Module livekit.agents.multimodal
Sub-modules
livekit.agents.multimodal.agent_playout
livekit.agents.multimodal.multimodal_agent
Classes
class AgentTranscriptionOptions (user_transcription: bool = True,
agent_transcription: bool = True,
agent_transcription_speed: float = 1.0,
sentence_tokenizer: tokenize.SentenceTokenizer = <livekit.agents.tokenize.basic.SentenceTokenizer object>,
word_tokenizer: tokenize.WordTokenizer = <livekit.agents.tokenize.basic.WordTokenizer object>,
hyphenate_word: Callable[[str], list[str]] = <function hyphenate_word>)-
Expand source code
@dataclass(frozen=True) class AgentTranscriptionOptions: user_transcription: bool = True """Whether to forward the user transcription to the client""" agent_transcription: bool = True """Whether to forward the agent transcription to the client""" agent_transcription_speed: float = 1.0 """The speed at which the agent's speech transcription is forwarded to the client. We try to mimic the agent's speech speed by adjusting the transcription speed.""" sentence_tokenizer: tokenize.SentenceTokenizer = tokenize.basic.SentenceTokenizer() """The tokenizer used to split the speech into sentences. This is used to decide when to mark a transcript as final for the agent transcription.""" word_tokenizer: tokenize.WordTokenizer = tokenize.basic.WordTokenizer( ignore_punctuation=False ) """The tokenizer used to split the speech into words. This is used to simulate the "interim results" of the agent transcription.""" hyphenate_word: Callable[[str], list[str]] = tokenize.basic.hyphenate_word """A function that takes a string (word) as input and returns a list of strings, representing the hyphenated parts of the word."""
AgentTranscriptionOptions(user_transcription: 'bool' = True, agent_transcription: 'bool' = True, agent_transcription_speed: 'float' = 1.0, sentence_tokenizer: 'tokenize.SentenceTokenizer' =
, word_tokenizer: 'tokenize.WordTokenizer' = , hyphenate_word: 'Callable[[str], list[str]]' = ) Class variables
var agent_transcription : bool
-
Whether to forward the agent transcription to the client
var agent_transcription_speed : float
-
The speed at which the agent's speech transcription is forwarded to the client. We try to mimic the agent's speech speed by adjusting the transcription speed.
var sentence_tokenizer : SentenceTokenizer
-
The tokenizer used to split the speech into sentences. This is used to decide when to mark a transcript as final for the agent transcription.
var user_transcription : bool
-
Whether to forward the user transcription to the client
var word_tokenizer : WordTokenizer
-
The tokenizer used to split the speech into words. This is used to simulate the "interim results" of the agent transcription.
Methods
def hyphenate_word(word: str) ‑> list[str]
-
Expand source code
def hyphenate_word(word: str) -> list[str]: return _basic_hyphenator.hyphenate_word(word)
class MultimodalAgent (*,
model: S2SModel,
vad: vad.VAD | None = None,
chat_ctx: llm.ChatContext | None = None,
fnc_ctx: llm.FunctionContext | None = None,
transcription: AgentTranscriptionOptions = AgentTranscriptionOptions(user_transcription=True, agent_transcription=True, agent_transcription_speed=1.0, sentence_tokenizer=<livekit.agents.tokenize.basic.SentenceTokenizer object>, word_tokenizer=<livekit.agents.tokenize.basic.WordTokenizer object>, hyphenate_word=<function hyphenate_word>),
max_text_response_retries: int = 5,
loop: asyncio.AbstractEventLoop | None = None)-
Expand source code
class MultimodalAgent(utils.EventEmitter[EventTypes]): def __init__( self, *, model: S2SModel, vad: vad.VAD | None = None, chat_ctx: llm.ChatContext | None = None, fnc_ctx: llm.FunctionContext | None = None, transcription: AgentTranscriptionOptions = AgentTranscriptionOptions(), max_text_response_retries: int = 5, loop: asyncio.AbstractEventLoop | None = None, ): """Create a new MultimodalAgent. Args: model: S2SModel instance. vad: Voice Activity Detection (VAD) instance. chat_ctx: Chat context for the assistant. fnc_ctx: Function context for the assistant. transcription: Options for assistant transcription. max_text_response_retries: Maximum number of retries to recover from text responses to audio mode. OpenAI's realtime API has a chance to return text responses instead of audio if the chat context includes text system or assistant messages. The agent will attempt to recover to audio mode by deleting the text response and appending an empty audio message to the conversation. loop: Event loop to use. Default to asyncio.get_event_loop(). """ super().__init__() self._loop = loop or asyncio.get_event_loop() from livekit.plugins.openai import realtime assert isinstance(model, realtime.RealtimeModel) self._model = model self._vad = vad self._chat_ctx = chat_ctx self._fnc_ctx = fnc_ctx self._opts = _ImplOptions( transcription=transcription, ) # audio input self._read_micro_atask: asyncio.Task | None = None self._subscribed_track: rtc.RemoteAudioTrack | None = None self._input_audio_ch = utils.aio.Chan[rtc.AudioFrame]() # audio output self._playing_handle: agent_playout.PlayoutHandle | None = None self._linked_participant: rtc.RemoteParticipant | None = None self._started, self._closed = False, False self._update_state_task: asyncio.Task | None = None self._http_session: aiohttp.ClientSession | None = None self._text_response_retries = 0 self._max_text_response_retries = max_text_response_retries @property def vad(self) -> vad.VAD | None: return self._vad @property def fnc_ctx(self) -> llm.FunctionContext | None: return self._session.fnc_ctx @fnc_ctx.setter def fnc_ctx(self, value: llm.FunctionContext | None) -> None: self._session.fnc_ctx = value def chat_ctx_copy(self) -> llm.ChatContext: return self._session.chat_ctx_copy() async def set_chat_ctx(self, ctx: llm.ChatContext) -> None: await self._session.set_chat_ctx(ctx) def start( self, room: rtc.Room, participant: rtc.RemoteParticipant | str | None = None ) -> None: if self._started: raise RuntimeError("voice assistant already started") room.on("participant_connected", self._on_participant_connected) room.on("track_published", self._subscribe_to_microphone) room.on("track_subscribed", self._subscribe_to_microphone) self._room, self._participant = room, participant if participant is not None: if isinstance(participant, rtc.RemoteParticipant): self._link_participant(participant.identity) else: self._link_participant(participant) else: # no participant provided, try to find the first participant in the room for participant in self._room.remote_participants.values(): self._link_participant(participant.identity) break self._session = self._model.session( chat_ctx=self._chat_ctx, fnc_ctx=self._fnc_ctx ) # Create a task to wait for initialization and start the main task async def _init_and_start(): try: await self._session._init_sync_task logger.info("Session initialized with chat context") self._main_atask = asyncio.create_task(self._main_task()) except Exception as e: logger.exception("Failed to initialize session") raise e # Schedule the initialization and start task asyncio.create_task(_init_and_start()) from livekit.plugins.openai import realtime @self._session.on("response_content_added") def _on_content_added(message: realtime.RealtimeContent): if message.content_type == "text": return tr_fwd = transcription.TTSSegmentsForwarder( room=self._room, participant=self._room.local_participant, speed=self._opts.transcription.agent_transcription_speed, sentence_tokenizer=self._opts.transcription.sentence_tokenizer, word_tokenizer=self._opts.transcription.word_tokenizer, hyphenate_word=self._opts.transcription.hyphenate_word, ) self._playing_handle = self._agent_playout.play( item_id=message.item_id, content_index=message.content_index, transcription_fwd=tr_fwd, text_stream=message.text_stream, audio_stream=message.audio_stream, ) @self._session.on("response_content_done") def _response_content_done(message: realtime.RealtimeContent): if message.content_type == "text": if self._text_response_retries >= self._max_text_response_retries: raise RuntimeError( f"The OpenAI Realtime API returned a text response " f"after {self._max_text_response_retries} retries. " f"Please try to reduce the number of text system or " f"assistant messages in the chat context." ) self._text_response_retries += 1 logger.warning( "The OpenAI Realtime API returned a text response instead of audio. " "Attempting to recover to audio mode...", extra={ "item_id": message.item_id, "text": message.text, "retries": self._text_response_retries, }, ) self._session._recover_from_text_response(message.item_id) else: self._text_response_retries = 0 @self._session.on("input_speech_committed") def _input_speech_committed(): self._stt_forwarder.update( stt.SpeechEvent( type=stt.SpeechEventType.INTERIM_TRANSCRIPT, alternatives=[stt.SpeechData(language="", text="")], ) ) @self._session.on("input_speech_transcription_completed") def _input_speech_transcription_completed( ev: realtime.InputTranscriptionCompleted, ): self._stt_forwarder.update( stt.SpeechEvent( type=stt.SpeechEventType.FINAL_TRANSCRIPT, alternatives=[stt.SpeechData(language="", text=ev.transcript)], ) ) user_msg = ChatMessage.create( text=ev.transcript, role="user", id=ev.item_id ) self._session._update_conversation_item_content( ev.item_id, user_msg.content ) self.emit("user_speech_committed", user_msg) logger.debug( "committed user speech", extra={"user_transcript": ev.transcript}, ) @self._session.on("input_speech_started") def _input_speech_started(): self.emit("user_started_speaking") self._update_state("listening") if self._playing_handle is not None and not self._playing_handle.done(): self._playing_handle.interrupt() self._session.conversation.item.truncate( item_id=self._playing_handle.item_id, content_index=self._playing_handle.content_index, audio_end_ms=int(self._playing_handle.audio_samples / 24000 * 1000), ) @self._session.on("input_speech_stopped") def _input_speech_stopped(): self.emit("user_stopped_speaking") @self._session.on("function_calls_collected") def _function_calls_collected(fnc_call_infos: list[llm.FunctionCallInfo]): self.emit("function_calls_collected", fnc_call_infos) @self._session.on("function_calls_finished") def _function_calls_finished(called_fncs: list[llm.CalledFunction]): self.emit("function_calls_finished", called_fncs) @self._session.on("metrics_collected") def _metrics_collected(metrics: MultimodalLLMMetrics): self.emit("metrics_collected", metrics) def _update_state(self, state: AgentState, delay: float = 0.0): """Set the current state of the agent""" @utils.log_exceptions(logger=logger) async def _run_task(delay: float) -> None: await asyncio.sleep(delay) if self._room.isconnected(): await self._room.local_participant.set_attributes( {ATTRIBUTE_AGENT_STATE: state} ) if self._update_state_task is not None: self._update_state_task.cancel() self._update_state_task = asyncio.create_task(_run_task(delay)) @utils.log_exceptions(logger=logger) async def _main_task(self) -> None: self._update_state("initializing") self._audio_source = rtc.AudioSource(24000, 1) self._agent_playout = agent_playout.AgentPlayout( audio_source=self._audio_source ) def _on_playout_started() -> None: self.emit("agent_started_speaking") self._update_state("speaking") def _on_playout_stopped(interrupted: bool) -> None: self.emit("agent_stopped_speaking") self._update_state("listening") if self._playing_handle is not None: collected_text = self._playing_handle._tr_fwd.played_text if interrupted: collected_text += "..." msg = ChatMessage.create( text=collected_text, role="assistant", id=self._playing_handle.item_id, ) self._session._update_conversation_item_content( self._playing_handle.item_id, msg.content ) if interrupted: self.emit("agent_speech_interrupted", msg) else: self.emit("agent_speech_committed", msg) logger.debug( "committed agent speech", extra={ "agent_transcript": collected_text, "interrupted": interrupted, }, ) self._agent_playout.on("playout_started", _on_playout_started) self._agent_playout.on("playout_stopped", _on_playout_stopped) track = rtc.LocalAudioTrack.create_audio_track( "assistant_voice", self._audio_source ) self._agent_publication = await self._room.local_participant.publish_track( track, rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_MICROPHONE) ) await self._agent_publication.wait_for_subscription() bstream = utils.audio.AudioByteStream( 24000, 1, samples_per_channel=2400, ) async for frame in self._input_audio_ch: for f in bstream.write(frame.data.tobytes()): self._session.input_audio_buffer.append(f) def _on_participant_connected(self, participant: rtc.RemoteParticipant): if self._linked_participant is None: return self._link_participant(participant.identity) def _link_participant(self, participant_identity: str) -> None: self._linked_participant = self._room.remote_participants.get( participant_identity ) if self._linked_participant is None: logger.error("_link_participant must be called with a valid identity") return self._subscribe_to_microphone() async def _micro_task(self, track: rtc.LocalAudioTrack) -> None: stream_24khz = rtc.AudioStream(track, sample_rate=24000, num_channels=1) async for ev in stream_24khz: self._input_audio_ch.send_nowait(ev.frame) def _subscribe_to_microphone(self, *args, **kwargs) -> None: """Subscribe to the participant microphone if found""" if self._linked_participant is None: return for publication in self._linked_participant.track_publications.values(): if publication.source != rtc.TrackSource.SOURCE_MICROPHONE: continue if not publication.subscribed: publication.set_subscribed(True) if ( publication.track is not None and publication.track != self._subscribed_track ): self._subscribed_track = publication.track # type: ignore self._stt_forwarder = transcription.STTSegmentsForwarder( room=self._room, participant=self._linked_participant, track=self._subscribed_track, ) if self._read_micro_atask is not None: self._read_micro_atask.cancel() self._read_micro_atask = asyncio.create_task( self._micro_task(self._subscribed_track) # type: ignore ) break def _ensure_session(self) -> aiohttp.ClientSession: if not self._http_session: self._http_session = utils.http_context.http_session() return self._http_session
Abstract base class for generic types.
On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::
class Mapping[KT, VT]: def __getitem__(self, key: KT) -> VT: ... # Etc.
On older versions of Python, however, generic classes have to explicitly inherit from Generic.
After a class has been declared to be generic, it can then be used as follows::
def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
Create a new MultimodalAgent.
Args
model
- S2SModel instance.
vad
- Voice Activity Detection (VAD) instance.
chat_ctx
- Chat context for the assistant.
fnc_ctx
- Function context for the assistant.
transcription
- Options for assistant transcription.
max_text_response_retries
- Maximum number of retries to recover from text responses to audio mode. OpenAI's realtime API has a chance to return text responses instead of audio if the chat context includes text system or assistant messages. The agent will attempt to recover to audio mode by deleting the text response and appending an empty audio message to the conversation.
loop
- Event loop to use. Default to asyncio.get_event_loop().
Ancestors
- EventEmitter
- typing.Generic
Instance variables
prop fnc_ctx : llm.FunctionContext | None
-
Expand source code
@property def fnc_ctx(self) -> llm.FunctionContext | None: return self._session.fnc_ctx
prop vad : vad.VAD | None
-
Expand source code
@property def vad(self) -> vad.VAD | None: return self._vad
Methods
def chat_ctx_copy(self) ‑> ChatContext
-
Expand source code
def chat_ctx_copy(self) -> llm.ChatContext: return self._session.chat_ctx_copy()
async def set_chat_ctx(self, ctx: llm.ChatContext) ‑> None
-
Expand source code
async def set_chat_ctx(self, ctx: llm.ChatContext) -> None: await self._session.set_chat_ctx(ctx)
def start(self, room: rtc.Room, participant: rtc.RemoteParticipant | str | None = None) ‑> None
-
Expand source code
def start( self, room: rtc.Room, participant: rtc.RemoteParticipant | str | None = None ) -> None: if self._started: raise RuntimeError("voice assistant already started") room.on("participant_connected", self._on_participant_connected) room.on("track_published", self._subscribe_to_microphone) room.on("track_subscribed", self._subscribe_to_microphone) self._room, self._participant = room, participant if participant is not None: if isinstance(participant, rtc.RemoteParticipant): self._link_participant(participant.identity) else: self._link_participant(participant) else: # no participant provided, try to find the first participant in the room for participant in self._room.remote_participants.values(): self._link_participant(participant.identity) break self._session = self._model.session( chat_ctx=self._chat_ctx, fnc_ctx=self._fnc_ctx ) # Create a task to wait for initialization and start the main task async def _init_and_start(): try: await self._session._init_sync_task logger.info("Session initialized with chat context") self._main_atask = asyncio.create_task(self._main_task()) except Exception as e: logger.exception("Failed to initialize session") raise e # Schedule the initialization and start task asyncio.create_task(_init_and_start()) from livekit.plugins.openai import realtime @self._session.on("response_content_added") def _on_content_added(message: realtime.RealtimeContent): if message.content_type == "text": return tr_fwd = transcription.TTSSegmentsForwarder( room=self._room, participant=self._room.local_participant, speed=self._opts.transcription.agent_transcription_speed, sentence_tokenizer=self._opts.transcription.sentence_tokenizer, word_tokenizer=self._opts.transcription.word_tokenizer, hyphenate_word=self._opts.transcription.hyphenate_word, ) self._playing_handle = self._agent_playout.play( item_id=message.item_id, content_index=message.content_index, transcription_fwd=tr_fwd, text_stream=message.text_stream, audio_stream=message.audio_stream, ) @self._session.on("response_content_done") def _response_content_done(message: realtime.RealtimeContent): if message.content_type == "text": if self._text_response_retries >= self._max_text_response_retries: raise RuntimeError( f"The OpenAI Realtime API returned a text response " f"after {self._max_text_response_retries} retries. " f"Please try to reduce the number of text system or " f"assistant messages in the chat context." ) self._text_response_retries += 1 logger.warning( "The OpenAI Realtime API returned a text response instead of audio. " "Attempting to recover to audio mode...", extra={ "item_id": message.item_id, "text": message.text, "retries": self._text_response_retries, }, ) self._session._recover_from_text_response(message.item_id) else: self._text_response_retries = 0 @self._session.on("input_speech_committed") def _input_speech_committed(): self._stt_forwarder.update( stt.SpeechEvent( type=stt.SpeechEventType.INTERIM_TRANSCRIPT, alternatives=[stt.SpeechData(language="", text="")], ) ) @self._session.on("input_speech_transcription_completed") def _input_speech_transcription_completed( ev: realtime.InputTranscriptionCompleted, ): self._stt_forwarder.update( stt.SpeechEvent( type=stt.SpeechEventType.FINAL_TRANSCRIPT, alternatives=[stt.SpeechData(language="", text=ev.transcript)], ) ) user_msg = ChatMessage.create( text=ev.transcript, role="user", id=ev.item_id ) self._session._update_conversation_item_content( ev.item_id, user_msg.content ) self.emit("user_speech_committed", user_msg) logger.debug( "committed user speech", extra={"user_transcript": ev.transcript}, ) @self._session.on("input_speech_started") def _input_speech_started(): self.emit("user_started_speaking") self._update_state("listening") if self._playing_handle is not None and not self._playing_handle.done(): self._playing_handle.interrupt() self._session.conversation.item.truncate( item_id=self._playing_handle.item_id, content_index=self._playing_handle.content_index, audio_end_ms=int(self._playing_handle.audio_samples / 24000 * 1000), ) @self._session.on("input_speech_stopped") def _input_speech_stopped(): self.emit("user_stopped_speaking") @self._session.on("function_calls_collected") def _function_calls_collected(fnc_call_infos: list[llm.FunctionCallInfo]): self.emit("function_calls_collected", fnc_call_infos) @self._session.on("function_calls_finished") def _function_calls_finished(called_fncs: list[llm.CalledFunction]): self.emit("function_calls_finished", called_fncs) @self._session.on("metrics_collected") def _metrics_collected(metrics: MultimodalLLMMetrics): self.emit("metrics_collected", metrics)
Inherited members