Module livekit.agents.multimodal

Sub-modules

livekit.agents.multimodal.agent_playout
livekit.agents.multimodal.multimodal_agent

Classes

class AgentTranscriptionOptions (user_transcription: bool = True,
agent_transcription: bool = True,
agent_transcription_speed: float = 1.0,
sentence_tokenizer: tokenize.SentenceTokenizer = <livekit.agents.tokenize.basic.SentenceTokenizer object>,
word_tokenizer: tokenize.WordTokenizer = <livekit.agents.tokenize.basic.WordTokenizer object>,
hyphenate_word: Callable[[str], list[str]] = <function hyphenate_word>)
Expand source code
@dataclass(frozen=True)
class AgentTranscriptionOptions:
    user_transcription: bool = True
    """Whether to forward the user transcription to the client"""
    agent_transcription: bool = True
    """Whether to forward the agent transcription to the client"""
    agent_transcription_speed: float = 1.0
    """The speed at which the agent's speech transcription is forwarded to the client.
    We try to mimic the agent's speech speed by adjusting the transcription speed."""
    sentence_tokenizer: tokenize.SentenceTokenizer = tokenize.basic.SentenceTokenizer()
    """The tokenizer used to split the speech into sentences.
    This is used to decide when to mark a transcript as final for the agent transcription."""
    word_tokenizer: tokenize.WordTokenizer = tokenize.basic.WordTokenizer(
        ignore_punctuation=False
    )
    """The tokenizer used to split the speech into words.
    This is used to simulate the "interim results" of the agent transcription."""
    hyphenate_word: Callable[[str], list[str]] = tokenize.basic.hyphenate_word
    """A function that takes a string (word) as input and returns a list of strings,
    representing the hyphenated parts of the word."""

AgentTranscriptionOptions(user_transcription: 'bool' = True, agent_transcription: 'bool' = True, agent_transcription_speed: 'float' = 1.0, sentence_tokenizer: 'tokenize.SentenceTokenizer' = , word_tokenizer: 'tokenize.WordTokenizer' = , hyphenate_word: 'Callable[[str], list[str]]' = )

Class variables

var agent_transcription : bool

Whether to forward the agent transcription to the client

var agent_transcription_speed : float

The speed at which the agent's speech transcription is forwarded to the client. We try to mimic the agent's speech speed by adjusting the transcription speed.

var sentence_tokenizerSentenceTokenizer

The tokenizer used to split the speech into sentences. This is used to decide when to mark a transcript as final for the agent transcription.

var user_transcription : bool

Whether to forward the user transcription to the client

var word_tokenizerWordTokenizer

The tokenizer used to split the speech into words. This is used to simulate the "interim results" of the agent transcription.

Methods

def hyphenate_word(word: str) ‑> list[str]
Expand source code
def hyphenate_word(word: str) -> list[str]:
    return _basic_hyphenator.hyphenate_word(word)
