Module livekit.agents.multimodal.multimodal_agent
Classes
class AgentTranscriptionOptions (user_transcription: bool = True,
agent_transcription: bool = True,
agent_transcription_speed: float = 1.0,
sentence_tokenizer: tokenize.SentenceTokenizer = <livekit.agents.tokenize.basic.SentenceTokenizer object>,
word_tokenizer: tokenize.WordTokenizer = <livekit.agents.tokenize.basic.WordTokenizer object>,
hyphenate_word: Callable[[str], list[str]] = <function hyphenate_word>)-
Expand source code
@dataclass(frozen=True) class AgentTranscriptionOptions: user_transcription: bool = True """Whether to forward the user transcription to the client""" agent_transcription: bool = True """Whether to forward the agent transcription to the client""" agent_transcription_speed: float = 1.0 """The speed at which the agent's speech transcription is forwarded to the client. We try to mimic the agent's speech speed by adjusting the transcription speed.""" sentence_tokenizer: tokenize.SentenceTokenizer = tokenize.basic.SentenceTokenizer() """The tokenizer used to split the speech into sentences. This is used to decide when to mark a transcript as final for the agent transcription.""" word_tokenizer: tokenize.WordTokenizer = tokenize.basic.WordTokenizer( ignore_punctuation=False ) """The tokenizer used to split the speech into words. This is used to simulate the "interim results" of the agent transcription.""" hyphenate_word: Callable[[str], list[str]] = tokenize.basic.hyphenate_word """A function that takes a string (word) as input and returns a list of strings, representing the hyphenated parts of the word."""
AgentTranscriptionOptions(user_transcription: 'bool' = True, agent_transcription: 'bool' = True, agent_transcription_speed: 'float' = 1.0, sentence_tokenizer: 'tokenize.SentenceTokenizer' =
, word_tokenizer: 'tokenize.WordTokenizer' = , hyphenate_word: 'Callable[[str], list[str]]' = ) Instance variables
var agent_transcription : bool
-
Whether to forward the agent transcription to the client
var agent_transcription_speed : float
-
The speed at which the agent's speech transcription is forwarded to the client. We try to mimic the agent's speech speed by adjusting the transcription speed.
var sentence_tokenizer : SentenceTokenizer
-
The tokenizer used to split the speech into sentences. This is used to decide when to mark a transcript as final for the agent transcription.
var user_transcription : bool
-
Whether to forward the user transcription to the client
var word_tokenizer : WordTokenizer
-
The tokenizer used to split the speech into words. This is used to simulate the "interim results" of the agent transcription.
Methods
def hyphenate_word(word: str) ‑> list[str]
-
Expand source code
def hyphenate_word(word: str) -> list[str]: return _basic_hyphenator.hyphenate_word(word)
class MultimodalAgent (*,
model: _RealtimeAPI,
vad: vad.VAD | None = None,
chat_ctx: llm.ChatContext | None = None,
fnc_ctx: llm.FunctionContext | None = None,
transcription: AgentTranscriptionOptions = AgentTranscriptionOptions(user_transcription=True, agent_transcription=True, agent_transcription_speed=1.0, sentence_tokenizer=<livekit.agents.tokenize.basic.SentenceTokenizer object>, word_tokenizer=<livekit.agents.tokenize.basic.WordTokenizer object>, hyphenate_word=<function hyphenate_word>),
max_text_response_retries: int = 5,
loop: asyncio.AbstractEventLoop | None = None,
noise_cancellation: rtc.NoiseCancellationOptions | None = None)-
Expand source code
class MultimodalAgent(utils.EventEmitter[EventTypes]): def __init__( self, *, model: _RealtimeAPI, vad: vad.VAD | None = None, chat_ctx: llm.ChatContext | None = None, fnc_ctx: llm.FunctionContext | None = None, transcription: AgentTranscriptionOptions = AgentTranscriptionOptions(), max_text_response_retries: int = 5, loop: asyncio.AbstractEventLoop | None = None, noise_cancellation: rtc.NoiseCancellationOptions | None = None, ): """Create a new MultimodalAgent. Args: model: RealtimeAPI instance. vad: Voice Activity Detection (VAD) instance. chat_ctx: Chat context for the assistant. fnc_ctx: Function context for the assistant. transcription: Options for assistant transcription. max_text_response_retries: Maximum number of retries to recover from text responses to audio mode. OpenAI's realtime API has a chance to return text responses instead of audio if the chat context includes text system or assistant messages. The agent will attempt to recover to audio mode by deleting the text response and appending an empty audio message to the conversation. loop: Event loop to use. Default to asyncio.get_event_loop(). """ super().__init__() self._loop = loop or asyncio.get_event_loop() self._model = model self._vad = vad self._chat_ctx = chat_ctx self._fnc_ctx = fnc_ctx self._opts = _ImplOptions( transcription=transcription, ) # audio input self._read_micro_atask: asyncio.Task | None = None self._subscribed_track: rtc.RemoteAudioTrack | None = None self._input_audio_ch = utils.aio.Chan[rtc.AudioFrame]() # audio output self._playing_handle: agent_playout.PlayoutHandle | None = None self._linked_participant: rtc.RemoteParticipant | None = None self._started, self._closed = False, False self._update_state_task: asyncio.Task | None = None self._http_session: aiohttp.ClientSession | None = None self._text_response_retries = 0 self._max_text_response_retries = max_text_response_retries self._noise_cancellation = noise_cancellation @property def vad(self) -> vad.VAD | None: return self._vad @property def fnc_ctx(self) -> llm.FunctionContext | None: return self._session.fnc_ctx @fnc_ctx.setter def fnc_ctx(self, value: llm.FunctionContext | None) -> None: self._session.fnc_ctx = value def chat_ctx_copy(self) -> llm.ChatContext: return self._session.chat_ctx_copy() async def set_chat_ctx(self, ctx: llm.ChatContext) -> None: await self._session.set_chat_ctx(ctx) def start( self, room: rtc.Room, participant: rtc.RemoteParticipant | str | None = None ) -> None: if self._started: raise RuntimeError("voice assistant already started") room.on("participant_connected", self._on_participant_connected) room.on("track_published", self._subscribe_to_microphone) room.on("track_subscribed", self._subscribe_to_microphone) self._room, self._participant = room, participant if participant is not None: if isinstance(participant, rtc.RemoteParticipant): self._link_participant(participant.identity) else: self._link_participant(participant) else: # no participant provided, try to find the first participant in the room for participant in self._room.remote_participants.values(): self._link_participant(participant.identity) break self._session = self._model.session( chat_ctx=self._chat_ctx, fnc_ctx=self._fnc_ctx ) # Create a task to wait for initialization and start the main task async def _init_and_start(): try: await self._session._init_sync_task logger.info("Session initialized with chat context") self._main_atask = asyncio.create_task(self._main_task()) except Exception as e: logger.exception("Failed to initialize session") raise e # Schedule the initialization and start task asyncio.create_task(_init_and_start()) @self._session.on("response_content_added") def _on_content_added(message: _ContentProto): tr_fwd = transcription.TTSSegmentsForwarder( room=self._room, participant=self._room.local_participant, speed=self._opts.transcription.agent_transcription_speed, sentence_tokenizer=self._opts.transcription.sentence_tokenizer, word_tokenizer=self._opts.transcription.word_tokenizer, hyphenate_word=self._opts.transcription.hyphenate_word, ) self._playing_handle = self._agent_playout.play( item_id=message.item_id, content_index=message.content_index, transcription_fwd=tr_fwd, text_stream=message.text_stream, audio_stream=message.audio_stream, ) @self._session.on("response_content_done") def _response_content_done(message: _ContentProto): if message.content_type == "text": if self._text_response_retries >= self._max_text_response_retries: raise RuntimeError( f"The OpenAI Realtime API returned a text response " f"after {self._max_text_response_retries} retries. " f"Please try to reduce the number of text system or " f"assistant messages in the chat context." ) self._text_response_retries += 1 logger.warning( "The OpenAI Realtime API returned a text response instead of audio. " "Attempting to recover to audio mode...", extra={ "item_id": message.item_id, "text": message.text, "retries": self._text_response_retries, }, ) self._session._recover_from_text_response(message.item_id) else: self._text_response_retries = 0 @self._session.on("input_speech_committed") def _input_speech_committed(): self._stt_forwarder.update( stt.SpeechEvent( type=stt.SpeechEventType.INTERIM_TRANSCRIPT, alternatives=[stt.SpeechData(language="", text="")], ) ) @self._session.on("input_speech_transcription_completed") def _input_speech_transcription_completed(ev: _InputTranscriptionProto): self._stt_forwarder.update( stt.SpeechEvent( type=stt.SpeechEventType.FINAL_TRANSCRIPT, alternatives=[stt.SpeechData(language="", text=ev.transcript)], ) ) if self._model.capabilities.supports_truncate: user_msg = ChatMessage.create( text=ev.transcript, role="user", id=ev.item_id ) self._session._update_conversation_item_content( ev.item_id, user_msg.content ) self._emit_speech_committed("user", ev.transcript) @self._session.on("agent_speech_transcription_completed") def _agent_speech_transcription_completed(ev: _InputTranscriptionProto): self._agent_stt_forwarder.update( stt.SpeechEvent( type=stt.SpeechEventType.FINAL_TRANSCRIPT, alternatives=[stt.SpeechData(language="", text=ev.transcript)], ) ) self._emit_speech_committed("agent", ev.transcript) # Similar to _input_speech_started, this handles updating the state to "listening" when the agent's speech is complete. # However, since Gemini doesn't support VAD events, we are not emitting the `user_started_speaking` event here. @self._session.on("agent_speech_stopped") def _agent_speech_stopped(): self.interrupt() @self._session.on("input_speech_started") def _input_speech_started(): self.emit("user_started_speaking") self.interrupt() @self._session.on("input_speech_stopped") def _input_speech_stopped(): self.emit("user_stopped_speaking") @self._session.on("function_calls_collected") def _function_calls_collected(fnc_call_infos: list[llm.FunctionCallInfo]): self.emit("function_calls_collected", fnc_call_infos) @self._session.on("function_calls_finished") def _function_calls_finished(called_fncs: list[llm.CalledFunction]): self.emit("function_calls_finished", called_fncs) @self._session.on("metrics_collected") def _metrics_collected(metrics: MultimodalLLMMetrics): self.emit("metrics_collected", metrics) def interrupt(self) -> None: if self._playing_handle is not None and not self._playing_handle.done(): self._playing_handle.interrupt() if self._model.capabilities.supports_truncate: self._session.cancel_response() # Only supported by OpenAI self._session._truncate_conversation_item( item_id=self._playing_handle.item_id, content_index=self._playing_handle.content_index, audio_end_ms=int(self._playing_handle.audio_samples / 24000 * 1000), ) self._update_state("listening") def generate_reply( self, on_duplicate: Literal[ "cancel_existing", "cancel_new", "keep_both" ] = "cancel_existing", ) -> None: """Generate a reply from the agent""" if not self._session.server_vad_enabled: self._session.commit_audio_buffer() self._session.create_response(on_duplicate=on_duplicate) def _update_state(self, state: AgentState, delay: float = 0.0): """Set the current state of the agent""" @utils.log_exceptions(logger=logger) async def _run_task(delay: float) -> None: await asyncio.sleep(delay) if self._room.isconnected(): await self._room.local_participant.set_attributes( {ATTRIBUTE_AGENT_STATE: state} ) if self._update_state_task is not None: self._update_state_task.cancel() self._update_state_task = asyncio.create_task(_run_task(delay)) @utils.log_exceptions(logger=logger) async def _main_task(self) -> None: self._update_state("initializing") self._audio_source = rtc.AudioSource(24000, 1) track = rtc.LocalAudioTrack.create_audio_track( "assistant_voice", self._audio_source ) self._agent_publication = await self._room.local_participant.publish_track( track, rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_MICROPHONE) ) self._agent_stt_forwarder = transcription.STTSegmentsForwarder( room=self._room, participant=self._room.local_participant, track=track, ) self._agent_playout = agent_playout.AgentPlayout( audio_source=self._audio_source ) def _on_playout_started() -> None: if self._session.playout_complete is not None: self._session.playout_complete.clear() self.emit("agent_started_speaking") self._update_state("speaking") def _on_playout_stopped(interrupted: bool) -> None: if self._session.playout_complete is not None: self._session.playout_complete.set() self.emit("agent_stopped_speaking") self._update_state("listening") if self._playing_handle is not None: collected_text = self._playing_handle._tr_fwd.played_text if interrupted: collected_text += "..." if self._model.capabilities.supports_truncate and collected_text: msg = ChatMessage.create( text=collected_text, role="assistant", id=self._playing_handle.item_id, ) self._session._update_conversation_item_content( self._playing_handle.item_id, msg.content ) self._emit_speech_committed("agent", collected_text, interrupted) self._agent_playout.on("playout_started", _on_playout_started) self._agent_playout.on("playout_stopped", _on_playout_stopped) await self._agent_publication.wait_for_subscription() bstream = utils.audio.AudioByteStream( 24000, 1, samples_per_channel=2400, ) async for frame in self._input_audio_ch: for f in bstream.write(frame.data.tobytes()): self._session._push_audio(f) def _on_participant_connected(self, participant: rtc.RemoteParticipant): if self._linked_participant is None: return self._link_participant(participant.identity) def _link_participant(self, participant_identity: str) -> None: self._linked_participant = self._room.remote_participants.get( participant_identity ) if self._linked_participant is None: logger.error("_link_participant must be called with a valid identity") return self._subscribe_to_microphone() async def _micro_task(self, track: rtc.LocalAudioTrack) -> None: sample_rate = self._model.capabilities.input_audio_sample_rate if sample_rate is None: sample_rate = 24000 input_stream = rtc.AudioStream( track, sample_rate=sample_rate, num_channels=1, noise_cancellation=self._noise_cancellation, ) async for ev in input_stream: self._input_audio_ch.send_nowait(ev.frame) def _subscribe_to_microphone(self, *args, **kwargs) -> None: """Subscribe to the participant microphone if found""" if self._linked_participant is None: return for publication in self._linked_participant.track_publications.values(): if publication.source != rtc.TrackSource.SOURCE_MICROPHONE: continue if not publication.subscribed: publication.set_subscribed(True) if ( publication.track is not None and publication.track != self._subscribed_track ): self._subscribed_track = publication.track # type: ignore self._stt_forwarder = transcription.STTSegmentsForwarder( room=self._room, participant=self._linked_participant, track=self._subscribed_track, ) if self._read_micro_atask is not None: self._read_micro_atask.cancel() self._read_micro_atask = asyncio.create_task( self._micro_task(self._subscribed_track) # type: ignore ) break def _ensure_session(self) -> aiohttp.ClientSession: if not self._http_session: self._http_session = utils.http_context.http_session() return self._http_session def _emit_speech_committed( self, speaker: Literal["user", "agent"], msg: str, interrupted: bool = False ): if speaker == "user": self.emit("user_speech_committed", msg) else: if interrupted: self.emit("agent_speech_interrupted", msg) else: self.emit("agent_speech_committed", msg) logger.debug( f"committed {speaker} speech", extra={ f"{speaker}_transcript": msg, "interrupted": interrupted, }, )
Abstract base class for generic types.
On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::
class Mapping[KT, VT]: def __getitem__(self, key: KT) -> VT: ... # Etc.
On older versions of Python, however, generic classes have to explicitly inherit from Generic.
After a class has been declared to be generic, it can then be used as follows::
def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
Create a new MultimodalAgent.
Args
model
- RealtimeAPI instance.
vad
- Voice Activity Detection (VAD) instance.
chat_ctx
- Chat context for the assistant.
fnc_ctx
- Function context for the assistant.
transcription
- Options for assistant transcription.
max_text_response_retries
- Maximum number of retries to recover from text responses to audio mode. OpenAI's realtime API has a chance to return text responses instead of audio if the chat context includes text system or assistant messages. The agent will attempt to recover to audio mode by deleting the text response and appending an empty audio message to the conversation.
loop
- Event loop to use. Default to asyncio.get_event_loop().
Ancestors
- EventEmitter
- typing.Generic
Instance variables
prop fnc_ctx : llm.FunctionContext | None
-
Expand source code
@property def fnc_ctx(self) -> llm.FunctionContext | None: return self._session.fnc_ctx
prop vad : vad.VAD | None
-
Expand source code
@property def vad(self) -> vad.VAD | None: return self._vad
Methods
def chat_ctx_copy(self) ‑> ChatContext
-
Expand source code
def chat_ctx_copy(self) -> llm.ChatContext: return self._session.chat_ctx_copy()
def generate_reply(self,
on_duplicate: "Literal['cancel_existing', 'cancel_new', 'keep_both']" = 'cancel_existing') ‑> None-
Expand source code
def generate_reply( self, on_duplicate: Literal[ "cancel_existing", "cancel_new", "keep_both" ] = "cancel_existing", ) -> None: """Generate a reply from the agent""" if not self._session.server_vad_enabled: self._session.commit_audio_buffer() self._session.create_response(on_duplicate=on_duplicate)
Generate a reply from the agent
def interrupt(self) ‑> None
-
Expand source code
def interrupt(self) -> None: if self._playing_handle is not None and not self._playing_handle.done(): self._playing_handle.interrupt() if self._model.capabilities.supports_truncate: self._session.cancel_response() # Only supported by OpenAI self._session._truncate_conversation_item( item_id=self._playing_handle.item_id, content_index=self._playing_handle.content_index, audio_end_ms=int(self._playing_handle.audio_samples / 24000 * 1000), ) self._update_state("listening")
async def set_chat_ctx(self, ctx: llm.ChatContext) ‑> None
-
Expand source code
async def set_chat_ctx(self, ctx: llm.ChatContext) -> None: await self._session.set_chat_ctx(ctx)
def start(self, room: rtc.Room, participant: rtc.RemoteParticipant | str | None = None) ‑> None
-
Expand source code
def start( self, room: rtc.Room, participant: rtc.RemoteParticipant | str | None = None ) -> None: if self._started: raise RuntimeError("voice assistant already started") room.on("participant_connected", self._on_participant_connected) room.on("track_published", self._subscribe_to_microphone) room.on("track_subscribed", self._subscribe_to_microphone) self._room, self._participant = room, participant if participant is not None: if isinstance(participant, rtc.RemoteParticipant): self._link_participant(participant.identity) else: self._link_participant(participant) else: # no participant provided, try to find the first participant in the room for participant in self._room.remote_participants.values(): self._link_participant(participant.identity) break self._session = self._model.session( chat_ctx=self._chat_ctx, fnc_ctx=self._fnc_ctx ) # Create a task to wait for initialization and start the main task async def _init_and_start(): try: await self._session._init_sync_task logger.info("Session initialized with chat context") self._main_atask = asyncio.create_task(self._main_task()) except Exception as e: logger.exception("Failed to initialize session") raise e # Schedule the initialization and start task asyncio.create_task(_init_and_start()) @self._session.on("response_content_added") def _on_content_added(message: _ContentProto): tr_fwd = transcription.TTSSegmentsForwarder( room=self._room, participant=self._room.local_participant, speed=self._opts.transcription.agent_transcription_speed, sentence_tokenizer=self._opts.transcription.sentence_tokenizer, word_tokenizer=self._opts.transcription.word_tokenizer, hyphenate_word=self._opts.transcription.hyphenate_word, ) self._playing_handle = self._agent_playout.play( item_id=message.item_id, content_index=message.content_index, transcription_fwd=tr_fwd, text_stream=message.text_stream, audio_stream=message.audio_stream, ) @self._session.on("response_content_done") def _response_content_done(message: _ContentProto): if message.content_type == "text": if self._text_response_retries >= self._max_text_response_retries: raise RuntimeError( f"The OpenAI Realtime API returned a text response " f"after {self._max_text_response_retries} retries. " f"Please try to reduce the number of text system or " f"assistant messages in the chat context." ) self._text_response_retries += 1 logger.warning( "The OpenAI Realtime API returned a text response instead of audio. " "Attempting to recover to audio mode...", extra={ "item_id": message.item_id, "text": message.text, "retries": self._text_response_retries, }, ) self._session._recover_from_text_response(message.item_id) else: self._text_response_retries = 0 @self._session.on("input_speech_committed") def _input_speech_committed(): self._stt_forwarder.update( stt.SpeechEvent( type=stt.SpeechEventType.INTERIM_TRANSCRIPT, alternatives=[stt.SpeechData(language="", text="")], ) ) @self._session.on("input_speech_transcription_completed") def _input_speech_transcription_completed(ev: _InputTranscriptionProto): self._stt_forwarder.update( stt.SpeechEvent( type=stt.SpeechEventType.FINAL_TRANSCRIPT, alternatives=[stt.SpeechData(language="", text=ev.transcript)], ) ) if self._model.capabilities.supports_truncate: user_msg = ChatMessage.create( text=ev.transcript, role="user", id=ev.item_id ) self._session._update_conversation_item_content( ev.item_id, user_msg.content ) self._emit_speech_committed("user", ev.transcript) @self._session.on("agent_speech_transcription_completed") def _agent_speech_transcription_completed(ev: _InputTranscriptionProto): self._agent_stt_forwarder.update( stt.SpeechEvent( type=stt.SpeechEventType.FINAL_TRANSCRIPT, alternatives=[stt.SpeechData(language="", text=ev.transcript)], ) ) self._emit_speech_committed("agent", ev.transcript) # Similar to _input_speech_started, this handles updating the state to "listening" when the agent's speech is complete. # However, since Gemini doesn't support VAD events, we are not emitting the `user_started_speaking` event here. @self._session.on("agent_speech_stopped") def _agent_speech_stopped(): self.interrupt() @self._session.on("input_speech_started") def _input_speech_started(): self.emit("user_started_speaking") self.interrupt() @self._session.on("input_speech_stopped") def _input_speech_stopped(): self.emit("user_stopped_speaking") @self._session.on("function_calls_collected") def _function_calls_collected(fnc_call_infos: list[llm.FunctionCallInfo]): self.emit("function_calls_collected", fnc_call_infos) @self._session.on("function_calls_finished") def _function_calls_finished(called_fncs: list[llm.CalledFunction]): self.emit("function_calls_finished", called_fncs) @self._session.on("metrics_collected") def _metrics_collected(metrics: MultimodalLLMMetrics): self.emit("metrics_collected", metrics)
Inherited members