Module livekit.agents.voice

Sub-modules

livekit.agents.voice.avatar
livekit.agents.voice.background_audio
livekit.agents.voice.io
livekit.agents.voice.room_io
livekit.agents.voice.run_result

Classes

class Agent (*,
instructions: str,
chat_ctx: NotGivenOr[llm.ChatContext | None] = NOT_GIVEN,
tools: list[llm.FunctionTool | llm.RawFunctionTool] | None = None,
turn_detection: NotGivenOr[TurnDetectionMode | None] = NOT_GIVEN,
stt: NotGivenOr[stt.STT | None] = NOT_GIVEN,
vad: NotGivenOr[vad.VAD | None] = NOT_GIVEN,
llm: NotGivenOr[llm.LLM | llm.RealtimeModel | None] = NOT_GIVEN,
tts: NotGivenOr[tts.TTS | None] = NOT_GIVEN,
mcp_servers: NotGivenOr[list[mcp.MCPServer] | None] = NOT_GIVEN,
allow_interruptions: NotGivenOr[bool] = NOT_GIVEN,
min_consecutive_speech_delay: NotGivenOr[float] = NOT_GIVEN,
use_tts_aligned_transcript: NotGivenOr[bool] = NOT_GIVEN)
Expand source code
class Agent:
    def __init__(
        self,
        *,
        instructions: str,
        chat_ctx: NotGivenOr[llm.ChatContext | None] = NOT_GIVEN,
        tools: list[llm.FunctionTool | llm.RawFunctionTool] | None = None,
        turn_detection: NotGivenOr[TurnDetectionMode | None] = NOT_GIVEN,
        stt: NotGivenOr[stt.STT | None] = NOT_GIVEN,
        vad: NotGivenOr[vad.VAD | None] = NOT_GIVEN,
        llm: NotGivenOr[llm.LLM | llm.RealtimeModel | None] = NOT_GIVEN,
        tts: NotGivenOr[tts.TTS | None] = NOT_GIVEN,
        mcp_servers: NotGivenOr[list[mcp.MCPServer] | None] = NOT_GIVEN,
        allow_interruptions: NotGivenOr[bool] = NOT_GIVEN,
        min_consecutive_speech_delay: NotGivenOr[float] = NOT_GIVEN,
        use_tts_aligned_transcript: NotGivenOr[bool] = NOT_GIVEN,
    ) -> None:
        tools = tools or []
        self._instructions = instructions
        self._tools = tools.copy() + find_function_tools(self)
        self._chat_ctx = chat_ctx.copy(tools=self._tools) if chat_ctx else ChatContext.empty()
        self._turn_detection = turn_detection
        self._stt = stt
        self._llm = llm
        self._tts = tts
        self._vad = vad
        self._allow_interruptions = allow_interruptions
        self._min_consecutive_speech_delay = min_consecutive_speech_delay
        self._use_tts_aligned_transcript = use_tts_aligned_transcript

        if isinstance(mcp_servers, list) and len(mcp_servers) == 0:
            mcp_servers = None  # treat empty list as None (but keep NOT_GIVEN)

        self._mcp_servers = mcp_servers
        self._activity: AgentActivity | None = None

    @property
    def label(self) -> str:
        """
        Returns:
            str: The label of the agent.
        """
        return f"{type(self).__module__}.{type(self).__name__}"

    @property
    def instructions(self) -> str:
        """
        Returns:
            str: The core instructions that guide the agent's behavior.
        """
        return self._instructions

    @property
    def tools(self) -> list[llm.FunctionTool | llm.RawFunctionTool]:
        """
        Returns:
            list[llm.FunctionTool | llm.RawFunctionTool]:
                A list of function tools available to the agent.
        """
        return self._tools.copy()

    @property
    def chat_ctx(self) -> llm.ChatContext:
        """
        Provides a read-only view of the agent's current chat context.

        Returns:
            llm.ChatContext: A read-only version of the agent's conversation history.

        See Also:
            update_chat_ctx: Method to update the internal chat context.
        """
        return _ReadOnlyChatContext(self._chat_ctx.items)

    async def update_instructions(self, instructions: str) -> None:
        """
        Updates the agent's instructions.

        If the agent is running in realtime mode, this method also updates
        the instructions for the ongoing realtime session.

        Args:
            instructions (str):
                The new instructions to set for the agent.

        Raises:
            llm.RealtimeError: If updating the realtime session instructions fails.
        """
        if self._activity is None:
            self._instructions = instructions
            return

        await self._activity.update_instructions(instructions)

    async def update_tools(self, tools: list[llm.FunctionTool | llm.RawFunctionTool]) -> None:
        """
        Updates the agent's available function tools.

        If the agent is running in realtime mode, this method also updates
        the tools for the ongoing realtime session.

        Args:
            tools (list[llm.FunctionTool]):
                The new list of function tools available to the agent.

        Raises:
            llm.RealtimeError: If updating the realtime session tools fails.
        """
        if self._activity is None:
            self._tools = list(set(tools))
            self._chat_ctx = self._chat_ctx.copy(tools=self._tools)
            return

        await self._activity.update_tools(tools)

    async def update_chat_ctx(self, chat_ctx: llm.ChatContext) -> None:
        """
        Updates the agent's chat context.

        If the agent is running in realtime mode, this method also updates
        the chat context for the ongoing realtime session.

        Args:
            chat_ctx (llm.ChatContext):
                The new or updated chat context for the agent.

        Raises:
            llm.RealtimeError: If updating the realtime session chat context fails.
        """
        if self._activity is None:
            self._chat_ctx = chat_ctx.copy(tools=self._tools)
            return

        await self._activity.update_chat_ctx(chat_ctx)

    # -- Pipeline nodes --
    # They can all be overriden by subclasses, by default they use the STT/LLM/TTS specified in the
    # constructor of the VoiceAgent

    async def on_enter(self) -> None:
        """Called when the task is entered"""
        pass

    async def on_exit(self) -> None:
        """Called when the task is exited"""
        pass

    async def on_user_turn_completed(
        self, turn_ctx: llm.ChatContext, new_message: llm.ChatMessage
    ) -> None:
        """Called when the user has finished speaking, and the LLM is about to respond

        This is a good opportunity to update the chat context or edit the new message before it is
        sent to the LLM.
        """
        pass

    def stt_node(
        self, audio: AsyncIterable[rtc.AudioFrame], model_settings: ModelSettings
    ) -> (
        AsyncIterable[stt.SpeechEvent | str]
        | Coroutine[Any, Any, AsyncIterable[stt.SpeechEvent | str]]
        | Coroutine[Any, Any, None]
    ):
        """
        A node in the processing pipeline that transcribes audio frames into speech events.

        By default, this node uses a Speech-To-Text (STT) capability from the current agent.
        If the STT implementation does not support streaming natively, a VAD (Voice Activity
        Detection) mechanism is required to wrap the STT.

        You can override this node with your own implementation for more flexibility (e.g.,
        custom pre-processing of audio, additional buffering, or alternative STT strategies).

        Args:
            audio (AsyncIterable[rtc.AudioFrame]): An asynchronous stream of audio frames.
            model_settings (ModelSettings): Configuration and parameters for model execution.

        Yields:
            stt.SpeechEvent: An event containing transcribed text or other STT-related data.
        """
        return Agent.default.stt_node(self, audio, model_settings)

    def llm_node(
        self,
        chat_ctx: llm.ChatContext,
        tools: list[FunctionTool | RawFunctionTool],
        model_settings: ModelSettings,
    ) -> (
        AsyncIterable[llm.ChatChunk | str]
        | Coroutine[Any, Any, AsyncIterable[llm.ChatChunk | str]]
        | Coroutine[Any, Any, str]
        | Coroutine[Any, Any, llm.ChatChunk]
        | Coroutine[Any, Any, None]
    ):
        """
        A node in the processing pipeline that processes text generation with an LLM.

        By default, this node uses the agent's LLM to process the provided context. It may yield
        plain text (as `str`) for straightforward text generation, or `llm.ChatChunk` objects that
        can include text and optional tool calls. `ChatChunk` is helpful for capturing more complex
        outputs such as function calls, usage statistics, or other metadata.

        You can override this node to customize how the LLM is used or how tool invocations
        and responses are handled.

        Args:
            chat_ctx (llm.ChatContext): The context for the LLM (the conversation history).
            tools (list[FunctionTool]): A list of callable tools that the LLM may invoke.
            model_settings (ModelSettings): Configuration and parameters for model execution.

        Yields/Returns:
            str: Plain text output from the LLM.
            llm.ChatChunk: An object that can contain both text and optional tool calls.
        """
        return Agent.default.llm_node(self, chat_ctx, tools, model_settings)

    def transcription_node(
        self, text: AsyncIterable[str | TimedString], model_settings: ModelSettings
    ) -> (
        AsyncIterable[str | TimedString]
        | Coroutine[Any, Any, AsyncIterable[str | TimedString]]
        | Coroutine[Any, Any, None]
    ):
        """
        A node in the processing pipeline that finalizes transcriptions from text segments.

        This node can be used to adjust or post-process text coming from an LLM (or any other
        source) into a final transcribed form. For instance, you might clean up formatting, fix
        punctuation, or perform any other text transformations here.

        You can override this node to customize post-processing logic according to your needs.

        Args:
            text (AsyncIterable[str | TimedString]): An asynchronous stream of text segments.
            model_settings (ModelSettings): Configuration and parameters for model execution.

        Yields:
            str: Finalized or post-processed text segments.
        """
        return Agent.default.transcription_node(self, text, model_settings)

    def tts_node(
        self, text: AsyncIterable[str], model_settings: ModelSettings
    ) -> (
        AsyncIterable[rtc.AudioFrame]
        | Coroutine[Any, Any, AsyncIterable[rtc.AudioFrame]]
        | Coroutine[Any, Any, None]
    ):
        """
        A node in the processing pipeline that synthesizes audio from text segments.

        By default, this node converts incoming text into audio frames using the Text-To-Speech
        from the agent.
        If the TTS implementation does not support streaming natively, it uses a sentence tokenizer
        to split text for incremental synthesis.

        You can override this node to provide different text chunking behavior, a custom TTS engine,
        or any other specialized processing.

        Args:
            text (AsyncIterable[str]): An asynchronous stream of text segments to be synthesized.
            model_settings (ModelSettings): Configuration and parameters for model execution.

        Yields:
            rtc.AudioFrame: Audio frames synthesized from the provided text.
        """
        return Agent.default.tts_node(self, text, model_settings)

    def realtime_audio_output_node(
        self, audio: AsyncIterable[rtc.AudioFrame], model_settings: ModelSettings
    ) -> (
        AsyncIterable[rtc.AudioFrame]
        | Coroutine[Any, Any, AsyncIterable[rtc.AudioFrame]]
        | Coroutine[Any, Any, None]
    ):
        """A node processing the audio from the realtime LLM session before it is played out."""
        return Agent.default.realtime_audio_output_node(self, audio, model_settings)

    def _get_activity_or_raise(self) -> AgentActivity:
        """Get the current activity context for this task (internal)"""
        if self._activity is None:
            raise RuntimeError("no activity context found, the agent is not running")

        return self._activity

    class default:
        @staticmethod
        async def stt_node(
            agent: Agent, audio: AsyncIterable[rtc.AudioFrame], model_settings: ModelSettings
        ) -> AsyncGenerator[stt.SpeechEvent, None]:
            """Default implementation for `Agent.stt_node`"""
            activity = agent._get_activity_or_raise()
            assert activity.stt is not None, "stt_node called but no STT node is available"

            wrapped_stt = activity.stt

            if not activity.stt.capabilities.streaming:
                if not activity.vad:
                    raise RuntimeError(
                        f"The STT ({activity.stt.label}) does not support streaming, add a VAD to the AgentTask/VoiceAgent to enable streaming"  # noqa: E501
                        "Or manually wrap your STT in a stt.StreamAdapter"
                    )

                wrapped_stt = stt.StreamAdapter(stt=wrapped_stt, vad=activity.vad)

            conn_options = activity.session.conn_options.stt_conn_options
            async with wrapped_stt.stream(conn_options=conn_options) as stream:

                @utils.log_exceptions(logger=logger)
                async def _forward_input() -> None:
                    async for frame in audio:
                        stream.push_frame(frame)

                forward_task = asyncio.create_task(_forward_input())
                try:
                    async for event in stream:
                        yield event
                finally:
                    await utils.aio.cancel_and_wait(forward_task)

        @staticmethod
        async def llm_node(
            agent: Agent,
            chat_ctx: llm.ChatContext,
            tools: list[FunctionTool | RawFunctionTool],
            model_settings: ModelSettings,
        ) -> AsyncGenerator[llm.ChatChunk | str, None]:
            """Default implementation for `Agent.llm_node`"""
            activity = agent._get_activity_or_raise()
            assert activity.llm is not None, "llm_node called but no LLM node is available"
            assert isinstance(activity.llm, llm.LLM), (
                "llm_node should only be used with LLM (non-multimodal/realtime APIs) nodes"
            )

            tool_choice = model_settings.tool_choice if model_settings else NOT_GIVEN
            activity_llm = activity.llm

            conn_options = activity.session.conn_options.llm_conn_options
            async with activity_llm.chat(
                chat_ctx=chat_ctx, tools=tools, tool_choice=tool_choice, conn_options=conn_options
            ) as stream:
                async for chunk in stream:
                    yield chunk

        @staticmethod
        async def tts_node(
            agent: Agent, text: AsyncIterable[str], model_settings: ModelSettings
        ) -> AsyncGenerator[rtc.AudioFrame, None]:
            """Default implementation for `Agent.tts_node`"""
            activity = agent._get_activity_or_raise()
            assert activity.tts is not None, "tts_node called but no TTS node is available"

            wrapped_tts = activity.tts

            if not activity.tts.capabilities.streaming:
                wrapped_tts = tts.StreamAdapter(
                    tts=wrapped_tts,
                    sentence_tokenizer=tokenize.blingfire.SentenceTokenizer(retain_format=True),
                )

            conn_options = activity.session.conn_options.tts_conn_options
            async with wrapped_tts.stream(conn_options=conn_options) as stream:

                async def _forward_input() -> None:
                    async for chunk in text:
                        stream.push_text(chunk)

                    stream.end_input()

                forward_task = asyncio.create_task(_forward_input())
                try:
                    async for ev in stream:
                        yield ev.frame
                finally:
                    await utils.aio.cancel_and_wait(forward_task)

        @staticmethod
        async def transcription_node(
            agent: Agent, text: AsyncIterable[str | TimedString], model_settings: ModelSettings
        ) -> AsyncGenerator[str | TimedString, None]:
            """Default implementation for `Agent.transcription_node`"""
            async for delta in text:
                yield delta

        @staticmethod
        async def realtime_audio_output_node(
            agent: Agent, audio: AsyncIterable[rtc.AudioFrame], model_settings: ModelSettings
        ) -> AsyncGenerator[rtc.AudioFrame, None]:
            """Default implementation for `Agent.realtime_audio_output_node`"""
            activity = agent._get_activity_or_raise()
            assert activity.realtime_llm_session is not None, (
                "realtime_audio_output_node called but no realtime LLM session is available"
            )

            async for frame in audio:
                yield frame

    @property
    def realtime_llm_session(self) -> llm.RealtimeSession:
        """
        Retrieve the realtime LLM session associated with the current agent.

        Raises:
            RuntimeError: If the agent is not running or the realtime LLM session is not available
        """
        if (rt_session := self._get_activity_or_raise().realtime_llm_session) is None:
            raise RuntimeError("no realtime LLM session")

        return rt_session

    @property
    def turn_detection(self) -> NotGivenOr[TurnDetectionMode | None]:
        """
        Retrieves the turn detection mode for identifying conversational turns.

        If this property was not set at Agent creation, but an ``AgentSession`` provides a turn detection,
        the session's turn detection mode will be used at runtime instead.

        Returns:
            NotGivenOr[TurnDetectionMode | None]: An optional turn detection mode for managing conversation flow.
        """  # noqa: E501
        return self._turn_detection

    @property
    def stt(self) -> NotGivenOr[stt.STT | None]:
        """
        Retrieves the Speech-To-Text component for the agent.

        If this property was not set at Agent creation, but an ``AgentSession`` provides an STT component,
        the session's STT will be used at runtime instead.

        Returns:
            NotGivenOr[stt.STT | None]: An optional STT component.
        """  # noqa: E501
        return self._stt

    @property
    def llm(self) -> NotGivenOr[llm.LLM | llm.RealtimeModel | None]:
        """
        Retrieves the Language Model or RealtimeModel used for text generation.

        If this property was not set at Agent creation, but an ``AgentSession`` provides an LLM or RealtimeModel,
        the session's model will be used at runtime instead.

        Returns:
            NotGivenOr[llm.LLM | llm.RealtimeModel | None]: The language model for text generation.
        """  # noqa: E501
        return self._llm

    @property
    def tts(self) -> NotGivenOr[tts.TTS | None]:
        """
        Retrieves the Text-To-Speech component for the agent.

        If this property was not set at Agent creation, but an ``AgentSession`` provides a TTS component,
        the session's TTS will be used at runtime instead.

        Returns:
            NotGivenOr[tts.TTS | None]: An optional TTS component for generating audio output.
        """  # noqa: E501
        return self._tts

    @property
    def mcp_servers(self) -> NotGivenOr[list[mcp.MCPServer] | None]:
        """
        Retrieves the list of Model Context Protocol (MCP) servers providing external tools.

        If this property was not set at Agent creation, but an ``AgentSession`` provides MCP servers,
        the session's MCP servers will be used at runtime instead.

        Returns:
            NotGivenOr[list[mcp.MCPServer]]: An optional list of MCP servers.
        """  # noqa: E501
        return self._mcp_servers

    @property
    def vad(self) -> NotGivenOr[vad.VAD | None]:
        """
        Retrieves the Voice Activity Detection component for the agent.

        If this property was not set at Agent creation, but an ``AgentSession`` provides a VAD component,
        the session's VAD will be used at runtime instead.

        Returns:
            NotGivenOr[vad.VAD | None]: An optional VAD component for detecting voice activity.
        """  # noqa: E501
        return self._vad

    @property
    def allow_interruptions(self) -> NotGivenOr[bool]:
        """
        Indicates whether interruptions (e.g., stopping TTS playback) are allowed.

        If this property was not set at Agent creation, but an ``AgentSession`` provides a value for
        allowing interruptions, the session's value will be used at runtime instead.

        Returns:
            NotGivenOr[bool]: Whether interruptions are permitted.
        """
        return self._allow_interruptions

    @property
    def min_consecutive_speech_delay(self) -> NotGivenOr[float]:
        """
        Retrieves the minimum consecutive speech delay for the agent.

        If this property was not set at Agent creation, but an ``AgentSession`` provides a value for
        the minimum consecutive speech delay, the session's value will be used at runtime instead.

        Returns:
            NotGivenOr[float]: The minimum consecutive speech delay.
        """
        return self._min_consecutive_speech_delay

    @property
    def use_tts_aligned_transcript(self) -> NotGivenOr[bool]:
        """
        Indicates whether to use TTS-aligned transcript as the input of
        the ``transcription_node``.

        If this property was not set at Agent creation, but an ``AgentSession`` provides a value for
        the use of TTS-aligned transcript, the session's value will be used at runtime instead.

        Returns:
            NotGivenOr[bool]: Whether to use TTS-aligned transcript.
        """
        return self._use_tts_aligned_transcript

    @property
    def session(self) -> AgentSession:
        """
        Retrieve the VoiceAgent associated with the current agent.

        Raises:
            RuntimeError: If the agent is not running
        """
        return self._get_activity_or_raise().session

Subclasses

  • livekit.agents.voice.agent.AgentTask

Class variables

var default

Instance variables

prop allow_interruptions : NotGivenOr[bool]
Expand source code
@property
def allow_interruptions(self) -> NotGivenOr[bool]:
    """
    Indicates whether interruptions (e.g., stopping TTS playback) are allowed.

    If this property was not set at Agent creation, but an ``AgentSession`` provides a value for
    allowing interruptions, the session's value will be used at runtime instead.

    Returns:
        NotGivenOr[bool]: Whether interruptions are permitted.
    """
    return self._allow_interruptions

Indicates whether interruptions (e.g., stopping TTS playback) are allowed.

If this property was not set at Agent creation, but an AgentSession provides a value for allowing interruptions, the session's value will be used at runtime instead.

Returns

NotGivenOr[bool]
Whether interruptions are permitted.
prop chat_ctx : llm.ChatContext
Expand source code
@property
def chat_ctx(self) -> llm.ChatContext:
    """
    Provides a read-only view of the agent's current chat context.

    Returns:
        llm.ChatContext: A read-only version of the agent's conversation history.

    See Also:
        update_chat_ctx: Method to update the internal chat context.
    """
    return _ReadOnlyChatContext(self._chat_ctx.items)

Provides a read-only view of the agent's current chat context.

Returns

llm.ChatContext
A read-only version of the agent's conversation history.

See Also: update_chat_ctx: Method to update the internal chat context.

prop instructions : str
Expand source code
@property
def instructions(self) -> str:
    """
    Returns:
        str: The core instructions that guide the agent's behavior.
    """
    return self._instructions

Returns

str
The core instructions that guide the agent's behavior.
prop label : str
Expand source code
@property
def label(self) -> str:
    """
    Returns:
        str: The label of the agent.
    """
    return f"{type(self).__module__}.{type(self).__name__}"