class MultimodalAgent (*,
model: _RealtimeAPI,
vad: vad.VAD | None = None,
chat_ctx: llm.ChatContext | None = None,
fnc_ctx: llm.FunctionContext | None = None,
transcription: AgentTranscriptionOptions = AgentTranscriptionOptions(user_transcription=True, agent_transcription=True, agent_transcription_speed=1.0, sentence_tokenizer=<livekit.agents.tokenize.basic.SentenceTokenizer object>, word_tokenizer=<livekit.agents.tokenize.basic.WordTokenizer object>, hyphenate_word=<function hyphenate_word>),
max_text_response_retries: int = 5,
loop: asyncio.AbstractEventLoop | None = None)
Expand source code
class MultimodalAgent(utils.EventEmitter[EventTypes]):
    def __init__(
        self,
        *,
        model: _RealtimeAPI,
        vad: vad.VAD | None = None,
        chat_ctx: llm.ChatContext | None = None,
        fnc_ctx: llm.FunctionContext | None = None,
        transcription: AgentTranscriptionOptions = AgentTranscriptionOptions(),
        max_text_response_retries: int = 5,
        loop: asyncio.AbstractEventLoop | None = None,
    ):
        """Create a new MultimodalAgent.

        Args:
            model: RealtimeAPI instance.
            vad: Voice Activity Detection (VAD) instance.
            chat_ctx: Chat context for the assistant.
            fnc_ctx: Function context for the assistant.
            transcription: Options for assistant transcription.
            max_text_response_retries: Maximum number of retries to recover
                from text responses to audio mode. OpenAI's realtime API has a
                chance to return text responses instead of audio if the chat
                context includes text system or assistant messages. The agent will
                attempt to recover to audio mode by deleting the text response
                and appending an empty audio message to the conversation.
            loop: Event loop to use. Default to asyncio.get_event_loop().
        """
        super().__init__()
        self._loop = loop or asyncio.get_event_loop()

        self._model = model
        self._vad = vad
        self._chat_ctx = chat_ctx
        self._fnc_ctx = fnc_ctx

        self._opts = _ImplOptions(
            transcription=transcription,
        )

        # audio input
        self._read_micro_atask: asyncio.Task | None = None
        self._subscribed_track: rtc.RemoteAudioTrack | None = None
        self._input_audio_ch = utils.aio.Chan[rtc.AudioFrame]()

        # audio output
        self._playing_handle: agent_playout.PlayoutHandle | None = None

        self._linked_participant: rtc.RemoteParticipant | None = None
        self._started, self._closed = False, False

        self._update_state_task: asyncio.Task | None = None
        self._http_session: aiohttp.ClientSession | None = None

        self._text_response_retries = 0
        self._max_text_response_retries = max_text_response_retries

    @property
    def vad(self) -> vad.VAD | None:
        return self._vad

    @property
    def fnc_ctx(self) -> llm.FunctionContext | None:
        return self._session.fnc_ctx

    @fnc_ctx.setter
    def fnc_ctx(self, value: llm.FunctionContext | None) -> None:
        self._session.fnc_ctx = value

    def chat_ctx_copy(self) -> llm.ChatContext:
        return self._session.chat_ctx_copy()

    async def set_chat_ctx(self, ctx: llm.ChatContext) -> None:
        await self._session.set_chat_ctx(ctx)

    def start(
        self, room: rtc.Room, participant: rtc.RemoteParticipant | str | None = None
    ) -> None:
        if self._started:
            raise RuntimeError("voice assistant already started")

        room.on("participant_connected", self._on_participant_connected)
        room.on("track_published", self._subscribe_to_microphone)
        room.on("track_subscribed", self._subscribe_to_microphone)

        self._room, self._participant = room, participant

        if participant is not None:
            if isinstance(participant, rtc.RemoteParticipant):
                self._link_participant(participant.identity)
            else:
                self._link_participant(participant)
        else:
            # no participant provided, try to find the first participant in the room
            for participant in self._room.remote_participants.values():
                self._link_participant(participant.identity)
                break

        self._session = self._model.session(
            chat_ctx=self._chat_ctx, fnc_ctx=self._fnc_ctx
        )

        # Create a task to wait for initialization and start the main task
        async def _init_and_start():
            try:
                await self._session._init_sync_task
                logger.info("Session initialized with chat context")
                self._main_atask = asyncio.create_task(self._main_task())
            except Exception as e:
                logger.exception("Failed to initialize session")
                raise e

        # Schedule the initialization and start task
        asyncio.create_task(_init_and_start())

        @self._session.on("response_content_added")
        def _on_content_added(message: _ContentProto):
            tr_fwd = transcription.TTSSegmentsForwarder(
                room=self._room,
                participant=self._room.local_participant,
                speed=self._opts.transcription.agent_transcription_speed,
                sentence_tokenizer=self._opts.transcription.sentence_tokenizer,
                word_tokenizer=self._opts.transcription.word_tokenizer,
                hyphenate_word=self._opts.transcription.hyphenate_word,
            )

            self._playing_handle = self._agent_playout.play(
                item_id=message.item_id,
                content_index=message.content_index,
                transcription_fwd=tr_fwd,
                text_stream=message.text_stream,
                audio_stream=message.audio_stream,
            )

        @self._session.on("response_content_done")
        def _response_content_done(message: _ContentProto):
            if message.content_type == "text":
                if self._text_response_retries >= self._max_text_response_retries:
                    raise RuntimeError(
                        f"The OpenAI Realtime API returned a text response "
                        f"after {self._max_text_response_retries} retries. "
                        f"Please try to reduce the number of text system or "
                        f"assistant messages in the chat context."
                    )

                self._text_response_retries += 1
                logger.warning(
                    "The OpenAI Realtime API returned a text response instead of audio. "
                    "Attempting to recover to audio mode...",
                    extra={
                        "item_id": message.item_id,
                        "text": message.text,
                        "retries": self._text_response_retries,
                    },
                )
                self._session._recover_from_text_response(message.item_id)
            else:
                self._text_response_retries = 0

        @self._session.on("input_speech_committed")
        def _input_speech_committed():
            self._stt_forwarder.update(
                stt.SpeechEvent(
                    type=stt.SpeechEventType.INTERIM_TRANSCRIPT,
                    alternatives=[stt.SpeechData(language="", text="")],
                )
            )

        @self._session.on("input_speech_transcription_completed")
        def _input_speech_transcription_completed(ev: _InputTranscriptionProto):
            self._stt_forwarder.update(
                stt.SpeechEvent(
                    type=stt.SpeechEventType.FINAL_TRANSCRIPT,
                    alternatives=[stt.SpeechData(language="", text=ev.transcript)],
                )
            )
            if self._model.capabilities.supports_truncate:
                user_msg = ChatMessage.create(
                    text=ev.transcript, role="user", id=ev.item_id
                )

                self._session._update_conversation_item_content(
                    ev.item_id, user_msg.content
                )

            self._emit_speech_committed("user", ev.transcript)

        @self._session.on("agent_speech_transcription_completed")
        def _agent_speech_transcription_completed(ev: _InputTranscriptionProto):
            self._agent_stt_forwarder.update(
                stt.SpeechEvent(
                    type=stt.SpeechEventType.FINAL_TRANSCRIPT,
                    alternatives=[stt.SpeechData(language="", text=ev.transcript)],
                )
            )
            self._emit_speech_committed("agent", ev.transcript)

        # Similar to _input_speech_started, this handles updating the state to "listening" when the agent's speech is complete.
        # However, since Gemini doesn't support VAD events, we are not emitting the `user_started_speaking` event here.
        @self._session.on("agent_speech_stopped")
        def _agent_speech_stopped():
            self.interrupt()

        @self._session.on("input_speech_started")
        def _input_speech_started():
            self.emit("user_started_speaking")
            self.interrupt()

        @self._session.on("input_speech_stopped")
        def _input_speech_stopped():
            self.emit("user_stopped_speaking")

        @self._session.on("function_calls_collected")
        def _function_calls_collected(fnc_call_infos: list[llm.FunctionCallInfo]):
            self.emit("function_calls_collected", fnc_call_infos)

        @self._session.on("function_calls_finished")
        def _function_calls_finished(called_fncs: list[llm.CalledFunction]):
            self.emit("function_calls_finished", called_fncs)

        @self._session.on("metrics_collected")
        def _metrics_collected(metrics: MultimodalLLMMetrics):
            self.emit("metrics_collected", metrics)

    def interrupt(self) -> None:
        if self._playing_handle is not None and not self._playing_handle.done():
            self._playing_handle.interrupt()

            if self._model.capabilities.supports_truncate:
                self._session.cancel_response()  # Only supported by OpenAI

                self._session._truncate_conversation_item(
                    item_id=self._playing_handle.item_id,
                    content_index=self._playing_handle.content_index,
                    audio_end_ms=int(self._playing_handle.audio_samples / 24000 * 1000),
                )
        self._update_state("listening")

    def generate_reply(
        self,
        on_duplicate: Literal[
            "cancel_existing", "cancel_new", "keep_both"
        ] = "cancel_existing",
    ) -> None:
        """Generate a reply from the agent"""
        if not self._session.server_vad_enabled:
            self._session.commit_audio_buffer()
        self._session.create_response(on_duplicate=on_duplicate)

    def _update_state(self, state: AgentState, delay: float = 0.0):
        """Set the current state of the agent"""

        @utils.log_exceptions(logger=logger)
        async def _run_task(delay: float) -> None:
            await asyncio.sleep(delay)

            if self._room.isconnected():
                await self._room.local_participant.set_attributes(
                    {ATTRIBUTE_AGENT_STATE: state}
                )

        if self._update_state_task is not None:
            self._update_state_task.cancel()

        self._update_state_task = asyncio.create_task(_run_task(delay))

    @utils.log_exceptions(logger=logger)
    async def _main_task(self) -> None:
        self._update_state("initializing")
        self._audio_source = rtc.AudioSource(24000, 1)
        track = rtc.LocalAudioTrack.create_audio_track(
            "assistant_voice", self._audio_source
        )
        self._agent_publication = await self._room.local_participant.publish_track(
            track, rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_MICROPHONE)
        )
        self._agent_stt_forwarder = transcription.STTSegmentsForwarder(
            room=self._room,
            participant=self._room.local_participant,
            track=track,
        )
        self._agent_playout = agent_playout.AgentPlayout(
            audio_source=self._audio_source
        )

        def _on_playout_started() -> None:
            self.emit("agent_started_speaking")
            self._update_state("speaking")

        def _on_playout_stopped(interrupted: bool) -> None:
            self.emit("agent_stopped_speaking")
            self._update_state("listening")

            if self._playing_handle is not None:
                collected_text = self._playing_handle._tr_fwd.played_text
                if interrupted:
                    collected_text += "..."

                if self._model.capabilities.supports_truncate and collected_text:
                    msg = ChatMessage.create(
                        text=collected_text,
                        role="assistant",
                        id=self._playing_handle.item_id,
                    )
                    self._session._update_conversation_item_content(
                        self._playing_handle.item_id, msg.content
                    )

                    self._emit_speech_committed("agent", collected_text, interrupted)

        self._agent_playout.on("playout_started", _on_playout_started)
        self._agent_playout.on("playout_stopped", _on_playout_stopped)

        await self._agent_publication.wait_for_subscription()

        bstream = utils.audio.AudioByteStream(
            24000,
            1,
            samples_per_channel=2400,
        )
        async for frame in self._input_audio_ch:
            for f in bstream.write(frame.data.tobytes()):
                self._session._push_audio(f)

    def _on_participant_connected(self, participant: rtc.RemoteParticipant):
        if self._linked_participant is None:
            return

        self._link_participant(participant.identity)

    def _link_participant(self, participant_identity: str) -> None:
        self._linked_participant = self._room.remote_participants.get(
            participant_identity
        )
        if self._linked_participant is None:
            logger.error("_link_participant must be called with a valid identity")
            return

        self._subscribe_to_microphone()

    async def _micro_task(self, track: rtc.LocalAudioTrack) -> None:
        sample_rate = self._model.capabilities.input_audio_sample_rate
        if sample_rate is None:
            sample_rate = 24000

        input_stream = rtc.AudioStream(track, sample_rate=sample_rate, num_channels=1)
        async for ev in input_stream:
            self._input_audio_ch.send_nowait(ev.frame)

    def _subscribe_to_microphone(self, *args, **kwargs) -> None:
        """Subscribe to the participant microphone if found"""

        if self._linked_participant is None:
            return

        for publication in self._linked_participant.track_publications.values():
            if publication.source != rtc.TrackSource.SOURCE_MICROPHONE:
                continue

            if not publication.subscribed:
                publication.set_subscribed(True)

            if (
                publication.track is not None
                and publication.track != self._subscribed_track
            ):
                self._subscribed_track = publication.track  # type: ignore
                self._stt_forwarder = transcription.STTSegmentsForwarder(
                    room=self._room,
                    participant=self._linked_participant,
                    track=self._subscribed_track,
                )

                if self._read_micro_atask is not None:
                    self._read_micro_atask.cancel()

                self._read_micro_atask = asyncio.create_task(
                    self._micro_task(self._subscribed_track)  # type: ignore
                )
                break

    def _ensure_session(self) -> aiohttp.ClientSession:
        if not self._http_session:
            self._http_session = utils.http_context.http_session()

        return self._http_session

    def _emit_speech_committed(
        self, speaker: Literal["user", "agent"], msg: str, interrupted: bool = False
    ):
        if speaker == "user":
            self.emit("user_speech_committed", msg)
        else:
            if interrupted:
                self.emit("agent_speech_interrupted", msg)
            else:
                self.emit("agent_speech_committed", msg)

        logger.debug(
            f"committed {speaker} speech",
            extra={
                f"{speaker}_transcript": msg,
                "interrupted": interrupted,
            },
        )

