Module livekit.agents.multimodal
Sub-modules
livekit.agents.multimodal.agent_playout
livekit.agents.multimodal.multimodal_agent
Classes
class AgentTranscriptionOptions (user_transcription: bool = True,
agent_transcription: bool = True,
agent_transcription_speed: float = 1.0,
sentence_tokenizer: tokenize.SentenceTokenizer = <livekit.agents.tokenize.basic.SentenceTokenizer object>,
word_tokenizer: tokenize.WordTokenizer = <livekit.agents.tokenize.basic.WordTokenizer object>,
hyphenate_word: Callable[[str], list[str]] = <function hyphenate_word>)-
Expand source code
@dataclass(frozen=True) class AgentTranscriptionOptions: user_transcription: bool = True """Whether to forward the user transcription to the client""" agent_transcription: bool = True """Whether to forward the agent transcription to the client""" agent_transcription_speed: float = 1.0 """The speed at which the agent's speech transcription is forwarded to the client. We try to mimic the agent's speech speed by adjusting the transcription speed.""" sentence_tokenizer: tokenize.SentenceTokenizer = tokenize.basic.SentenceTokenizer() """The tokenizer used to split the speech into sentences. This is used to decide when to mark a transcript as final for the agent transcription.""" word_tokenizer: tokenize.WordTokenizer = tokenize.basic.WordTokenizer( ignore_punctuation=False ) """The tokenizer used to split the speech into words. This is used to simulate the "interim results" of the agent transcription.""" hyphenate_word: Callable[[str], list[str]] = tokenize.basic.hyphenate_word """A function that takes a string (word) as input and returns a list of strings, representing the hyphenated parts of the word."""
AgentTranscriptionOptions(user_transcription: 'bool' = True, agent_transcription: 'bool' = True, agent_transcription_speed: 'float' = 1.0, sentence_tokenizer: 'tokenize.SentenceTokenizer' =
, word_tokenizer: 'tokenize.WordTokenizer' = , hyphenate_word: 'Callable[[str], list[str]]' = ) Class variables
var agent_transcription : bool
-
Whether to forward the agent transcription to the client
var agent_transcription_speed : float
-
The speed at which the agent's speech transcription is forwarded to the client. We try to mimic the agent's speech speed by adjusting the transcription speed.
var sentence_tokenizer : SentenceTokenizer
-
The tokenizer used to split the speech into sentences. This is used to decide when to mark a transcript as final for the agent transcription.
var user_transcription : bool
-
Whether to forward the user transcription to the client
var word_tokenizer : WordTokenizer
-
The tokenizer used to split the speech into words. This is used to simulate the "interim results" of the agent transcription.
Methods
def hyphenate_word(word: str) ‑> list[str]
-
Expand source code
def hyphenate_word(word: str) -> list[str]: return _basic_hyphenator.hyphenate_word(word)
class MultimodalAgent (*,
model: _RealtimeAPI,
vad: vad.VAD | None = None,
chat_ctx: llm.ChatContext | None = None,
fnc_ctx: llm.FunctionContext | None = None,
transcription: AgentTranscriptionOptions = AgentTranscriptionOptions(user_transcription=True, agent_transcription=True, agent_transcription_speed=1.0, sentence_tokenizer=<livekit.agents.tokenize.basic.SentenceTokenizer object>, word_tokenizer=<livekit.agents.tokenize.basic.WordTokenizer object>, hyphenate_word=<function hyphenate_word>),
max_text_response_retries: int = 5,
loop: asyncio.AbstractEventLoop | None = None)-
Expand source code
class MultimodalAgent(utils.EventEmitter[EventTypes]): def __init__( self, *, model: _RealtimeAPI, vad: vad.VAD | None = None, chat_ctx: llm.ChatContext | None = None, fnc_ctx: llm.FunctionContext | None = None, transcription: AgentTranscriptionOptions = AgentTranscriptionOptions(), max_text_response_retries: int = 5, loop: asyncio.AbstractEventLoop | None = None, ): """Create a new MultimodalAgent. Args: model: RealtimeAPI instance. vad: Voice Activity Detection (VAD) instance. chat_ctx: Chat context for the assistant. fnc_ctx: Function context for the assistant. transcription: Options for assistant transcription. max_text_response_retries: Maximum number of retries to recover from text responses to audio mode. OpenAI's realtime API has a chance to return text responses instead of audio if the chat context includes text system or assistant messages. The agent will attempt to recover to audio mode by deleting the text response and appending an empty audio message to the conversation. loop: Event loop to use. Default to asyncio.get_event_loop(). """ super().__init__() self._loop = loop or asyncio.get_event_loop() self._model = model self._vad = vad self._chat_ctx = chat_ctx self._fnc_ctx = fnc_ctx self._opts = _ImplOptions( transcription=transcription, ) # audio input self._read_micro_atask: asyncio.Task | None = None self._subscribed_track: rtc.RemoteAudioTrack | None = None self._input_audio_ch = utils.aio.Chan[rtc.AudioFrame]() # audio output self._playing_handle: agent_playout.PlayoutHandle | None = None self._linked_participant: rtc.RemoteParticipant | None = None self._started, self._closed = False, False self._update_state_task: asyncio.Task | None = None self._http_session: aiohttp.ClientSession | None = None self._text_response_retries = 0 self._max_text_response_retries = max_text_response_retries @property def vad(self) -> vad.VAD | None: return self._vad @property def fnc_ctx(self) -> llm.FunctionContext | None: return self._session.fnc_ctx @fnc_ctx.setter def fnc_ctx(self, value: llm.FunctionContext | None) -> None: self._session.fnc_ctx = value def chat_ctx_copy(self) -> llm.ChatContext: return self._session.chat_ctx_copy() async def set_chat_ctx(self, ctx: llm.ChatContext) -> None: await self._session.set_chat_ctx(ctx) def start( self, room: rtc.Room, participant: rtc.RemoteParticipant | str | None = None ) -> None: if self._started: raise RuntimeError("voice assistant already started") room.on("participant_connected", self._on_participant_connected) room.on("track_published", self._subscribe_to_microphone) room.on("track_subscribed", self._subscribe_to_microphone) self._room, self._participant = room, participant if participant is not None: if isinstance(participant, rtc.RemoteParticipant): self._link_participant(participant.identity) else: self._link_participant(participant) else: # no participant provided, try to find the first participant in the room for participant in self._room.remote_participants.values(): self._link_participant(participant.identity) break self._session = self._model.session( chat_ctx=self._chat_ctx, fnc_ctx=self._fnc_ctx ) # Create a task to wait for initialization and start the main task async def _init_and_start(): try: await self._session._init_sync_task logger.info("Session initialized with chat context") self._main_atask = asyncio.create_task(self._main_task()) except Exception as e: logger.exception("Failed to initialize session") raise e # Schedule the initialization and start task asyncio.create_task(_init_and_start()) @self._session.on("response_content_added") def _on_content_added(message: _ContentProto): tr_fwd = transcription.TTSSegmentsForwarder( room=self._room, participant=self._room.local_participant, speed=self._opts.transcription.agent_transcription_speed, sentence_tokenizer=self._opts.transcription.sentence_tokenizer, word_tokenizer=self._opts.transcription.word_tokenizer, hyphenate_word=self._opts.transcription.hyphenate_word, ) self._playing_handle = self._agent_playout.play( item_id=message.item_id, content_index=message.content_index, transcription_fwd=tr_fwd, text_stream=message.text_stream, audio_stream=message.audio_stream, ) @self._session.on("response_content_done") def _response_content_done(message: _ContentProto): if message.content_type == "text": if self._text_response_retries >= self._max_text_response_retries: raise RuntimeError( f"The OpenAI Realtime API returned a text response " f"after {self._max_text_response_retries} retries. " f"Please try to reduce the number of text system or " f"assistant messages in the chat context." ) self._text_response_retries += 1 logger.warning( "The OpenAI Realtime API returned a text response instead of audio. " "Attempting to recover to audio mode...", extra={ "item_id": message.item_id, "text": message.text, "retries": self._text_response_retries, }, ) self._session._recover_from_text_response(message.item_id) else: self._text_response_retries = 0 @self._session.on("input_speech_committed") def _input_speech_committed(): self._stt_forwarder.update( stt.SpeechEvent( type=stt.SpeechEventType.INTERIM_TRANSCRIPT, alternatives=[stt.SpeechData(language="", text="")], ) ) @self._session.on("input_speech_transcription_completed") def _input_speech_transcription_completed(ev: _InputTranscriptionProto): self._stt_forwarder.update( stt.SpeechEvent( type=stt.SpeechEventType.FINAL_TRANSCRIPT, alternatives=[stt.SpeechData(language="", text=ev.transcript)], ) ) if self._model.capabilities.supports_truncate: user_msg = ChatMessage.create( text=ev.transcript, role="user", id=ev.item_id ) self._session._update_conversation_item_content( ev.item_id, user_msg.content ) self._emit_speech_committed("user", ev.transcript) @self._session.on("agent_speech_transcription_completed") def _agent_speech_transcription_completed(ev: _InputTranscriptionProto): self._agent_stt_forwarder.update( stt.SpeechEvent( type=stt.SpeechEventType.FINAL_TRANSCRIPT, alternatives=[stt.SpeechData(language="", text=ev.transcript)], ) ) self._emit_speech_committed("agent", ev.transcript) # Similar to _input_speech_started, this handles updating the state to "listening" when the agent's speech is complete. # However, since Gemini doesn't support VAD events, we are not emitting the `user_started_speaking` event here. @self._session.on("agent_speech_stopped") def _agent_speech_stopped(): self.interrupt() @self._session.on("input_speech_started") def _input_speech_started(): self.emit("user_started_speaking") self.interrupt() @self._session.on("input_speech_stopped") def _input_speech_stopped(): self.emit("user_stopped_speaking") @self._session.on("function_calls_collected") def _function_calls_collected(fnc_call_infos: list[llm.FunctionCallInfo]): self.emit("function_calls_collected", fnc_call_infos) @self._session.on("function_calls_finished") def _function_calls_finished(called_fncs: list[llm.CalledFunction]): self.emit("function_calls_finished", called_fncs) @self._session.on("metrics_collected") def _metrics_collected(metrics: MultimodalLLMMetrics): self.emit("metrics_collected", metrics) def interrupt(self) -> None: if self._playing_handle is not None and not self._playing_handle.done(): self._playing_handle.interrupt() if self._model.capabilities.supports_truncate: self._session.cancel_response() # Only supported by OpenAI self._session._truncate_conversation_item( item_id=self._playing_handle.item_id, content_index=self._playing_handle.content_index, audio_end_ms=int(self._playing_handle.audio_samples / 24000 * 1000), ) self._update_state("listening") def generate_reply( self, on_duplicate: Literal[ "cancel_existing", "cancel_new", "keep_both" ] = "cancel_existing", ) -> None: """Generate a reply from the agent""" if not self._session.server_vad_enabled: self._session.commit_audio_buffer() self._session.create_response(on_duplicate=on_duplicate) def _update_state(self, state: AgentState, delay: float = 0.0): """Set the current state of the agent""" @utils.log_exceptions(logger=logger) async def _run_task(delay: float) -> None: await asyncio.sleep(delay) if self._room.isconnected(): await self._room.local_participant.set_attributes( {ATTRIBUTE_AGENT_STATE: state} ) if self._update_state_task is not None: self._update_state_task.cancel() self._update_state_task = asyncio.create_task(_run_task(delay)) @utils.log_exceptions(logger=logger) async def _main_task(self) -> None: self._update_state("initializing") self._audio_source = rtc.AudioSource(24000, 1) track = rtc.LocalAudioTrack.create_audio_track( "assistant_voice", self._audio_source ) self._agent_publication = await self._room.local_participant.publish_track( track, rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_MICROPHONE) ) self._agent_stt_forwarder = transcription.STTSegmentsForwarder( room=self._room, participant=self._room.local_participant, track=track, ) self._agent_playout = agent_playout.AgentPlayout( audio_source=self._audio_source ) def _on_playout_started() -> None: self.emit("agent_started_speaking") self._update_state("speaking") def _on_playout_stopped(interrupted: bool) -> None: self.emit("agent_stopped_speaking") self._update_state("listening") if self._playing_handle is not None: collected_text = self._playing_handle._tr_fwd.played_text if interrupted: collected_text += "..." if self._model.capabilities.supports_truncate and collected_text: msg = ChatMessage.create( text=collected_text, role="assistant", id=self._playing_handle.item_id, ) self._session._update_conversation_item_content( self._playing_handle.item_id, msg.content ) self._emit_speech_committed("agent", collected_text, interrupted) self._agent_playout.on("playout_started", _on_playout_started) self._agent_playout.on("playout_stopped", _on_playout_stopped) await self._agent_publication.wait_for_subscription() bstream = utils.audio.AudioByteStream( 24000, 1, samples_per_channel=2400, ) async for frame in self._input_audio_ch: for f in bstream.write(frame.data.tobytes()): self._session._push_audio(f) def _on_participant_connected(self, participant: rtc.RemoteParticipant): if self._linked_participant is None: return self._link_participant(participant.identity) def _link_participant(self, participant_identity: str) -> None: self._linked_participant = self._room.remote_participants.get( participant_identity ) if self._linked_participant is None: logger.error("_link_participant must be called with a valid identity") return self._subscribe_to_microphone() async def _micro_task(self, track: rtc.LocalAudioTrack) -> None: sample_rate = self._model.capabilities.input_audio_sample_rate if sample_rate is None: sample_rate = 24000 input_stream = rtc.AudioStream(track, sample_rate=sample_rate, num_channels=1) async for ev in input_stream: self._input_audio_ch.send_nowait(ev.frame) def _subscribe_to_microphone(self, *args, **kwargs) -> None: """Subscribe to the participant microphone if found""" if self._linked_participant is None: return for publication in self._linked_participant.track_publications.values(): if publication.source != rtc.TrackSource.SOURCE_MICROPHONE: continue if not publication.subscribed: publication.set_subscribed(True) if ( publication.track is not None and publication.track != self._subscribed_track ): self._subscribed_track = publication.track # type: ignore self._stt_forwarder = transcription.STTSegmentsForwarder( room=self._room, participant=self._linked_participant, track=self._subscribed_track, ) if self._read_micro_atask is not None: self._read_micro_atask.cancel() self._read_micro_atask = asyncio.create_task( self._micro_task(self._subscribed_track) # type: ignore ) break def _ensure_session(self) -> aiohttp.ClientSession: if not self._http_session: self._http_session = utils.http_context.http_session() return self._http_session def _emit_speech_committed( self, speaker: Literal["user", "agent"], msg: str, interrupted: bool = False ): if speaker == "user": self.emit("user_speech_committed", msg) else: if interrupted: self.emit("agent_speech_interrupted", msg) else: self.emit("agent_speech_committed", msg) logger.debug( f"committed {speaker} speech", extra={ f"{speaker}_transcript": msg, "interrupted": interrupted, }, )
Abstract base class for generic types.
On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::
class Mapping[KT, VT]: def __getitem__(self, key: KT) -> VT: ... # Etc.
On older versions of Python, however, generic classes have to explicitly inherit from Generic.
After a class has been declared to be generic, it can then be used as follows::
def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
Create a new MultimodalAgent.
Args
model
- RealtimeAPI instance.
vad
- Voice Activity Detection (VAD) instance.
chat_ctx
- Chat context for the assistant.
fnc_ctx
- Function context for the assistant.
transcription
- Options for assistant transcription.
max_text_response_retries
- Maximum number of retries to recover from text responses to audio mode. OpenAI's realtime API has a chance to return text responses instead of audio if the chat context includes text system or assistant messages. The agent will attempt to recover to audio mode by deleting the text response and appending an empty audio message to the conversation.
loop
- Event loop to use. Default to asyncio.get_event_loop().
Ancestors
- EventEmitter
- typing.Generic
Instance variables
prop fnc_ctx : llm.FunctionContext | None
-
Expand source code
@property def fnc_ctx(self) -> llm.FunctionContext | None: return self._session.fnc_ctx
prop vad : vad.VAD | None
-
Expand source code
@property def vad(self) -> vad.VAD | None: return self._vad
Methods
def chat_ctx_copy(self) ‑> ChatContext
-
Expand source code
def chat_ctx_copy(self) -> llm.ChatContext: return self._session.chat_ctx_copy()
def generate_reply(self,
on_duplicate: "Literal['cancel_existing', 'cancel_new', 'keep_both']" = 'cancel_existing') ‑> None-
Expand source code
def generate_reply( self, on_duplicate: Literal[ "cancel_existing", "cancel_new", "keep_both" ] = "cancel_existing", ) -> None: """Generate a reply from the agent""" if not self._session.server_vad_enabled: self._session.commit_audio_buffer() self._session.create_response(on_duplicate=on_duplicate)
Generate a reply from the agent
def interrupt(self) ‑> None
-
Expand source code
def interrupt(self) -> None: if self._playing_handle is not None and not self._playing_handle.done(): self._playing_handle.interrupt() if self._model.capabilities.supports_truncate: self._session.cancel_response() # Only supported by OpenAI self._session._truncate_conversation_item( item_id=self._playing_handle.item_id, content_index=self._playing_handle.content_index, audio_end_ms=int(self._playing_handle.audio_samples / 24000 * 1000), ) self._update_state("listening")
async def set_chat_ctx(self, ctx: llm.ChatContext) ‑> None
-
Expand source code
async def set_chat_ctx(self, ctx: llm.ChatContext) -> None: await self._session.set_chat_ctx(ctx)
def start(self, room: rtc.Room, participant: rtc.RemoteParticipant | str | None = None) ‑> None
-
Expand source code
def start( self, room: rtc.Room, participant: rtc.RemoteParticipant | str | None = None ) -> None: if self._started: raise RuntimeError("voice assistant already started") room.on("participant_connected", self._on_participant_connected) room.on("track_published", self._subscribe_to_microphone) room.on("track_subscribed", self._subscribe_to_microphone) self._room, self._participant = room, participant if participant is not None: if isinstance(participant, rtc.RemoteParticipant): self._link_participant(participant.identity) else: self._link_participant(participant) else: # no participant provided, try to find the first participant in the room for participant in self._room.remote_participants.values(): self._link_participant(participant.identity) break self._session = self._model.session( chat_ctx=self._chat_ctx, fnc_ctx=self._fnc_ctx ) # Create a task to wait for initialization and start the main task async def _init_and_start(): try: await self._session._init_sync_task logger.info("Session initialized with chat context") self._main_atask = asyncio.create_task(self._main_task()) except Exception as e: logger.exception("Failed to initialize session") raise e # Schedule the initialization and start task asyncio.create_task(_init_and_start()) @self._session.on("response_content_added") def _on_content_added(message: _ContentProto): tr_fwd = transcription.TTSSegmentsForwarder( room=self._room, participant=self._room.local_participant, speed=self._opts.transcription.agent_transcription_speed, sentence_tokenizer=self._opts.transcription.sentence_tokenizer, word_tokenizer=self._opts.transcription.word_tokenizer, hyphenate_word=self._opts.transcription.hyphenate_word, ) self._playing_handle = self._agent_playout.play( item_id=message.item_id, content_index=message.content_index, transcription_fwd=tr_fwd, text_stream=message.text_stream, audio_stream=message.audio_stream, ) @self._session.on("response_content_done") def _response_content_done(message: _ContentProto): if message.content_type == "text": if self._text_response_retries >= self._max_text_response_retries: raise RuntimeError( f"The OpenAI Realtime API returned a text response " f"after {self._max_text_response_retries} retries. " f"Please try to reduce the number of text system or " f"assistant messages in the chat context." ) self._text_response_retries += 1 logger.warning( "The OpenAI Realtime API returned a text response instead of audio. " "Attempting to recover to audio mode...", extra={ "item_id": message.item_id, "text": message.text, "retries": self._text_response_retries, }, ) self._session._recover_from_text_response(message.item_id) else: self._text_response_retries = 0 @self._session.on("input_speech_committed") def _input_speech_committed(): self._stt_forwarder.update( stt.SpeechEvent( type=stt.SpeechEventType.INTERIM_TRANSCRIPT, alternatives=[stt.SpeechData(language="", text="")], ) ) @self._session.on("input_speech_transcription_completed") def _input_speech_transcription_completed(ev: _InputTranscriptionProto): self._stt_forwarder.update( stt.SpeechEvent( type=stt.SpeechEventType.FINAL_TRANSCRIPT, alternatives=[stt.SpeechData(language="", text=ev.transcript)], ) ) if self._model.capabilities.supports_truncate: user_msg = ChatMessage.create( text=ev.transcript, role="user", id=ev.item_id ) self._session._update_conversation_item_content( ev.item_id, user_msg.content ) self._emit_speech_committed("user", ev.transcript) @self._session.on("agent_speech_transcription_completed") def _agent_speech_transcription_completed(ev: _InputTranscriptionProto): self._agent_stt_forwarder.update( stt.SpeechEvent( type=stt.SpeechEventType.FINAL_TRANSCRIPT, alternatives=[stt.SpeechData(language="", text=ev.transcript)], ) ) self._emit_speech_committed("agent", ev.transcript) # Similar to _input_speech_started, this handles updating the state to "listening" when the agent's speech is complete. # However, since Gemini doesn't support VAD events, we are not emitting the `user_started_speaking` event here. @self._session.on("agent_speech_stopped") def _agent_speech_stopped(): self.interrupt() @self._session.on("input_speech_started") def _input_speech_started(): self.emit("user_started_speaking") self.interrupt() @self._session.on("input_speech_stopped") def _input_speech_stopped(): self.emit("user_stopped_speaking") @self._session.on("function_calls_collected") def _function_calls_collected(fnc_call_infos: list[llm.FunctionCallInfo]): self.emit("function_calls_collected", fnc_call_infos) @self._session.on("function_calls_finished") def _function_calls_finished(called_fncs: list[llm.CalledFunction]): self.emit("function_calls_finished", called_fncs) @self._session.on("metrics_collected") def _metrics_collected(metrics: MultimodalLLMMetrics): self.emit("metrics_collected", metrics)
Inherited members
class _RealtimeAPI (*args, **kwargs)
-
Expand source code
class _RealtimeAPI(Protocol): """Realtime API protocol""" @property def capabilities(self) -> _CapabilitiesProto: ... def session( self, *, chat_ctx: llm.ChatContext | None = None, fnc_ctx: llm.FunctionContext | None = None, ) -> _RealtimeAPISession: """ Create a new realtime session with the given chat and function contexts. """ pass
Realtime API protocol
Ancestors
- typing.Protocol
- typing.Generic
Instance variables
prop capabilities : _CapabilitiesProto
-
Expand source code
@property def capabilities(self) -> _CapabilitiesProto: ...
Methods
def session(self,
*,
chat_ctx: llm.ChatContext | None = None,
fnc_ctx: llm.FunctionContext | None = None) ‑> livekit.agents.multimodal.multimodal_agent._RealtimeAPISession-
Expand source code
def session( self, *, chat_ctx: llm.ChatContext | None = None, fnc_ctx: llm.FunctionContext | None = None, ) -> _RealtimeAPISession: """ Create a new realtime session with the given chat and function contexts. """ pass
Create a new realtime session with the given chat and function contexts.
class _RealtimeAPISession (*args, **kwargs)
-
Expand source code
class _RealtimeAPISession(Protocol): async def set_chat_ctx(self, ctx: llm.ChatContext) -> None: ... @overload def on(self, event: str, callback: None = None) -> Callable[[T], T]: ... @overload def on(self, event: str, callback: T) -> T: ... def on( self, event: str, callback: Optional[T] = None ) -> Union[T, Callable[[T], T]]: ... def _push_audio(self, frame: rtc.AudioFrame) -> None: ... @property def fnc_ctx(self) -> llm.FunctionContext | None: ... @fnc_ctx.setter def fnc_ctx(self, value: llm.FunctionContext | None) -> None: ... def chat_ctx_copy(self) -> llm.ChatContext: ... def cancel_response(self) -> None: ... def create_response( self, on_duplicate: Literal[ "cancel_existing", "cancel_new", "keep_both" ] = "keep_both", ) -> None: ... def commit_audio_buffer(self) -> None: ... @property def server_vad_enabled(self) -> bool: ... def _recover_from_text_response(self, item_id: str) -> None: ... def _update_conversation_item_content( self, item_id: str, content: llm.ChatContent | list[llm.ChatContent] | None = None, ) -> None: ... def _truncate_conversation_item( self, item_id: str, content_index: int, audio_end_ms: int ) -> None: ...
Base class for protocol classes.
Protocol classes are defined as::
class Proto(Protocol): def meth(self) -> int: ...
Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing).
For example::
class C: def meth(self) -> int: return 0 def func(x: Proto) -> int: return x.meth() func(C()) # Passes static type check
See PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as::
class GenProto[T](Protocol): def meth(self) -> T: ...
Ancestors
- typing.Protocol
- typing.Generic
Instance variables
prop fnc_ctx : llm.FunctionContext | None
-
Expand source code
@property def fnc_ctx(self) -> llm.FunctionContext | None: ...
prop server_vad_enabled : bool
-
Expand source code
@property def server_vad_enabled(self) -> bool: ...
Methods
def cancel_response(self) ‑> None
-
Expand source code
def cancel_response(self) -> None: ...
def chat_ctx_copy(self) ‑> ChatContext
-
Expand source code
def chat_ctx_copy(self) -> llm.ChatContext: ...
def commit_audio_buffer(self) ‑> None
-
Expand source code
def commit_audio_buffer(self) -> None: ...
def create_response(self,
on_duplicate: "Literal['cancel_existing', 'cancel_new', 'keep_both']" = 'keep_both') ‑> None-
Expand source code
def create_response( self, on_duplicate: Literal[ "cancel_existing", "cancel_new", "keep_both" ] = "keep_both", ) -> None: ...
def on(self, event: str, callback: Optional[T] = None) ‑> ~T | Callable[[~T], ~T]
-
Expand source code
def on( self, event: str, callback: Optional[T] = None ) -> Union[T, Callable[[T], T]]: ...
async def set_chat_ctx(self, ctx: llm.ChatContext) ‑> None
-
Expand source code
async def set_chat_ctx(self, ctx: llm.ChatContext) -> None: ...