Returns

str
The label of the agent.
prop llm : NotGivenOr[llm.LLM | llm.RealtimeModel | None]
Expand source code
@property
def llm(self) -> NotGivenOr[llm.LLM | llm.RealtimeModel | None]:
    """
    Retrieves the Language Model or RealtimeModel used for text generation.

    If this property was not set at Agent creation, but an ``AgentSession`` provides an LLM or RealtimeModel,
    the session's model will be used at runtime instead.

    Returns:
        NotGivenOr[llm.LLM | llm.RealtimeModel | None]: The language model for text generation.
    """  # noqa: E501
    return self._llm

Retrieves the Language Model or RealtimeModel used for text generation.

If this property was not set at Agent creation, but an AgentSession provides an LLM or RealtimeModel, the session's model will be used at runtime instead.

Returns

NotGivenOr[llm.LLM | llm.RealtimeModel | None]
The language model for text generation.
prop mcp_servers : NotGivenOr[list[mcp.MCPServer] | None]
Expand source code
@property
def mcp_servers(self) -> NotGivenOr[list[mcp.MCPServer] | None]:
    """
    Retrieves the list of Model Context Protocol (MCP) servers providing external tools.

    If this property was not set at Agent creation, but an ``AgentSession`` provides MCP servers,
    the session's MCP servers will be used at runtime instead.

    Returns:
        NotGivenOr[list[mcp.MCPServer]]: An optional list of MCP servers.
    """  # noqa: E501
    return self._mcp_servers

Retrieves the list of Model Context Protocol (MCP) servers providing external tools.

If this property was not set at Agent creation, but an AgentSession provides MCP servers, the session's MCP servers will be used at runtime instead.

Returns

NotGivenOr[list[mcp.MCPServer]]
An optional list of MCP servers.
prop min_consecutive_speech_delay : NotGivenOr[float]
Expand source code
@property
def min_consecutive_speech_delay(self) -> NotGivenOr[float]:
    """
    Retrieves the minimum consecutive speech delay for the agent.

    If this property was not set at Agent creation, but an ``AgentSession`` provides a value for
    the minimum consecutive speech delay, the session's value will be used at runtime instead.

    Returns:
        NotGivenOr[float]: The minimum consecutive speech delay.
    """
    return self._min_consecutive_speech_delay

Retrieves the minimum consecutive speech delay for the agent.

If this property was not set at Agent creation, but an AgentSession provides a value for the minimum consecutive speech delay, the session's value will be used at runtime instead.

Returns

NotGivenOr[float]
The minimum consecutive speech delay.
prop realtime_llm_session : llm.RealtimeSession
Expand source code
@property
def realtime_llm_session(self) -> llm.RealtimeSession:
    """
    Retrieve the realtime LLM session associated with the current agent.

    Raises:
        RuntimeError: If the agent is not running or the realtime LLM session is not available
    """
    if (rt_session := self._get_activity_or_raise().realtime_llm_session) is None:
        raise RuntimeError("no realtime LLM session")

    return rt_session

Retrieve the realtime LLM session associated with the current agent.

Raises

RuntimeError
If the agent is not running or the realtime LLM session is not available
prop sessionAgentSession
Expand source code
@property
def session(self) -> AgentSession:
    """
    Retrieve the VoiceAgent associated with the current agent.

    Raises:
        RuntimeError: If the agent is not running
    """
    return self._get_activity_or_raise().session

Retrieve the VoiceAgent associated with the current agent.

Raises

RuntimeError
If the agent is not running
prop stt : NotGivenOr[stt.STT | None]
Expand source code
@property
def stt(self) -> NotGivenOr[stt.STT | None]:
    """
    Retrieves the Speech-To-Text component for the agent.

    If this property was not set at Agent creation, but an ``AgentSession`` provides an STT component,
    the session's STT will be used at runtime instead.

    Returns:
        NotGivenOr[stt.STT | None]: An optional STT component.
    """  # noqa: E501
    return self._stt

Retrieves the Speech-To-Text component for the agent.

If this property was not set at Agent creation, but an AgentSession provides an STT component, the session's STT will be used at runtime instead.

Returns

NotGivenOr[stt.STT | None]
An optional STT component.
prop tools : list[llm.FunctionTool | llm.RawFunctionTool]
Expand source code
@property
def tools(self) -> list[llm.FunctionTool | llm.RawFunctionTool]:
    """
    Returns:
        list[llm.FunctionTool | llm.RawFunctionTool]:
            A list of function tools available to the agent.
    """
    return self._tools.copy()

Returns

list[llm.FunctionTool | llm.RawFunctionTool]: A list of function tools available to the agent.

prop tts : NotGivenOr[tts.TTS | None]
Expand source code
@property
def tts(self) -> NotGivenOr[tts.TTS | None]:
    """
    Retrieves the Text-To-Speech component for the agent.

    If this property was not set at Agent creation, but an ``AgentSession`` provides a TTS component,
    the session's TTS will be used at runtime instead.

    Returns:
        NotGivenOr[tts.TTS | None]: An optional TTS component for generating audio output.
    """  # noqa: E501
    return self._tts

Retrieves the Text-To-Speech component for the agent.

If this property was not set at Agent creation, but an AgentSession provides a TTS component, the session's TTS will be used at runtime instead.

Returns

NotGivenOr[tts.TTS | None]
An optional TTS component for generating audio output.
prop turn_detection : NotGivenOr[TurnDetectionMode | None]
Expand source code
@property
def turn_detection(self) -> NotGivenOr[TurnDetectionMode | None]:
    """
    Retrieves the turn detection mode for identifying conversational turns.

    If this property was not set at Agent creation, but an ``AgentSession`` provides a turn detection,
    the session's turn detection mode will be used at runtime instead.

    Returns:
        NotGivenOr[TurnDetectionMode | None]: An optional turn detection mode for managing conversation flow.
    """  # noqa: E501
    return self._turn_detection

Retrieves the turn detection mode for identifying conversational turns.

If this property was not set at Agent creation, but an AgentSession provides a turn detection, the session's turn detection mode will be used at runtime instead.

Returns

NotGivenOr[TurnDetectionMode | None]
An optional turn detection mode for managing conversation flow.
prop use_tts_aligned_transcript : NotGivenOr[bool]
Expand source code
@property
def use_tts_aligned_transcript(self) -> NotGivenOr[bool]:
    """
    Indicates whether to use TTS-aligned transcript as the input of
    the ``transcription_node``.

    If this property was not set at Agent creation, but an ``AgentSession`` provides a value for
    the use of TTS-aligned transcript, the session's value will be used at runtime instead.

    Returns:
        NotGivenOr[bool]: Whether to use TTS-aligned transcript.
    """
    return self._use_tts_aligned_transcript

Indicates whether to use TTS-aligned transcript as the input of the transcription_node.

If this property was not set at Agent creation, but an AgentSession provides a value for the use of TTS-aligned transcript, the session's value will be used at runtime instead.

Returns

NotGivenOr[bool]
Whether to use TTS-aligned transcript.
prop vad : NotGivenOr[vad.VAD | None]
Expand source code
@property
def vad(self) -> NotGivenOr[vad.VAD | None]:
    """
    Retrieves the Voice Activity Detection component for the agent.

    If this property was not set at Agent creation, but an ``AgentSession`` provides a VAD component,
    the session's VAD will be used at runtime instead.

    Returns:
        NotGivenOr[vad.VAD | None]: An optional VAD component for detecting voice activity.
    """  # noqa: E501
    return self._vad

Retrieves the Voice Activity Detection component for the agent.

If this property was not set at Agent creation, but an AgentSession provides a VAD component, the session's VAD will be used at runtime instead.

Returns

NotGivenOr[vad.VAD | None]
An optional VAD component for detecting voice activity.

Methods

def llm_node(self,
chat_ctx: llm.ChatContext,
tools: list[FunctionTool | RawFunctionTool],
model_settings: ModelSettings) ‑> collections.abc.AsyncIterable[livekit.agents.llm.llm.ChatChunk | str] | collections.abc.Coroutine[typing.Any, typing.Any, collections.abc.AsyncIterable[livekit.agents.llm.llm.ChatChunk | str]] | collections.abc.Coroutine[typing.Any, typing.Any, str] | collections.abc.Coroutine[typing.Any, typing.Any, livekit.agents.llm.llm.ChatChunk] | collections.abc.Coroutine[typing.Any, typing.Any, None]
Expand source code
def llm_node(
    self,
    chat_ctx: llm.ChatContext,
    tools: list[FunctionTool | RawFunctionTool],
    model_settings: ModelSettings,
) -> (
    AsyncIterable[llm.ChatChunk | str]
    | Coroutine[Any, Any, AsyncIterable[llm.ChatChunk | str]]
    | Coroutine[Any, Any, str]
    | Coroutine[Any, Any, llm.ChatChunk]
    | Coroutine[Any, Any, None]
):
    """
    A node in the processing pipeline that processes text generation with an LLM.

    By default, this node uses the agent's LLM to process the provided context. It may yield
    plain text (as `str`) for straightforward text generation, or `llm.ChatChunk` objects that
    can include text and optional tool calls. `ChatChunk` is helpful for capturing more complex
    outputs such as function calls, usage statistics, or other metadata.

    You can override this node to customize how the LLM is used or how tool invocations
    and responses are handled.

    Args:
        chat_ctx (llm.ChatContext): The context for the LLM (the conversation history).
        tools (list[FunctionTool]): A list of callable tools that the LLM may invoke.
        model_settings (ModelSettings): Configuration and parameters for model execution.

    Yields/Returns:
        str: Plain text output from the LLM.
        llm.ChatChunk: An object that can contain both text and optional tool calls.
    """
    return Agent.default.llm_node(self, chat_ctx, tools, model_settings)

A node in the processing pipeline that processes text generation with an LLM.

By default, this node uses the agent's LLM to process the provided context. It may yield plain text (as str) for straightforward text generation, or llm.ChatChunk objects that can include text and optional tool calls. ChatChunk is helpful for capturing more complex outputs such as function calls, usage statistics, or other metadata.

You can override this node to customize how the LLM is used or how tool invocations and responses are handled.

Args

chat_ctx : llm.ChatContext
The context for the LLM (the conversation history).
tools : list[FunctionTool]
A list of callable tools that the LLM may invoke.
model_settings : ModelSettings
Configuration and parameters for model execution.

Yields/Returns: str: Plain text output from the LLM. llm.ChatChunk: An object that can contain both text and optional tool calls.

async def on_enter(self) ‑> None
Expand source code
async def on_enter(self) -> None:
    """Called when the task is entered"""
    pass

Called when the task is entered

async def on_exit(self) ‑> None
Expand source code
async def on_exit(self) -> None:
    """Called when the task is exited"""
    pass

Called when the task is exited

async def on_user_turn_completed(self, turn_ctx: llm.ChatContext, new_message: llm.ChatMessage) ‑> None
Expand source code
async def on_user_turn_completed(
    self, turn_ctx: llm.ChatContext, new_message: llm.ChatMessage
) -> None:
    """Called when the user has finished speaking, and the LLM is about to respond

    This is a good opportunity to update the chat context or edit the new message before it is
    sent to the LLM.
    """
    pass

Called when the user has finished speaking, and the LLM is about to respond

This is a good opportunity to update the chat context or edit the new message before it is sent to the LLM.

def realtime_audio_output_node(self,
audio: AsyncIterable[rtc.AudioFrame],
model_settings: ModelSettings) ‑> collections.abc.AsyncIterable[AudioFrame] | collections.abc.Coroutine[typing.Any, typing.Any, collections.abc.AsyncIterable[AudioFrame]] | collections.abc.Coroutine[typing.Any, typing.Any, None]
Expand source code
def realtime_audio_output_node(
    self, audio: AsyncIterable[rtc.AudioFrame], model_settings: ModelSettings
) -> (
    AsyncIterable[rtc.AudioFrame]
    | Coroutine[Any, Any, AsyncIterable[rtc.AudioFrame]]
    | Coroutine[Any, Any, None]
):
    """A node processing the audio from the realtime LLM session before it is played out."""
    return Agent.default.realtime_audio_output_node(self, audio, model_settings)

A node processing the audio from the realtime LLM session before it is played out.

def stt_node(self,
audio: AsyncIterable[rtc.AudioFrame],
model_settings: ModelSettings) ‑> collections.abc.AsyncIterable[livekit.agents.stt.stt.SpeechEvent | str] | collections.abc.Coroutine[typing.Any, typing.Any, collections.abc.AsyncIterable[livekit.agents.stt.stt.SpeechEvent | str]] | collections.abc.Coroutine[typing.Any, typing.Any, None]
Expand source code
def stt_node(
    self, audio: AsyncIterable[rtc.AudioFrame], model_settings: ModelSettings
) -> (
    AsyncIterable[stt.SpeechEvent | str]
    | Coroutine[Any, Any, AsyncIterable[stt.SpeechEvent | str]]
    | Coroutine[Any, Any, None]
):
    """
    A node in the processing pipeline that transcribes audio frames into speech events.

    By default, this node uses a Speech-To-Text (STT) capability from the current agent.
    If the STT implementation does not support streaming natively, a VAD (Voice Activity
    Detection) mechanism is required to wrap the STT.

    You can override this node with your own implementation for more flexibility (e.g.,
    custom pre-processing of audio, additional buffering, or alternative STT strategies).

    Args:
        audio (AsyncIterable[rtc.AudioFrame]): An asynchronous stream of audio frames.
        model_settings (ModelSettings): Configuration and parameters for model execution.

    Yields:
        stt.SpeechEvent: An event containing transcribed text or other STT-related data.
    """
    return Agent.default.stt_node(self, audio, model_settings)

A node in the processing pipeline that transcribes audio frames into speech events.

By default, this node uses a Speech-To-Text (STT) capability from the current agent. If the STT implementation does not support streaming natively, a VAD (Voice Activity Detection) mechanism is required to wrap the STT.

You can override this node with your own implementation for more flexibility (e.g., custom pre-processing of audio, additional buffering, or alternative STT strategies).

Args

audio : AsyncIterable[rtc.AudioFrame]
An asynchronous stream of audio frames.
model_settings : ModelSettings
Configuration and parameters for model execution.

Yields

stt.SpeechEvent
An event containing transcribed text or other STT-related data.
def transcription_node(self,
text: AsyncIterable[str | TimedString],
model_settings: ModelSettings) ‑> AsyncIterable[str | TimedString] | Coroutine[Any, Any, AsyncIterable[str | TimedString]] | Coroutine[Any, Any, None]
Expand source code
def transcription_node(
    self, text: AsyncIterable[str | TimedString], model_settings: ModelSettings
) -> (
    AsyncIterable[str | TimedString]
    | Coroutine[Any, Any, AsyncIterable[str | TimedString]]
    | Coroutine[Any, Any, None]
):
    """
    A node in the processing pipeline that finalizes transcriptions from text segments.

    This node can be used to adjust or post-process text coming from an LLM (or any other
    source) into a final transcribed form. For instance, you might clean up formatting, fix
    punctuation, or perform any other text transformations here.

    You can override this node to customize post-processing logic according to your needs.

    Args:
        text (AsyncIterable[str | TimedString]): An asynchronous stream of text segments.
        model_settings (ModelSettings): Configuration and parameters for model execution.

    Yields:
        str: Finalized or post-processed text segments.
    """
    return Agent.default.transcription_node(self, text, model_settings)

A node in the processing pipeline that finalizes transcriptions from text segments.

This node can be used to adjust or post-process text coming from an LLM (or any other source) into a final transcribed form. For instance, you might clean up formatting, fix punctuation, or perform any other text transformations here.

You can override this node to customize post-processing logic according to your needs.

Args

text : AsyncIterable[str | TimedString]
An asynchronous stream of text segments.
model_settings : ModelSettings
Configuration and parameters for model execution.

Yields

str
Finalized or post-processed text segments.
def tts_node(self,
text: AsyncIterable[str],
model_settings: ModelSettings) ‑> collections.abc.AsyncIterable[AudioFrame] | collections.abc.Coroutine[typing.Any, typing.Any, collections.abc.AsyncIterable[AudioFrame]] | collections.abc.Coroutine[typing.Any, typing.Any, None]
Expand source code
def tts_node(
    self, text: AsyncIterable[str], model_settings: ModelSettings
) -> (
    AsyncIterable[rtc.AudioFrame]
    | Coroutine[Any, Any, AsyncIterable[rtc.AudioFrame]]
    | Coroutine[Any, Any, None]
):
    """
    A node in the processing pipeline that synthesizes audio from text segments.

    By default, this node converts incoming text into audio frames using the Text-To-Speech
    from the agent.
    If the TTS implementation does not support streaming natively, it uses a sentence tokenizer
    to split text for incremental synthesis.

    You can override this node to provide different text chunking behavior, a custom TTS engine,
    or any other specialized processing.

    Args:
        text (AsyncIterable[str]): An asynchronous stream of text segments to be synthesized.
        model_settings (ModelSettings): Configuration and parameters for model execution.

    Yields:
        rtc.AudioFrame: Audio frames synthesized from the provided text.
    """
    return Agent.default.tts_node(self, text, model_settings)

A node in the processing pipeline that synthesizes audio from text segments.

By default, this node converts incoming text into audio frames using the Text-To-Speech from the agent. If the TTS implementation does not support streaming natively, it uses a sentence tokenizer to split text for incremental synthesis.

You can override this node to provide different text chunking behavior, a custom TTS engine, or any other specialized processing.

Args

text : AsyncIterable[str]
An asynchronous stream of text segments to be synthesized.
model_settings : ModelSettings
Configuration and parameters for model execution.

Yields

rtc.AudioFrame
Audio frames synthesized from the provided text.
async def update_chat_ctx(self, chat_ctx: llm.ChatContext) ‑> None
Expand source code
async def update_chat_ctx(self, chat_ctx: llm.ChatContext) -> None:
    """
    Updates the agent's chat context.

    If the agent is running in realtime mode, this method also updates
    the chat context for the ongoing realtime session.

    Args:
        chat_ctx (llm.ChatContext):
            The new or updated chat context for the agent.

    Raises:
        llm.RealtimeError: If updating the realtime session chat context fails.
    """
    if self._activity is None:
        self._chat_ctx = chat_ctx.copy(tools=self._tools)
        return

    await self._activity.update_chat_ctx(chat_ctx)

Updates the agent's chat context.

If the agent is running in realtime mode, this method also updates the chat context for the ongoing realtime session.

Args

chat_ctx (llm.ChatContext): The new or updated chat context for the agent.

Raises

llm.RealtimeError
If updating the realtime session chat context fails.
async def update_instructions(self, instructions: str) ‑> None
Expand source code
async def update_instructions(self, instructions: str) -> None:
    """
    Updates the agent's instructions.

    If the agent is running in realtime mode, this method also updates
    the instructions for the ongoing realtime session.

    Args:
        instructions (str):
            The new instructions to set for the agent.

    Raises:
        llm.RealtimeError: If updating the realtime session instructions fails.
    """
    if self._activity is None:
        self._instructions = instructions
        return

    await self._activity.update_instructions(instructions)

Updates the agent's instructions.

If the agent is running in realtime mode, this method also updates the instructions for the ongoing realtime session.

Args

instructions (str): The new instructions to set for the agent.

Raises

llm.RealtimeError
If updating the realtime session instructions fails.
async def update_tools(self, tools: list[llm.FunctionTool | llm.RawFunctionTool]) ‑> None
Expand source code
async def update_tools(self, tools: list[llm.FunctionTool | llm.RawFunctionTool]) -> None:
    """
    Updates the agent's available function tools.

    If the agent is running in realtime mode, this method also updates
    the tools for the ongoing realtime session.

    Args:
        tools (list[llm.FunctionTool]):
            The new list of function tools available to the agent.

    Raises:
        llm.RealtimeError: If updating the realtime session tools fails.
    """
    if self._activity is None:
        self._tools = list(set(tools))
        self._chat_ctx = self._chat_ctx.copy(tools=self._tools)
        return

    await self._activity.update_tools(tools)

Updates the agent's available function tools.

If the agent is running in realtime mode, this method also updates the tools for the ongoing realtime session.

Args

tools (list[llm.FunctionTool]): The new list of function tools available to the agent.

Raises

llm.RealtimeError
If updating the realtime session tools fails.
class AgentFalseInterruptionEvent (**data: Any)
Expand source code
class AgentFalseInterruptionEvent(BaseModel):
    type: Literal["agent_false_interruption"] = "agent_false_interruption"
    message: ChatMessage | None
    """The `assistant` message that got interrupted"""
    extra_instructions: str | None = None
    """Optional instructions originally passed to `AgentSession.generate_reply` via the `instructions` argument.
    Populated only if the user interrupted a speech response generated using `session.generate_reply`.
    Useful for understanding what the agent was attempting to convey before the interruption."""
    created_at: float = Field(default_factory=time.time)

Usage docs: https://docs.pydantic.dev/2.10/concepts/models/

A base class for creating Pydantic models.

Attributes