Abstract base class for generic types.

On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::

class Mapping[KT, VT]:
    def __getitem__(self, key: KT) -> VT:
        ...
    # Etc.

On older versions of Python, however, generic classes have to explicitly inherit from Generic.

After a class has been declared to be generic, it can then be used as follows::

def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT:
    try:
        return mapping[key]
    except KeyError:
        return default

Create a new MultimodalAgent.

Args

model
RealtimeAPI instance.
vad
Voice Activity Detection (VAD) instance.
chat_ctx
Chat context for the assistant.
fnc_ctx
Function context for the assistant.
transcription
Options for assistant transcription.
max_text_response_retries
Maximum number of retries to recover from text responses to audio mode. OpenAI's realtime API has a chance to return text responses instead of audio if the chat context includes text system or assistant messages. The agent will attempt to recover to audio mode by deleting the text response and appending an empty audio message to the conversation.
loop
Event loop to use. Default to asyncio.get_event_loop().

Ancestors

Instance variables

prop fnc_ctx : llm.FunctionContext | None
Expand source code
@property
def fnc_ctx(self) -> llm.FunctionContext | None:
    return self._session.fnc_ctx
prop vad : vad.VAD | None
Expand source code
@property
def vad(self) -> vad.VAD | None:
    return self._vad

Methods

def chat_ctx_copy(self) ‑> ChatContext
Expand source code
def chat_ctx_copy(self) -> llm.ChatContext:
    return self._session.chat_ctx_copy()
def generate_reply(self,
on_duplicate: "Literal['cancel_existing', 'cancel_new', 'keep_both']" = 'cancel_existing') ‑> None
Expand source code
def generate_reply(
    self,
    on_duplicate: Literal[
        "cancel_existing", "cancel_new", "keep_both"
    ] = "cancel_existing",
) -> None:
    """Generate a reply from the agent"""
    if not self._session.server_vad_enabled:
        self._session.commit_audio_buffer()
    self._session.create_response(on_duplicate=on_duplicate)

Generate a reply from the agent

def interrupt(self) ‑> None
Expand source code
def interrupt(self) -> None:
    if self._playing_handle is not None and not self._playing_handle.done():
        self._playing_handle.interrupt()

        if self._model.capabilities.supports_truncate:
            self._session.cancel_response()  # Only supported by OpenAI

            self._session._truncate_conversation_item(
                item_id=self._playing_handle.item_id,
                content_index=self._playing_handle.content_index,
                audio_end_ms=int(self._playing_handle.audio_samples / 24000 * 1000),
            )
    self._update_state("listening")
async def set_chat_ctx(self, ctx: llm.ChatContext) ‑> None
Expand source code
async def set_chat_ctx(self, ctx: llm.ChatContext) -> None:
    await self._session.set_chat_ctx(ctx)