__class_vars__
The names of the class variables defined on the model.
__private_attributes__
Metadata about the private attributes of the model.
__signature__
The synthesized __init__ [Signature][inspect.Signature] of the model.
__pydantic_complete__
Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__
The core schema of the model.
__pydantic_custom_init__
Whether the model has a custom __init__ function.
__pydantic_decorators__
Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
__pydantic_generic_metadata__
Metadata for generic models; contains data used for a similar purpose to args, origin, parameters in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__
Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__
The name of the post-init method for the model, if defined.
__pydantic_root_model__
Whether the model is a [RootModel][pydantic.root_model.RootModel].
__pydantic_serializer__
The pydantic-core SchemaSerializer used to dump instances of the model.
__pydantic_validator__
The pydantic-core SchemaValidator used to validate instances of the model.
__pydantic_fields__
A dictionary of field names and their corresponding [FieldInfo][pydantic.fields.FieldInfo] objects.
__pydantic_computed_fields__
A dictionary of computed field names and their corresponding [ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects.
__pydantic_extra__
A dictionary containing extra values, if [extra][pydantic.config.ConfigDict.extra] is set to 'allow'.
__pydantic_fields_set__
The names of fields explicitly set during instantiation.
__pydantic_private__
Values of private attributes set on the model instance.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • pydantic.main.BaseModel

Class variables

var created_at : float
var extra_instructions : str | None

Optional instructions originally passed to AgentSession.generate_reply() via the instructions argument. Populated only if the user interrupted a speech response generated using session.generate_reply. Useful for understanding what the agent was attempting to convey before the interruption.

var message : livekit.agents.llm.chat_context.ChatMessage | None

The assistant message that got interrupted

var model_config
var type : Literal['agent_false_interruption']
class AgentSession (*,
turn_detection: NotGivenOr[TurnDetectionMode] = NOT_GIVEN,
stt: NotGivenOr[stt.STT] = NOT_GIVEN,
vad: NotGivenOr[vad.VAD] = NOT_GIVEN,
llm: NotGivenOr[llm.LLM | llm.RealtimeModel] = NOT_GIVEN,
tts: NotGivenOr[tts.TTS] = NOT_GIVEN,
mcp_servers: NotGivenOr[list[mcp.MCPServer]] = NOT_GIVEN,
userdata: NotGivenOr[Userdata_T] = NOT_GIVEN,
allow_interruptions: bool = True,
discard_audio_if_uninterruptible: bool = True,
min_interruption_duration: float = 0.5,
min_interruption_words: int = 0,
min_endpointing_delay: float = 0.4,
max_endpointing_delay: float = 6.0,
max_tool_steps: int = 3,
video_sampler: NotGivenOr[_VideoSampler | None] = NOT_GIVEN,
user_away_timeout: float | None = 15.0,
agent_false_interruption_timeout: float | None = 4.0,
min_consecutive_speech_delay: float = 0.0,
use_tts_aligned_transcript: bool = False,
preemptive_generation: bool = False,
conn_options: NotGivenOr[SessionConnectOptions] = NOT_GIVEN,
loop: asyncio.AbstractEventLoop | None = None)
Expand source code
class AgentSession(rtc.EventEmitter[EventTypes], Generic[Userdata_T]):
    def __init__(
        self,
        *,
        turn_detection: NotGivenOr[TurnDetectionMode] = NOT_GIVEN,
        stt: NotGivenOr[stt.STT] = NOT_GIVEN,
        vad: NotGivenOr[vad.VAD] = NOT_GIVEN,
        llm: NotGivenOr[llm.LLM | llm.RealtimeModel] = NOT_GIVEN,
        tts: NotGivenOr[tts.TTS] = NOT_GIVEN,
        mcp_servers: NotGivenOr[list[mcp.MCPServer]] = NOT_GIVEN,
        userdata: NotGivenOr[Userdata_T] = NOT_GIVEN,
        allow_interruptions: bool = True,
        discard_audio_if_uninterruptible: bool = True,
        min_interruption_duration: float = 0.5,
        min_interruption_words: int = 0,
        min_endpointing_delay: float = 0.4,
        max_endpointing_delay: float = 6.0,
        max_tool_steps: int = 3,
        video_sampler: NotGivenOr[_VideoSampler | None] = NOT_GIVEN,
        user_away_timeout: float | None = 15.0,
        agent_false_interruption_timeout: float | None = 4.0,
        min_consecutive_speech_delay: float = 0.0,
        use_tts_aligned_transcript: bool = False,
        preemptive_generation: bool = False,
        conn_options: NotGivenOr[SessionConnectOptions] = NOT_GIVEN,
        loop: asyncio.AbstractEventLoop | None = None,
    ) -> None:
        """`AgentSession` is the LiveKit Agents runtime that glues together
        media streams, speech/LLM components, and tool orchestration into a
        single real-time voice agent.

        It links audio, video, and text I/O with STT, VAD, TTS, and the LLM;
        handles turn detection, endpointing, interruptions, and multi-step
        tool calls; and exposes everything through event callbacks so you can
        focus on writing function tools and simple hand-offs rather than
        low-level streaming logic.

        Args:
            turn_detection (TurnDetectionMode, optional): Strategy for deciding
                when the user has finished speaking.

                * ``"stt"`` – rely on speech-to-text end-of-utterance cues
                * ``"vad"`` – rely on Voice Activity Detection start/stop cues
                * ``"realtime_llm"`` – use server-side detection from a
                  realtime LLM
                * ``"manual"`` – caller controls turn boundaries explicitly
                * ``_TurnDetector`` instance – plug-in custom detector

                If *NOT_GIVEN*, the session chooses the best available mode in
                priority order ``realtime_llm → vad → stt → manual``; it
                automatically falls back if the necessary model is missing.
            stt (stt.STT, optional): Speech-to-text backend.
            vad (vad.VAD, optional): Voice-activity detector
            llm (llm.LLM | llm.RealtimeModel, optional): LLM or RealtimeModel
            tts (tts.TTS, optional): Text-to-speech engine.
            mcp_servers (list[mcp.MCPServer], optional): List of MCP servers
                providing external tools for the agent to use.
            userdata (Userdata_T, optional): Arbitrary per-session user data.
            allow_interruptions (bool): Whether the user can interrupt the
                agent mid-utterance. Default ``True``.
            discard_audio_if_uninterruptible (bool): When ``True``, buffered
                audio is dropped while the agent is speaking and cannot be
                interrupted. Default ``True``.
            min_interruption_duration (float): Minimum speech length (s) to
                register as an interruption. Default ``0.5`` s.
            min_interruption_words (int): Minimum number of words to consider
                an interruption, only used if stt enabled. Default ``0``.
            min_endpointing_delay (float): Minimum time-in-seconds the agent
                must wait after a potential end-of-utterance signal (from VAD
                or an EOU model) before it declares the user’s turn complete.
                Default ``0.4`` s.
            max_endpointing_delay (float): Maximum time-in-seconds the agent
                will wait before terminating the turn. Default ``6.0`` s.
            max_tool_steps (int): Maximum consecutive tool calls per LLM turn.
                Default ``3``.
            video_sampler (_VideoSampler, optional): Uses
                :class:`VoiceActivityVideoSampler` when *NOT_GIVEN*; that sampler
                captures video at ~1 fps while the user is speaking and ~0.3 fps
                when silent by default.
            user_away_timeout (float, optional): If set, set the user state as
                "away" after this amount of time after user and agent are silent.
                Default ``15.0`` s, set to ``None`` to disable.
            agent_false_interruption_timeout (float, optional): If set, emit an
                `agent_false_interruption` event after this amount of time if
                the user is silent and no user transcript is detected after
                the interruption. Set to ``None`` to disable. Default ``4.0`` s.
            min_consecutive_speech_delay (float, optional): The minimum delay between
                consecutive speech. Default ``0.0`` s.
            use_tts_aligned_transcript (bool, optional): Whether to use TTS-aligned
                transcript as the input of the ``transcription_node``. Only applies
                if ``TTS.capabilities.aligned_transcript`` is ``True`` or ``streaming``
                is ``False``.
            preemptive_generation (bool): Whether to use preemptive generation.
                Default ``False``.
            preemptive_generation (bool):
                Whether to speculatively begin LLM and TTS requests before an end-of-turn is
                detected. When True, the agent sends inference calls as soon as a user
                transcript is received rather than waiting for a definitive turn boundary. This
                can reduce response latency by overlapping model inference with user audio,
                but may incur extra compute if the user interrupts or revises mid-utterance.
                Defaults to ``False``.
            conn_options (SessionConnectOptions, optional): Connection options for
                stt, llm, and tts.
            loop (asyncio.AbstractEventLoop, optional): Event loop to bind the
                session to. Falls back to :pyfunc:`asyncio.get_event_loop()`.
        """
        super().__init__()
        self._loop = loop or asyncio.get_event_loop()

        if not is_given(video_sampler):
            video_sampler = VoiceActivityVideoSampler(speaking_fps=1.0, silent_fps=0.3)

        self._video_sampler = video_sampler

        # This is the "global" chat_context, it holds the entire conversation history
        self._chat_ctx = ChatContext.empty()
        self._opts = VoiceOptions(
            allow_interruptions=allow_interruptions,
            discard_audio_if_uninterruptible=discard_audio_if_uninterruptible,
            min_interruption_duration=min_interruption_duration,
            min_interruption_words=min_interruption_words,
            min_endpointing_delay=min_endpointing_delay,
            max_endpointing_delay=max_endpointing_delay,
            max_tool_steps=max_tool_steps,
            user_away_timeout=user_away_timeout,
            agent_false_interruption_timeout=agent_false_interruption_timeout,
            min_consecutive_speech_delay=min_consecutive_speech_delay,
            preemptive_generation=preemptive_generation,
            use_tts_aligned_transcript=use_tts_aligned_transcript,
        )
        self._conn_options = conn_options or SessionConnectOptions()
        self._started = False
        self._turn_detection = turn_detection or None
        self._stt = stt or None
        self._vad = vad or None
        self._llm = llm or None
        self._tts = tts or None
        self._mcp_servers = mcp_servers or None

        # unrecoverable error counts, reset after agent speaking
        self._llm_error_counts = 0
        self._tts_error_counts = 0

        # configurable IO
        self._input = io.AgentInput(self._on_video_input_changed, self._on_audio_input_changed)
        self._output = io.AgentOutput(
            self._on_video_output_changed,
            self._on_audio_output_changed,
            self._on_text_output_changed,
        )

        self._forward_audio_atask: asyncio.Task[None] | None = None
        self._forward_video_atask: asyncio.Task[None] | None = None
        self._update_activity_atask: asyncio.Task[None] | None = None
        self._activity_lock = asyncio.Lock()
        self._lock = asyncio.Lock()

        # used to keep a reference to the room io
        # this is not exposed, if users want access to it, they can create their own RoomIO
        self._room_io: room_io.RoomIO | None = None

        self._agent: Agent | None = None
        self._activity: AgentActivity | None = None
        self._next_activity: AgentActivity | None = None
        self._user_state: UserState = "listening"
        self._agent_state: AgentState = "initializing"
        self._user_away_timer: asyncio.TimerHandle | None = None

        # used to emit the agent false interruption event
        self._false_interruption_timer: asyncio.TimerHandle | None = None
        self._false_interrupted_event: AgentFalseInterruptionEvent | None = None

        self._userdata: Userdata_T | None = userdata if is_given(userdata) else None
        self._closing_task: asyncio.Task[None] | None = None
        self._job_context_cb_registered: bool = False

        self._global_run_state: RunResult | None = None

        # trace
        self._user_speaking_span: trace.Span | None = None
        self._agent_speaking_span: trace.Span | None = None
        self._session_span: trace.Span | None = None
        self._root_span_context: otel_context.Context | None = None

    @property
    def userdata(self) -> Userdata_T:
        if self._userdata is None:
            raise ValueError("VoiceAgent userdata is not set")

        return self._userdata

    @userdata.setter
    def userdata(self, value: Userdata_T) -> None:
        self._userdata = value

    @property
    def turn_detection(self) -> TurnDetectionMode | None:
        return self._turn_detection

    @property
    def mcp_servers(self) -> list[mcp.MCPServer] | None:
        return self._mcp_servers

    @property
    def input(self) -> io.AgentInput:
        return self._input

    @property
    def output(self) -> io.AgentOutput:
        return self._output

    @property
    def options(self) -> VoiceOptions:
        return self._opts

    @property
    def conn_options(self) -> SessionConnectOptions:
        return self._conn_options

    @property
    def history(self) -> llm.ChatContext:
        return self._chat_ctx

    @property
    def current_speech(self) -> SpeechHandle | None:
        return self._activity.current_speech if self._activity is not None else None

    @property
    def user_state(self) -> UserState:
        return self._user_state

    @property
    def agent_state(self) -> AgentState:
        return self._agent_state

    @property
    def current_agent(self) -> Agent:
        if self._agent is None:
            raise RuntimeError("VoiceAgent isn't running")

        return self._agent

    def run(self, *, user_input: str, output_type: type[Run_T] | None = None) -> RunResult[Run_T]:
        if self._global_run_state is not None and not self._global_run_state.done():
            raise RuntimeError("nested runs are not supported")

        run_state = RunResult(user_input=user_input, output_type=output_type)
        self._global_run_state = run_state
        self.generate_reply(user_input=user_input)
        return run_state

    @tracer.start_as_current_span("agent_session", end_on_exit=False)
    async def start(
        self,
        agent: Agent,
        *,
        room: NotGivenOr[rtc.Room] = NOT_GIVEN,
        room_input_options: NotGivenOr[room_io.RoomInputOptions] = NOT_GIVEN,
        room_output_options: NotGivenOr[room_io.RoomOutputOptions] = NOT_GIVEN,
    ) -> None:
        """Start the voice agent.

        Create a default RoomIO if the input or output audio is not already set.
        If the console flag is provided, start a ChatCLI.

        Args:
            room: The room to use for input and output
            room_input_options: Options for the room input
            room_output_options: Options for the room output
        """
        async with self._lock:
            if self._started:
                return

            self._root_span_context = otel_context.get_current()
            self._session_span = current_span = trace.get_current_span()
            current_span = trace.get_current_span()
            current_span.set_attribute(trace_types.ATTR_AGENT_LABEL, agent.label)
            current_span.set_attribute(
                trace_types.ATTR_SESSION_OPTIONS, json.dumps(asdict(self._opts))
            )

            self._agent = agent
            self._update_agent_state("initializing")

            tasks: list[asyncio.Task[None]] = []
            if cli.CLI_ARGUMENTS is not None and cli.CLI_ARGUMENTS.console:
                from .chat_cli import ChatCLI

                if (
                    self.input.audio is not None
                    or self.output.audio is not None
                    or self.output.transcription is not None
                ):
                    logger.warning(
                        "agent started with the console subcommand, but input.audio or output.audio "  # noqa: E501
                        "or output.transcription is already set, overriding.."
                    )

                chat_cli = ChatCLI(self)
                tasks.append(asyncio.create_task(chat_cli.start(), name="_chat_cli_start"))

            elif is_given(room) and not self._room_io:
                room_input_options = copy.copy(
                    room_input_options or room_io.DEFAULT_ROOM_INPUT_OPTIONS
                )
                room_output_options = copy.copy(
                    room_output_options or room_io.DEFAULT_ROOM_OUTPUT_OPTIONS
                )

                if self.input.audio is not None:
                    if room_input_options.audio_enabled:
                        logger.warning(
                            "RoomIO audio input is enabled but input.audio is already set, ignoring.."  # noqa: E501
                        )
                    room_input_options.audio_enabled = False

                if self.output.audio is not None:
                    if room_output_options.audio_enabled:
                        logger.warning(
                            "RoomIO audio output is enabled but output.audio is already set, ignoring.."  # noqa: E501
                        )
                    room_output_options.audio_enabled = False

                if self.output.transcription is not None:
                    if room_output_options.transcription_enabled:
                        logger.warning(
                            "RoomIO transcription output is enabled but output.transcription is already set, ignoring.."  # noqa: E501
                        )
                    room_output_options.transcription_enabled = False

                self._room_io = room_io.RoomIO(
                    room=room,
                    agent_session=self,
                    input_options=room_input_options,
                    output_options=room_output_options,
                )
                tasks.append(asyncio.create_task(self._room_io.start(), name="_room_io_start"))

            # session can be restarted, register the callbacks only once
            try:
                job_ctx = get_job_context()
                current_span.set_attribute(trace_types.ATTR_ROOM_NAME, job_ctx.room.name)
                current_span.set_attribute(trace_types.ATTR_JOB_ID, job_ctx.job.id)
                current_span.set_attribute(trace_types.ATTR_AGENT_NAME, job_ctx.job.agent_name)
                if self._room_io:
                    # automatically connect to the room when room io is used
                    tasks.append(asyncio.create_task(job_ctx.connect(), name="_job_ctx_connect"))

                if not self._job_context_cb_registered:
                    job_ctx.add_shutdown_callback(
                        lambda: self._aclose_impl(reason=CloseReason.JOB_SHUTDOWN)
                    )
                    self._job_context_cb_registered = True
            except RuntimeError:
                pass  # ignore

            # it is ok to await it directly, there is no previous task to drain
            tasks.append(
                asyncio.create_task(self._update_activity(self._agent, wait_on_enter=False))
            )

            try:
                await asyncio.gather(*tasks)
            finally:
                await utils.aio.cancel_and_wait(*tasks)

            # important: no await should be done after this!

            if self.input.audio is not None:
                self._forward_audio_atask = asyncio.create_task(
                    self._forward_audio_task(), name="_forward_audio_task"
                )

            if self.input.video is not None:
                self._forward_video_atask = asyncio.create_task(
                    self._forward_video_task(), name="_forward_video_task"
                )

            self._started = True
            self._update_agent_state("listening")
            if self._room_io and self._room_io.subscribed_fut:

                def on_room_io_subscribed(_: asyncio.Future[None]) -> None:
                    if self._user_state == "listening" and self._agent_state == "listening":
                        self._set_user_away_timer()

                self._room_io.subscribed_fut.add_done_callback(on_room_io_subscribed)

            # log used IO
            def _collect_source(
                inp: io.AudioInput | io.VideoInput | None,
            ) -> list[io.AudioInput | io.VideoInput]:
                return [] if inp is None else [inp] + _collect_source(inp.source)

            def _collect_chain(
                out: io.TextOutput | io.VideoOutput | io.AudioOutput | None,
            ) -> list[io.VideoOutput | io.AudioOutput | io.TextOutput]:
                return [] if out is None else [out] + _collect_chain(out.next_in_chain)

            audio_input = _collect_source(self.input.audio)[::-1]
            video_input = _collect_source(self.input.video)[::-1]

            audio_output = _collect_chain(self.output.audio)
            video_output = _collect_chain(self.output.video)
            transcript_output = _collect_chain(self.output.transcription)

            logger.debug(
                "using audio io: %s -> `AgentSession` -> %s",
                " -> ".join([f"`{out.label}`" for out in audio_input]) or "(none)",
                " -> ".join([f"`{out.label}`" for out in audio_output]) or "(none)",
            )

            logger.debug(
                "using transcript io: `AgentSession` -> %s",
                " -> ".join([f"`{out.label}`" for out in transcript_output]) or "(none)",
            )

            if video_input or video_output:
                logger.debug(
                    "using video io: %s > `AgentSession` > %s",
                    " -> ".join([f"`{out.label}`" for out in video_input]) or "(none)",
                    " -> ".join([f"`{out.label}`" for out in video_output]) or "(none)",
                )

    async def drain(self) -> None:
        if self._activity is None:
            raise RuntimeError("AgentSession isn't running")

        await self._activity.drain()

    def _close_soon(
        self,
        *,
        reason: CloseReason,
        drain: bool = False,
        error: llm.LLMError | stt.STTError | tts.TTSError | llm.RealtimeModelError | None = None,
    ) -> None:
        if self._closing_task:
            return

        self._closing_task = asyncio.create_task(
            self._aclose_impl(error=error, drain=drain, reason=reason)
        )

    @utils.log_exceptions(logger=logger)
    async def _aclose_impl(
        self,
        *,
        reason: CloseReason,
        drain: bool = False,
        error: llm.LLMError | stt.STTError | tts.TTSError | llm.RealtimeModelError | None = None,
    ) -> None:
        if self._root_span_context:
            # make `activity.drain` and `on_exit` under the root span
            otel_context.attach(self._root_span_context)

        async with self._lock:
            if not self._started:
                return

            if self._activity is not None:
                if not drain:
                    try:
                        await self._activity.interrupt()
                    except RuntimeError:
                        # uninterruptible speech
                        # TODO(long): force interrupt or wait for it to finish?
                        # it might be an audio played from the error callback
                        pass

                await self._activity.drain()

                # wait any uninterruptible speech to finish
                if self._activity.current_speech:
                    await self._activity.current_speech

                # detach the inputs and outputs
                self.input.audio = None
                self.input.video = None
                self.output.audio = None
                self.output.transcription = None

                await self._activity.aclose()
                self._activity = None

            if self._agent_speaking_span:
                self._agent_speaking_span.end()
                self._agent_speaking_span = None

            if self._user_speaking_span:
                self._user_speaking_span.end()
                self._user_speaking_span = None

            if self._forward_audio_atask is not None:
                await utils.aio.cancel_and_wait(self._forward_audio_atask)

            if self._room_io:
                await self._room_io.aclose()
                self._room_io = None

            self._started = False
            if self._session_span:
                self._session_span.end()
                self._session_span = None
            self.emit("close", CloseEvent(error=error, reason=reason))

            self._cancel_user_away_timer()
            self._cancel_agent_false_interruption()
            self._user_state = "listening"
            self._agent_state = "initializing"
            self._llm_error_counts = 0
            self._tts_error_counts = 0
            self._root_span_context = None

        logger.debug("session closed", extra={"reason": reason.value, "error": error})

    async def aclose(self) -> None:
        await self._aclose_impl(reason=CloseReason.USER_INITIATED)

    def update_options(self) -> None:
        pass

    def say(
        self,
        text: str | AsyncIterable[str],
        *,
        audio: NotGivenOr[AsyncIterable[rtc.AudioFrame]] = NOT_GIVEN,
        allow_interruptions: NotGivenOr[bool] = NOT_GIVEN,
        add_to_chat_ctx: bool = True,
    ) -> SpeechHandle:
        if self._activity is None:
            raise RuntimeError("AgentSession isn't running")

        run_state = self._global_run_state
        if self._activity.scheduling_paused:
            if self._next_activity is None:
                raise RuntimeError("AgentSession is closing, cannot use say()")

            handle = self._next_activity.say(
                text,
                audio=audio,
                allow_interruptions=allow_interruptions,
                add_to_chat_ctx=add_to_chat_ctx,
            )
            if run_state:
                run_state._watch_handle(handle)

            return handle

        handle = self._activity.say(
            text,
            audio=audio,
            allow_interruptions=allow_interruptions,
            add_to_chat_ctx=add_to_chat_ctx,
        )
        if run_state:
            run_state._watch_handle(handle)

        return handle

    def generate_reply(
        self,
        *,
        user_input: NotGivenOr[str] = NOT_GIVEN,
        instructions: NotGivenOr[str] = NOT_GIVEN,
        tool_choice: NotGivenOr[llm.ToolChoice] = NOT_GIVEN,
        allow_interruptions: NotGivenOr[bool] = NOT_GIVEN,
    ) -> SpeechHandle:
        """Generate a reply for the agent to speak to the user.

        Args:
            user_input (NotGivenOr[str], optional): The user's input that may influence the reply,
                such as answering a question.
            instructions (NotGivenOr[str], optional): Additional instructions for generating the reply.
            tool_choice (NotGivenOr[llm.ToolChoice], optional): Specifies the external tool to use when
                generating the reply. If generate_reply is invoked within a function_tool, defaults to "none".
            allow_interruptions (NotGivenOr[bool], optional): Indicates whether the user can interrupt this speech.

        Returns:
            SpeechHandle: A handle to the generated reply.
        """  # noqa: E501
        if self._activity is None:
            raise RuntimeError("AgentSession isn't running")

        user_message = (
            llm.ChatMessage(role="user", content=[user_input])
            if is_given(user_input)
            else NOT_GIVEN
        )

        run_state = self._global_run_state
        if self._activity.scheduling_paused:
            if self._next_activity is None:
                raise RuntimeError("AgentSession is closing, cannot use generate_reply()")

            handle = self._next_activity._generate_reply(
                user_message=user_message,
                instructions=instructions,
                tool_choice=tool_choice,
                allow_interruptions=allow_interruptions,
            )
            if run_state:
                run_state._watch_handle(handle)

            return handle

        handle = self._activity._generate_reply(
            user_message=user_message,
            instructions=instructions,
            tool_choice=tool_choice,
            allow_interruptions=allow_interruptions,
        )
        if run_state:
            run_state._watch_handle(handle)

        return handle

    def interrupt(self) -> asyncio.Future[None]:
        """Interrupt the current speech generation.

        Returns:
            An asyncio.Future that completes when the interruption is fully processed
            and chat context has been updated.
        """
        if self._activity is None:
            raise RuntimeError("AgentSession isn't running")

        return self._activity.interrupt()

    def clear_user_turn(self) -> None:
        # clear the transcription or input audio buffer of the user turn
        if self._activity is None:
            raise RuntimeError("AgentSession isn't running")

        self._activity.clear_user_turn()

    def commit_user_turn(self, *, transcript_timeout: float = 2.0) -> None:
        """Commit the user turn and generate a reply.

        Args:
            transcript_timeout (float, optional): The timeout for the final transcript
                to be received after committing the user turn.
                Increase this value if the STT is slow to respond.

        Raises:
            RuntimeError: If the AgentSession isn't running.
        """
        if self._activity is None:
            raise RuntimeError("AgentSession isn't running")

        self._activity.commit_user_turn(transcript_timeout=transcript_timeout)

    def update_agent(self, agent: Agent) -> None:
        self._agent = agent

        if self._started:
            self._update_activity_atask = task = asyncio.create_task(
                self._update_activity_task(self._update_activity_atask, self._agent),
                name="_update_activity_task",
            )
            run_state = self._global_run_state
            if run_state:
                # don't mark the RunResult as done, if there is currently an agent transition happening.  # noqa: E501
                # (used to make sure we're correctly adding the AgentHandoffResult before completion)  # noqa: E501
                run_state._watch_handle(task)

    async def _update_activity(
        self,
        agent: Agent,
        *,
        previous_activity: Literal["close", "pause"] = "close",
        new_activity: Literal["start", "resume"] = "start",
        blocked_tasks: list[asyncio.Task] | None = None,
        wait_on_enter: bool = True,
    ) -> None:
        async with self._activity_lock:
            # _update_activity is called directly sometimes, update for redundancy
            self._agent = agent

            if new_activity == "start":
                if agent._activity is not None:
                    raise RuntimeError("cannot start agent: an activity is already running")

                self._next_activity = AgentActivity(agent, self)
            elif new_activity == "resume":
                if agent._activity is None:
                    raise RuntimeError("cannot resume agent: no existing active activity to resume")

                self._next_activity = agent._activity

            previous_activity_v = self._activity
            if self._activity is not None:
                if previous_activity == "close":
                    await self._activity.drain()
                    await self._activity.aclose()
                elif previous_activity == "pause":
                    await self._activity.pause(blocked_tasks=blocked_tasks or [])

            self._activity = self._next_activity
            self._next_activity = None

            run_state = self._global_run_state
            if run_state:
                run_state._agent_handoff(
                    old_agent=previous_activity_v.agent if previous_activity_v else None,
                    new_agent=self._activity.agent,
                )

            if new_activity == "start":
                await self._activity.start()
            elif new_activity == "resume":
                await self._activity.resume()

        # move it outside the lock to allow calling _update_activity in on_enter of a new agent
        if wait_on_enter:
            assert self._activity._on_enter_task is not None
            await asyncio.shield(self._activity._on_enter_task)

    @utils.log_exceptions(logger=logger)
    async def _update_activity_task(
        self, old_task: asyncio.Task[None] | None, agent: Agent
    ) -> None:
        if old_task is not None:
            await old_task

        if self._root_span_context is not None:
            # restore the root span context so on_exit, on_enter, and future turns
            # are direct children of the root span, not nested under a tool call.
            otel_context.attach(self._root_span_context)

        await self._update_activity(agent)

    def _on_error(
        self,
        error: llm.LLMError | stt.STTError | tts.TTSError | llm.RealtimeModelError,
    ) -> None:
        if self._closing_task or error.recoverable:
            return

        if error.type == "llm_error":
            self._llm_error_counts += 1
            if self._llm_error_counts <= self.conn_options.max_unrecoverable_errors:
                return
        elif error.type == "tts_error":
            self._tts_error_counts += 1
            if self._tts_error_counts <= self.conn_options.max_unrecoverable_errors:
                return

        logger.error("AgentSession is closing due to unrecoverable error", exc_info=error.error)

        def on_close_done(_: asyncio.Task[None]) -> None:
            self._closing_task = None

        self._closing_task = asyncio.create_task(
            self._aclose_impl(error=error, reason=CloseReason.ERROR)
        )
        self._closing_task.add_done_callback(on_close_done)

    @utils.log_exceptions(logger=logger)
    async def _forward_audio_task(self) -> None:
        audio_input = self.input.audio
        if audio_input is None:
            return

        async for frame in audio_input:
            if self._activity is not None:
                self._activity.push_audio(frame)

    @utils.log_exceptions(logger=logger)
    async def _forward_video_task(self) -> None:
        video_input = self.input.video
        if video_input is None:
            return

        async for frame in video_input:
            if self._activity is not None:
                if self._video_sampler is not None and not self._video_sampler(frame, self):
                    continue  # ignore this frame

                self._activity.push_video(frame)

    def _set_user_away_timer(self) -> None:
        self._cancel_user_away_timer()
        if self._opts.user_away_timeout is None:
            return

        if (
            (room_io := self._room_io)
            and room_io.subscribed_fut
            and not room_io.subscribed_fut.done()
        ):
            # skip the timer before user join the room
            return

        self._user_away_timer = self._loop.call_later(
            self._opts.user_away_timeout, self._update_user_state, "away"
        )

    def _cancel_user_away_timer(self) -> None:
        if self._user_away_timer is not None:
            self._user_away_timer.cancel()
            self._user_away_timer = None

    def _update_agent_state(self, state: AgentState) -> None:
        if self._agent_state == state:
            return

        if state == "speaking":
            self._llm_error_counts = 0
            self._tts_error_counts = 0

            if self._agent_speaking_span is None:
                self._agent_speaking_span = tracer.start_span("agent_speaking")
                self._agent_speaking_span.set_attribute(trace_types.ATTR_START_TIME, time.time())
        elif self._agent_speaking_span is not None:
            self._agent_speaking_span.set_attribute(trace_types.ATTR_END_TIME, time.time())
            self._agent_speaking_span.end()
            self._agent_speaking_span = None

        if state == "listening" and self._user_state == "listening":
            self._set_user_away_timer()
        else:
            self._cancel_user_away_timer()

        if state != "listening":
            self._cancel_agent_false_interruption()

        old_state = self._agent_state
        self._agent_state = state
        self.emit(
            "agent_state_changed",
            AgentStateChangedEvent(old_state=old_state, new_state=state),
        )

    def _update_user_state(
        self, state: UserState, *, last_speaking_time: float | None = None
    ) -> None:
        if self._user_state == state:
            return

        if state == "speaking" and self._user_speaking_span is None:
            self._user_speaking_span = tracer.start_span("user_speaking")
            self._user_speaking_span.set_attribute(trace_types.ATTR_START_TIME, time.time())
        elif self._user_speaking_span is not None:
            end_time = last_speaking_time or time.time()
            self._user_speaking_span.set_attribute(trace_types.ATTR_END_TIME, end_time)
            self._user_speaking_span.end(end_time=int(end_time * 1_000_000_000))  # nanoseconds
            self._user_speaking_span = None

        if state == "listening" and self._agent_state == "listening":
            self._set_user_away_timer()
        else:
            self._cancel_user_away_timer()

        # pause the false interruption timer if user is speaking and recreate it after user stops
        if state == "speaking" and self._false_interruption_timer:
            ev = self._false_interrupted_event
            self._cancel_agent_false_interruption()
            self._false_interrupted_event = ev
        elif state == "listening" and self._false_interrupted_event:
            self._schedule_agent_false_interruption(self._false_interrupted_event)

        old_state = self._user_state
        self._user_state = state
        self.emit(
            "user_state_changed",
            UserStateChangedEvent(old_state=old_state, new_state=state),
        )

    def _user_input_transcribed(self, ev: UserInputTranscribedEvent) -> None:
        self.emit("user_input_transcribed", ev)
        if ev.is_final:
            # fully cancel the false interruption event if user transcript arrives
            self._cancel_agent_false_interruption()

    def _conversation_item_added(self, message: llm.ChatMessage) -> None:
        self._chat_ctx.insert(message)
        self.emit("conversation_item_added", ConversationItemAddedEvent(item=message))

    def _schedule_agent_false_interruption(self, ev: AgentFalseInterruptionEvent) -> None:
        if self._opts.agent_false_interruption_timeout is None:
            return

        def _emit_event() -> None:
            if self._agent_state != "listening" or self._user_state != "listening":
                return

            self.emit("agent_false_interruption", ev)
            self._false_interruption_timer = None

        self._cancel_agent_false_interruption()
        self._false_interruption_timer = self._loop.call_later(
            self._opts.agent_false_interruption_timeout, _emit_event
        )
        self._false_interrupted_event = ev

    def _cancel_agent_false_interruption(self) -> None:
        if self._false_interruption_timer is not None:
            self._false_interruption_timer.cancel()
            self._false_interruption_timer = None
        self._false_interrupted_event = None

    # move them to the end to avoid shadowing the same named modules for mypy
    @property
    def stt(self) -> stt.STT | None:
        return self._stt

    @property
    def llm(self) -> llm.LLM | llm.RealtimeModel | None:
        return self._llm

    @property
    def tts(self) -> tts.TTS | None:
        return self._tts

    @property
    def vad(self) -> vad.VAD | None:
        return self._vad

    # -- User changed input/output streams/sinks --

    def _on_video_input_changed(self) -> None:
        if not self._started:
            return

        if self._forward_video_atask is not None:
            self._forward_video_atask.cancel()

        self._forward_video_atask = asyncio.create_task(
            self._forward_video_task(), name="_forward_video_task"
        )

    def _on_audio_input_changed(self) -> None:
        if not self._started:
            return

        if self._forward_audio_atask is not None:
            self._forward_audio_atask.cancel()

        self._forward_audio_atask = asyncio.create_task(
            self._forward_audio_task(), name="_forward_audio_task"
        )

    def _on_video_output_changed(self) -> None:
        pass

    def _on_audio_output_changed(self) -> None:
        pass

    def _on_text_output_changed(self) -> None:
        pass

    # ---

    async def __aenter__(self) -> AgentSession:
        return self

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> None:
        await self.aclose()

Abstract base class for generic types.

On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::

class Mapping[KT, VT]:
    def __getitem__(self, key: KT) -> VT:
        ...
    # Etc.

On older versions of Python, however, generic classes have to explicitly inherit from Generic.

After a class has been declared to be generic, it can then be used as follows::

def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT:
    try:
        return mapping[key]
    except KeyError:
        return default

AgentSession is the LiveKit Agents runtime that glues together media streams, speech/LLM components, and tool orchestration into a single real-time voice agent.

It links audio, video, and text I/O with STT, VAD, TTS, and the LLM; handles turn detection, endpointing, interruptions, and multi-step tool calls; and exposes everything through event callbacks so you can focus on writing function tools and simple hand-offs rather than low-level streaming logic.

Args

turn_detection : TurnDetectionMode, optional

Strategy for deciding when the user has finished speaking.

  • "stt" – rely on speech-to-text end-of-utterance cues
  • "vad" – rely on Voice Activity Detection start/stop cues
  • "realtime_llm" – use server-side detection from a realtime LLM
  • "manual" – caller controls turn boundaries explicitly
  • _TurnDetector instance – plug-in custom detector

If NOT_GIVEN, the session chooses the best available mode in priority order realtime_llm → vad → stt → manual; it automatically falls back if the necessary model is missing.

stt : stt.STT, optional
Speech-to-text backend.
vad : vad.VAD, optional
Voice-activity detector
llm : llm.LLM | llm.RealtimeModel, optional
LLM or RealtimeModel
tts : tts.TTS, optional
Text-to-speech engine.
mcp_servers : list[mcp.MCPServer], optional
List of MCP servers providing external tools for the agent to use.
userdata : Userdata_T, optional
Arbitrary per-session user data.
allow_interruptions : bool
Whether the user can interrupt the agent mid-utterance. Default True.
discard_audio_if_uninterruptible : bool
When True, buffered audio is dropped while the agent is speaking and cannot be interrupted. Default True.
min_interruption_duration : float
Minimum speech length (s) to register as an interruption. Default 0.5 s.
min_interruption_words : int
Minimum number of words to consider an interruption, only used if stt enabled. Default 0.
min_endpointing_delay : float
Minimum time-in-seconds the agent must wait after a potential end-of-utterance signal (from VAD or an EOU model) before it declares the user’s turn complete. Default 0.4 s.
max_endpointing_delay : float
Maximum time-in-seconds the agent will wait before terminating the turn. Default 6.0 s.
max_tool_steps : int
Maximum consecutive tool calls per LLM turn. Default 3.
video_sampler : _VideoSampler, optional
Uses :class:VoiceActivityVideoSampler when NOT_GIVEN; that sampler captures video at ~1 fps while the user is speaking and ~0.3 fps when silent by default.
user_away_timeout : float, optional
If set, set the user state as "away" after this amount of time after user and agent are silent. Default 15.0 s, set to None to disable.
agent_false_interruption_timeout : float, optional
If set, emit an agent_false_interruption event after this amount of time if the user is silent and no user transcript is detected after the interruption. Set to None to disable. Default 4.0 s.
min_consecutive_speech_delay : float, optional
The minimum delay between consecutive speech. Default 0.0 s.
use_tts_aligned_transcript : bool, optional
Whether to use TTS-aligned transcript as the input of the transcription_node. Only applies if TTS.capabilities.aligned_transcript is True or streaming is False.
preemptive_generation : bool
Whether to use preemptive generation. Default False.
preemptive_generation (bool):
Whether to speculatively begin LLM and TTS requests before an end-of-turn is
detected. When True, the agent sends inference calls as soon as a user
transcript is received rather than waiting for a definitive turn boundary. This
can reduce response latency by overlapping model inference with user audio,
but may incur extra compute if the user interrupts or revises mid-utterance.
Defaults to False.
conn_options : SessionConnectOptions, optional
Connection options for stt, llm, and tts.
loop : asyncio.AbstractEventLoop, optional
Event loop to bind the session to. Falls back to :pyfunc:asyncio.get_event_loop().

Ancestors

Instance variables

prop agent_state : AgentState
Expand source code
@property
def agent_state(self) -> AgentState:
    return self._agent_state
prop conn_options : SessionConnectOptions
Expand source code
@property
def conn_options(self) -> SessionConnectOptions:
    return self._conn_options
prop current_agentAgent
Expand source code
@property
def current_agent(self) -> Agent:
    if self._agent is None:
        raise RuntimeError("VoiceAgent isn't running")

    return self._agent
prop current_speechSpeechHandle | None
Expand source code
@property
def current_speech(self) -> SpeechHandle | None:
    return self._activity.current_speech if self._activity is not None else None
prop history : llm.ChatContext
Expand source code
@property
def history(self) -> llm.ChatContext:
    return self._chat_ctx
prop inputAgentInput
Expand source code
@property
def input(self) -> io.AgentInput:
    return self._input
prop llm : llm.LLM | llm.RealtimeModel | None
Expand source code
@property
def llm(self) -> llm.LLM | llm.RealtimeModel | None:
    return self._llm
prop mcp_servers : list[mcp.MCPServer] | None
Expand source code
@property
def mcp_servers(self) -> list[mcp.MCPServer] | None:
    return self._mcp_servers
prop options : VoiceOptions
Expand source code
@property
def options(self) -> VoiceOptions:
    return self._opts
prop outputAgentOutput
Expand source code
@property
def output(self) -> io.AgentOutput:
    return self._output
prop stt : stt.STT | None
Expand source code
@property
def stt(self) -> stt.STT | None:
    return self._stt
prop tts : tts.TTS | None
Expand source code
@property
def tts(self) -> tts.TTS | None:
    return self._tts
prop turn_detection : TurnDetectionMode | None
Expand source code
@property
def turn_detection(self) -> TurnDetectionMode | None:
    return self._turn_detection
prop user_state : UserState
Expand source code
@property
def user_state(self) -> UserState:
    return self._user_state
prop userdata : Userdata_T
Expand source code
@property
def userdata(self) -> Userdata_T:
    if self._userdata is None:
        raise ValueError("VoiceAgent userdata is not set")

    return self._userdata
prop vad : vad.VAD | None
Expand source code
@property
def vad(self) -> vad.VAD | None:
    return self._vad

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    await self._aclose_impl(reason=CloseReason.USER_INITIATED)
def clear_user_turn(self) ‑> None
Expand source code
def clear_user_turn(self) -> None:
    # clear the transcription or input audio buffer of the user turn
    if self._activity is None:
        raise RuntimeError("AgentSession isn't running")

    self._activity.clear_user_turn()
def commit_user_turn(self, *, transcript_timeout: float = 2.0) ‑> None
Expand source code
def commit_user_turn(self, *, transcript_timeout: float = 2.0) -> None:
    """Commit the user turn and generate a reply.

    Args:
        transcript_timeout (float, optional): The timeout for the final transcript
            to be received after committing the user turn.
            Increase this value if the STT is slow to respond.

    Raises:
        RuntimeError: If the AgentSession isn't running.
    """
    if self._activity is None:
        raise RuntimeError("AgentSession isn't running")

    self._activity.commit_user_turn(transcript_timeout=transcript_timeout)

Commit the user turn and generate a reply.

Args

transcript_timeout : float, optional
The timeout for the final transcript to be received after committing the user turn. Increase this value if the STT is slow to respond.

Raises

RuntimeError
If the AgentSession isn't running.
async def drain(self) ‑> None
Expand source code
async def drain(self) -> None:
    if self._activity is None:
        raise RuntimeError("AgentSession isn't running")

    await self._activity.drain()
def generate_reply(self,
*,
user_input: NotGivenOr[str] = NOT_GIVEN,
instructions: NotGivenOr[str] = NOT_GIVEN,
tool_choice: NotGivenOr[llm.ToolChoice] = NOT_GIVEN,
allow_interruptions: NotGivenOr[bool] = NOT_GIVEN) ‑> livekit.agents.voice.speech_handle.SpeechHandle
Expand source code
def generate_reply(
    self,
    *,
    user_input: NotGivenOr[str] = NOT_GIVEN,
    instructions: NotGivenOr[str] = NOT_GIVEN,
    tool_choice: NotGivenOr[llm.ToolChoice] = NOT_GIVEN,
    allow_interruptions: NotGivenOr[bool] = NOT_GIVEN,
) -> SpeechHandle:
    """Generate a reply for the agent to speak to the user.

    Args:
        user_input (NotGivenOr[str], optional): The user's input that may influence the reply,
            such as answering a question.
        instructions (NotGivenOr[str], optional): Additional instructions for generating the reply.
        tool_choice (NotGivenOr[llm.ToolChoice], optional): Specifies the external tool to use when
            generating the reply. If generate_reply is invoked within a function_tool, defaults to "none".
        allow_interruptions (NotGivenOr[bool], optional): Indicates whether the user can interrupt this speech.

    Returns:
        SpeechHandle: A handle to the generated reply.
    """  # noqa: E501
    if self._activity is None:
        raise RuntimeError("AgentSession isn't running")

    user_message = (
        llm.ChatMessage(role="user", content=[user_input])
        if is_given(user_input)
        else NOT_GIVEN
    )

    run_state = self._global_run_state
    if self._activity.scheduling_paused:
        if self._next_activity is None:
            raise RuntimeError("AgentSession is closing, cannot use generate_reply()")

        handle = self._next_activity._generate_reply(
            user_message=user_message,
            instructions=instructions,
            tool_choice=tool_choice,
            allow_interruptions=allow_interruptions,
        )
        if run_state:
            run_state._watch_handle(handle)

        return handle

    handle = self._activity._generate_reply(
        user_message=user_message,
        instructions=instructions,
        tool_choice=tool_choice,
        allow_interruptions=allow_interruptions,
    )
    if run_state:
        run_state._watch_handle(handle)

    return handle

Generate a reply for the agent to speak to the user.

Args

user_input : NotGivenOr[str], optional
The user's input that may influence the reply, such as answering a question.
instructions : NotGivenOr[str], optional
Additional instructions for generating the reply.
tool_choice : NotGivenOr[llm.ToolChoice], optional
Specifies the external tool to use when generating the reply. If generate_reply is invoked within a function_tool, defaults to "none".
allow_interruptions : NotGivenOr[bool], optional
Indicates whether the user can interrupt this speech.

Returns

SpeechHandle
A handle to the generated reply.
def interrupt(self) ‑> _asyncio.Future[None]
Expand source code
def interrupt(self) -> asyncio.Future[None]:
    """Interrupt the current speech generation.

    Returns:
        An asyncio.Future that completes when the interruption is fully processed
        and chat context has been updated.
    """
    if self._activity is None:
        raise RuntimeError("AgentSession isn't running")

    return self._activity.interrupt()

Interrupt the current speech generation.

Returns

An asyncio.Future that completes when the interruption is fully processed and chat context has been updated.

def run(self, *, user_input: str, output_type: type[Run_T] | None = None) ‑> RunResult[~Run_T]
Expand source code
def run(self, *, user_input: str, output_type: type[Run_T] | None = None) -> RunResult[Run_T]:
    if self._global_run_state is not None and not self._global_run_state.done():
        raise RuntimeError("nested runs are not supported")

    run_state = RunResult(user_input=user_input, output_type=output_type)
    self._global_run_state = run_state
    self.generate_reply(user_input=user_input)
    return run_state
def say(self,
text: str | AsyncIterable[str],
*,
audio: NotGivenOr[AsyncIterable[rtc.AudioFrame]] = NOT_GIVEN,
allow_interruptions: NotGivenOr[bool] = NOT_GIVEN,
add_to_chat_ctx: bool = True) ‑> livekit.agents.voice.speech_handle.SpeechHandle
Expand source code
def say(
    self,
    text: str | AsyncIterable[str],
    *,
    audio: NotGivenOr[AsyncIterable[rtc.AudioFrame]] = NOT_GIVEN,
    allow_interruptions: NotGivenOr[bool] = NOT_GIVEN,
    add_to_chat_ctx: bool = True,
) -> SpeechHandle:
    if self._activity is None:
        raise RuntimeError("AgentSession isn't running")

    run_state = self._global_run_state
    if self._activity.scheduling_paused:
        if self._next_activity is None:
            raise RuntimeError("AgentSession is closing, cannot use say()")

        handle = self._next_activity.say(
            text,
            audio=audio,
            allow_interruptions=allow_interruptions,
            add_to_chat_ctx=add_to_chat_ctx,
        )
        if run_state:
            run_state._watch_handle(handle)

        return handle

    handle = self._activity.say(
        text,
        audio=audio,
        allow_interruptions=allow_interruptions,
        add_to_chat_ctx=add_to_chat_ctx,
    )
    if run_state:
        run_state._watch_handle(handle)

    return handle
async def start(self,
agent: Agent,
*,
room: NotGivenOr[rtc.Room] = NOT_GIVEN,
room_input_options: NotGivenOr[RoomInputOptions] = NOT_GIVEN,
room_output_options: NotGivenOr[RoomOutputOptions] = NOT_GIVEN) ‑> None
Expand source code
@tracer.start_as_current_span("agent_session", end_on_exit=False)
async def start(
    self,
    agent: Agent,
    *,
    room: NotGivenOr[rtc.Room] = NOT_GIVEN,
    room_input_options: NotGivenOr[room_io.RoomInputOptions] = NOT_GIVEN,
    room_output_options: NotGivenOr[room_io.RoomOutputOptions] = NOT_GIVEN,
) -> None:
    """Start the voice agent.

    Create a default RoomIO if the input or output audio is not already set.
    If the console flag is provided, start a ChatCLI.

    Args:
        room: The room to use for input and output
        room_input_options: Options for the room input
        room_output_options: Options for the room output
    """
    async with self._lock:
        if self._started:
            return

        self._root_span_context = otel_context.get_current()
        self._session_span = current_span = trace.get_current_span()
        current_span = trace.get_current_span()
        current_span.set_attribute(trace_types.ATTR_AGENT_LABEL, agent.label)
        current_span.set_attribute(
            trace_types.ATTR_SESSION_OPTIONS, json.dumps(asdict(self._opts))
        )

        self._agent = agent
        self._update_agent_state("initializing")

        tasks: list[asyncio.Task[None]] = []
        if cli.CLI_ARGUMENTS is not None and cli.CLI_ARGUMENTS.console:
            from .chat_cli import ChatCLI

            if (
                self.input.audio is not None
                or self.output.audio is not None
                or self.output.transcription is not None
            ):
                logger.warning(
                    "agent started with the console subcommand, but input.audio or output.audio "  # noqa: E501
                    "or output.transcription is already set, overriding.."
                )

            chat_cli = ChatCLI(self)
            tasks.append(asyncio.create_task(chat_cli.start(), name="_chat_cli_start"))

        elif is_given(room) and not self._room_io:
            room_input_options = copy.copy(
                room_input_options or room_io.DEFAULT_ROOM_INPUT_OPTIONS
            )
            room_output_options = copy.copy(
                room_output_options or room_io.DEFAULT_ROOM_OUTPUT_OPTIONS
            )

            if self.input.audio is not None:
                if room_input_options.audio_enabled:
                    logger.warning(
                        "RoomIO audio input is enabled but input.audio is already set, ignoring.."  # noqa: E501
                    )
                room_input_options.audio_enabled = False

            if self.output.audio is not None:
                if room_output_options.audio_enabled:
                    logger.warning(
                        "RoomIO audio output is enabled but output.audio is already set, ignoring.."  # noqa: E501
                    )
                room_output_options.audio_enabled = False

            if self.output.transcription is not None:
                if room_output_options.transcription_enabled:
                    logger.warning(
                        "RoomIO transcription output is enabled but output.transcription is already set, ignoring.."  # noqa: E501
                    )
                room_output_options.transcription_enabled = False

            self._room_io = room_io.RoomIO(
                room=room,
                agent_session=self,
                input_options=room_input_options,
                output_options=room_output_options,
            )
            tasks.append(asyncio.create_task(self._room_io.start(), name="_room_io_start"))

        # session can be restarted, register the callbacks only once
        try:
            job_ctx = get_job_context()
            current_span.set_attribute(trace_types.ATTR_ROOM_NAME, job_ctx.room.name)
            current_span.set_attribute(trace_types.ATTR_JOB_ID, job_ctx.job.id)
            current_span.set_attribute(trace_types.ATTR_AGENT_NAME, job_ctx.job.agent_name)
            if self._room_io:
                # automatically connect to the room when room io is used
                tasks.append(asyncio.create_task(job_ctx.connect(), name="_job_ctx_connect"))

            if not self._job_context_cb_registered:
                job_ctx.add_shutdown_callback(
                    lambda: self._aclose_impl(reason=CloseReason.JOB_SHUTDOWN)
                )
                self._job_context_cb_registered = True
        except RuntimeError:
            pass  # ignore

        # it is ok to await it directly, there is no previous task to drain
        tasks.append(
            asyncio.create_task(self._update_activity(self._agent, wait_on_enter=False))
        )

        try:
            await asyncio.gather(*tasks)
        finally:
            await utils.aio.cancel_and_wait(*tasks)

        # important: no await should be done after this!

        if self.input.audio is not None:
            self._forward_audio_atask = asyncio.create_task(
                self._forward_audio_task(), name="_forward_audio_task"
            )

        if self.input.video is not None:
            self._forward_video_atask = asyncio.create_task(
                self._forward_video_task(), name="_forward_video_task"
            )

        self._started = True
        self._update_agent_state("listening")
        if self._room_io and self._room_io.subscribed_fut:

            def on_room_io_subscribed(_: asyncio.Future[None]) -> None:
                if self._user_state == "listening" and self._agent_state == "listening":
                    self._set_user_away_timer()

            self._room_io.subscribed_fut.add_done_callback(on_room_io_subscribed)

        # log used IO
        def _collect_source(
            inp: io.AudioInput | io.VideoInput | None,
        ) -> list[io.AudioInput | io.VideoInput]:
            return [] if inp is None else [inp] + _collect_source(inp.source)

        def _collect_chain(
            out: io.TextOutput | io.VideoOutput | io.AudioOutput | None,
        ) -> list[io.VideoOutput | io.AudioOutput | io.TextOutput]:
            return [] if out is None else [out] + _collect_chain(out.next_in_chain)

        audio_input = _collect_source(self.input.audio)[::-1]
        video_input = _collect_source(self.input.video)[::-1]

        audio_output = _collect_chain(self.output.audio)
        video_output = _collect_chain(self.output.video)
        transcript_output = _collect_chain(self.output.transcription)

        logger.debug(
            "using audio io: %s -> `AgentSession` -> %s",
            " -> ".join([f"`{out.label}`" for out in audio_input]) or "(none)",
            " -> ".join([f"`{out.label}`" for out in audio_output]) or "(none)",
        )

        logger.debug(
            "using transcript io: `AgentSession` -> %s",
            " -> ".join([f"`{out.label}`" for out in transcript_output]) or "(none)",
        )

        if video_input or video_output:
            logger.debug(
                "using video io: %s > `AgentSession` > %s",
                " -> ".join([f"`{out.label}`" for out in video_input]) or "(none)",
                " -> ".join([f"`{out.label}`" for out in video_output]) or "(none)",
            )

Start the voice agent.

Create a default RoomIO if the input or output audio is not already set. If the console flag is provided, start a ChatCLI.

Args

room
The room to use for input and output
room_input_options
Options for the room input
room_output_options
Options for the room output
def update_agent(self,
agent: Agent) ‑> None
Expand source code
def update_agent(self, agent: Agent) -> None:
    self._agent = agent

    if self._started:
        self._update_activity_atask = task = asyncio.create_task(
            self._update_activity_task(self._update_activity_atask, self._agent),
            name="_update_activity_task",
        )
        run_state = self._global_run_state
        if run_state:
            # don't mark the RunResult as done, if there is currently an agent transition happening.  # noqa: E501
            # (used to make sure we're correctly adding the AgentHandoffResult before completion)  # noqa: E501
            run_state._watch_handle(task)
def update_options(self) ‑> None
Expand source code
def update_options(self) -> None:
    pass

Inherited members

class AgentStateChangedEvent (**data: Any)
Expand source code
class AgentStateChangedEvent(BaseModel):
    type: Literal["agent_state_changed"] = "agent_state_changed"
    old_state: AgentState
    new_state: AgentState
    created_at: float = Field(default_factory=time.time)

Usage docs: https://docs.pydantic.dev/2.10/concepts/models/

A base class for creating Pydantic models.

Attributes

__class_vars__
The names of the class variables defined on the model.
__private_attributes__
Metadata about the private attributes of the model.
__signature__
The synthesized __init__ [Signature][inspect.Signature] of the model.
__pydantic_complete__
Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__
The core schema of the model.
__pydantic_custom_init__
Whether the model has a custom __init__ function.
__pydantic_decorators__
Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
__pydantic_generic_metadata__
Metadata for generic models; contains data used for a similar purpose to args, origin, parameters in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__
Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__
The name of the post-init method for the model, if defined.
__pydantic_root_model__
Whether the model is a [RootModel][pydantic.root_model.RootModel].
__pydantic_serializer__
The pydantic-core SchemaSerializer used to dump instances of the model.
__pydantic_validator__
The pydantic-core SchemaValidator used to validate instances of the model.
__pydantic_fields__
A dictionary of field names and their corresponding [FieldInfo][pydantic.fields.FieldInfo] objects.
__pydantic_computed_fields__
A dictionary of computed field names and their corresponding [ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects.
__pydantic_extra__
A dictionary containing extra values, if [extra][pydantic.config.ConfigDict.extra] is set to 'allow'.
__pydantic_fields_set__
The names of fields explicitly set during instantiation.
__pydantic_private__
Values of private attributes set on the model instance.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • pydantic.main.BaseModel

Class variables

var created_at : float
var model_config
var new_state : Literal['initializing', 'idle', 'listening', 'thinking', 'speaking']
var old_state : Literal['initializing', 'idle', 'listening', 'thinking', 'speaking']
var type : Literal['agent_state_changed']
class AgentTask (*,
instructions: str,
chat_ctx: NotGivenOr[llm.ChatContext] = NOT_GIVEN,
tools: list[llm.FunctionTool | llm.RawFunctionTool] | None = None,
turn_detection: NotGivenOr[TurnDetectionMode | None] = NOT_GIVEN,
stt: NotGivenOr[stt.STT | None] = NOT_GIVEN,
vad: NotGivenOr[vad.VAD | None] = NOT_GIVEN,
llm: NotGivenOr[llm.LLM | llm.RealtimeModel | None] = NOT_GIVEN,
tts: NotGivenOr[tts.TTS | None] = NOT_GIVEN,
mcp_servers: NotGivenOr[list[mcp.MCPServer] | None] = NOT_GIVEN,
allow_interruptions: NotGivenOr[bool] = NOT_GIVEN)
Expand source code
class AgentTask(Agent, Generic[TaskResult_T]):
    def __init__(
        self,
        *,
        instructions: str,
        chat_ctx: NotGivenOr[llm.ChatContext] = NOT_GIVEN,
        tools: list[llm.FunctionTool | llm.RawFunctionTool] | None = None,
        turn_detection: NotGivenOr[TurnDetectionMode | None] = NOT_GIVEN,
        stt: NotGivenOr[stt.STT | None] = NOT_GIVEN,
        vad: NotGivenOr[vad.VAD | None] = NOT_GIVEN,
        llm: NotGivenOr[llm.LLM | llm.RealtimeModel | None] = NOT_GIVEN,
        tts: NotGivenOr[tts.TTS | None] = NOT_GIVEN,
        mcp_servers: NotGivenOr[list[mcp.MCPServer] | None] = NOT_GIVEN,
        allow_interruptions: NotGivenOr[bool] = NOT_GIVEN,
    ) -> None:
        tools = tools or []
        super().__init__(
            instructions=instructions,
            chat_ctx=chat_ctx,
            tools=tools,
            turn_detection=turn_detection,
            stt=stt,
            vad=vad,
            llm=llm,
            tts=tts,
            mcp_servers=mcp_servers,
            allow_interruptions=allow_interruptions,
        )

        self.__started = False
        self.__fut = asyncio.Future[TaskResult_T]()

    def done(self) -> bool:
        return self.__fut.done()

    def complete(self, result: TaskResult_T | Exception) -> None:
        if self.__fut.done():
            raise RuntimeError(f"{self.__class__.__name__} is already done")

        if isinstance(result, Exception):
            self.__fut.set_exception(result)
        else:
            self.__fut.set_result(result)

        self.__fut.exception()  # silence exc not retrieved warnings

        from .agent_activity import _SpeechHandleContextVar

        speech_handle = _SpeechHandleContextVar.get(None)

        if speech_handle:
            speech_handle._maybe_run_final_output = result

        # if not self.__inline_mode:
        #    session._close_soon(reason=CloseReason.TASK_COMPLETED, drain=True)

    async def __await_impl(self) -> TaskResult_T:
        if self.__started:
            raise RuntimeError(f"{self.__class__.__name__} is not re-entrant, await only once")

        self.__started = True

        current_task = asyncio.current_task()
        if current_task is None:
            raise RuntimeError(
                f"{self.__class__.__name__} must be executed inside an async context"
            )

        task_info = _get_activity_task_info(current_task)
        if not task_info or not task_info.inline_task:
            raise RuntimeError(
                f"{self.__class__.__name__} should only be awaited inside tool_functions or the on_enter/on_exit methods of an Agent"  # noqa: E501
            )

        def _handle_task_done(_: asyncio.Task[Any]) -> None:
            if self.__fut.done():
                return

            # if the asyncio.Task running the InlineTask completes before the InlineTask itself, log
            # an error and attempt to recover by terminating the InlineTask.
            logger.error(
                f"The asyncio.Task finished before {self.__class__.__name__} was completed."
            )

            self.complete(
                RuntimeError(
                    f"The asyncio.Task finished before {self.__class__.__name__} was completed."
                )
            )

        current_task.add_done_callback(_handle_task_done)

        from .agent_activity import _AgentActivityContextVar, _SpeechHandleContextVar

        # TODO(theomonnom): add a global lock for inline tasks
        # This may currently break in the case we use parallel tool calls.

        speech_handle = _SpeechHandleContextVar.get(None)
        old_activity = _AgentActivityContextVar.get()
        old_agent = old_activity.agent
        session = old_activity.session

        # TODO(theomonnom): could the RunResult watcher & the blocked_tasks share the same logic?
        await session._update_activity(
            self, previous_activity="pause", blocked_tasks=[current_task]
        )

        # NOTE: _update_activity is calling the on_enter method, so the RunResult can capture all speeches
        run_state = session._global_run_state
        if speech_handle and run_state and not run_state.done():
            # make sure to not deadlock on the current speech handle
            run_state._unwatch_handle(speech_handle)
            # it is OK to call _mark_done_if_needed here, the above _update_activity will call on_enter
            # so handles added inside the on_enter will make sure we're not completing the run_state too early.
            run_state._mark_done_if_needed(None)

        try:
            return await asyncio.shield(self.__fut)

        finally:
            # run_state could have changed after self.__fut
            run_state = session._global_run_state

            if session.current_agent != self:
                logger.warning(
                    f"{self.__class__.__name__} completed, but the agent has changed in the meantime. "
                    "Ignoring handoff to the previous agent, likely due to `AgentSession.update_agent` being invoked."
                )
                await old_activity.aclose()
            else:
                if speech_handle and run_state and not run_state.done():
                    run_state._watch_handle(speech_handle)

                merged_chat_ctx = old_agent.chat_ctx.merge(
                    self.chat_ctx, exclude_function_call=True, exclude_instructions=True
                )
                # set the chat_ctx directly, `session._update_activity` will sync it to the rt_session if needed
                old_agent._chat_ctx.items[:] = merged_chat_ctx.items
                # await old_agent.update_chat_ctx(merged_chat_ctx)

                await session._update_activity(
                    old_agent, new_activity="resume", wait_on_enter=False
                )

    def __await__(self) -> Generator[None, None, TaskResult_T]:
        return self.__await_impl().__await__()

Abstract base class for generic types.

On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::

class Mapping[KT, VT]:
    def __getitem__(self, key: KT) -> VT:
        ...
    # Etc.

On older versions of Python, however, generic classes have to explicitly inherit from Generic.

After a class has been declared to be generic, it can then be used as follows::

def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT:
    try:
        return mapping[key]
    except KeyError:
        return default

Ancestors

  • livekit.agents.voice.agent.Agent
  • typing.Generic

Subclasses

Methods

def complete(self, result: TaskResult_T | Exception) ‑> None
Expand source code
def complete(self, result: TaskResult_T | Exception) -> None:
    if self.__fut.done():
        raise RuntimeError(f"{self.__class__.__name__} is already done")

    if isinstance(result, Exception):
        self.__fut.set_exception(result)
    else:
        self.__fut.set_result(result)

    self.__fut.exception()  # silence exc not retrieved warnings

    from .agent_activity import _SpeechHandleContextVar

    speech_handle = _SpeechHandleContextVar.get(None)

    if speech_handle:
        speech_handle._maybe_run_final_output = result

    # if not self.__inline_mode:
    #    session._close_soon(reason=CloseReason.TASK_COMPLETED, drain=True)
def done(self) ‑> bool
Expand source code
def done(self) -> bool:
    return self.__fut.done()
class ChatCLI (agent_session: AgentSession,
*,
sync_transcription: bool = True,
loop: asyncio.AbstractEventLoop | None = None)
Expand source code
class ChatCLI:
    def __init__(
        self,
        agent_session: AgentSession,
        *,
        sync_transcription: bool = True,
        loop: asyncio.AbstractEventLoop | None = None,
    ) -> None:
        self._loop = loop or asyncio.get_event_loop()
        self._session = agent_session
        self._done_fut = asyncio.Future[None]()
        self._micro_db = INPUT_DB_MIN

        self._audio_input_ch = aio.Chan[rtc.AudioFrame](loop=self._loop)

        self._input_stream: sd.InputStream | None = None
        self._output_stream: sd.OutputStream | None = None
        self._cli_mode: Literal["text", "audio"] = "audio"

        self._text_input_buf: list[str] = []

        self._text_sink = _TextOutput(self)
        self._audio_sink = _AudioOutput(self)
        self._transcript_syncer: TranscriptSynchronizer | None = None
        if sync_transcription:
            self._transcript_syncer = TranscriptSynchronizer(
                next_in_chain_audio=self._audio_sink,
                next_in_chain_text=self._text_sink,
            )

        self._apm = rtc.AudioProcessingModule(
            echo_cancellation=True,
            noise_suppression=True,
            high_pass_filter=True,
            auto_gain_control=True,
        )

        self._output_delay = 0.0
        self._input_delay = 0.0

        self._main_atask: asyncio.Task[None] | None = None

        self._input_audio: io.AudioInput = _AudioInput(self)
        self._output_audio: io.AudioOutput = (
            self._transcript_syncer.audio_output if self._transcript_syncer else self._audio_sink
        )

        self._recorder_io: RecorderIO | None = None
        if cli.CLI_ARGUMENTS is not None and cli.CLI_ARGUMENTS.record:
            self._recorder_io = RecorderIO(agent_session=agent_session)
            self._input_audio = self._recorder_io.record_input(self._input_audio)
            self._output_audio = self._recorder_io.record_output(self._output_audio)

    async def start(self) -> None:
        if self._recorder_io:
            timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M")
            filename = f"console_{timestamp}.ogg"
            await self._recorder_io.start(output_path=filename)

            try:
                job_ctx = get_job_context()
                job_ctx.add_shutdown_callback(self._recorder_io.aclose)
            except RuntimeError:
                pass  # ignore

        if self._transcript_syncer:
            self._update_text_output(enable=True, stdout_enable=False)

        self._update_microphone(enable=True)
        self._update_speaker(enable=True)
        self._main_atask = asyncio.create_task(self._main_task(), name="_main_task")

    @log_exceptions(logger=logger)
    async def _main_task(self) -> None:
        stdin_ch = aio.Chan[str](loop=self._loop)

        if sys.platform == "win32":
            import msvcrt

            async def win_reader():
                while True:
                    ch = await self._loop.run_in_executor(None, msvcrt.getch)

                    if ch == b"\x03":  # Ctrl+C on Windows
                        break

                    try:
                        ch = ch.decode("utf-8")
                    except Exception:
                        pass
                    await stdin_ch.send(ch)

            self._win_read_task = asyncio.create_task(win_reader())
        else:
            import termios
            import tty

            fd = sys.stdin.fileno()
            old_settings = termios.tcgetattr(fd)
            tty.setcbreak(fd)

            def on_input() -> None:
                try:
                    ch = sys.stdin.read(1)
                    stdin_ch.send_nowait(ch)
                except Exception:
                    stdin_ch.close()

            self._loop.add_reader(fd, on_input)

        try:
            input_cli_task = asyncio.create_task(self._input_cli_task(stdin_ch))
            input_cli_task.add_done_callback(lambda _: self._done_fut.set_result(None))
            render_cli_task = asyncio.create_task(self._render_cli_task())

            await self._done_fut
            await aio.cancel_and_wait(render_cli_task)

            self._update_microphone(enable=False)
            self._update_speaker(enable=False)
        finally:
            if sys.platform != "win32":
                import termios

                termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
                self._loop.remove_reader(fd)

    def _update_microphone(self, *, enable: bool) -> None:
        import sounddevice as sd

        input_device, _ = sd.default.device
        if input_device is not None and enable:
            device_info = sd.query_devices(input_device)
            assert isinstance(device_info, dict)

            self._input_device_name: str = device_info.get("name", "Microphone")
            self._input_stream = sd.InputStream(
                callback=self._sd_input_callback,
                dtype="int16",
                channels=1,
                device=input_device,
                samplerate=24000,
                blocksize=2400,
            )
            self._input_stream.start()
            self._session.input.audio = self._input_audio
        elif self._input_stream is not None:
            self._input_stream.stop()
            self._input_stream.close()
            self._input_stream = None
            self._session.input.audio = None

    def _update_speaker(self, *, enable: bool) -> None:
        import sounddevice as sd

        _, output_device = sd.default.device
        if output_device is not None and enable:
            self._output_stream = sd.OutputStream(
                callback=self._sd_output_callback,
                dtype="int16",
                channels=1,
                device=output_device,
                samplerate=24000,
                blocksize=2400,  # 100ms
            )
            self._output_stream.start()
            self._session.output.audio = self._output_audio
        elif self._output_stream is not None:
            self._output_stream.close()
            self._output_stream = None
            self._session.output.audio = None

    def _update_text_output(self, *, enable: bool, stdout_enable: bool) -> None:
        if enable:
            self._session.output.transcription = (
                self._transcript_syncer.text_output if self._transcript_syncer else self._text_sink
            )
            self._text_sink.set_enabled(stdout_enable)
        else:
            self._session.output.transcription = None
            self._text_input_buf = []

    def _sd_output_callback(self, outdata: np.ndarray, frames: int, time, *_) -> None:  # type: ignore
        self._output_delay = time.outputBufferDacTime - time.currentTime

        FRAME_SAMPLES = 240
        with self._audio_sink.lock:
            bytes_needed = frames * 2
            if len(self._audio_sink.audio_buffer) < bytes_needed:
                available_bytes = len(self._audio_sink.audio_buffer)
                outdata[: available_bytes // 2, 0] = np.frombuffer(
                    self._audio_sink.audio_buffer,
                    dtype=np.int16,
                    count=available_bytes // 2,
                )
                outdata[available_bytes // 2 :, 0] = 0
                del self._audio_sink.audio_buffer[:available_bytes]
            else:
                chunk = self._audio_sink.audio_buffer[:bytes_needed]
                outdata[:, 0] = np.frombuffer(chunk, dtype=np.int16, count=frames)
                del self._audio_sink.audio_buffer[:bytes_needed]

        num_chunks = frames // FRAME_SAMPLES
        for i in range(num_chunks):
            start = i * FRAME_SAMPLES
            end = start + FRAME_SAMPLES
            render_chunk = outdata[start:end, 0]
            render_frame_for_aec = rtc.AudioFrame(
                data=render_chunk.tobytes(),
                samples_per_channel=FRAME_SAMPLES,
                sample_rate=24000,
                num_channels=1,
            )
            self._apm.process_reverse_stream(render_frame_for_aec)

    def _sd_input_callback(self, indata: np.ndarray, frame_count: int, time, *_) -> None:  # type: ignore
        self._input_delay = time.currentTime - time.inputBufferAdcTime
        total_delay = self._output_delay + self._input_delay

        try:
            self._apm.set_stream_delay_ms(int(total_delay * 1000))
        except RuntimeError:
            pass  # setting stream delay in console mode fails often, so we silently continue

        FRAME_SAMPLES = 240  # 10ms at 24000 Hz
        num_frames = frame_count // FRAME_SAMPLES

        for i in range(num_frames):
            start = i * FRAME_SAMPLES
            end = start + FRAME_SAMPLES
            capture_chunk = indata[start:end]

            capture_frame_for_aec = rtc.AudioFrame(
                data=capture_chunk.tobytes(),
                samples_per_channel=FRAME_SAMPLES,
                sample_rate=24000,
                num_channels=1,
            )
            self._apm.process_stream(capture_frame_for_aec)

            in_data_aec = np.frombuffer(capture_frame_for_aec.data, dtype=np.int16)
            rms = np.sqrt(np.mean(in_data_aec.astype(np.float32) ** 2))
            max_int16 = np.iinfo(np.int16).max
            self._micro_db = 20.0 * np.log10(rms / max_int16 + 1e-6)

            self._loop.call_soon_threadsafe(self._audio_input_ch.send_nowait, capture_frame_for_aec)

    @log_exceptions(logger=logger)
    async def _input_cli_task(self, in_ch: aio.Chan[str]) -> None:
        while True:
            char = await in_ch.recv()
            if char is None:
                break

            if char == "\x02":  # Ctrl+B
                if self._cli_mode == "audio":
                    self._cli_mode = "text"
                    self._update_text_output(enable=True, stdout_enable=True)
                    self._update_microphone(enable=False)
                    self._update_speaker(enable=False)
                    click.echo("\nSwitched to Text Input Mode.", nl=False)
                else:
                    self._cli_mode = "audio"
                    self._update_text_output(enable=True, stdout_enable=False)
                    self._update_microphone(enable=True)
                    self._update_speaker(enable=True)
                    self._text_input_buf = []
                    click.echo("\nSwitched to Audio Input Mode.", nl=False)

            if self._cli_mode == "text":  # Read input
                if char in ("\r", "\n"):
                    text = "".join(self._text_input_buf)
                    if text:
                        self._text_input_buf = []
                        self._session.interrupt()
                        self._session.generate_reply(user_input=text)
                        click.echo("\n", nl=False)
                elif char == "\x7f":  # Backspace
                    if self._text_input_buf:
                        self._text_input_buf.pop()
                        sys.stdout.write("\b \b")
                        sys.stdout.flush()
                elif char.isprintable():
                    self._text_input_buf.append(char)
                    click.echo(char, nl=False)
                    sys.stdout.flush()

    async def _render_cli_task(self) -> None:
        next_frame = time.perf_counter()
        while True:
            next_frame += 1 / FPS
            if self._cli_mode == "audio":
                self._print_audio_mode()
            elif self._cli_mode == "text" and not self._text_sink._capturing:
                self._print_text_mode()

            await asyncio.sleep(max(0, next_frame - time.perf_counter()))

    def _print_audio_mode(self) -> None:
        amplitude_db = _normalize_db(self._micro_db, db_min=INPUT_DB_MIN, db_max=INPUT_DB_MAX)
        nb_bar = round(amplitude_db * MAX_AUDIO_BAR)

        color_code = 31 if amplitude_db > 0.75 else 33 if amplitude_db > 0.5 else 32
        bar = "#" * nb_bar + "-" * (MAX_AUDIO_BAR - nb_bar)
        sys.stdout.write(
            f"\r[Audio] {self._input_device_name[-20:]} [{self._micro_db:6.2f} dBFS] {_esc(color_code)}[{bar}]{_esc(0)}"  # noqa: E501
        )
        sys.stdout.flush()

    def _print_text_mode(self) -> None:
        sys.stdout.write("\r")
        sys.stdout.flush()
        prompt = "Enter your message: "
        sys.stdout.write(f"[Text {prompt}{''.join(self._text_input_buf)}")
        sys.stdout.flush()

Methods

async def start(self) ‑> None
Expand source code
async def start(self) -> None:
    if self._recorder_io:
        timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M")
        filename = f"console_{timestamp}.ogg"
        await self._recorder_io.start(output_path=filename)

        try:
            job_ctx = get_job_context()
            job_ctx.add_shutdown_callback(self._recorder_io.aclose)
        except RuntimeError:
            pass  # ignore

    if self._transcript_syncer:
        self._update_text_output(enable=True, stdout_enable=False)

    self._update_microphone(enable=True)
    self._update_speaker(enable=True)
    self._main_atask = asyncio.create_task(self._main_task(), name="_main_task")
class CloseEvent (**data: Any)
Expand source code
class CloseEvent(BaseModel):
    type: Literal["close"] = "close"
    error: LLMError | STTError | TTSError | RealtimeModelError | None = None
    reason: CloseReason
    created_at: float = Field(default_factory=time.time)

Usage docs: https://docs.pydantic.dev/2.10/concepts/models/

A base class for creating Pydantic models.

Attributes

__class_vars__
The names of the class variables defined on the model.
__private_attributes__
Metadata about the private attributes of the model.
__signature__
The synthesized __init__ [Signature][inspect.Signature] of the model.
__pydantic_complete__
Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__
The core schema of the model.
__pydantic_custom_init__
Whether the model has a custom __init__ function.
__pydantic_decorators__
Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
__pydantic_generic_metadata__
Metadata for generic models; contains data used for a similar purpose to args, origin, parameters in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__
Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__
The name of the post-init method for the model, if defined.
__pydantic_root_model__
Whether the model is a [RootModel][pydantic.root_model.RootModel].
__pydantic_serializer__
The pydantic-core SchemaSerializer used to dump instances of the model.
__pydantic_validator__
The pydantic-core SchemaValidator used to validate instances of the model.
__pydantic_fields__
A dictionary of field names and their corresponding [FieldInfo][pydantic.fields.FieldInfo] objects.
__pydantic_computed_fields__
A dictionary of computed field names and their corresponding [ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects.
__pydantic_extra__
A dictionary containing extra values, if [extra][pydantic.config.ConfigDict.extra] is set to 'allow'.
__pydantic_fields_set__
The names of fields explicitly set during instantiation.
__pydantic_private__
Values of private attributes set on the model instance.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • pydantic.main.BaseModel

Class variables

var created_at : float
var error : livekit.agents.llm.llm.LLMError | livekit.agents.stt.stt.STTError | livekit.agents.tts.tts.TTSError | livekit.agents.llm.realtime.RealtimeModelError | None
var model_config
var reason : livekit.agents.voice.events.CloseReason
var type : Literal['close']
class CloseReason (*args, **kwds)
Expand source code
@unique
class CloseReason(str, Enum):
    ERROR = "error"
    JOB_SHUTDOWN = "job_shutdown"
    PARTICIPANT_DISCONNECTED = "participant_disconnected"
    USER_INITIATED = "user_initiated"
    TASK_COMPLETED = "task_completed"

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

Ancestors

  • builtins.str
  • enum.Enum

Class variables

var ERROR
var JOB_SHUTDOWN
var PARTICIPANT_DISCONNECTED
var TASK_COMPLETED
var USER_INITIATED
class ConversationItemAddedEvent (**data: Any)
Expand source code
class ConversationItemAddedEvent(BaseModel):
    type: Literal["conversation_item_added"] = "conversation_item_added"
    item: ChatMessage | _TypeDiscriminator
    created_at: float = Field(default_factory=time.time)

Usage docs: https://docs.pydantic.dev/2.10/concepts/models/

A base class for creating Pydantic models.

Attributes

__class_vars__
The names of the class variables defined on the model.
__private_attributes__
Metadata about the private attributes of the model.
__signature__
The synthesized __init__ [Signature][inspect.Signature] of the model.
__pydantic_complete__
Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__
The core schema of the model.
__pydantic_custom_init__
Whether the model has a custom __init__ function.
__pydantic_decorators__
Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
__pydantic_generic_metadata__
Metadata for generic models; contains data used for a similar purpose to args, origin, parameters in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__
Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__
The name of the post-init method for the model, if defined.
__pydantic_root_model__
Whether the model is a [RootModel][pydantic.root_model.RootModel].
__pydantic_serializer__
The pydantic-core SchemaSerializer used to dump instances of the model.
__pydantic_validator__
The pydantic-core SchemaValidator used to validate instances of the model.
__pydantic_fields__
A dictionary of field names and their corresponding [FieldInfo][pydantic.fields.FieldInfo] objects.
__pydantic_computed_fields__
A dictionary of computed field names and their corresponding [ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects.
__pydantic_extra__
A dictionary containing extra values, if [extra][pydantic.config.ConfigDict.extra] is set to 'allow'.
__pydantic_fields_set__
The names of fields explicitly set during instantiation.
__pydantic_private__
Values of private attributes set on the model instance.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • pydantic.main.BaseModel

Class variables

var created_at : float
var item : livekit.agents.llm.chat_context.ChatMessage | livekit.agents.voice.events._TypeDiscriminator
var model_config
var type : Literal['conversation_item_added']
class ErrorEvent (**data: Any)
Expand source code
class ErrorEvent(BaseModel):
    model_config = ConfigDict(arbitrary_types_allowed=True)
    type: Literal["error"] = "error"
    error: LLMError | STTError | TTSError | RealtimeModelError | Any
    source: LLM | STT | TTS | RealtimeModel | Any
    created_at: float = Field(default_factory=time.time)

Usage docs: https://docs.pydantic.dev/2.10/concepts/models/

A base class for creating Pydantic models.

Attributes

__class_vars__
The names of the class variables defined on the model.
__private_attributes__
Metadata about the private attributes of the model.
__signature__
The synthesized __init__ [Signature][inspect.Signature] of the model.
__pydantic_complete__
Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__
The core schema of the model.
__pydantic_custom_init__
Whether the model has a custom __init__ function.
__pydantic_decorators__
Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
__pydantic_generic_metadata__
Metadata for generic models; contains data used for a similar purpose to args, origin, parameters in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__
Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__
The name of the post-init method for the model, if defined.
__pydantic_root_model__
Whether the model is a [RootModel][pydantic.root_model.RootModel].
__pydantic_serializer__
The pydantic-core SchemaSerializer used to dump instances of the model.
__pydantic_validator__
The pydantic-core SchemaValidator used to validate instances of the model.
__pydantic_fields__
A dictionary of field names and their corresponding [FieldInfo][pydantic.fields.FieldInfo] objects.
__pydantic_computed_fields__
A dictionary of computed field names and their corresponding [ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects.
__pydantic_extra__
A dictionary containing extra values, if [extra][pydantic.config.ConfigDict.extra] is set to 'allow'.
__pydantic_fields_set__
The names of fields explicitly set during instantiation.
__pydantic_private__
Values of private attributes set on the model instance.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • pydantic.main.BaseModel

Class variables

var created_at : float
var error : livekit.agents.llm.llm.LLMError | livekit.agents.stt.stt.STTError | livekit.agents.tts.tts.TTSError | livekit.agents.llm.realtime.RealtimeModelError | typing.Any
var model_config
var source : livekit.agents.llm.llm.LLM | livekit.agents.stt.stt.STT | livekit.agents.tts.tts.TTS | livekit.agents.llm.realtime.RealtimeModel | typing.Any
var type : Literal['error']
class FunctionToolsExecutedEvent (**data: Any)
Expand source code
class FunctionToolsExecutedEvent(BaseModel):
    type: Literal["function_tools_executed"] = "function_tools_executed"
    function_calls: list[FunctionCall]
    function_call_outputs: list[FunctionCallOutput | None]
    created_at: float = Field(default_factory=time.time)

    def zipped(self) -> list[tuple[FunctionCall, FunctionCallOutput | None]]:
        return list(zip(self.function_calls, self.function_call_outputs))

    @model_validator(mode="after")
    def verify_lists_length(self) -> Self:
        if len(self.function_calls) != len(self.function_call_outputs):
            raise ValueError("The number of function_calls and function_call_outputs must match.")

        return self

Usage docs: https://docs.pydantic.dev/2.10/concepts/models/

A base class for creating Pydantic models.

Attributes

__class_vars__
The names of the class variables defined on the model.
__private_attributes__
Metadata about the private attributes of the model.
__signature__
The synthesized __init__ [Signature][inspect.Signature] of the model.
__pydantic_complete__
Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__
The core schema of the model.
__pydantic_custom_init__
Whether the model has a custom __init__ function.
__pydantic_decorators__
Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
__pydantic_generic_metadata__
Metadata for generic models; contains data used for a similar purpose to args, origin, parameters in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__
Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__
The name of the post-init method for the model, if defined.
__pydantic_root_model__
Whether the model is a [RootModel][pydantic.root_model.RootModel].
__pydantic_serializer__
The pydantic-core SchemaSerializer used to dump instances of the model.
__pydantic_validator__
The pydantic-core SchemaValidator used to validate instances of the model.
__pydantic_fields__
A dictionary of field names and their corresponding [FieldInfo][pydantic.fields.FieldInfo] objects.
__pydantic_computed_fields__
A dictionary of computed field names and their corresponding [ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects.
__pydantic_extra__
A dictionary containing extra values, if [extra][pydantic.config.ConfigDict.extra] is set to 'allow'.
__pydantic_fields_set__
The names of fields explicitly set during instantiation.
__pydantic_private__
Values of private attributes set on the model instance.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • pydantic.main.BaseModel

Class variables

var created_at : float
var function_call_outputs : list[livekit.agents.llm.chat_context.FunctionCallOutput | None]
var function_calls : list[livekit.agents.llm.chat_context.FunctionCall]
var model_config
var type : Literal['function_tools_executed']

Methods

def verify_lists_length(self) ‑> Self
Expand source code
@model_validator(mode="after")
def verify_lists_length(self) -> Self:
    if len(self.function_calls) != len(self.function_call_outputs):
        raise ValueError("The number of function_calls and function_call_outputs must match.")

    return self
def zipped(self) ‑> list[tuple[livekit.agents.llm.chat_context.FunctionCall, livekit.agents.llm.chat_context.FunctionCallOutput | None]]
Expand source code
def zipped(self) -> list[tuple[FunctionCall, FunctionCallOutput | None]]:
    return list(zip(self.function_calls, self.function_call_outputs))
class MetricsCollectedEvent (**data: Any)
Expand source code
class MetricsCollectedEvent(BaseModel):
    type: Literal["metrics_collected"] = "metrics_collected"
    metrics: AgentMetrics
    created_at: float = Field(default_factory=time.time)

Usage docs: https://docs.pydantic.dev/2.10/concepts/models/

A base class for creating Pydantic models.

Attributes

__class_vars__
The names of the class variables defined on the model.
__private_attributes__
Metadata about the private attributes of the model.
__signature__
The synthesized __init__ [Signature][inspect.Signature] of the model.
__pydantic_complete__
Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__
The core schema of the model.
__pydantic_custom_init__
Whether the model has a custom __init__ function.
__pydantic_decorators__
Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
__pydantic_generic_metadata__
Metadata for generic models; contains data used for a similar purpose to args, origin, parameters in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__
Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__
The name of the post-init method for the model, if defined.
__pydantic_root_model__
Whether the model is a [RootModel][pydantic.root_model.RootModel].
__pydantic_serializer__
The pydantic-core SchemaSerializer used to dump instances of the model.
__pydantic_validator__
The pydantic-core SchemaValidator used to validate instances of the model.
__pydantic_fields__
A dictionary of field names and their corresponding [FieldInfo][pydantic.fields.FieldInfo] objects.
__pydantic_computed_fields__
A dictionary of computed field names and their corresponding [ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects.
__pydantic_extra__
A dictionary containing extra values, if [extra][pydantic.config.ConfigDict.extra] is set to 'allow'.
__pydantic_fields_set__
The names of fields explicitly set during instantiation.
__pydantic_private__
Values of private attributes set on the model instance.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • pydantic.main.BaseModel

Class variables

var created_at : float
var metrics : livekit.agents.metrics.base.STTMetrics | livekit.agents.metrics.base.LLMMetrics | livekit.agents.metrics.base.TTSMetrics | livekit.agents.metrics.base.VADMetrics | livekit.agents.metrics.base.EOUMetrics | livekit.agents.metrics.base.RealtimeModelMetrics
var model_config
var type : Literal['metrics_collected']
class ModelSettings (tool_choice: NotGivenOr[llm.ToolChoice] = NOT_GIVEN)
Expand source code
@dataclass
class ModelSettings:
    tool_choice: NotGivenOr[llm.ToolChoice] = NOT_GIVEN
    """The tool choice to use when calling the LLM."""

ModelSettings(tool_choice: 'NotGivenOr[llm.ToolChoice]' = NOT_GIVEN)

Instance variables

var tool_choice : livekit.agents.llm.tool_context.NamedToolChoice | Literal['auto', 'required', 'none'] | livekit.agents.types.NotGiven

The tool choice to use when calling the LLM.

class RunContext (*,
session: AgentSession[Userdata_T],
speech_handle: SpeechHandle,
function_call: FunctionCall)
Expand source code
class RunContext(Generic[Userdata_T]):
    # private ctor
    def __init__(
        self,
        *,
        session: AgentSession[Userdata_T],
        speech_handle: SpeechHandle,
        function_call: FunctionCall,
    ) -> None:
        self._session = session
        self._speech_handle = speech_handle
        self._function_call = function_call

        self._initial_step_idx = speech_handle.num_steps - 1

    @property
    def session(self) -> AgentSession[Userdata_T]:
        return self._session

    @property
    def speech_handle(self) -> SpeechHandle:
        return self._speech_handle

    @property
    def function_call(self) -> FunctionCall:
        return self._function_call

    @property
    def userdata(self) -> Userdata_T:
        return self.session.userdata

    def disallow_interruptions(self) -> None:
        """Disable interruptions for this FunctionCall.

        Delegates to the SpeechHandle.allow_interruptions setter,
        which will raise a RuntimeError if the handle is already interrupted.

        Raises:
            RuntimeError: If the SpeechHandle is already interrupted.
        """
        self.speech_handle.allow_interruptions = False

    async def wait_for_playout(self) -> None:
        """Waits for the speech playout corresponding to this function call step.

        Unlike `SpeechHandle.wait_for_playout`, which waits for the full
        assistant turn to complete (including all function tools),
        this method only waits for the assistant's spoken response prior running
        this tool to finish playing."""
        await self.speech_handle._wait_for_generation(step_idx=self._initial_step_idx)

Abstract base class for generic types.

On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::

class Mapping[KT, VT]:
    def __getitem__(self, key: KT) -> VT:
        ...
    # Etc.

On older versions of Python, however, generic classes have to explicitly inherit from Generic.

After a class has been declared to be generic, it can then be used as follows::

def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT:
    try:
        return mapping[key]
    except KeyError:
        return default

Ancestors

  • typing.Generic

Instance variables

prop function_call : FunctionCall
Expand source code
@property
def function_call(self) -> FunctionCall:
    return self._function_call
prop sessionAgentSession[Userdata_T]
Expand source code
@property
def session(self) -> AgentSession[Userdata_T]:
    return self._session
prop speech_handleSpeechHandle
Expand source code
@property
def speech_handle(self) -> SpeechHandle:
    return self._speech_handle
prop userdata : Userdata_T
Expand source code
@property
def userdata(self) -> Userdata_T:
    return self.session.userdata

Methods

def disallow_interruptions(self) ‑> None
Expand source code
def disallow_interruptions(self) -> None:
    """Disable interruptions for this FunctionCall.

    Delegates to the SpeechHandle.allow_interruptions setter,
    which will raise a RuntimeError if the handle is already interrupted.

    Raises:
        RuntimeError: If the SpeechHandle is already interrupted.
    """
    self.speech_handle.allow_interruptions = False

Disable interruptions for this FunctionCall.

Delegates to the SpeechHandle.allow_interruptions setter, which will raise a RuntimeError if the handle is already interrupted.

Raises

RuntimeError
If the SpeechHandle is already interrupted.
async def wait_for_playout(self) ‑> None
Expand source code
async def wait_for_playout(self) -> None:
    """Waits for the speech playout corresponding to this function call step.

    Unlike `SpeechHandle.wait_for_playout`, which waits for the full
    assistant turn to complete (including all function tools),
    this method only waits for the assistant's spoken response prior running
    this tool to finish playing."""
    await self.speech_handle._wait_for_generation(step_idx=self._initial_step_idx)

Waits for the speech playout corresponding to this function call step.

Unlike SpeechHandle.wait_for_playout(), which waits for the full assistant turn to complete (including all function tools), this method only waits for the assistant's spoken response prior running this tool to finish playing.

class SpeechCreatedEvent (**data: Any)
Expand source code
class SpeechCreatedEvent(BaseModel):
    model_config = ConfigDict(arbitrary_types_allowed=True)

    type: Literal["speech_created"] = "speech_created"
    user_initiated: bool
    """True if the speech was created using public methods like `say` or `generate_reply`"""
    source: Literal["say", "generate_reply", "tool_response"]
    """Source indicating how the speech handle was created"""
    speech_handle: SpeechHandle = Field(..., exclude=True)
    """The speech handle that was created"""
    created_at: float = Field(default_factory=time.time)

Usage docs: https://docs.pydantic.dev/2.10/concepts/models/

A base class for creating Pydantic models.

Attributes

__class_vars__
The names of the class variables defined on the model.
__private_attributes__
Metadata about the private attributes of the model.
__signature__
The synthesized __init__ [Signature][inspect.Signature] of the model.
__pydantic_complete__
Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__
The core schema of the model.
__pydantic_custom_init__
Whether the model has a custom __init__ function.
__pydantic_decorators__
Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
__pydantic_generic_metadata__
Metadata for generic models; contains data used for a similar purpose to args, origin, parameters in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__
Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__
The name of the post-init method for the model, if defined.
__pydantic_root_model__
Whether the model is a [RootModel][pydantic.root_model.RootModel].
__pydantic_serializer__
The pydantic-core SchemaSerializer used to dump instances of the model.
__pydantic_validator__
The pydantic-core SchemaValidator used to validate instances of the model.
__pydantic_fields__
A dictionary of field names and their corresponding [FieldInfo][pydantic.fields.FieldInfo] objects.
__pydantic_computed_fields__
A dictionary of computed field names and their corresponding [ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects.
__pydantic_extra__
A dictionary containing extra values, if [extra][pydantic.config.ConfigDict.extra] is set to 'allow'.
__pydantic_fields_set__
The names of fields explicitly set during instantiation.
__pydantic_private__
Values of private attributes set on the model instance.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • pydantic.main.BaseModel

Class variables

var created_at : float
var model_config
var source : Literal['say', 'generate_reply', 'tool_response']

Source indicating how the speech handle was created

var speech_handle : livekit.agents.voice.speech_handle.SpeechHandle

The speech handle that was created

var type : Literal['speech_created']
var user_initiated : bool

True if the speech was created using public methods like say or generate_reply

class SpeechHandle (*, speech_id: str, allow_interruptions: bool)
Expand source code
class SpeechHandle:
    SPEECH_PRIORITY_LOW = 0
    """Priority for messages that should be played after all other messages in the queue"""
    SPEECH_PRIORITY_NORMAL = 5
    """Every speech generates by the VoiceAgent defaults to this priority."""
    SPEECH_PRIORITY_HIGH = 10
    """Priority for important messages that should be played before others."""

    def __init__(self, *, speech_id: str, allow_interruptions: bool) -> None:
        self._id = speech_id
        self._allow_interruptions = allow_interruptions

        self._interrupt_fut = asyncio.Future[None]()
        self._done_fut = asyncio.Future[None]()
        self._scheduled_fut = asyncio.Future[None]()
        self._authorize_event = asyncio.Event()

        self._generations: list[asyncio.Future[None]] = []

        # indicate if the speech was interrupted by a user turn
        self._interrupted_by_user: bool = False

        # internal tasks used by this generation
        self._tasks: list[asyncio.Task] = []
        self._chat_items: list[llm.ChatItem] = []
        self._num_steps = 1

        self._item_added_callbacks: set[Callable[[llm.ChatItem], None]] = set()
        self._done_callbacks: set[Callable[[SpeechHandle], None]] = set()

        def _on_done(_: asyncio.Future[None]) -> None:
            for cb in self._done_callbacks:
                cb(self)

        self._done_fut.add_done_callback(_on_done)
        self._maybe_run_final_output: Any = None  # kept private

    @staticmethod
    def create(allow_interruptions: bool = True) -> SpeechHandle:
        return SpeechHandle(
            speech_id=utils.shortuuid("speech_"),
            allow_interruptions=allow_interruptions,
        )

    @property
    def num_steps(self) -> int:
        return self._num_steps

    @property
    def id(self) -> str:
        return self._id

    @property
    def scheduled(self) -> bool:
        return self._scheduled_fut.done()

    @property
    def interrupted(self) -> bool:
        return self._interrupt_fut.done()

    @property
    def allow_interruptions(self) -> bool:
        return self._allow_interruptions

    @allow_interruptions.setter
    def allow_interruptions(self, value: bool) -> None:
        """Allow or disallow interruptions on this SpeechHandle.

        When set to False, the SpeechHandle will no longer accept any incoming
        interruption requests until re-enabled. If the handle is already
        interrupted, clearing interruptions is not allowed.

        Args:
            value (bool): True to allow interruptions, False to disallow.

        Raises:
            RuntimeError: If attempting to disable interruptions when already interrupted.
        """
        if self.interrupted and not value:
            raise RuntimeError(
                "Cannot set allow_interruptions to False, the SpeechHandle is already interrupted"
            )

        self._allow_interruptions = value

    @property
    def chat_items(self) -> list[llm.ChatItem]:
        return self._chat_items

    def done(self) -> bool:
        return self._done_fut.done()

    def interrupt(self) -> SpeechHandle:
        """Interrupt the current speech generation.

        Raises:
            RuntimeError: If this speech handle does not allow interruptions.

        Returns:
            SpeechHandle: The same speech handle that was interrupted.
        """
        if not self._allow_interruptions:
            raise RuntimeError("This generation handle does not allow interruptions")

        self._cancel()
        return self

    async def wait_for_playout(self) -> None:
        """Waits for the entire assistant turn to complete playback.

        This method waits until the assistant has fully finished speaking,
        including any finalization steps beyond initial response generation.
        This is appropriate to call when you want to ensure the speech output
        has entirely played out, including any tool calls and response follow-ups."""

        # raise an error to avoid developer mistakes
        from .agent import _get_activity_task_info

        if task := asyncio.current_task():
            info = _get_activity_task_info(task)
            if info and info.function_call and info.speech_handle == self:
                raise RuntimeError(
                    f"cannot call `SpeechHandle.wait_for_playout()` from inside the function tool `{info.function_call.name}` that owns this SpeechHandle. "
                    "This creates a circular wait: the speech handle is waiting for the function tool to complete, "
                    "while the function tool is simultaneously waiting for the speech handle.\n"
                    "To wait for the assistant’s spoken response prior to running this tool, use `RunContext.wait_for_playout()` instead."
                )

        await asyncio.shield(self._done_fut)

    def __await__(self) -> Generator[None, None, SpeechHandle]:
        async def _await_impl() -> SpeechHandle:
            await self.wait_for_playout()
            return self

        return _await_impl().__await__()

    def add_done_callback(self, callback: Callable[[SpeechHandle], None]) -> None:
        self._done_callbacks.add(callback)

    def remove_done_callback(self, callback: Callable[[SpeechHandle], None]) -> None:
        self._done_callbacks.discard(callback)

    async def wait_if_not_interrupted(self, aw: list[asyncio.futures.Future[Any]]) -> None:
        fs: list[asyncio.Future[Any]] = [
            asyncio.gather(*aw, return_exceptions=True),
            self._interrupt_fut,
        ]
        await asyncio.wait(fs, return_when=asyncio.FIRST_COMPLETED)

    def _cancel(self) -> SpeechHandle:
        if self.done():
            return self

        with contextlib.suppress(asyncio.InvalidStateError):
            self._interrupt_fut.set_result(None)

        return self

    def _add_item_added_callback(self, callback: Callable[[llm.ChatItem], Any]) -> None:
        self._item_added_callbacks.add(callback)

    def _remove_item_added_callback(self, callback: Callable[[llm.ChatItem], Any]) -> None:
        self._item_added_callbacks.discard(callback)

    def _item_added(self, items: Sequence[llm.ChatItem]) -> None:
        for item in items:
            for cb in self._item_added_callbacks:
                cb(item)

            self._chat_items.append(item)

    def _authorize_generation(self) -> None:
        fut = asyncio.Future[None]()
        self._generations.append(fut)
        self._authorize_event.set()

    def _clear_authorization(self) -> None:
        self._authorize_event.clear()

    async def _wait_for_authorization(self) -> None:
        await self._authorize_event.wait()

    async def _wait_for_generation(self, step_idx: int = -1) -> None:
        if not self._generations:
            raise RuntimeError("cannot use wait_for_generation: no active generation is running.")

        await asyncio.shield(self._generations[step_idx])

    async def _wait_for_scheduled(self) -> None:
        await asyncio.shield(self._scheduled_fut)

    def _mark_generation_done(self) -> None:
        if not self._generations:
            raise RuntimeError("cannot use mark_generation_done: no active generation is running.")

        with contextlib.suppress(asyncio.InvalidStateError):
            self._generations[-1].set_result(None)

    def _mark_done(self) -> None:
        with contextlib.suppress(asyncio.InvalidStateError):
            # will raise InvalidStateError if the future is already done (interrupted)
            self._done_fut.set_result(None)
            if self._generations:
                self._mark_generation_done()  # preemptive generation could be cancelled before being scheduled

    def _mark_scheduled(self) -> None:
        with contextlib.suppress(asyncio.InvalidStateError):
            self._scheduled_fut.set_result(None)

    def _mark_interrupted_by_user(self) -> None:
        self._interrupted_by_user = True

Class variables

var SPEECH_PRIORITY_HIGH

Priority for important messages that should be played before others.

var SPEECH_PRIORITY_LOW

Priority for messages that should be played after all other messages in the queue

var SPEECH_PRIORITY_NORMAL

Every speech generates by the VoiceAgent defaults to this priority.

Static methods

def create(allow_interruptions: bool = True) ‑> livekit.agents.voice.speech_handle.SpeechHandle
Expand source code
@staticmethod
def create(allow_interruptions: bool = True) -> SpeechHandle:
    return SpeechHandle(
        speech_id=utils.shortuuid("speech_"),
        allow_interruptions=allow_interruptions,
    )

Instance variables

prop allow_interruptions : bool
Expand source code
@property
def allow_interruptions(self) -> bool:
    return self._allow_interruptions
prop chat_items : list[llm.ChatItem]
Expand source code
@property
def chat_items(self) -> list[llm.ChatItem]:
    return self._chat_items
prop id : str
Expand source code
@property
def id(self) -> str:
    return self._id
prop interrupted : bool
Expand source code
@property
def interrupted(self) -> bool:
    return self._interrupt_fut.done()
prop num_steps : int
Expand source code
@property
def num_steps(self) -> int:
    return self._num_steps
prop scheduled : bool
Expand source code
@property
def scheduled(self) -> bool:
    return self._scheduled_fut.done()

Methods

def add_done_callback(self,
callback: Callable[[SpeechHandle], None]) ‑> None
Expand source code
def add_done_callback(self, callback: Callable[[SpeechHandle], None]) -> None:
    self._done_callbacks.add(callback)
def done(self) ‑> bool
Expand source code
def done(self) -> bool:
    return self._done_fut.done()
def interrupt(self) ‑> livekit.agents.voice.speech_handle.SpeechHandle
Expand source code
def interrupt(self) -> SpeechHandle:
    """Interrupt the current speech generation.

    Raises:
        RuntimeError: If this speech handle does not allow interruptions.

    Returns:
        SpeechHandle: The same speech handle that was interrupted.
    """
    if not self._allow_interruptions:
        raise RuntimeError("This generation handle does not allow interruptions")

    self._cancel()
    return self

Interrupt the current speech generation.

Raises

RuntimeError
If this speech handle does not allow interruptions.

Returns

SpeechHandle
The same speech handle that was interrupted.
def remove_done_callback(self,
callback: Callable[[SpeechHandle], None]) ‑> None
Expand source code
def remove_done_callback(self, callback: Callable[[SpeechHandle], None]) -> None:
    self._done_callbacks.discard(callback)
async def wait_for_playout(self) ‑> None
Expand source code
async def wait_for_playout(self) -> None:
    """Waits for the entire assistant turn to complete playback.

    This method waits until the assistant has fully finished speaking,
    including any finalization steps beyond initial response generation.
    This is appropriate to call when you want to ensure the speech output
    has entirely played out, including any tool calls and response follow-ups."""

    # raise an error to avoid developer mistakes
    from .agent import _get_activity_task_info

    if task := asyncio.current_task():
        info = _get_activity_task_info(task)
        if info and info.function_call and info.speech_handle == self:
            raise RuntimeError(
                f"cannot call `SpeechHandle.wait_for_playout()` from inside the function tool `{info.function_call.name}` that owns this SpeechHandle. "
                "This creates a circular wait: the speech handle is waiting for the function tool to complete, "
                "while the function tool is simultaneously waiting for the speech handle.\n"
                "To wait for the assistant’s spoken response prior to running this tool, use `RunContext.wait_for_playout()` instead."
            )

    await asyncio.shield(self._done_fut)

Waits for the entire assistant turn to complete playback.

This method waits until the assistant has fully finished speaking, including any finalization steps beyond initial response generation. This is appropriate to call when you want to ensure the speech output has entirely played out, including any tool calls and response follow-ups.

async def wait_if_not_interrupted(self, aw: list[asyncio.futures.Future[Any]]) ‑> None
Expand source code
async def wait_if_not_interrupted(self, aw: list[asyncio.futures.Future[Any]]) -> None:
    fs: list[asyncio.Future[Any]] = [
        asyncio.gather(*aw, return_exceptions=True),
        self._interrupt_fut,
    ]
    await asyncio.wait(fs, return_when=asyncio.FIRST_COMPLETED)
class TranscriptSynchronizer (*,
next_in_chain_audio: AudioOutput,
next_in_chain_text: TextOutput,
speed: float = 1.0,
hyphenate_word: Callable[[str], list[str]] = <function hyphenate_word>,
split_words: Callable[[str], list[tuple[str, int, int]]] = functools.partial(<function split_words>, ignore_punctuation=False, split_character=True),
sentence_tokenizer: NotGivenOr[tokenize.SentenceTokenizer] = NOT_GIVEN)
Expand source code
class TranscriptSynchronizer:
    """
    Synchronizes text with audio playback timing.

    This class is responsible for synchronizing text with audio playback timing.
    It currently assumes that the first push_audio is starting the audio playback of a segment.
    """

    def __init__(
        self,
        *,
        next_in_chain_audio: io.AudioOutput,
        next_in_chain_text: io.TextOutput,
        speed: float = 1.0,
        hyphenate_word: Callable[[str], list[str]] = tokenize.basic.hyphenate_word,
        split_words: Callable[[str], list[tuple[str, int, int]]] = functools.partial(
            tokenize.basic.split_words, ignore_punctuation=False, split_character=True
        ),
        sentence_tokenizer: NotGivenOr[tokenize.SentenceTokenizer] = NOT_GIVEN,
    ) -> None:
        super().__init__()

        self._text_output = _SyncedTextOutput(self, next_in_chain=next_in_chain_text)
        self._audio_output = _SyncedAudioOutput(self, next_in_chain=next_in_chain_audio)
        self._text_attached, self._audio_attached = True, True
        self._opts = _TextSyncOptions(
            speed=speed,
            hyphenate_word=hyphenate_word,
            split_words=split_words,
            sentence_tokenizer=(
                sentence_tokenizer or tokenize.basic.SentenceTokenizer(retain_format=True)
            ),
            speaking_rate_detector=SpeakingRateDetector(),
        )
        self._enabled = True
        self._closed = False

        # initial segment/first segment, recreated for each new segment
        self._impl = _SegmentSynchronizerImpl(options=self._opts, next_in_chain=next_in_chain_text)
        self._rotate_segment_atask = asyncio.create_task(self._rotate_segment_task(None))

    @property
    def audio_output(self) -> _SyncedAudioOutput:
        return self._audio_output

    @property
    def text_output(self) -> _SyncedTextOutput:
        return self._text_output

    @property
    def enabled(self) -> bool:
        return self._enabled

    async def aclose(self) -> None:
        self._closed = True
        await self.barrier()
        await self._impl.aclose()

    def set_enabled(self, enabled: bool) -> None:
        if self._enabled == enabled:
            return

        self._enabled = enabled
        self.rotate_segment()

    def _on_attachment_changed(
        self,
        *,
        audio_attached: NotGivenOr[bool] = NOT_GIVEN,
        text_attached: NotGivenOr[bool] = NOT_GIVEN,
    ) -> None:
        if is_given(audio_attached):
            self._audio_attached = audio_attached

        if is_given(text_attached):
            self._text_attached = text_attached

        self.set_enabled(self._audio_attached and self._text_attached)

    async def _rotate_segment_task(self, old_task: asyncio.Task[None] | None) -> None:
        if old_task:
            await old_task

        await self._impl.aclose()
        self._impl = _SegmentSynchronizerImpl(
            options=self._opts, next_in_chain=self._text_output._next_in_chain
        )

    def rotate_segment(self) -> None:
        if self._closed:
            return

        if not self._rotate_segment_atask.done():
            logger.warning("rotate_segment called while previous segment is still being rotated")

        self._rotate_segment_atask = asyncio.create_task(
            self._rotate_segment_task(self._rotate_segment_atask)
        )

    async def barrier(self) -> None:
        if self._rotate_segment_atask is None:
            return

        # using a while loop in case rotate_segment is called twice (this should not happen, but
        # just in case, we do log a warning if it does)
        while not self._rotate_segment_atask.done():
            await self._rotate_segment_atask

Synchronizes text with audio playback timing.

This class is responsible for synchronizing text with audio playback timing. It currently assumes that the first push_audio is starting the audio playback of a segment.

Instance variables

prop audio_output : _SyncedAudioOutput
Expand source code
@property
def audio_output(self) -> _SyncedAudioOutput:
    return self._audio_output
prop enabled : bool
Expand source code
@property
def enabled(self) -> bool:
    return self._enabled
prop text_output : _SyncedTextOutput
Expand source code
@property
def text_output(self) -> _SyncedTextOutput:
    return self._text_output

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    self._closed = True
    await self.barrier()
    await self._impl.aclose()
async def barrier(self) ‑> None
Expand source code
async def barrier(self) -> None:
    if self._rotate_segment_atask is None:
        return

    # using a while loop in case rotate_segment is called twice (this should not happen, but
    # just in case, we do log a warning if it does)
    while not self._rotate_segment_atask.done():
        await self._rotate_segment_atask
def rotate_segment(self) ‑> None
Expand source code
def rotate_segment(self) -> None:
    if self._closed:
        return

    if not self._rotate_segment_atask.done():
        logger.warning("rotate_segment called while previous segment is still being rotated")

    self._rotate_segment_atask = asyncio.create_task(
        self._rotate_segment_task(self._rotate_segment_atask)
    )
def set_enabled(self, enabled: bool) ‑> None
Expand source code
def set_enabled(self, enabled: bool) -> None:
    if self._enabled == enabled:
        return

    self._enabled = enabled
    self.rotate_segment()
class UserInputTranscribedEvent (**data: Any)
Expand source code
class UserInputTranscribedEvent(BaseModel):
    type: Literal["user_input_transcribed"] = "user_input_transcribed"
    transcript: str
    is_final: bool
    speaker_id: str | None = None
    language: str | None = None
    created_at: float = Field(default_factory=time.time)

Usage docs: https://docs.pydantic.dev/2.10/concepts/models/

A base class for creating Pydantic models.

Attributes

__class_vars__
The names of the class variables defined on the model.
__private_attributes__
Metadata about the private attributes of the model.
__signature__
The synthesized __init__ [Signature][inspect.Signature] of the model.
__pydantic_complete__
Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__
The core schema of the model.
__pydantic_custom_init__
Whether the model has a custom __init__ function.
__pydantic_decorators__
Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
__pydantic_generic_metadata__
Metadata for generic models; contains data used for a similar purpose to args, origin, parameters in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__
Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__
The name of the post-init method for the model, if defined.
__pydantic_root_model__
Whether the model is a [RootModel][pydantic.root_model.RootModel].
__pydantic_serializer__
The pydantic-core SchemaSerializer used to dump instances of the model.
__pydantic_validator__
The pydantic-core SchemaValidator used to validate instances of the model.
__pydantic_fields__
A dictionary of field names and their corresponding [FieldInfo][pydantic.fields.FieldInfo] objects.
__pydantic_computed_fields__
A dictionary of computed field names and their corresponding [ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects.
__pydantic_extra__
A dictionary containing extra values, if [extra][pydantic.config.ConfigDict.extra] is set to 'allow'.
__pydantic_fields_set__
The names of fields explicitly set during instantiation.
__pydantic_private__
Values of private attributes set on the model instance.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • pydantic.main.BaseModel

Class variables

var created_at : float
var is_final : bool
var language : str | None
var model_config
var speaker_id : str | None
var transcript : str
var type : Literal['user_input_transcribed']
class UserStateChangedEvent (**data: Any)
Expand source code
class UserStateChangedEvent(BaseModel):
    type: Literal["user_state_changed"] = "user_state_changed"
    old_state: UserState
    new_state: UserState
    created_at: float = Field(default_factory=time.time)

Usage docs: https://docs.pydantic.dev/2.10/concepts/models/

A base class for creating Pydantic models.

Attributes

__class_vars__
The names of the class variables defined on the model.
__private_attributes__
Metadata about the private attributes of the model.
__signature__
The synthesized __init__ [Signature][inspect.Signature] of the model.
__pydantic_complete__
Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__
The core schema of the model.
__pydantic_custom_init__
Whether the model has a custom __init__ function.
__pydantic_decorators__
Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
__pydantic_generic_metadata__
Metadata for generic models; contains data used for a similar purpose to args, origin, parameters in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__
Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__
The name of the post-init method for the model, if defined.
__pydantic_root_model__
Whether the model is a [RootModel][pydantic.root_model.RootModel].
__pydantic_serializer__
The pydantic-core SchemaSerializer used to dump instances of the model.
__pydantic_validator__
The pydantic-core SchemaValidator used to validate instances of the model.
__pydantic_fields__
A dictionary of field names and their corresponding [FieldInfo][pydantic.fields.FieldInfo] objects.
__pydantic_computed_fields__
A dictionary of computed field names and their corresponding [ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects.
__pydantic_extra__
A dictionary containing extra values, if [extra][pydantic.config.ConfigDict.extra] is set to 'allow'.
__pydantic_fields_set__
The names of fields explicitly set during instantiation.
__pydantic_private__
Values of private attributes set on the model instance.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • pydantic.main.BaseModel

Class variables

var created_at : float
var model_config
var new_state : Literal['speaking', 'listening', 'away']
var old_state : Literal['speaking', 'listening', 'away']
var type : Literal['user_state_changed']
class VoiceActivityVideoSampler (*, speaking_fps: float = 1.0, silent_fps: float = 0.3)
Expand source code
class VoiceActivityVideoSampler:
    def __init__(self, *, speaking_fps: float = 1.0, silent_fps: float = 0.3):
        if speaking_fps <= 0 or silent_fps <= 0:
            raise ValueError("FPS values must be greater than zero")

        self.speaking_fps = speaking_fps
        self.silent_fps = silent_fps
        self._last_sampled_time: float | None = None

    def __call__(self, frame: rtc.VideoFrame, session: AgentSession) -> bool:
        now = time.time()
        is_speaking = session.user_state == "speaking"
        target_fps = self.speaking_fps if is_speaking else self.silent_fps
        min_frame_interval = 1.0 / target_fps

        if self._last_sampled_time is None:
            self._last_sampled_time = now
            return True

        if (now - self._last_sampled_time) >= min_frame_interval:
            self._last_sampled_time = now
            return True

        return False
class _ParticipantAudioOutput (room: rtc.Room,
*,
sample_rate: int,
num_channels: int,
track_publish_options: rtc.TrackPublishOptions,
track_name: str = 'roomio_audio',
queue_size_ms: int = 100000)
Expand source code
class _ParticipantAudioOutput(io.AudioOutput):
    def __init__(
        self,
        room: rtc.Room,
        *,
        sample_rate: int,
        num_channels: int,
        track_publish_options: rtc.TrackPublishOptions,
        track_name: str = "roomio_audio",
        queue_size_ms: int = 100_000,  # TODO(long): move buffer to python
    ) -> None:
        super().__init__(label="RoomIO", next_in_chain=None, sample_rate=sample_rate)
        self._room = room
        self._track_name = track_name
        self._lock = asyncio.Lock()
        self._audio_source = rtc.AudioSource(sample_rate, num_channels, queue_size_ms)
        self._publish_options = track_publish_options
        self._publication: rtc.LocalTrackPublication | None = None
        self._subscribed_fut = asyncio.Future[None]()

        # used to republish track on reconnection
        self._republish_task: asyncio.Task[None] | None = None
        self._flush_task: asyncio.Task[None] | None = None
        self._interrupted_event = asyncio.Event()

        self._pushed_duration: float = 0.0
        self._interrupted: bool = False

    async def _publish_track(self) -> None:
        async with self._lock:
            track = rtc.LocalAudioTrack.create_audio_track(self._track_name, self._audio_source)
            self._publication = await self._room.local_participant.publish_track(
                track, self._publish_options
            )
            await self._publication.wait_for_subscription()
            if not self._subscribed_fut.done():
                self._subscribed_fut.set_result(None)

    @property
    def subscribed(self) -> asyncio.Future[None]:
        return self._subscribed_fut

    async def start(self) -> None:
        await self._publish_track()
        self._room.on("reconnected", self._on_reconnected)

    async def aclose(self) -> None:
        self._room.off("reconnected", self._on_reconnected)
        if self._republish_task:
            await utils.aio.cancel_and_wait(self._republish_task)
        if self._flush_task:
            await utils.aio.cancel_and_wait(self._flush_task)

        await self._audio_source.aclose()

    async def capture_frame(self, frame: rtc.AudioFrame) -> None:
        await self._subscribed_fut

        await super().capture_frame(frame)

        if self._flush_task and not self._flush_task.done():
            logger.error("capture_frame called while flush is in progress")
            await self._flush_task

        self._pushed_duration += frame.duration
        await self._audio_source.capture_frame(frame)

    def flush(self) -> None:
        super().flush()

        if not self._pushed_duration:
            return

        if self._flush_task and not self._flush_task.done():
            # shouldn't happen if only one active speech handle at a time
            logger.error("flush called while playback is in progress")
            self._flush_task.cancel()

        self._flush_task = asyncio.create_task(self._wait_for_playout())

    def clear_buffer(self) -> None:
        if not self._pushed_duration:
            return
        self._interrupted_event.set()

    async def _wait_for_playout(self) -> None:
        wait_for_interruption = asyncio.create_task(self._interrupted_event.wait())
        wait_for_playout = asyncio.create_task(self._audio_source.wait_for_playout())
        await asyncio.wait(
            [wait_for_playout, wait_for_interruption],
            return_when=asyncio.FIRST_COMPLETED,
        )

        interrupted = wait_for_interruption.done()
        pushed_duration = self._pushed_duration

        if interrupted:
            pushed_duration = max(pushed_duration - self._audio_source.queued_duration, 0)
            self._audio_source.clear_queue()
            wait_for_playout.cancel()
        else:
            wait_for_interruption.cancel()

        self._pushed_duration = 0
        self._interrupted_event.clear()
        self.on_playback_finished(playback_position=pushed_duration, interrupted=interrupted)

    def _on_reconnected(self) -> None:
        if self._republish_task:
            self._republish_task.cancel()
        self._republish_task = asyncio.create_task(self._publish_track())

Helper class that provides a standard way to create an ABC using inheritance.

Args

sample_rate
The sample rate required by the audio sink, if None, any sample rate is accepted

Ancestors

Instance variables

prop subscribed : asyncio.Future[None]
Expand source code
@property
def subscribed(self) -> asyncio.Future[None]:
    return self._subscribed_fut

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    self._room.off("reconnected", self._on_reconnected)
    if self._republish_task:
        await utils.aio.cancel_and_wait(self._republish_task)
    if self._flush_task:
        await utils.aio.cancel_and_wait(self._flush_task)

    await self._audio_source.aclose()
async def start(self) ‑> None
Expand source code
async def start(self) -> None:
    await self._publish_track()
    self._room.on("reconnected", self._on_reconnected)

Inherited members

class _ParticipantStreamTranscriptionOutput (room: rtc.Room,
*,
is_delta_stream: bool = True,
participant: rtc.Participant | str | None = None,
attributes: dict[str, str] | None = None)
Expand source code
class _ParticipantStreamTranscriptionOutput:
    def __init__(
        self,
        room: rtc.Room,
        *,
        is_delta_stream: bool = True,
        participant: rtc.Participant | str | None = None,
        attributes: dict[str, str] | None = None,
    ):
        self._room, self._is_delta_stream = room, is_delta_stream
        self._track_id: str | None = None
        self._participant_identity: str | None = None
        self._additional_attributes = attributes or {}

        self._writer: rtc.TextStreamWriter | None = None

        self._room.on("track_published", self._on_track_published)
        self._room.on("local_track_published", self._on_local_track_published)
        self._flush_atask: asyncio.Task[None] | None = None

        self._reset_state()
        self.set_participant(participant)

    def set_participant(
        self,
        participant: rtc.Participant | str | None,
    ) -> None:
        self._participant_identity = (
            participant.identity if isinstance(participant, rtc.Participant) else participant
        )
        if self._participant_identity is None:
            return

        try:
            self._track_id = find_micro_track_id(self._room, self._participant_identity)
        except ValueError:
            # track id is optional for TextStream when audio is not published
            self._track_id = None

        self.flush()
        self._reset_state()

    def _reset_state(self) -> None:
        self._current_id = utils.shortuuid("SG_")
        self._capturing = False
        self._latest_text = ""

    async def _create_text_writer(
        self, attributes: dict[str, str] | None = None
    ) -> rtc.TextStreamWriter:
        assert self._participant_identity is not None, "participant_identity is not set"

        if not attributes:
            attributes = {
                ATTRIBUTE_TRANSCRIPTION_FINAL: "false",
            }
            if self._track_id:
                attributes[ATTRIBUTE_TRANSCRIPTION_TRACK_ID] = self._track_id
        attributes[ATTRIBUTE_TRANSCRIPTION_SEGMENT_ID] = self._current_id

        for key, val in self._additional_attributes.items():
            if key not in attributes:
                attributes[key] = val

        return await self._room.local_participant.stream_text(
            topic=TOPIC_TRANSCRIPTION,
            sender_identity=self._participant_identity,
            attributes=attributes,
        )

    @utils.log_exceptions(logger=logger)
    async def capture_text(self, text: str) -> None:
        if self._participant_identity is None:
            return

        if self._flush_atask and not self._flush_atask.done():
            await self._flush_atask

        if not self._capturing:
            self._reset_state()
            self._capturing = True

        self._latest_text = text

        try:
            if self._room.isconnected():
                if self._is_delta_stream:  # reuse the existing writer
                    if self._writer is None:
                        self._writer = await self._create_text_writer()

                    await self._writer.write(text)
                else:  # always create a new writer
                    tmp_writer = await self._create_text_writer()
                    await tmp_writer.write(text)
                    await tmp_writer.aclose()
        except Exception as e:
            logger.warning("failed to publish transcription", exc_info=e)

    async def _flush_task(self, writer: rtc.TextStreamWriter | None) -> None:
        attributes = {ATTRIBUTE_TRANSCRIPTION_FINAL: "true"}
        if self._track_id:
            attributes[ATTRIBUTE_TRANSCRIPTION_TRACK_ID] = self._track_id

        try:
            if self._room.isconnected():
                if self._is_delta_stream:
                    if writer:
                        await writer.aclose(attributes=attributes)
                else:
                    tmp_writer = await self._create_text_writer(attributes=attributes)
                    await tmp_writer.write(self._latest_text)
                    await tmp_writer.aclose()
        except Exception as e:
            logger.warning("failed to publish transcription", exc_info=e)

    def flush(self) -> None:
        if self._participant_identity is None or not self._capturing:
            return

        self._capturing = False
        curr_writer = self._writer
        self._writer = None
        self._flush_atask = asyncio.create_task(self._flush_task(curr_writer))

    def _on_track_published(
        self, track: rtc.RemoteTrackPublication, participant: rtc.RemoteParticipant
    ) -> None:
        if (
            self._participant_identity is None
            or participant.identity != self._participant_identity
            or track.source != rtc.TrackSource.SOURCE_MICROPHONE
        ):
            return

        self._track_id = track.sid

    def _on_local_track_published(self, track: rtc.LocalTrackPublication, _: rtc.Track) -> None:
        if (
            self._participant_identity is None
            or self._participant_identity != self._room.local_participant.identity
            or track.source != rtc.TrackSource.SOURCE_MICROPHONE
        ):
            return

        self._track_id = track.sid

Methods

async def capture_text(self, text: str) ‑> None
Expand source code
@utils.log_exceptions(logger=logger)
async def capture_text(self, text: str) -> None:
    if self._participant_identity is None:
        return

    if self._flush_atask and not self._flush_atask.done():
        await self._flush_atask

    if not self._capturing:
        self._reset_state()
        self._capturing = True

    self._latest_text = text

    try:
        if self._room.isconnected():
            if self._is_delta_stream:  # reuse the existing writer
                if self._writer is None:
                    self._writer = await self._create_text_writer()

                await self._writer.write(text)
            else:  # always create a new writer
                tmp_writer = await self._create_text_writer()
                await tmp_writer.write(text)
                await tmp_writer.aclose()
    except Exception as e:
        logger.warning("failed to publish transcription", exc_info=e)
def flush(self) ‑> None
Expand source code
def flush(self) -> None:
    if self._participant_identity is None or not self._capturing:
        return

    self._capturing = False
    curr_writer = self._writer
    self._writer = None
    self._flush_atask = asyncio.create_task(self._flush_task(curr_writer))
def set_participant(self, participant: rtc.Participant | str | None) ‑> None
Expand source code
def set_participant(
    self,
    participant: rtc.Participant | str | None,
) -> None:
    self._participant_identity = (
        participant.identity if isinstance(participant, rtc.Participant) else participant
    )
    if self._participant_identity is None:
        return

    try:
        self._track_id = find_micro_track_id(self._room, self._participant_identity)
    except ValueError:
        # track id is optional for TextStream when audio is not published
        self._track_id = None

    self.flush()
    self._reset_state()
class _ParticipantTranscriptionOutput (*,
room: rtc.Room,
is_delta_stream: bool = True,
participant: rtc.Participant | str | None = None,
next_in_chain: TextOutput | None = None)
Expand source code
class _ParticipantTranscriptionOutput(io.TextOutput):
    def __init__(
        self,
        *,
        room: rtc.Room,
        is_delta_stream: bool = True,
        participant: rtc.Participant | str | None = None,
        next_in_chain: io.TextOutput | None = None,
    ) -> None:
        super().__init__(label="RoomIO", next_in_chain=next_in_chain)

        self.__outputs: list[
            _ParticipantLegacyTranscriptionOutput | _ParticipantStreamTranscriptionOutput
        ] = [
            _ParticipantLegacyTranscriptionOutput(
                room=room,
                is_delta_stream=is_delta_stream,
                participant=participant,
            ),
            _ParticipantStreamTranscriptionOutput(
                room=room,
                is_delta_stream=is_delta_stream,
                participant=participant,
            ),
        ]

    def set_participant(self, participant: rtc.Participant | str | None) -> None:
        for source in self.__outputs:
            source.set_participant(participant)

    async def capture_text(self, text: str) -> None:
        await asyncio.gather(*[sink.capture_text(text) for sink in self.__outputs])

        if self.next_in_chain:
            await self.next_in_chain.capture_text(text)

    def flush(self) -> None:
        for source in self.__outputs:
            source.flush()

        if self.next_in_chain:
            self.next_in_chain.flush()

Helper class that provides a standard way to create an ABC using inheritance.

Ancestors

Methods

def set_participant(self, participant: rtc.Participant | str | None) ‑> None
Expand source code
def set_participant(self, participant: rtc.Participant | str | None) -> None:
    for source in self.__outputs:
        source.set_participant(participant)

Inherited members