def start(self, room: rtc.Room, participant: rtc.RemoteParticipant | str | None = None) ‑> None
Expand source code
def start(
    self, room: rtc.Room, participant: rtc.RemoteParticipant | str | None = None
) -> None:
    if self._started:
        raise RuntimeError("voice assistant already started")

    room.on("participant_connected", self._on_participant_connected)
    room.on("track_published", self._subscribe_to_microphone)
    room.on("track_subscribed", self._subscribe_to_microphone)

    self._room, self._participant = room, participant

    if participant is not None:
        if isinstance(participant, rtc.RemoteParticipant):
            self._link_participant(participant.identity)
        else:
            self._link_participant(participant)
    else:
        # no participant provided, try to find the first participant in the room
        for participant in self._room.remote_participants.values():
            self._link_participant(participant.identity)
            break

    self._session = self._model.session(
        chat_ctx=self._chat_ctx, fnc_ctx=self._fnc_ctx
    )

    # Create a task to wait for initialization and start the main task
    async def _init_and_start():
        try:
            await self._session._init_sync_task
            logger.info("Session initialized with chat context")
            self._main_atask = asyncio.create_task(self._main_task())
        except Exception as e:
            logger.exception("Failed to initialize session")
            raise e

    # Schedule the initialization and start task
    asyncio.create_task(_init_and_start())

    @self._session.on("response_content_added")
    def _on_content_added(message: _ContentProto):
        tr_fwd = transcription.TTSSegmentsForwarder(
            room=self._room,
            participant=self._room.local_participant,
            speed=self._opts.transcription.agent_transcription_speed,
            sentence_tokenizer=self._opts.transcription.sentence_tokenizer,
            word_tokenizer=self._opts.transcription.word_tokenizer,
            hyphenate_word=self._opts.transcription.hyphenate_word,
        )

        self._playing_handle = self._agent_playout.play(
            item_id=message.item_id,
            content_index=message.content_index,
            transcription_fwd=tr_fwd,
            text_stream=message.text_stream,
            audio_stream=message.audio_stream,
        )

    @self._session.on("response_content_done")
    def _response_content_done(message: _ContentProto):
        if message.content_type == "text":
            if self._text_response_retries >= self._max_text_response_retries:
                raise RuntimeError(
                    f"The OpenAI Realtime API returned a text response "
                    f"after {self._max_text_response_retries} retries. "
                    f"Please try to reduce the number of text system or "
                    f"assistant messages in the chat context."
                )

            self._text_response_retries += 1
            logger.warning(
                "The OpenAI Realtime API returned a text response instead of audio. "
                "Attempting to recover to audio mode...",
                extra={
                    "item_id": message.item_id,
                    "text": message.text,
                    "retries": self._text_response_retries,
                },
            )
            self._session._recover_from_text_response(message.item_id)
        else:
            self._text_response_retries = 0

    @self._session.on("input_speech_committed")
    def _input_speech_committed():
        self._stt_forwarder.update(
            stt.SpeechEvent(
                type=stt.SpeechEventType.INTERIM_TRANSCRIPT,
                alternatives=[stt.SpeechData(language="", text="")],
            )
        )

    @self._session.on("input_speech_transcription_completed")
    def _input_speech_transcription_completed(ev: _InputTranscriptionProto):
        self._stt_forwarder.update(
            stt.SpeechEvent(
                type=stt.SpeechEventType.FINAL_TRANSCRIPT,
                alternatives=[stt.SpeechData(language="", text=ev.transcript)],
            )
        )
        if self._model.capabilities.supports_truncate:
            user_msg = ChatMessage.create(
                text=ev.transcript, role="user", id=ev.item_id
            )

            self._session._update_conversation_item_content(
                ev.item_id, user_msg.content
            )

        self._emit_speech_committed("user", ev.transcript)

    @self._session.on("agent_speech_transcription_completed")
    def _agent_speech_transcription_completed(ev: _InputTranscriptionProto):
        self._agent_stt_forwarder.update(
            stt.SpeechEvent(
                type=stt.SpeechEventType.FINAL_TRANSCRIPT,
                alternatives=[stt.SpeechData(language="", text=ev.transcript)],
            )
        )
        self._emit_speech_committed("agent", ev.transcript)

    # Similar to _input_speech_started, this handles updating the state to "listening" when the agent's speech is complete.
    # However, since Gemini doesn't support VAD events, we are not emitting the `user_started_speaking` event here.
    @self._session.on("agent_speech_stopped")
    def _agent_speech_stopped():
        self.interrupt()

    @self._session.on("input_speech_started")
    def _input_speech_started():
        self.emit("user_started_speaking")
        self.interrupt()

    @self._session.on("input_speech_stopped")
    def _input_speech_stopped():
        self.emit("user_stopped_speaking")

    @self._session.on("function_calls_collected")
    def _function_calls_collected(fnc_call_infos: list[llm.FunctionCallInfo]):
        self.emit("function_calls_collected", fnc_call_infos)

    @self._session.on("function_calls_finished")
    def _function_calls_finished(called_fncs: list[llm.CalledFunction]):
        self.emit("function_calls_finished", called_fncs)

    @self._session.on("metrics_collected")
    def _metrics_collected(metrics: MultimodalLLMMetrics):
        self.emit("metrics_collected", metrics)

Inherited members

class _RealtimeAPI (*args, **kwargs)
Expand source code
class _RealtimeAPI(Protocol):
    """Realtime API protocol"""

    @property
    def capabilities(self) -> _CapabilitiesProto: ...
    def session(
        self,
        *,
        chat_ctx: llm.ChatContext | None = None,
        fnc_ctx: llm.FunctionContext | None = None,
    ) -> _RealtimeAPISession:
        """
        Create a new realtime session with the given chat and function contexts.
        """
        pass

Realtime API protocol

Ancestors

  • typing.Protocol
  • typing.Generic

Instance variables

prop capabilities : _CapabilitiesProto
Expand source code
@property
def capabilities(self) -> _CapabilitiesProto: ...

Methods

def session(self,
*,
chat_ctx: llm.ChatContext | None = None,
fnc_ctx: llm.FunctionContext | None = None) ‑> livekit.agents.multimodal.multimodal_agent._RealtimeAPISession
Expand source code
def session(
    self,
    *,
    chat_ctx: llm.ChatContext | None = None,
    fnc_ctx: llm.FunctionContext | None = None,
) -> _RealtimeAPISession:
    """
    Create a new realtime session with the given chat and function contexts.
    """
    pass

Create a new realtime session with the given chat and function contexts.

class _RealtimeAPISession (*args, **kwargs)
Expand source code
class _RealtimeAPISession(Protocol):
    async def set_chat_ctx(self, ctx: llm.ChatContext) -> None: ...
    @overload
    def on(self, event: str, callback: None = None) -> Callable[[T], T]: ...
    @overload
    def on(self, event: str, callback: T) -> T: ...
    def on(
        self, event: str, callback: Optional[T] = None
    ) -> Union[T, Callable[[T], T]]: ...

    def _push_audio(self, frame: rtc.AudioFrame) -> None: ...
    @property
    def fnc_ctx(self) -> llm.FunctionContext | None: ...
    @fnc_ctx.setter
    def fnc_ctx(self, value: llm.FunctionContext | None) -> None: ...

    def chat_ctx_copy(self) -> llm.ChatContext: ...

    def cancel_response(self) -> None: ...
    def create_response(
        self,
        on_duplicate: Literal[
            "cancel_existing", "cancel_new", "keep_both"
        ] = "keep_both",
    ) -> None: ...
    def commit_audio_buffer(self) -> None: ...
    @property
    def server_vad_enabled(self) -> bool: ...

    def _recover_from_text_response(self, item_id: str) -> None: ...
    def _update_conversation_item_content(
        self,
        item_id: str,
        content: llm.ChatContent | list[llm.ChatContent] | None = None,
    ) -> None: ...
    def _truncate_conversation_item(
        self, item_id: str, content_index: int, audio_end_ms: int
    ) -> None: ...

Base class for protocol classes.

Protocol classes are defined as::

class Proto(Protocol):
    def meth(self) -> int:
        ...

Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing).

For example::

class C:
    def meth(self) -> int:
        return 0

def func(x: Proto) -> int:
    return x.meth()

func(C())  # Passes static type check

See PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as::

class GenProto[T](Protocol):
    def meth(self) -> T:
        ...

Ancestors

  • typing.Protocol
  • typing.Generic

Instance variables

prop fnc_ctx : llm.FunctionContext | None
Expand source code
@property
def fnc_ctx(self) -> llm.FunctionContext | None: ...
prop server_vad_enabled : bool
Expand source code
@property
def server_vad_enabled(self) -> bool: ...

Methods

def cancel_response(self) ‑> None
Expand source code
def cancel_response(self) -> None: ...
def chat_ctx_copy(self) ‑> ChatContext
Expand source code
def chat_ctx_copy(self) -> llm.ChatContext: ...
def commit_audio_buffer(self) ‑> None
Expand source code
def commit_audio_buffer(self) -> None: ...
def create_response(self,
on_duplicate: "Literal['cancel_existing', 'cancel_new', 'keep_both']" = 'keep_both') ‑> None
Expand source code
def create_response(
    self,
    on_duplicate: Literal[
        "cancel_existing", "cancel_new", "keep_both"
    ] = "keep_both",
) -> None: ...
def on(self, event: str, callback: Optional[T] = None) ‑> ~T | Callable[[~T], ~T]
Expand source code
def on(
    self, event: str, callback: Optional[T] = None
) -> Union[T, Callable[[T], T]]: ...
async def set_chat_ctx(self, ctx: llm.ChatContext) ‑> None
Expand source code
async def set_chat_ctx(self, ctx: llm.ChatContext) -> None: ...