Module livekit.agents

Sub-modules

livekit.agents.jupyter
livekit.agents.resources

Functions

def function_tool(f: F | Raw_F | None = None,
*,
name: str | None = None,
description: str | None = None,
raw_schema: RawFunctionDescription | dict | None = None) ‑> livekit.agents.llm.tool_context.FunctionTool | livekit.agents.llm.tool_context.RawFunctionTool | Callable[[~F | ~Raw_F], livekit.agents.llm.tool_context.FunctionTool | livekit.agents.llm.tool_context.RawFunctionTool]
Expand source code
def function_tool(
    f: F | Raw_F | None = None,
    *,
    name: str | None = None,
    description: str | None = None,
    raw_schema: RawFunctionDescription | dict | None = None,
) -> FunctionTool | RawFunctionTool | Callable[[F | Raw_F], FunctionTool | RawFunctionTool]:
    def deco(func: F | Raw_F) -> RawFunctionTool | FunctionTool:
        if raw_schema is not None:
            if not raw_schema.get("name") or not raw_schema.get("parameters"):
                raise ValueError("raw function description must contain a name and parameters key")

            info = _RawFunctionToolInfo(raw_schema={**raw_schema}, name=raw_schema["name"])
            setattr(func, "__livekit_raw_tool_info", info)
            return cast(RawFunctionTool, func)
        else:
            from docstring_parser import parse_from_object

            docstring = parse_from_object(func)
            info = _FunctionToolInfo(
                name=name or func.__name__,
                description=description or docstring.description,
            )
            setattr(func, "__livekit_tool_info", info)
            return cast(FunctionTool, func)

    if f is not None:
        return deco(f)

    return deco
def get_job_context() ‑> livekit.agents.job.JobContext
Expand source code
def get_job_context() -> JobContext:
    ctx = _JobContextVar.get(None)
    if ctx is None:
        raise RuntimeError(
            "no job context found, are you running this code inside a job entrypoint?"
        )

    return ctx

Classes

class APIConnectOptions (max_retry: int = 3, retry_interval: float = 2.0, timeout: float = 10.0)
Expand source code
@dataclass(frozen=True)
class APIConnectOptions:
    max_retry: int = 3
    """
    Maximum number of retries to connect to the API.
    """

    retry_interval: float = 2.0
    """
    Interval between retries to connect to the API in seconds.
    """

    timeout: float = 10.0
    """
    Timeout for connecting to the API in seconds.
    """

    def __post_init__(self):
        if self.max_retry < 0:
            raise ValueError("max_retry must be greater than or equal to 0")

        if self.retry_interval < 0:
            raise ValueError("retry_interval must be greater than or equal to 0")

        if self.timeout < 0:
            raise ValueError("timeout must be greater than or equal to 0")

    def _interval_for_retry(self, num_retries: int) -> float:
        """
        Return the interval for the given number of retries.

        The first retry is immediate, and then uses specified retry_interval
        """
        if num_retries == 0:
            return 0.1
        return self.retry_interval

APIConnectOptions(max_retry: int = 3, retry_interval: float = 2.0, timeout: float = 10.0)

Instance variables

var max_retry : int

Maximum number of retries to connect to the API.

var retry_interval : float

Interval between retries to connect to the API in seconds.

var timeout : float

Timeout for connecting to the API in seconds.

class APIConnectionError (message: str = 'Connection error.', *, retryable: bool = True)
Expand source code
class APIConnectionError(APIError):
    """Raised when an API request failed due to a connection error."""

    def __init__(self, message: str = "Connection error.", *, retryable: bool = True) -> None:
        super().__init__(message, body=None, retryable=retryable)

Raised when an API request failed due to a connection error.

Ancestors

  • livekit.agents._exceptions.APIError
  • builtins.Exception
  • builtins.BaseException

Subclasses

  • livekit.agents._exceptions.APITimeoutError
class APIError (message: str, *, body: object | None, retryable: bool = True)
Expand source code
class APIError(Exception):
    """Raised when an API request failed.
    This is used on our TTS/STT/LLM plugins."""

    message: str
    """
    The error message returned by the API.
    """

    body: object | None
    """The API response body, if available.


    If the API returned a valid json, the body will contains
    the decodede result.
    """

    retryable: bool = False
    """Whether the error can be retried."""

    def __init__(self, message: str, *, body: object | None, retryable: bool = True) -> None:
        super().__init__(message)

        self.message = message
        self.body = body
        self.retryable = retryable

Raised when an API request failed. This is used on our TTS/STT/LLM plugins.

Ancestors

  • builtins.Exception
  • builtins.BaseException

Subclasses

  • livekit.agents._exceptions.APIConnectionError
  • livekit.agents._exceptions.APIStatusError

Class variables

var body : object | None

The API response body, if available.

If the API returned a valid json, the body will contains the decodede result.

var message : str

The error message returned by the API.

var retryable : bool

Whether the error can be retried.

class APIStatusError (message: str,
*,
status_code: int = -1,
request_id: str | None = None,
body: object | None = None,
retryable: bool | None = None)
Expand source code
class APIStatusError(APIError):
    """Raised when an API response has a status code of 4xx or 5xx."""

    status_code: int
    """The status code of the API response."""

    request_id: str | None
    """The request ID of the API response, if available."""

    def __init__(
        self,
        message: str,
        *,
        status_code: int = -1,
        request_id: str | None = None,
        body: object | None = None,
        retryable: bool | None = None,
    ) -> None:
        if retryable is None:
            retryable = True
            # 4xx errors are not retryable
            if status_code >= 400 and status_code < 500:
                retryable = False

        super().__init__(message, body=body, retryable=retryable)

        self.status_code = status_code
        self.request_id = request_id

    def __str__(self):
        return (
            f"{self.message} "
            f"(status_code={self.status_code}, request_id={self.request_id}, body={self.body})"
        )

Raised when an API response has a status code of 4xx or 5xx.

Ancestors

  • livekit.agents._exceptions.APIError
  • builtins.Exception
  • builtins.BaseException

Class variables

var request_id : str | None

The request ID of the API response, if available.

var status_code : int

The status code of the API response.

class APITimeoutError (message: str = 'Request timed out.', *, retryable: bool = True)
Expand source code
class APITimeoutError(APIConnectionError):
    """Raised when an API request timed out."""

    def __init__(self, message: str = "Request timed out.", *, retryable: bool = True) -> None:
        super().__init__(message, retryable=retryable)

Raised when an API request timed out.

Ancestors

  • livekit.agents._exceptions.APIConnectionError
  • livekit.agents._exceptions.APIError
  • builtins.Exception
  • builtins.BaseException
class Agent (*,
instructions: str,
chat_ctx: NotGivenOr[llm.ChatContext | None] = NOT_GIVEN,
tools: list[llm.FunctionTool] | None = None,
turn_detection: NotGivenOr[TurnDetectionMode | None] = NOT_GIVEN,
stt: NotGivenOr[stt.STT | None] = NOT_GIVEN,
vad: NotGivenOr[vad.VAD | None] = NOT_GIVEN,
llm: NotGivenOr[llm.LLM | llm.RealtimeModel | None] = NOT_GIVEN,
tts: NotGivenOr[tts.TTS | None] = NOT_GIVEN,
allow_interruptions: NotGivenOr[bool] = NOT_GIVEN)
Expand source code
class Agent:
    def __init__(
        self,
        *,
        instructions: str,
        chat_ctx: NotGivenOr[llm.ChatContext | None] = NOT_GIVEN,
        tools: list[llm.FunctionTool] | None = None,
        turn_detection: NotGivenOr[TurnDetectionMode | None] = NOT_GIVEN,
        stt: NotGivenOr[stt.STT | None] = NOT_GIVEN,
        vad: NotGivenOr[vad.VAD | None] = NOT_GIVEN,
        llm: NotGivenOr[llm.LLM | llm.RealtimeModel | None] = NOT_GIVEN,
        tts: NotGivenOr[tts.TTS | None] = NOT_GIVEN,
        allow_interruptions: NotGivenOr[bool] = NOT_GIVEN,
    ) -> None:
        tools = tools or []
        self._instructions = instructions
        self._tools = tools.copy() + find_function_tools(self)
        self._chat_ctx = chat_ctx.copy(tools=self._tools) if chat_ctx else ChatContext.empty()
        self._turn_detection = turn_detection
        self._stt = stt
        self._llm = llm
        self._tts = tts
        self._vad = vad
        self._allow_interruptions = allow_interruptions
        self._activity: AgentActivity | None = None

    @property
    def instructions(self) -> str:
        """
        Returns:
            str: The core instructions that guide the agent's behavior.
        """
        return self._instructions

    @property
    def tools(self) -> list[llm.FunctionTool]:
        """
        Returns:
            list[llm.FunctionTool]: A list of function tools available to the agent.
        """
        return self._tools.copy()

    @property
    def chat_ctx(self) -> llm.ChatContext:
        """
        Provides a read-only view of the agent's current chat context.

        Returns:
            llm.ChatContext: A read-only version of the agent's conversation history.

        See Also:
            update_chat_ctx: Method to update the internal chat context.
        """
        return _ReadOnlyChatContext(self._chat_ctx.items)

    async def update_instructions(self, instructions: str) -> None:
        """
        Updates the agent's instructions.

        If the agent is running in realtime mode, this method also updates
        the instructions for the ongoing realtime session.

        Args:
            instructions (str):
                The new instructions to set for the agent.

        Raises:
            llm.RealtimeError: If updating the realtime session instructions fails.
        """
        if self._activity is None:
            self._instructions = instructions
            return

        await self._activity.update_instructions(instructions)

    async def update_tools(self, tools: list[llm.FunctionTool]) -> None:
        """
        Updates the agent's available function tools.

        If the agent is running in realtime mode, this method also updates
        the tools for the ongoing realtime session.

        Args:
            tools (list[llm.FunctionTool]):
                The new list of function tools available to the agent.

        Raises:
            llm.RealtimeError: If updating the realtime session tools fails.
        """
        if self._activity is None:
            self._tools = list(set(tools))
            self._chat_ctx = self._chat_ctx.copy(tools=self._tools)
            return

        await self._activity.update_tools(tools)

    async def update_chat_ctx(self, chat_ctx: llm.ChatContext) -> None:
        """
        Updates the agent's chat context.

        If the agent is running in realtime mode, this method also updates
        the chat context for the ongoing realtime session.

        Args:
            chat_ctx (llm.ChatContext):
                The new or updated chat context for the agent.

        Raises:
            llm.RealtimeError: If updating the realtime session chat context fails.
        """
        if self._activity is None:
            self._chat_ctx = chat_ctx.copy(tools=self._tools)
            return

        await self._activity.update_chat_ctx(chat_ctx)

    @property
    def turn_detection(self) -> NotGivenOr[TurnDetectionMode | None]:
        """
        Retrieves the turn detection mode for identifying conversational turns.

        If this property was not set at Agent creation, but an ``AgentSession`` provides a turn detection,
        the session's turn detection mode will be used at runtime instead.

        Returns:
            NotGivenOr[TurnDetectionMode | None]: An optional turn detection mode for managing conversation flow.
        """  # noqa: E501
        return self._turn_detection

    @property
    def stt(self) -> NotGivenOr[stt.STT | None]:
        """
        Retrieves the Speech-To-Text component for the agent.

        If this property was not set at Agent creation, but an ``AgentSession`` provides an STT component,
        the session's STT will be used at runtime instead.

        Returns:
            NotGivenOr[stt.STT | None]: An optional STT component.
        """  # noqa: E501
        return self._stt

    @property
    def llm(self) -> NotGivenOr[llm.LLM | llm.RealtimeModel | None]:
        """
        Retrieves the Language Model or RealtimeModel used for text generation.

        If this property was not set at Agent creation, but an ``AgentSession`` provides an LLM or RealtimeModel,
        the session's model will be used at runtime instead.

        Returns:
            NotGivenOr[llm.LLM | llm.RealtimeModel | None]: The language model for text generation.
        """  # noqa: E501
        return self._llm

    @property
    def tts(self) -> NotGivenOr[tts.TTS | None]:
        """
        Retrieves the Text-To-Speech component for the agent.

        If this property was not set at Agent creation, but an ``AgentSession`` provides a TTS component,
        the session's TTS will be used at runtime instead.

        Returns:
            NotGivenOr[tts.TTS | None]: An optional TTS component for generating audio output.
        """  # noqa: E501
        return self._tts

    @property
    def vad(self) -> NotGivenOr[vad.VAD | None]:
        """
        Retrieves the Voice Activity Detection component for the agent.

        If this property was not set at Agent creation, but an ``AgentSession`` provides a VAD component,
        the session's VAD will be used at runtime instead.

        Returns:
            NotGivenOr[vad.VAD | None]: An optional VAD component for detecting voice activity.
        """  # noqa: E501
        return self._vad

    @property
    def allow_interruptions(self) -> NotGivenOr[bool]:
        """
        Indicates whether interruptions (e.g., stopping TTS playback) are allowed.

        If this property was not set at Agent creation, but an ``AgentSession`` provides a value for
        allowing interruptions, the session's value will be used at runtime instead.

        Returns:
            NotGivenOr[bool]: Whether interruptions are permitted.
        """
        return self._allow_interruptions

    @property
    def realtime_llm_session(self) -> llm.RealtimeSession:
        """
        Retrieve the realtime LLM session associated with the current agent.

        Raises:
            RuntimeError: If the agent is not running or the realtime LLM session is not available
        """
        if (rt_session := self._get_activity_or_raise().realtime_llm_session) is None:
            raise RuntimeError("no realtime LLM session")

        return rt_session

    @property
    def session(self) -> AgentSession:
        """
        Retrieve the VoiceAgent associated with the current agent.

        Raises:
            RuntimeError: If the agent is not running
        """
        return self._get_activity_or_raise().session

    # -- Pipeline nodes --
    # They can all be overriden by subclasses, by default they use the STT/LLM/TTS specified in the
    # constructor of the VoiceAgent

    async def on_enter(self) -> None:
        """Called when the task is entered"""
        pass

    async def on_exit(self) -> None:
        """Called when the task is exited"""
        pass

    async def on_user_turn_completed(
        self, turn_ctx: llm.ChatContext, new_message: llm.ChatMessage
    ) -> None:
        """Called when the user has finished speaking, and the LLM is about to respond

        This is a good opportunity to update the chat context or edit the new message before it is
        sent to the LLM.
        """
        pass

    def stt_node(
        self, audio: AsyncIterable[rtc.AudioFrame], model_settings: ModelSettings
    ) -> (
        AsyncIterable[stt.SpeechEvent | str]
        | Coroutine[Any, Any, AsyncIterable[stt.SpeechEvent | str]]
        | Coroutine[Any, Any, None]
    ):
        """
        A node in the processing pipeline that transcribes audio frames into speech events.

        By default, this node uses a Speech-To-Text (STT) capability from the current agent.
        If the STT implementation does not support streaming natively, a VAD (Voice Activity
        Detection) mechanism is required to wrap the STT.

        You can override this node with your own implementation for more flexibility (e.g.,
        custom pre-processing of audio, additional buffering, or alternative STT strategies).

        Args:
            audio (AsyncIterable[rtc.AudioFrame]): An asynchronous stream of audio frames.
            model_settings (ModelSettings): Configuration and parameters for model execution.

        Yields:
            stt.SpeechEvent: An event containing transcribed text or other STT-related data.
        """
        return Agent.default.stt_node(self, audio, model_settings)

    def llm_node(
        self,
        chat_ctx: llm.ChatContext,
        tools: list[FunctionTool],
        model_settings: ModelSettings,
    ) -> (
        AsyncIterable[llm.ChatChunk | str]
        | Coroutine[Any, Any, AsyncIterable[llm.ChatChunk | str]]
        | Coroutine[Any, Any, str]
        | Coroutine[Any, Any, llm.ChatChunk]
        | Coroutine[Any, Any, None]
    ):
        """
        A node in the processing pipeline that processes text generation with an LLM.

        By default, this node uses the agent's LLM to process the provided context. It may yield
        plain text (as `str`) for straightforward text generation, or `llm.ChatChunk` objects that
        can include text and optional tool calls. `ChatChunk` is helpful for capturing more complex
        outputs such as function calls, usage statistics, or other metadata.

        You can override this node to customize how the LLM is used or how tool invocations
        and responses are handled.

        Args:
            chat_ctx (llm.ChatContext): The context for the LLM (the conversation history).
            tools (list[FunctionTool]): A list of callable tools that the LLM may invoke.
            model_settings (ModelSettings): Configuration and parameters for model execution.

        Yields/Returns:
            str: Plain text output from the LLM.
            llm.ChatChunk: An object that can contain both text and optional tool calls.
        """
        return Agent.default.llm_node(self, chat_ctx, tools, model_settings)

    def transcription_node(
        self, text: AsyncIterable[str], model_settings: ModelSettings
    ) -> AsyncIterable[str] | Coroutine[Any, Any, AsyncIterable[str]] | Coroutine[Any, Any, None]:
        """
        A node in the processing pipeline that finalizes transcriptions from text segments.

        This node can be used to adjust or post-process text coming from an LLM (or any other
        source) into a final transcribed form. For instance, you might clean up formatting, fix
        punctuation, or perform any other text transformations here.

        You can override this node to customize post-processing logic according to your needs.

        Args:
            text (AsyncIterable[str]): An asynchronous stream of text segments.
            model_settings (ModelSettings): Configuration and parameters for model execution.

        Yields:
            str: Finalized or post-processed text segments.
        """
        return Agent.default.transcription_node(self, text, model_settings)

    def tts_node(
        self, text: AsyncIterable[str], model_settings: ModelSettings
    ) -> (
        AsyncGenerator[rtc.AudioFrame, None]
        | Coroutine[Any, Any, AsyncIterable[rtc.AudioFrame]]
        | Coroutine[Any, Any, None]
    ):
        """
        A node in the processing pipeline that synthesizes audio from text segments.

        By default, this node converts incoming text into audio frames using the Text-To-Speech
        from the agent.
        If the TTS implementation does not support streaming natively, it uses a sentence tokenizer
        to split text for incremental synthesis.

        You can override this node to provide different text chunking behavior, a custom TTS engine,
        or any other specialized processing.

        Args:
            text (AsyncIterable[str]): An asynchronous stream of text segments to be synthesized.
            model_settings (ModelSettings): Configuration and parameters for model execution.

        Yields:
            rtc.AudioFrame: Audio frames synthesized from the provided text.
        """
        return Agent.default.tts_node(self, text, model_settings)

    def realtime_audio_output_node(
        self, audio: AsyncIterable[rtc.AudioFrame], model_settings: ModelSettings
    ) -> (
        AsyncIterable[rtc.AudioFrame]
        | Coroutine[Any, Any, AsyncIterable[rtc.AudioFrame]]
        | Coroutine[Any, Any, None]
    ):
        """A node processing the audio from the realtime LLM session before it is played out."""
        return Agent.default.realtime_audio_output_node(self, audio, model_settings)

    def _get_activity_or_raise(self) -> AgentActivity:
        """Get the current activity context for this task (internal)"""
        if self._activity is None:
            raise RuntimeError("no activity context found, this task is not running")

        return self._activity

    class default:
        @staticmethod
        async def stt_node(
            agent: Agent, audio: AsyncIterable[rtc.AudioFrame], model_settings: ModelSettings
        ) -> AsyncGenerator[stt.SpeechEvent, None]:
            """Default implementation for `Agent.stt_node`"""
            activity = agent._get_activity_or_raise()
            assert activity.stt is not None, "stt_node called but no STT node is available"

            wrapped_stt = activity.stt

            if not activity.stt.capabilities.streaming:
                if not activity.vad:
                    raise RuntimeError(
                        f"The STT ({activity.stt.label}) does not support streaming, add a VAD to the AgentTask/VoiceAgent to enable streaming"  # noqa: E501
                        "Or manually wrap your STT in a stt.StreamAdapter"
                    )

                wrapped_stt = stt.StreamAdapter(stt=wrapped_stt, vad=activity.vad)

            async with wrapped_stt.stream() as stream:

                @utils.log_exceptions(logger=logger)
                async def _forward_input():
                    async for frame in audio:
                        stream.push_frame(frame)

                forward_task = asyncio.create_task(_forward_input())
                try:
                    async for event in stream:
                        yield event
                finally:
                    await utils.aio.cancel_and_wait(forward_task)

        @staticmethod
        async def llm_node(
            agent: Agent,
            chat_ctx: llm.ChatContext,
            tools: list[FunctionTool],
            model_settings: ModelSettings,
        ) -> AsyncGenerator[llm.ChatChunk | str, None]:
            """Default implementation for `Agent.llm_node`"""
            activity = agent._get_activity_or_raise()
            assert activity.llm is not None, "llm_node called but no LLM node is available"
            assert isinstance(activity.llm, llm.LLM), (
                "llm_node should only be used with LLM (non-multimodal/realtime APIs) nodes"
            )

            tool_choice = model_settings.tool_choice if model_settings else NOT_GIVEN
            activity_llm = activity.llm

            async with activity_llm.chat(
                chat_ctx=chat_ctx, tools=tools, tool_choice=tool_choice
            ) as stream:
                async for chunk in stream:
                    yield chunk

        @staticmethod
        async def tts_node(
            agent: Agent, text: AsyncIterable[str], model_settings: ModelSettings
        ) -> AsyncGenerator[rtc.AudioFrame, None]:
            """Default implementation for `Agent.tts_node`"""
            activity = agent._get_activity_or_raise()
            assert activity.tts is not None, "tts_node called but no TTS node is available"

            wrapped_tts = activity.tts

            if not activity.tts.capabilities.streaming:
                wrapped_tts = tts.StreamAdapter(
                    tts=wrapped_tts, sentence_tokenizer=tokenize.basic.SentenceTokenizer()
                )

            async with wrapped_tts.stream() as stream:

                async def _forward_input():
                    async for chunk in text:
                        stream.push_text(chunk)

                    stream.end_input()

                forward_task = asyncio.create_task(_forward_input())
                try:
                    async for ev in stream:
                        yield ev.frame
                finally:
                    await utils.aio.cancel_and_wait(forward_task)

        @staticmethod
        async def transcription_node(
            agent: Agent, text: AsyncIterable[str], model_settings: ModelSettings
        ) -> AsyncGenerator[str, None]:
            """Default implementation for `Agent.transcription_node`"""
            async for delta in text:
                yield delta

        @staticmethod
        async def realtime_audio_output_node(
            agent: Agent, audio: AsyncIterable[rtc.AudioFrame], model_settings: ModelSettings
        ) -> AsyncGenerator[rtc.AudioFrame, None]:
            """Default implementation for `Agent.realtime_audio_output_node`"""
            activity = agent._get_activity_or_raise()
            assert activity.realtime_llm_session is not None, (
                "realtime_audio_output_node called but no realtime LLM session is available"
            )

            async for frame in audio:
                yield frame

Subclasses

  • livekit.agents.voice.agent.InlineTask

Class variables

var default

Instance variables

prop allow_interruptions : NotGivenOr[bool]
Expand source code
@property
def allow_interruptions(self) -> NotGivenOr[bool]:
    """
    Indicates whether interruptions (e.g., stopping TTS playback) are allowed.

    If this property was not set at Agent creation, but an ``AgentSession`` provides a value for
    allowing interruptions, the session's value will be used at runtime instead.

    Returns:
        NotGivenOr[bool]: Whether interruptions are permitted.
    """
    return self._allow_interruptions

Indicates whether interruptions (e.g., stopping TTS playback) are allowed.

If this property was not set at Agent creation, but an AgentSession provides a value for allowing interruptions, the session's value will be used at runtime instead.

Returns

NotGivenOr[bool]
Whether interruptions are permitted.
prop chat_ctx : llm.ChatContext
Expand source code
@property
def chat_ctx(self) -> llm.ChatContext:
    """
    Provides a read-only view of the agent's current chat context.

    Returns:
        llm.ChatContext: A read-only version of the agent's conversation history.

    See Also:
        update_chat_ctx: Method to update the internal chat context.
    """
    return _ReadOnlyChatContext(self._chat_ctx.items)

Provides a read-only view of the agent's current chat context.

Returns

llm.ChatContext
A read-only version of the agent's conversation history.

See Also: update_chat_ctx: Method to update the internal chat context.

prop instructions : str
Expand source code
@property
def instructions(self) -> str:
    """
    Returns:
        str: The core instructions that guide the agent's behavior.
    """
    return self._instructions

Returns

str
The core instructions that guide the agent's behavior.
prop llm : NotGivenOr[llm.LLM | llm.RealtimeModel | None]
Expand source code
@property
def llm(self) -> NotGivenOr[llm.LLM | llm.RealtimeModel | None]:
    """
    Retrieves the Language Model or RealtimeModel used for text generation.

    If this property was not set at Agent creation, but an ``AgentSession`` provides an LLM or RealtimeModel,
    the session's model will be used at runtime instead.

    Returns:
        NotGivenOr[llm.LLM | llm.RealtimeModel | None]: The language model for text generation.
    """  # noqa: E501
    return self._llm

Retrieves the Language Model or RealtimeModel used for text generation.

If this property was not set at Agent creation, but an AgentSession provides an LLM or RealtimeModel, the session's model will be used at runtime instead.

Returns

NotGivenOr[llm.LLM | llm.RealtimeModel | None]
The language model for text generation.
prop realtime_llm_session : llm.RealtimeSession
Expand source code
@property
def realtime_llm_session(self) -> llm.RealtimeSession:
    """
    Retrieve the realtime LLM session associated with the current agent.

    Raises:
        RuntimeError: If the agent is not running or the realtime LLM session is not available
    """
    if (rt_session := self._get_activity_or_raise().realtime_llm_session) is None:
        raise RuntimeError("no realtime LLM session")

    return rt_session

Retrieve the realtime LLM session associated with the current agent.

Raises

RuntimeError
If the agent is not running or the realtime LLM session is not available
prop sessionAgentSession
Expand source code
@property
def session(self) -> AgentSession:
    """
    Retrieve the VoiceAgent associated with the current agent.

    Raises:
        RuntimeError: If the agent is not running
    """
    return self._get_activity_or_raise().session

Retrieve the VoiceAgent associated with the current agent.

Raises

RuntimeError
If the agent is not running
prop stt : NotGivenOr[stt.STT | None]
Expand source code
@property
def stt(self) -> NotGivenOr[stt.STT | None]:
    """
    Retrieves the Speech-To-Text component for the agent.

    If this property was not set at Agent creation, but an ``AgentSession`` provides an STT component,
    the session's STT will be used at runtime instead.

    Returns:
        NotGivenOr[stt.STT | None]: An optional STT component.
    """  # noqa: E501
    return self._stt

Retrieves the Speech-To-Text component for the agent.

If this property was not set at Agent creation, but an AgentSession provides an STT component, the session's STT will be used at runtime instead.

Returns

NotGivenOr[stt.STT | None]
An optional STT component.
prop tools : list[llm.FunctionTool]
Expand source code
@property
def tools(self) -> list[llm.FunctionTool]:
    """
    Returns:
        list[llm.FunctionTool]: A list of function tools available to the agent.
    """
    return self._tools.copy()

Returns

list[llm.FunctionTool]
A list of function tools available to the agent.
prop tts : NotGivenOr[tts.TTS | None]
Expand source code
@property
def tts(self) -> NotGivenOr[tts.TTS | None]:
    """
    Retrieves the Text-To-Speech component for the agent.

    If this property was not set at Agent creation, but an ``AgentSession`` provides a TTS component,
    the session's TTS will be used at runtime instead.

    Returns:
        NotGivenOr[tts.TTS | None]: An optional TTS component for generating audio output.
    """  # noqa: E501
    return self._tts

Retrieves the Text-To-Speech component for the agent.

If this property was not set at Agent creation, but an AgentSession provides a TTS component, the session's TTS will be used at runtime instead.

Returns

NotGivenOr[tts.TTS | None]
An optional TTS component for generating audio output.
prop turn_detection : NotGivenOr[TurnDetectionMode | None]
Expand source code
@property
def turn_detection(self) -> NotGivenOr[TurnDetectionMode | None]:
    """
    Retrieves the turn detection mode for identifying conversational turns.

    If this property was not set at Agent creation, but an ``AgentSession`` provides a turn detection,
    the session's turn detection mode will be used at runtime instead.

    Returns:
        NotGivenOr[TurnDetectionMode | None]: An optional turn detection mode for managing conversation flow.
    """  # noqa: E501
    return self._turn_detection

Retrieves the turn detection mode for identifying conversational turns.

If this property was not set at Agent creation, but an AgentSession provides a turn detection, the session's turn detection mode will be used at runtime instead.

Returns

NotGivenOr[TurnDetectionMode | None]
An optional turn detection mode for managing conversation flow.
prop vad : NotGivenOr[vad.VAD | None]
Expand source code
@property
def vad(self) -> NotGivenOr[vad.VAD | None]:
    """
    Retrieves the Voice Activity Detection component for the agent.

    If this property was not set at Agent creation, but an ``AgentSession`` provides a VAD component,
    the session's VAD will be used at runtime instead.

    Returns:
        NotGivenOr[vad.VAD | None]: An optional VAD component for detecting voice activity.
    """  # noqa: E501
    return self._vad

Retrieves the Voice Activity Detection component for the agent.

If this property was not set at Agent creation, but an AgentSession provides a VAD component, the session's VAD will be used at runtime instead.

Returns

NotGivenOr[vad.VAD | None]
An optional VAD component for detecting voice activity.

Methods

def llm_node(self,
chat_ctx: llm.ChatContext,
tools: list[FunctionTool],
model_settings: ModelSettings) ‑> collections.abc.AsyncIterable[livekit.agents.llm.llm.ChatChunk | str] | collections.abc.Coroutine[typing.Any, typing.Any, collections.abc.AsyncIterable[livekit.agents.llm.llm.ChatChunk | str]] | collections.abc.Coroutine[typing.Any, typing.Any, str] | collections.abc.Coroutine[typing.Any, typing.Any, livekit.agents.llm.llm.ChatChunk] | collections.abc.Coroutine[typing.Any, typing.Any, None]
Expand source code
def llm_node(
    self,
    chat_ctx: llm.ChatContext,
    tools: list[FunctionTool],
    model_settings: ModelSettings,
) -> (
    AsyncIterable[llm.ChatChunk | str]
    | Coroutine[Any, Any, AsyncIterable[llm.ChatChunk | str]]
    | Coroutine[Any, Any, str]
    | Coroutine[Any, Any, llm.ChatChunk]
    | Coroutine[Any, Any, None]
):
    """
    A node in the processing pipeline that processes text generation with an LLM.

    By default, this node uses the agent's LLM to process the provided context. It may yield
    plain text (as `str`) for straightforward text generation, or `llm.ChatChunk` objects that
    can include text and optional tool calls. `ChatChunk` is helpful for capturing more complex
    outputs such as function calls, usage statistics, or other metadata.

    You can override this node to customize how the LLM is used or how tool invocations
    and responses are handled.

    Args:
        chat_ctx (llm.ChatContext): The context for the LLM (the conversation history).
        tools (list[FunctionTool]): A list of callable tools that the LLM may invoke.
        model_settings (ModelSettings): Configuration and parameters for model execution.

    Yields/Returns:
        str: Plain text output from the LLM.
        llm.ChatChunk: An object that can contain both text and optional tool calls.
    """
    return Agent.default.llm_node(self, chat_ctx, tools, model_settings)

A node in the processing pipeline that processes text generation with an LLM.

By default, this node uses the agent's LLM to process the provided context. It may yield plain text (as str) for straightforward text generation, or llm.ChatChunk objects that can include text and optional tool calls. ChatChunk is helpful for capturing more complex outputs such as function calls, usage statistics, or other metadata.

You can override this node to customize how the LLM is used or how tool invocations and responses are handled.

Args

chat_ctx : llm.ChatContext
The context for the LLM (the conversation history).
tools : list[FunctionTool]
A list of callable tools that the LLM may invoke.
model_settings : ModelSettings
Configuration and parameters for model execution.

Yields/Returns: str: Plain text output from the LLM. llm.ChatChunk: An object that can contain both text and optional tool calls.

async def on_enter(self) ‑> None
Expand source code
async def on_enter(self) -> None:
    """Called when the task is entered"""
    pass

Called when the task is entered

async def on_exit(self) ‑> None
Expand source code
async def on_exit(self) -> None:
    """Called when the task is exited"""
    pass

Called when the task is exited

async def on_user_turn_completed(self, turn_ctx: llm.ChatContext, new_message: llm.ChatMessage) ‑> None
Expand source code
async def on_user_turn_completed(
    self, turn_ctx: llm.ChatContext, new_message: llm.ChatMessage
) -> None:
    """Called when the user has finished speaking, and the LLM is about to respond

    This is a good opportunity to update the chat context or edit the new message before it is
    sent to the LLM.
    """
    pass

Called when the user has finished speaking, and the LLM is about to respond

This is a good opportunity to update the chat context or edit the new message before it is sent to the LLM.

def realtime_audio_output_node(self,
audio: AsyncIterable[rtc.AudioFrame],
model_settings: ModelSettings) ‑> collections.abc.AsyncIterable[AudioFrame] | collections.abc.Coroutine[typing.Any, typing.Any, collections.abc.AsyncIterable[AudioFrame]] | collections.abc.Coroutine[typing.Any, typing.Any, None]
Expand source code
def realtime_audio_output_node(
    self, audio: AsyncIterable[rtc.AudioFrame], model_settings: ModelSettings
) -> (
    AsyncIterable[rtc.AudioFrame]
    | Coroutine[Any, Any, AsyncIterable[rtc.AudioFrame]]
    | Coroutine[Any, Any, None]
):
    """A node processing the audio from the realtime LLM session before it is played out."""
    return Agent.default.realtime_audio_output_node(self, audio, model_settings)

A node processing the audio from the realtime LLM session before it is played out.

def stt_node(self,
audio: AsyncIterable[rtc.AudioFrame],
model_settings: ModelSettings) ‑> collections.abc.AsyncIterable[livekit.agents.stt.stt.SpeechEvent | str] | collections.abc.Coroutine[typing.Any, typing.Any, collections.abc.AsyncIterable[livekit.agents.stt.stt.SpeechEvent | str]] | collections.abc.Coroutine[typing.Any, typing.Any, None]
Expand source code
def stt_node(
    self, audio: AsyncIterable[rtc.AudioFrame], model_settings: ModelSettings
) -> (
    AsyncIterable[stt.SpeechEvent | str]
    | Coroutine[Any, Any, AsyncIterable[stt.SpeechEvent | str]]
    | Coroutine[Any, Any, None]
):
    """
    A node in the processing pipeline that transcribes audio frames into speech events.

    By default, this node uses a Speech-To-Text (STT) capability from the current agent.
    If the STT implementation does not support streaming natively, a VAD (Voice Activity
    Detection) mechanism is required to wrap the STT.

    You can override this node with your own implementation for more flexibility (e.g.,
    custom pre-processing of audio, additional buffering, or alternative STT strategies).

    Args:
        audio (AsyncIterable[rtc.AudioFrame]): An asynchronous stream of audio frames.
        model_settings (ModelSettings): Configuration and parameters for model execution.

    Yields:
        stt.SpeechEvent: An event containing transcribed text or other STT-related data.
    """
    return Agent.default.stt_node(self, audio, model_settings)

A node in the processing pipeline that transcribes audio frames into speech events.

By default, this node uses a Speech-To-Text (STT) capability from the current agent. If the STT implementation does not support streaming natively, a VAD (Voice Activity Detection) mechanism is required to wrap the STT.

You can override this node with your own implementation for more flexibility (e.g., custom pre-processing of audio, additional buffering, or alternative STT strategies).

Args

audio : AsyncIterable[rtc.AudioFrame]
An asynchronous stream of audio frames.
model_settings : ModelSettings
Configuration and parameters for model execution.

Yields

stt.SpeechEvent
An event containing transcribed text or other STT-related data.
def transcription_node(self,
text: AsyncIterable[str],
model_settings: ModelSettings) ‑> collections.abc.AsyncIterable[str] | collections.abc.Coroutine[typing.Any, typing.Any, collections.abc.AsyncIterable[str]] | collections.abc.Coroutine[typing.Any, typing.Any, None]
Expand source code
def transcription_node(
    self, text: AsyncIterable[str], model_settings: ModelSettings
) -> AsyncIterable[str] | Coroutine[Any, Any, AsyncIterable[str]] | Coroutine[Any, Any, None]:
    """
    A node in the processing pipeline that finalizes transcriptions from text segments.

    This node can be used to adjust or post-process text coming from an LLM (or any other
    source) into a final transcribed form. For instance, you might clean up formatting, fix
    punctuation, or perform any other text transformations here.

    You can override this node to customize post-processing logic according to your needs.

    Args:
        text (AsyncIterable[str]): An asynchronous stream of text segments.
        model_settings (ModelSettings): Configuration and parameters for model execution.

    Yields:
        str: Finalized or post-processed text segments.
    """
    return Agent.default.transcription_node(self, text, model_settings)

A node in the processing pipeline that finalizes transcriptions from text segments.

This node can be used to adjust or post-process text coming from an LLM (or any other source) into a final transcribed form. For instance, you might clean up formatting, fix punctuation, or perform any other text transformations here.

You can override this node to customize post-processing logic according to your needs.

Args

text : AsyncIterable[str]
An asynchronous stream of text segments.
model_settings : ModelSettings
Configuration and parameters for model execution.

Yields

str
Finalized or post-processed text segments.
def tts_node(self,
text: AsyncIterable[str],
model_settings: ModelSettings) ‑> collections.abc.AsyncGenerator[AudioFrame, None] | collections.abc.Coroutine[typing.Any, typing.Any, collections.abc.AsyncIterable[AudioFrame]] | collections.abc.Coroutine[typing.Any, typing.Any, None]
Expand source code
def tts_node(
    self, text: AsyncIterable[str], model_settings: ModelSettings
) -> (
    AsyncGenerator[rtc.AudioFrame, None]
    | Coroutine[Any, Any, AsyncIterable[rtc.AudioFrame]]
    | Coroutine[Any, Any, None]
):
    """
    A node in the processing pipeline that synthesizes audio from text segments.

    By default, this node converts incoming text into audio frames using the Text-To-Speech
    from the agent.
    If the TTS implementation does not support streaming natively, it uses a sentence tokenizer
    to split text for incremental synthesis.

    You can override this node to provide different text chunking behavior, a custom TTS engine,
    or any other specialized processing.

    Args:
        text (AsyncIterable[str]): An asynchronous stream of text segments to be synthesized.
        model_settings (ModelSettings): Configuration and parameters for model execution.

    Yields:
        rtc.AudioFrame: Audio frames synthesized from the provided text.
    """
    return Agent.default.tts_node(self, text, model_settings)

A node in the processing pipeline that synthesizes audio from text segments.

By default, this node converts incoming text into audio frames using the Text-To-Speech from the agent. If the TTS implementation does not support streaming natively, it uses a sentence tokenizer to split text for incremental synthesis.

You can override this node to provide different text chunking behavior, a custom TTS engine, or any other specialized processing.

Args

text : AsyncIterable[str]
An asynchronous stream of text segments to be synthesized.
model_settings : ModelSettings
Configuration and parameters for model execution.

Yields

rtc.AudioFrame
Audio frames synthesized from the provided text.
async def update_chat_ctx(self, chat_ctx: llm.ChatContext) ‑> None
Expand source code
async def update_chat_ctx(self, chat_ctx: llm.ChatContext) -> None:
    """
    Updates the agent's chat context.

    If the agent is running in realtime mode, this method also updates
    the chat context for the ongoing realtime session.

    Args:
        chat_ctx (llm.ChatContext):
            The new or updated chat context for the agent.

    Raises:
        llm.RealtimeError: If updating the realtime session chat context fails.
    """
    if self._activity is None:
        self._chat_ctx = chat_ctx.copy(tools=self._tools)
        return

    await self._activity.update_chat_ctx(chat_ctx)

Updates the agent's chat context.

If the agent is running in realtime mode, this method also updates the chat context for the ongoing realtime session.

Args

chat_ctx (llm.ChatContext): The new or updated chat context for the agent.

Raises

llm.RealtimeError
If updating the realtime session chat context fails.
async def update_instructions(self, instructions: str) ‑> None
Expand source code
async def update_instructions(self, instructions: str) -> None:
    """
    Updates the agent's instructions.

    If the agent is running in realtime mode, this method also updates
    the instructions for the ongoing realtime session.

    Args:
        instructions (str):
            The new instructions to set for the agent.

    Raises:
        llm.RealtimeError: If updating the realtime session instructions fails.
    """
    if self._activity is None:
        self._instructions = instructions
        return

    await self._activity.update_instructions(instructions)

Updates the agent's instructions.

If the agent is running in realtime mode, this method also updates the instructions for the ongoing realtime session.

Args

instructions (str): The new instructions to set for the agent.

Raises

llm.RealtimeError
If updating the realtime session instructions fails.
async def update_tools(self, tools: list[llm.FunctionTool]) ‑> None
Expand source code
async def update_tools(self, tools: list[llm.FunctionTool]) -> None:
    """
    Updates the agent's available function tools.

    If the agent is running in realtime mode, this method also updates
    the tools for the ongoing realtime session.

    Args:
        tools (list[llm.FunctionTool]):
            The new list of function tools available to the agent.

    Raises:
        llm.RealtimeError: If updating the realtime session tools fails.
    """
    if self._activity is None:
        self._tools = list(set(tools))
        self._chat_ctx = self._chat_ctx.copy(tools=self._tools)
        return

    await self._activity.update_tools(tools)

Updates the agent's available function tools.

If the agent is running in realtime mode, this method also updates the tools for the ongoing realtime session.

Args

tools (list[llm.FunctionTool]): The new list of function tools available to the agent.

Raises

llm.RealtimeError
If updating the realtime session tools fails.
class AgentSession (*,
turn_detection: NotGivenOr[TurnDetectionMode] = NOT_GIVEN,
stt: NotGivenOr[stt.STT] = NOT_GIVEN,
vad: NotGivenOr[vad.VAD] = NOT_GIVEN,
llm: NotGivenOr[llm.LLM | llm.RealtimeModel] = NOT_GIVEN,
tts: NotGivenOr[tts.TTS] = NOT_GIVEN,
userdata: NotGivenOr[Userdata_T] = NOT_GIVEN,
allow_interruptions: bool = True,
discard_audio_if_uninterruptible: bool = True,
min_interruption_duration: float = 0.5,
min_endpointing_delay: float = 0.5,
max_endpointing_delay: float = 6.0,
max_tool_steps: int = 3,
video_sampler: NotGivenOr[_VideoSampler | None] = NOT_GIVEN,
loop: asyncio.AbstractEventLoop | None = None)
Expand source code
class AgentSession(rtc.EventEmitter[EventTypes], Generic[Userdata_T]):
    def __init__(
        self,
        *,
        turn_detection: NotGivenOr[TurnDetectionMode] = NOT_GIVEN,
        stt: NotGivenOr[stt.STT] = NOT_GIVEN,
        vad: NotGivenOr[vad.VAD] = NOT_GIVEN,
        llm: NotGivenOr[llm.LLM | llm.RealtimeModel] = NOT_GIVEN,
        tts: NotGivenOr[tts.TTS] = NOT_GIVEN,
        userdata: NotGivenOr[Userdata_T] = NOT_GIVEN,
        allow_interruptions: bool = True,
        discard_audio_if_uninterruptible: bool = True,
        min_interruption_duration: float = 0.5,
        min_endpointing_delay: float = 0.5,
        max_endpointing_delay: float = 6.0,
        max_tool_steps: int = 3,
        video_sampler: NotGivenOr[_VideoSampler | None] = NOT_GIVEN,
        loop: asyncio.AbstractEventLoop | None = None,
    ) -> None:
        """`AgentSession` is the LiveKit Agents runtime that glues together
        media streams, speech/LLM components, and tool orchestration into a
        single real-time voice agent.

        It links audio, video, and text I/O with STT, VAD, TTS, and the LLM;
        handles turn detection, endpointing, interruptions, and multi-step
        tool calls; and exposes everything through event callbacks so you can
        focus on writing function tools and simple hand-offs rather than
        low-level streaming logic.

        Args:
            turn_detection (TurnDetectionMode, optional): Strategy for deciding
                when the user has finished speaking.

                * ``"stt"`` – rely on speech-to-text end-of-utterance cues
                * ``"vad"`` – rely on Voice Activity Detection start/stop cues
                * ``"realtime_llm"`` – use server-side detection from a
                  realtime LLM
                * ``"manual"`` – caller controls turn boundaries explicitly
                * ``_TurnDetector`` instance – plug-in custom detector

                If *NOT_GIVEN*, the session chooses the best available mode in
                priority order ``realtime_llm → vad → stt → manual``; it
                automatically falls back if the necessary model is missing.
            stt (stt.STT, optional): Speech-to-text backend.
            vad (vad.VAD, optional): Voice-activity detector
            llm (llm.LLM | llm.RealtimeModel, optional): LLM or RealtimeModel
            tts (tts.TTS, optional): Text-to-speech engine.
            userdata (Userdata_T, optional): Arbitrary per-session user data.
            allow_interruptions (bool): Whether the user can interrupt the
                agent mid-utterance. Default ``True``.
            discard_audio_if_uninterruptible (bool): When ``True``, buffered
                audio is dropped while the agent is speaking and cannot be
                interrupted. Default ``True``.
            min_interruption_duration (float): Minimum speech length (s) to
                register as an interruption. Default ``0.5`` s.
            min_endpointing_delay (float): Minimum time-in-seconds the agent
                must wait after a potential end-of-utterance signal (from VAD
                or an EOU model) before it declares the user’s turn complete.
                Default ``0.5`` s.
            max_endpointing_delay (float): Maximum time-in-seconds the agent
                will wait before terminating the turn. Default ``6.0`` s.
            max_tool_steps (int): Maximum consecutive tool calls per LLM turn.
                Default ``3``.
            video_sampler (_VideoSampler, optional): Uses
                :class:`VoiceActivityVideoSampler` when *NOT_GIVEN*; that sampler
                captures video at ~1 fps while the user is speaking and ~0.3 fps
                when silent by default.
            loop (asyncio.AbstractEventLoop, optional): Event loop to bind the
                session to. Falls back to :pyfunc:`asyncio.get_event_loop()`.
        """
        super().__init__()
        self._loop = loop or asyncio.get_event_loop()

        if not is_given(video_sampler):
            video_sampler = VoiceActivityVideoSampler(speaking_fps=1.0, silent_fps=0.3)

        self._video_sampler = video_sampler

        # This is the "global" chat_context, it holds the entire conversation history
        self._chat_ctx = ChatContext.empty()
        self._opts = VoiceOptions(
            allow_interruptions=allow_interruptions,
            discard_audio_if_uninterruptible=discard_audio_if_uninterruptible,
            min_interruption_duration=min_interruption_duration,
            min_endpointing_delay=min_endpointing_delay,
            max_endpointing_delay=max_endpointing_delay,
            max_tool_steps=max_tool_steps,
        )
        self._started = False
        self._turn_detection = turn_detection or None
        self._stt = stt or None
        self._vad = vad or None
        self._llm = llm or None
        self._tts = tts or None

        # configurable IO
        self._input = io.AgentInput(self._on_video_input_changed, self._on_audio_input_changed)
        self._output = io.AgentOutput(
            self._on_video_output_changed,
            self._on_audio_output_changed,
            self._on_text_output_changed,
        )

        self._forward_audio_atask: asyncio.Task | None = None
        self._update_activity_atask: asyncio.Task | None = None
        self._activity_lock = asyncio.Lock()
        self._lock = asyncio.Lock()

        # used to keep a reference to the room io
        # this is not exposed, if users want access to it, they can create their own RoomIO
        self._room_io: room_io.RoomIO | None = None

        self._agent: Agent | None = None
        self._activity: AgentActivity | None = None
        self._user_state: UserState = "listening"
        self._agent_state: AgentState = "initializing"

        self._userdata: Userdata_T | None = userdata if is_given(userdata) else None
        self._closing_task: asyncio.Task | None = None

    @property
    def userdata(self) -> Userdata_T:
        if self._userdata is None:
            raise ValueError("VoiceAgent userdata is not set")

        return self._userdata

    @userdata.setter
    def userdata(self, value: Userdata_T) -> None:
        self._userdata = value

    @property
    def turn_detection(self) -> TurnDetectionMode | None:
        return self._turn_detection

    @property
    def stt(self) -> stt.STT | None:
        return self._stt

    @property
    def llm(self) -> llm.LLM | llm.RealtimeModel | None:
        return self._llm

    @property
    def tts(self) -> tts.TTS | None:
        return self._tts

    @property
    def vad(self) -> vad.VAD | None:
        return self._vad

    @property
    def input(self) -> io.AgentInput:
        return self._input

    @property
    def output(self) -> io.AgentOutput:
        return self._output

    @property
    def options(self) -> VoiceOptions:
        return self._opts

    @property
    def history(self) -> llm.ChatContext:
        return self._chat_ctx

    @property
    def current_speech(self) -> SpeechHandle | None:
        return self._activity.current_speech if self._activity is not None else None

    @property
    def user_state(self) -> UserState:
        return self._user_state

    @property
    def agent_state(self) -> AgentState:
        return self._agent_state

    @property
    def current_agent(self) -> Agent:
        if self._agent is None:
            raise RuntimeError("VoiceAgent isn't running")

        return self._agent

    async def start(
        self,
        agent: Agent,
        *,
        room: NotGivenOr[rtc.Room] = NOT_GIVEN,
        room_input_options: NotGivenOr[room_io.RoomInputOptions] = NOT_GIVEN,
        room_output_options: NotGivenOr[room_io.RoomOutputOptions] = NOT_GIVEN,
    ) -> None:
        """Start the voice agent.

        Create a default RoomIO if the input or output audio is not already set.
        If the console flag is provided, start a ChatCLI.

        Args:
            room: The room to use for input and output
            room_input_options: Options for the room input
            room_output_options: Options for the room output
        """
        async with self._lock:
            if self._started:
                return

            self._agent = agent
            self._update_agent_state("initializing")

            if cli.CLI_ARGUMENTS is not None and cli.CLI_ARGUMENTS.console:
                from .chat_cli import ChatCLI

                if (
                    self.input.audio is not None
                    or self.output.audio is not None
                    or self.output.transcription is not None
                ):
                    logger.warning(
                        "agent started with the console subcommand, but input.audio or output.audio "  # noqa: E501
                        "or output.transcription is already set, overriding.."
                    )

                chat_cli = ChatCLI(self)
                await chat_cli.start()

            elif is_given(room) and not self._room_io:
                room_input_options = copy.deepcopy(room_input_options)
                room_output_options = copy.deepcopy(room_output_options)

                if (
                    self.input.audio is not None
                    and is_given(room_input_options)
                    and room_input_options.audio_enabled
                ):
                    logger.warning(
                        "RoomIO audio input is enabled but input.audio is already set, ignoring.."
                    )
                    room_input_options.audio_enabled = False

                if (
                    self.output.audio is not None
                    and is_given(room_output_options)
                    and room_output_options.audio_enabled
                ):
                    logger.warning(
                        "RoomIO audio output is enabled but output.audio is already set, ignoring.."
                    )
                    room_output_options.audio_enabled = False

                if (
                    self.output.transcription is not None
                    and is_given(room_output_options)
                    and room_output_options.transcription_enabled
                ):
                    logger.warning(
                        "RoomIO transcription output is enabled but output.transcription is already set, ignoring.."  # noqa: E501
                    )
                    room_output_options.transcription_enabled = False

                self._room_io = room_io.RoomIO(
                    room=room,
                    agent_session=self,
                    input_options=(room_input_options or room_io.DEFAULT_ROOM_INPUT_OPTIONS),
                    output_options=(room_output_options or room_io.DEFAULT_ROOM_OUTPUT_OPTIONS),
                )
                await self._room_io.start()

            else:
                if not self._room_io and not self.output.audio and not self.output.transcription:
                    logger.warning(
                        "session starts without output, forgetting to pass `room` to `AgentSession.start()`?"  # noqa: E501
                    )

            try:
                job_ctx = get_job_context()
                job_ctx.add_tracing_callback(self._trace_chat_ctx)
            except RuntimeError:
                pass  # ignore

            # it is ok to await it directly, there is no previous task to drain
            await self._update_activity_task(self._agent)

            # important: no await should be done after this!

            if self.input.audio is not None:
                self._forward_audio_atask = asyncio.create_task(
                    self._forward_audio_task(), name="_forward_audio_task"
                )

            if self.input.video is not None:
                self._forward_video_atask = asyncio.create_task(
                    self._forward_video_task(), name="_forward_video_task"
                )

            self._started = True
            self._update_agent_state("listening")

    async def _trace_chat_ctx(self) -> None:
        if self._activity is None:
            return  # can happen at startup

        chat_ctx = self._activity.agent.chat_ctx
        debug.Tracing.store_kv("chat_ctx", chat_ctx.to_dict(exclude_function_call=False))
        debug.Tracing.store_kv("history", self.history.to_dict(exclude_function_call=False))

    async def drain(self) -> None:
        if self._activity is None:
            raise RuntimeError("AgentSession isn't running")

        await self._activity.drain()

    async def _aclose_impl(
        self,
        *,
        error: llm.LLMError | stt.STTError | tts.TTSError | None = None,
    ) -> None:
        async with self._lock:
            if not self._started:
                return

            self.emit("close", CloseEvent(error=error))

            if self._activity is not None:
                await self._activity.aclose()

            if self._forward_audio_atask is not None:
                await utils.aio.cancel_and_wait(self._forward_audio_atask)

            if self._room_io:
                await self._room_io.aclose()

    async def aclose(self) -> None:
        await self._aclose_impl()

    def emit(self, event: EventTypes, ev: AgentEvent) -> None:  # type: ignore
        # don't log VAD metrics as they are too verbose
        if ev.type != "metrics_collected" or ev.metrics.type != "vad_metrics":
            debug.Tracing.log_event(f'agent.on("{event}")', ev.model_dump())

        return super().emit(event, ev)

    def update_options(self) -> None:
        pass

    def say(
        self,
        text: str | AsyncIterable[str],
        *,
        audio: NotGivenOr[AsyncIterable[rtc.AudioFrame]] = NOT_GIVEN,
        allow_interruptions: NotGivenOr[bool] = NOT_GIVEN,
        add_to_chat_ctx: bool = True,
    ) -> SpeechHandle:
        if self._activity is None:
            raise RuntimeError("AgentSession isn't running")

        if self._activity.draining:
            if self._next_activity is None:
                raise RuntimeError("AgentSession is closing, cannot use say()")

            return self._next_activity.say(
                text,
                audio=audio,
                allow_interruptions=allow_interruptions,
                add_to_chat_ctx=add_to_chat_ctx,
            )

        return self._activity.say(
            text,
            audio=audio,
            allow_interruptions=allow_interruptions,
            add_to_chat_ctx=add_to_chat_ctx,
        )

    def generate_reply(
        self,
        *,
        user_input: NotGivenOr[str] = NOT_GIVEN,
        instructions: NotGivenOr[str] = NOT_GIVEN,
        tool_choice: NotGivenOr[llm.ToolChoice] = NOT_GIVEN,
        allow_interruptions: NotGivenOr[bool] = NOT_GIVEN,
    ) -> SpeechHandle:
        """Generate a reply for the agent to speak to the user.

        Args:
            user_input (NotGivenOr[str], optional): The user's input that may influence the reply,
                such as answering a question.
            instructions (NotGivenOr[str], optional): Additional instructions for generating the reply.
            tool_choice (NotGivenOr[llm.ToolChoice], optional): Specifies the external tool to use when
                generating the reply. If generate_reply is invoked within a function_tool, defaults to "none".
            allow_interruptions (NotGivenOr[bool], optional): Indicates whether the user can interrupt this speech.

        Returns:
            SpeechHandle: A handle to the generated reply.
        """  # noqa: E501
        if self._activity is None:
            raise RuntimeError("AgentSession isn't running")

        user_message = (
            llm.ChatMessage(role="user", content=[user_input])
            if is_given(user_input)
            else NOT_GIVEN
        )

        if self._activity.draining:
            if self._next_activity is None:
                raise RuntimeError("AgentSession is closing, cannot use generate_reply()")

            return self._next_activity._generate_reply(
                user_message=user_message,
                instructions=instructions,
                tool_choice=tool_choice,
                allow_interruptions=allow_interruptions,
            )

        return self._activity._generate_reply(
            user_message=user_message,
            instructions=instructions,
            tool_choice=tool_choice,
            allow_interruptions=allow_interruptions,
        )

    def interrupt(self) -> asyncio.Future:
        """Interrupt the current speech generation.

        Returns:
            An asyncio.Future that completes when the interruption is fully processed
            and chat context has been updated.

        Example:
            ```python
            await session.interrupt()
            ```
        """
        if self._activity is None:
            raise RuntimeError("AgentSession isn't running")

        return self._activity.interrupt()

    def clear_user_turn(self) -> None:
        # clear the transcription or input audio buffer of the user turn
        if self._activity is None:
            raise RuntimeError("AgentSession isn't running")

        self._activity.clear_user_turn()

    def commit_user_turn(self) -> None:
        # commit the user turn and generate a reply
        if self._activity is None:
            raise RuntimeError("AgentSession isn't running")

        self._activity.commit_user_turn()

    def update_agent(self, agent: Agent) -> None:
        self._agent = agent

        if self._started:
            self._update_activity_atask = asyncio.create_task(
                self._update_activity_task(self._agent), name="_update_activity_task"
            )

    def _on_error(
        self,
        error: llm.LLMError | stt.STTError | tts.TTSError | llm.RealtimeModelError,
    ) -> None:
        if self._closing_task or error.recoverable:
            return

        async def drain_and_close() -> None:
            await self.drain()
            await self._aclose_impl(error=error)

        def on_close_done(_: asyncio.Task) -> None:
            self._closing_task = None

        self._closing_task = asyncio.create_task(drain_and_close())
        self._closing_task.add_done_callback(on_close_done)

    @utils.log_exceptions(logger=logger)
    async def _update_activity_task(self, task: Agent) -> None:
        async with self._activity_lock:
            self._next_activity = AgentActivity(task, self)

            if self._activity is not None:
                await self._activity.drain()
                await self._activity.aclose()

            self._activity = self._next_activity
            self._next_activity = None
            await self._activity.start()

    @utils.log_exceptions(logger=logger)
    async def _forward_audio_task(self) -> None:
        audio_input = self.input.audio
        if audio_input is None:
            return

        async for frame in audio_input:
            if self._activity is not None:
                self._activity.push_audio(frame)

    @utils.log_exceptions(logger=logger)
    async def _forward_video_task(self) -> None:
        video_input = self.input.video
        if video_input is None:
            return

        async for frame in video_input:
            if self._activity is not None:
                if self._video_sampler is not None and not self._video_sampler(frame, self):
                    continue  # ignore this frame

                self._activity.push_video(frame)

    def _update_agent_state(self, state: AgentState) -> None:
        if self._agent_state == state:
            return

        old_state = self._agent_state
        self._agent_state = state
        self.emit(
            "agent_state_changed", AgentStateChangedEvent(old_state=old_state, new_state=state)
        )

    def _update_user_state(self, state: UserState) -> None:
        if self._user_state == state:
            return

        old_state = self._user_state
        self._user_state = state
        self.emit("user_state_changed", UserStateChangedEvent(old_state=old_state, new_state=state))

    def _conversation_item_added(self, message: llm.ChatMessage) -> None:
        self._chat_ctx.items.append(message)
        self.emit("conversation_item_added", ConversationItemAddedEvent(item=message))

    # -- User changed input/output streams/sinks --

    def _on_video_input_changed(self) -> None:
        if not self._started:
            return

        if self._forward_video_atask is not None:
            self._forward_video_atask.cancel()

        self._forward_video_atask = asyncio.create_task(
            self._forward_video_task(), name="_forward_video_task"
        )

    def _on_audio_input_changed(self) -> None:
        if not self._started:
            return

        if self._forward_audio_atask is not None:
            self._forward_audio_atask.cancel()

        self._forward_audio_atask = asyncio.create_task(
            self._forward_audio_task(), name="_forward_audio_task"
        )

    def _on_video_output_changed(self) -> None:
        pass

    def _on_audio_output_changed(self) -> None:
        pass

    def _on_text_output_changed(self) -> None:
        pass

    # ---

Abstract base class for generic types.

On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::

class Mapping[KT, VT]:
    def __getitem__(self, key: KT) -> VT:
        ...
    # Etc.

On older versions of Python, however, generic classes have to explicitly inherit from Generic.

After a class has been declared to be generic, it can then be used as follows::

def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT:
    try:
        return mapping[key]
    except KeyError:
        return default

AgentSession is the LiveKit Agents runtime that glues together media streams, speech/LLM components, and tool orchestration into a single real-time voice agent.

It links audio, video, and text I/O with STT, VAD, TTS, and the LLM; handles turn detection, endpointing, interruptions, and multi-step tool calls; and exposes everything through event callbacks so you can focus on writing function tools and simple hand-offs rather than low-level streaming logic.

Args

turn_detection : TurnDetectionMode, optional

Strategy for deciding when the user has finished speaking.

  • "stt" – rely on speech-to-text end-of-utterance cues
  • "vad" – rely on Voice Activity Detection start/stop cues
  • "realtime_llm" – use server-side detection from a realtime LLM
  • "manual" – caller controls turn boundaries explicitly
  • _TurnDetector instance – plug-in custom detector

If NOT_GIVEN, the session chooses the best available mode in priority order realtime_llm → vad → stt → manual; it automatically falls back if the necessary model is missing.

stt : stt.STT, optional
Speech-to-text backend.
vad : vad.VAD, optional
Voice-activity detector
llm : llm.LLM | llm.RealtimeModel, optional
LLM or RealtimeModel
tts : tts.TTS, optional
Text-to-speech engine.
userdata : Userdata_T, optional
Arbitrary per-session user data.
allow_interruptions : bool
Whether the user can interrupt the agent mid-utterance. Default True.
discard_audio_if_uninterruptible : bool
When True, buffered audio is dropped while the agent is speaking and cannot be interrupted. Default True.
min_interruption_duration : float
Minimum speech length (s) to register as an interruption. Default 0.5 s.
min_endpointing_delay : float
Minimum time-in-seconds the agent must wait after a potential end-of-utterance signal (from VAD or an EOU model) before it declares the user’s turn complete. Default 0.5 s.
max_endpointing_delay : float
Maximum time-in-seconds the agent will wait before terminating the turn. Default 6.0 s.
max_tool_steps : int
Maximum consecutive tool calls per LLM turn. Default 3.
video_sampler : _VideoSampler, optional
Uses :class:VoiceActivityVideoSampler when NOT_GIVEN; that sampler captures video at ~1 fps while the user is speaking and ~0.3 fps when silent by default.
loop : asyncio.AbstractEventLoop, optional
Event loop to bind the session to. Falls back to :pyfunc:asyncio.get_event_loop().

Ancestors

Instance variables

prop agent_state : AgentState
Expand source code
@property
def agent_state(self) -> AgentState:
    return self._agent_state
prop current_agentAgent
Expand source code
@property
def current_agent(self) -> Agent:
    if self._agent is None:
        raise RuntimeError("VoiceAgent isn't running")

    return self._agent
prop current_speech : SpeechHandle | None
Expand source code
@property
def current_speech(self) -> SpeechHandle | None:
    return self._activity.current_speech if self._activity is not None else None
prop history : llm.ChatContext
Expand source code
@property
def history(self) -> llm.ChatContext:
    return self._chat_ctx
prop input : io.AgentInput
Expand source code
@property
def input(self) -> io.AgentInput:
    return self._input
prop llm : llm.LLM | llm.RealtimeModel | None
Expand source code
@property
def llm(self) -> llm.LLM | llm.RealtimeModel | None:
    return self._llm
prop options : VoiceOptions
Expand source code
@property
def options(self) -> VoiceOptions:
    return self._opts
prop output : io.AgentOutput
Expand source code
@property
def output(self) -> io.AgentOutput:
    return self._output
prop stt : stt.STT | None
Expand source code
@property
def stt(self) -> stt.STT | None:
    return self._stt
prop tts : tts.TTS | None
Expand source code
@property
def tts(self) -> tts.TTS | None:
    return self._tts
prop turn_detection : TurnDetectionMode | None
Expand source code
@property
def turn_detection(self) -> TurnDetectionMode | None:
    return self._turn_detection
prop user_state : UserState
Expand source code
@property
def user_state(self) -> UserState:
    return self._user_state
prop userdata : Userdata_T
Expand source code
@property
def userdata(self) -> Userdata_T:
    if self._userdata is None:
        raise ValueError("VoiceAgent userdata is not set")

    return self._userdata
prop vad : vad.VAD | None
Expand source code
@property
def vad(self) -> vad.VAD | None:
    return self._vad

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    await self._aclose_impl()
def clear_user_turn(self) ‑> None
Expand source code
def clear_user_turn(self) -> None:
    # clear the transcription or input audio buffer of the user turn
    if self._activity is None:
        raise RuntimeError("AgentSession isn't running")

    self._activity.clear_user_turn()
def commit_user_turn(self) ‑> None
Expand source code
def commit_user_turn(self) -> None:
    # commit the user turn and generate a reply
    if self._activity is None:
        raise RuntimeError("AgentSession isn't running")

    self._activity.commit_user_turn()
async def drain(self) ‑> None
Expand source code
async def drain(self) -> None:
    if self._activity is None:
        raise RuntimeError("AgentSession isn't running")

    await self._activity.drain()
def generate_reply(self,
*,
user_input: NotGivenOr[str] = NOT_GIVEN,
instructions: NotGivenOr[str] = NOT_GIVEN,
tool_choice: NotGivenOr[llm.ToolChoice] = NOT_GIVEN,
allow_interruptions: NotGivenOr[bool] = NOT_GIVEN) ‑> livekit.agents.voice.speech_handle.SpeechHandle
Expand source code
def generate_reply(
    self,
    *,
    user_input: NotGivenOr[str] = NOT_GIVEN,
    instructions: NotGivenOr[str] = NOT_GIVEN,
    tool_choice: NotGivenOr[llm.ToolChoice] = NOT_GIVEN,
    allow_interruptions: NotGivenOr[bool] = NOT_GIVEN,
) -> SpeechHandle:
    """Generate a reply for the agent to speak to the user.

    Args:
        user_input (NotGivenOr[str], optional): The user's input that may influence the reply,
            such as answering a question.
        instructions (NotGivenOr[str], optional): Additional instructions for generating the reply.
        tool_choice (NotGivenOr[llm.ToolChoice], optional): Specifies the external tool to use when
            generating the reply. If generate_reply is invoked within a function_tool, defaults to "none".
        allow_interruptions (NotGivenOr[bool], optional): Indicates whether the user can interrupt this speech.

    Returns:
        SpeechHandle: A handle to the generated reply.
    """  # noqa: E501
    if self._activity is None:
        raise RuntimeError("AgentSession isn't running")

    user_message = (
        llm.ChatMessage(role="user", content=[user_input])
        if is_given(user_input)
        else NOT_GIVEN
    )

    if self._activity.draining:
        if self._next_activity is None:
            raise RuntimeError("AgentSession is closing, cannot use generate_reply()")

        return self._next_activity._generate_reply(
            user_message=user_message,
            instructions=instructions,
            tool_choice=tool_choice,
            allow_interruptions=allow_interruptions,
        )

    return self._activity._generate_reply(
        user_message=user_message,
        instructions=instructions,
        tool_choice=tool_choice,
        allow_interruptions=allow_interruptions,
    )

Generate a reply for the agent to speak to the user.

Args

user_input : NotGivenOr[str], optional
The user's input that may influence the reply, such as answering a question.
instructions : NotGivenOr[str], optional
Additional instructions for generating the reply.
tool_choice : NotGivenOr[llm.ToolChoice], optional
Specifies the external tool to use when generating the reply. If generate_reply is invoked within a function_tool, defaults to "none".
allow_interruptions : NotGivenOr[bool], optional
Indicates whether the user can interrupt this speech.

Returns

SpeechHandle
A handle to the generated reply.
def interrupt(self) ‑> _asyncio.Future
Expand source code
def interrupt(self) -> asyncio.Future:
    """Interrupt the current speech generation.

    Returns:
        An asyncio.Future that completes when the interruption is fully processed
        and chat context has been updated.

    Example:
        ```python
        await session.interrupt()
        ```
    """
    if self._activity is None:
        raise RuntimeError("AgentSession isn't running")

    return self._activity.interrupt()

Interrupt the current speech generation.

Returns

An asyncio.Future that completes when the interruption is fully processed and chat context has been updated.

Example

await session.interrupt()
def say(self,
text: str | AsyncIterable[str],
*,
audio: NotGivenOr[AsyncIterable[rtc.AudioFrame]] = NOT_GIVEN,
allow_interruptions: NotGivenOr[bool] = NOT_GIVEN,
add_to_chat_ctx: bool = True) ‑> livekit.agents.voice.speech_handle.SpeechHandle
Expand source code
def say(
    self,
    text: str | AsyncIterable[str],
    *,
    audio: NotGivenOr[AsyncIterable[rtc.AudioFrame]] = NOT_GIVEN,
    allow_interruptions: NotGivenOr[bool] = NOT_GIVEN,
    add_to_chat_ctx: bool = True,
) -> SpeechHandle:
    if self._activity is None:
        raise RuntimeError("AgentSession isn't running")

    if self._activity.draining:
        if self._next_activity is None:
            raise RuntimeError("AgentSession is closing, cannot use say()")

        return self._next_activity.say(
            text,
            audio=audio,
            allow_interruptions=allow_interruptions,
            add_to_chat_ctx=add_to_chat_ctx,
        )

    return self._activity.say(
        text,
        audio=audio,
        allow_interruptions=allow_interruptions,
        add_to_chat_ctx=add_to_chat_ctx,
    )
async def start(self,
agent: Agent,
*,
room: NotGivenOr[rtc.Room] = NOT_GIVEN,
room_input_options: NotGivenOr[room_io.RoomInputOptions] = NOT_GIVEN,
room_output_options: NotGivenOr[room_io.RoomOutputOptions] = NOT_GIVEN) ‑> None
Expand source code
async def start(
    self,
    agent: Agent,
    *,
    room: NotGivenOr[rtc.Room] = NOT_GIVEN,
    room_input_options: NotGivenOr[room_io.RoomInputOptions] = NOT_GIVEN,
    room_output_options: NotGivenOr[room_io.RoomOutputOptions] = NOT_GIVEN,
) -> None:
    """Start the voice agent.

    Create a default RoomIO if the input or output audio is not already set.
    If the console flag is provided, start a ChatCLI.

    Args:
        room: The room to use for input and output
        room_input_options: Options for the room input
        room_output_options: Options for the room output
    """
    async with self._lock:
        if self._started:
            return

        self._agent = agent
        self._update_agent_state("initializing")

        if cli.CLI_ARGUMENTS is not None and cli.CLI_ARGUMENTS.console:
            from .chat_cli import ChatCLI

            if (
                self.input.audio is not None
                or self.output.audio is not None
                or self.output.transcription is not None
            ):
                logger.warning(
                    "agent started with the console subcommand, but input.audio or output.audio "  # noqa: E501
                    "or output.transcription is already set, overriding.."
                )

            chat_cli = ChatCLI(self)
            await chat_cli.start()

        elif is_given(room) and not self._room_io:
            room_input_options = copy.deepcopy(room_input_options)
            room_output_options = copy.deepcopy(room_output_options)

            if (
                self.input.audio is not None
                and is_given(room_input_options)
                and room_input_options.audio_enabled
            ):
                logger.warning(
                    "RoomIO audio input is enabled but input.audio is already set, ignoring.."
                )
                room_input_options.audio_enabled = False

            if (
                self.output.audio is not None
                and is_given(room_output_options)
                and room_output_options.audio_enabled
            ):
                logger.warning(
                    "RoomIO audio output is enabled but output.audio is already set, ignoring.."
                )
                room_output_options.audio_enabled = False

            if (
                self.output.transcription is not None
                and is_given(room_output_options)
                and room_output_options.transcription_enabled
            ):
                logger.warning(
                    "RoomIO transcription output is enabled but output.transcription is already set, ignoring.."  # noqa: E501
                )
                room_output_options.transcription_enabled = False

            self._room_io = room_io.RoomIO(
                room=room,
                agent_session=self,
                input_options=(room_input_options or room_io.DEFAULT_ROOM_INPUT_OPTIONS),
                output_options=(room_output_options or room_io.DEFAULT_ROOM_OUTPUT_OPTIONS),
            )
            await self._room_io.start()

        else:
            if not self._room_io and not self.output.audio and not self.output.transcription:
                logger.warning(
                    "session starts without output, forgetting to pass `room` to `AgentSession.start()`?"  # noqa: E501
                )

        try:
            job_ctx = get_job_context()
            job_ctx.add_tracing_callback(self._trace_chat_ctx)
        except RuntimeError:
            pass  # ignore

        # it is ok to await it directly, there is no previous task to drain
        await self._update_activity_task(self._agent)

        # important: no await should be done after this!

        if self.input.audio is not None:
            self._forward_audio_atask = asyncio.create_task(
                self._forward_audio_task(), name="_forward_audio_task"
            )

        if self.input.video is not None:
            self._forward_video_atask = asyncio.create_task(
                self._forward_video_task(), name="_forward_video_task"
            )

        self._started = True
        self._update_agent_state("listening")

Start the voice agent.

Create a default RoomIO if the input or output audio is not already set. If the console flag is provided, start a ChatCLI.

Args

room
The room to use for input and output
room_input_options
Options for the room input
room_output_options
Options for the room output
def update_agent(self,
agent: Agent) ‑> None
Expand source code
def update_agent(self, agent: Agent) -> None:
    self._agent = agent

    if self._started:
        self._update_activity_atask = asyncio.create_task(
            self._update_activity_task(self._agent), name="_update_activity_task"
        )
def update_options(self) ‑> None
Expand source code
def update_options(self) -> None:
    pass

Inherited members

class AgentStateChangedEvent (**data: Any)
Expand source code
class AgentStateChangedEvent(BaseModel):
    type: Literal["agent_state_changed"] = "agent_state_changed"
    old_state: AgentState
    new_state: AgentState

Usage docs: https://docs.pydantic.dev/2.10/concepts/models/

A base class for creating Pydantic models.

Attributes

__class_vars__
The names of the class variables defined on the model.
__private_attributes__
Metadata about the private attributes of the model.
__signature__
The synthesized __init__ [Signature][inspect.Signature] of the model.
__pydantic_complete__
Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__
The core schema of the model.
__pydantic_custom_init__
Whether the model has a custom __init__ function.
__pydantic_decorators__
Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
__pydantic_generic_metadata__
Metadata for generic models; contains data used for a similar purpose to args, origin, parameters in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__
Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__
The name of the post-init method for the model, if defined.
__pydantic_root_model__
Whether the model is a [RootModel][pydantic.root_model.RootModel].
__pydantic_serializer__
The pydantic-core SchemaSerializer used to dump instances of the model.
__pydantic_validator__
The pydantic-core SchemaValidator used to validate instances of the model.
__pydantic_fields__
A dictionary of field names and their corresponding [FieldInfo][pydantic.fields.FieldInfo] objects.
__pydantic_computed_fields__
A dictionary of computed field names and their corresponding [ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects.
__pydantic_extra__
A dictionary containing extra values, if [extra][pydantic.config.ConfigDict.extra] is set to 'allow'.
__pydantic_fields_set__
The names of fields explicitly set during instantiation.
__pydantic_private__
Values of private attributes set on the model instance.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • pydantic.main.BaseModel

Class variables

var model_config
var new_state : Literal['initializing', 'idle', 'listening', 'thinking', 'speaking']
var old_state : Literal['initializing', 'idle', 'listening', 'thinking', 'speaking']
var type : Literal['agent_state_changed']
class AssignmentTimeoutError (*args, **kwargs)
Expand source code
class AssignmentTimeoutError(Exception):
    """Raised when accepting a job but not receiving an assignment within the specified timeout.
    The server may have chosen another worker to handle this job."""

    pass

Raised when accepting a job but not receiving an assignment within the specified timeout. The server may have chosen another worker to handle this job.

Ancestors

  • builtins.Exception
  • builtins.BaseException
class AudioConfig (source: ForwardRef('AudioSource'),
volume: ForwardRef('float') = 1.0,
probability: ForwardRef('float') = 1.0)
Expand source code
class AudioConfig(NamedTuple):
    """
    Definition for the audio to be played in the background

    Args:
        volume: The volume of the audio (0.0-1.0)
        probability: The probability of the audio being played, when multiple
            AudioConfigs are provided (0.0-1.0)
    """

    source: AudioSource
    volume: float = 1.0
    probability: float = 1.0

Definition for the audio to be played in the background

Args

volume
The volume of the audio (0.0-1.0)
probability
The probability of the audio being played, when multiple AudioConfigs are provided (0.0-1.0)

Ancestors

  • builtins.tuple

Instance variables

var probability : float
Expand source code
class AudioConfig(NamedTuple):
    """
    Definition for the audio to be played in the background

    Args:
        volume: The volume of the audio (0.0-1.0)
        probability: The probability of the audio being played, when multiple
            AudioConfigs are provided (0.0-1.0)
    """

    source: AudioSource
    volume: float = 1.0
    probability: float = 1.0

Alias for field number 2

var source : AsyncIterator[AudioFrame] | str | livekit.agents.voice.background_audio.BuiltinAudioClip
Expand source code
class AudioConfig(NamedTuple):
    """
    Definition for the audio to be played in the background

    Args:
        volume: The volume of the audio (0.0-1.0)
        probability: The probability of the audio being played, when multiple
            AudioConfigs are provided (0.0-1.0)
    """

    source: AudioSource
    volume: float = 1.0
    probability: float = 1.0

Alias for field number 0

var volume : float
Expand source code
class AudioConfig(NamedTuple):
    """
    Definition for the audio to be played in the background

    Args:
        volume: The volume of the audio (0.0-1.0)
        probability: The probability of the audio being played, when multiple
            AudioConfigs are provided (0.0-1.0)
    """

    source: AudioSource
    volume: float = 1.0
    probability: float = 1.0

Alias for field number 1

class AutoSubscribe (*args, **kwds)
Expand source code
class AutoSubscribe(str, Enum):
    SUBSCRIBE_ALL = "subscribe_all"
    SUBSCRIBE_NONE = "subscribe_none"
    AUDIO_ONLY = "audio_only"
    VIDEO_ONLY = "video_only"

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

Ancestors

  • builtins.str
  • enum.Enum

Class variables

var AUDIO_ONLY
var SUBSCRIBE_ALL
var SUBSCRIBE_NONE
var VIDEO_ONLY
class BackgroundAudioPlayer (*,
ambient_sound: NotGivenOr[AudioSource | AudioConfig | list[AudioConfig] | None] = NOT_GIVEN,
thinking_sound: NotGivenOr[AudioSource | AudioConfig | list[AudioConfig] | None] = NOT_GIVEN)
Expand source code
class BackgroundAudioPlayer:
    def __init__(
        self,
        *,
        ambient_sound: NotGivenOr[AudioSource | AudioConfig | list[AudioConfig] | None] = NOT_GIVEN,
        thinking_sound: NotGivenOr[
            AudioSource | AudioConfig | list[AudioConfig] | None
        ] = NOT_GIVEN,
    ) -> None:
        """
        Initializes the BackgroundAudio component with optional ambient and thinking sounds.

        This component creates and publishes a continuous audio track to a LiveKit room while managing
        the playback of ambient and agent “thinking” sounds. It supports three types of audio sources:
        - A BuiltinAudioClip enum value, which will use a pre-defined sound from the package resources
        - A file path (string) pointing to an audio file, which can be looped.
        - An AsyncIterator that yields rtc.AudioFrame

        When a list (or AudioConfig) is supplied, the component considers each sound’s volume and probability:
        - The probability value determines the chance that a particular sound is selected for playback.
        - A total probability below 1.0 means there is a chance no sound will be selected (resulting in silence).

        Args:
            ambient_sound (NotGivenOr[Union[AudioSource, AudioConfig, List[AudioConfig], None]], optional):
                The ambient sound to be played continuously. For file paths, the sound will be looped.
                For AsyncIterator sources, ensure the iterator is infinite or looped.

            thinking_sound (NotGivenOr[Union[AudioSource, AudioConfig, List[AudioConfig], None]], optional):
                The sound to be played when the associated agent enters a “thinking” state. This can be a single
                sound source or a list of AudioConfig objects (with volume and probability settings).

        """  # noqa: E501

        self._ambient_sound = ambient_sound if is_given(ambient_sound) else None
        self._thinking_sound = thinking_sound if is_given(thinking_sound) else None

        self._audio_source = rtc.AudioSource(48000, 1, queue_size_ms=_AUDIO_SOURCE_BUFFER_MS)
        self._audio_mixer = rtc.AudioMixer(48000, 1, blocksize=4800, capacity=1)
        self._publication: rtc.LocalTrackPublication | None = None
        self._lock = asyncio.Lock()

        self._republish_task: asyncio.Task | None = None  # republish the task on reconnect
        self._mixer_atask: asyncio.Task | None = None

        self._play_tasks: list[asyncio.Task] = []

        self._ambient_handle: PlayHandle | None = None
        self._thinking_handle: PlayHandle | None = None

    def _select_sound_from_list(self, sounds: list[AudioConfig]) -> AudioConfig | None:
        """
        Selects a sound from a list of BackgroundSound based on their probabilities.
        Returns None if no sound is selected (when sum of probabilities < 1.0).
        """
        total_probability = sum(sound.probability for sound in sounds)
        if total_probability <= 0:
            return None

        if total_probability < 1.0 and random.random() > total_probability:
            return None

        normalize_factor = 1.0 if total_probability <= 1.0 else total_probability
        r = random.random() * min(total_probability, 1.0)
        cumulative = 0.0

        for sound in sounds:
            if sound.probability <= 0:
                continue

            norm_prob = sound.probability / normalize_factor
            cumulative += norm_prob

            if r <= cumulative:
                return sound

        return sounds[-1]

    def _normalize_sound_source(
        self, source: AudioSource | AudioConfig | list[AudioConfig] | None
    ) -> tuple[AudioSource, float] | None:
        if source is None:
            return None

        if isinstance(source, BuiltinAudioClip):
            return self._normalize_builtin_audio(source), 1.0
        elif isinstance(source, list):
            selected = self._select_sound_from_list(cast(list[AudioConfig], source))
            if selected is None:
                return None
            return selected.source, selected.volume
        elif isinstance(source, AudioConfig):
            return self._normalize_builtin_audio(source.source), source.volume

        return source, 1.0

    def _normalize_builtin_audio(self, source: AudioSource) -> AsyncIterator[rtc.AudioFrame] | str:
        if isinstance(source, BuiltinAudioClip):
            return source.path()
        else:
            return source

    def play(
        self,
        audio: AudioSource | AudioConfig | list[AudioConfig],
        *,
        loop: bool = False,
    ) -> PlayHandle:
        """
        Plays an audio once or in a loop.

        Args:
            audio (Union[AudioSource, AudioConfig, List[AudioConfig]]):
                The audio to play. Can be:
                - A string pointing to a file path
                - An AsyncIterator that yields `rtc.AudioFrame`
                - An AudioConfig object with volume and probability
                - A list of AudioConfig objects, where one will be selected based on probability

                If a string is provided and `loop` is True, the sound will be looped.
                If an AsyncIterator is provided, it is played until exhaustion (and cannot be looped
                automatically).
            loop (bool, optional):
                Whether to loop the audio. Only applicable if `audio` is a string or contains strings.
                Defaults to False.

        Returns:
            PlayHandle: An object representing the playback handle. This can be
            awaited or stopped manually.
        """  # noqa: E501
        if not self._mixer_atask:
            raise RuntimeError("BackgroundAudio is not started")

        normalized = self._normalize_sound_source(audio)
        if normalized is None:
            play_handle = PlayHandle()
            play_handle._mark_playout_done()
            return play_handle

        sound_source, volume = normalized

        if loop and isinstance(sound_source, AsyncIterator):
            raise ValueError(
                "Looping sound via AsyncIterator is not supported. Use a string file path or your own 'infinite' AsyncIterator with loop=False"  # noqa: E501
            )

        play_handle = PlayHandle()
        task = asyncio.create_task(self._play_task(play_handle, sound_source, volume, loop))
        task.add_done_callback(lambda _: self._play_tasks.remove(task))
        task.add_done_callback(lambda _: play_handle._mark_playout_done())
        self._play_tasks.append(task)
        return play_handle

    async def start(
        self,
        *,
        room: rtc.Room,
        agent_session: NotGivenOr[AgentSession] = NOT_GIVEN,
        track_publish_options: NotGivenOr[rtc.TrackPublishOptions] = NOT_GIVEN,
    ) -> None:
        """
        Starts the background audio system, publishing the audio track
        and beginning playback of any configured ambient sound.

        If `ambient_sound` is provided (and contains file paths), they will loop
        automatically. If `ambient_sound` contains AsyncIterators, they are assumed
        to be already infinite or looped.

        Args:
            room (rtc.Room):
                The LiveKit Room object where the audio track will be published.
            agent_session (NotGivenOr[AgentSession], optional):
                The session object used to track the agent's state (e.g., "thinking").
                Required if `thinking_sound` is provided.
            track_publish_options (NotGivenOr[rtc.TrackPublishOptions], optional):
                Options used when publishing the audio track. If not given, defaults will
                be used.
        """
        async with self._lock:
            self._room = room
            self._agent_session = agent_session or None
            self._track_publish_options = track_publish_options or None

            if cli.CLI_ARGUMENTS is not None and cli.CLI_ARGUMENTS.console:
                logger.warning(
                    "Background audio is not supported in console mode. Audio will not be played."
                )

            await self._publish_track()

            self._mixer_atask = asyncio.create_task(self._run_mixer_task())
            self._room.on("reconnected", self._on_reconnected)

            if self._agent_session:
                self._agent_session.on("agent_state_changed", self._agent_state_changed)

            if self._ambient_sound:
                normalized = self._normalize_sound_source(self._ambient_sound)
                if normalized:
                    sound_source, volume = normalized
                    selected_sound = AudioConfig(sound_source, volume)
                    if isinstance(sound_source, str):
                        self._ambient_handle = self.play(selected_sound, loop=True)
                    else:
                        self._ambient_handle = self.play(selected_sound)

    async def aclose(self) -> None:
        """
        Gracefully closes the background audio system, canceling all ongoing
        playback tasks and unpublishing the audio track.
        """
        async with self._lock:
            if not self._mixer_atask:
                return  # not started

            await cancel_and_wait(*self._play_tasks)

            if self._republish_task:
                await cancel_and_wait(self._republish_task)

            await cancel_and_wait(self._mixer_atask)

            await self._audio_source.aclose()
            await self._audio_mixer.aclose()

            if self._agent_session:
                self._agent_session.off("agent_state_changed", self._agent_state_changed)

            self._room.off("reconnected", self._on_reconnected)

            with contextlib.suppress(Exception):
                if self._publication is not None:
                    await self._room.local_participant.unpublish_track(self._publication.sid)

    def _on_reconnected(self) -> None:
        if self._republish_task:
            self._republish_task.cancel()

        self._publication = None
        self._republish_task = asyncio.create_task(self._republish_track_task())

    def _agent_state_changed(self, ev: AgentStateChangedEvent) -> None:
        if not self._thinking_sound:
            return

        if ev.new_state == "thinking":
            if self._thinking_handle and not self._thinking_handle.done():
                return

            self._thinking_handle = self.play(self._thinking_sound)

        elif self._thinking_handle:
            self._thinking_handle.stop()

    @log_exceptions(logger=logger)
    async def _play_task(
        self, play_handle: PlayHandle, sound: AudioSource, volume: float, loop: bool
    ) -> None:
        if isinstance(sound, BuiltinAudioClip):
            sound = sound.path()

        if isinstance(sound, str):
            if loop:
                sound = _loop_audio_frames(sound)
            else:
                sound = audio_frames_from_file(sound)

        async def _gen_wrapper() -> AsyncGenerator[rtc.AudioFrame, None]:
            async for frame in sound:
                if volume != 1.0:
                    data = np.frombuffer(frame.data, dtype=np.int16).astype(np.float32)
                    data *= 10 ** (np.log10(volume))
                    np.clip(data, -32768, 32767, out=data)
                    yield rtc.AudioFrame(
                        data=data.astype(np.int16).tobytes(),
                        sample_rate=frame.sample_rate,
                        num_channels=frame.num_channels,
                        samples_per_channel=frame.samples_per_channel,
                    )
                else:
                    yield frame

            # TODO(theomonnom): the wait_for_playout() may be innaccurate by 400ms
            play_handle._mark_playout_done()

        gen = _gen_wrapper()
        try:
            self._audio_mixer.add_stream(gen)
            await play_handle.wait_for_playout()  # wait for playout or interruption
        finally:
            if play_handle._stop_fut.done():
                self._audio_mixer.remove_stream(gen)
                await gen.aclose()

            play_handle._mark_playout_done()  # the task could be cancelled

    @log_exceptions(logger=logger)
    async def _run_mixer_task(self) -> None:
        async for frame in self._audio_mixer:
            await self._audio_source.capture_frame(frame)

    async def _publish_track(self) -> None:
        if self._publication is not None:
            return

        track = rtc.LocalAudioTrack.create_audio_track("background_audio", self._audio_source)
        self._publication = await self._room.local_participant.publish_track(
            track, self._track_publish_options or rtc.TrackPublishOptions()
        )

    @log_exceptions(logger=logger)
    async def _republish_track_task(self) -> None:
        # used to republish the track on agent reconnect
        async with self._lock:
            await self._publish_track()

Initializes the BackgroundAudio component with optional ambient and thinking sounds.

This component creates and publishes a continuous audio track to a LiveKit room while managing the playback of ambient and agent “thinking” sounds. It supports three types of audio sources: - A BuiltinAudioClip enum value, which will use a pre-defined sound from the package resources - A file path (string) pointing to an audio file, which can be looped. - An AsyncIterator that yields rtc.AudioFrame

When a list (or AudioConfig) is supplied, the component considers each sound’s volume and probability: - The probability value determines the chance that a particular sound is selected for playback. - A total probability below 1.0 means there is a chance no sound will be selected (resulting in silence).

Args

ambient_sound (NotGivenOr[Union[AudioSource, AudioConfig, List[AudioConfig], None]], optional): The ambient sound to be played continuously. For file paths, the sound will be looped. For AsyncIterator sources, ensure the iterator is infinite or looped.

thinking_sound (NotGivenOr[Union[AudioSource, AudioConfig, List[AudioConfig], None]], optional): The sound to be played when the associated agent enters a “thinking” state. This can be a single sound source or a list of AudioConfig objects (with volume and probability settings).

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    """
    Gracefully closes the background audio system, canceling all ongoing
    playback tasks and unpublishing the audio track.
    """
    async with self._lock:
        if not self._mixer_atask:
            return  # not started

        await cancel_and_wait(*self._play_tasks)

        if self._republish_task:
            await cancel_and_wait(self._republish_task)

        await cancel_and_wait(self._mixer_atask)

        await self._audio_source.aclose()
        await self._audio_mixer.aclose()

        if self._agent_session:
            self._agent_session.off("agent_state_changed", self._agent_state_changed)

        self._room.off("reconnected", self._on_reconnected)

        with contextlib.suppress(Exception):
            if self._publication is not None:
                await self._room.local_participant.unpublish_track(self._publication.sid)

Gracefully closes the background audio system, canceling all ongoing playback tasks and unpublishing the audio track.

def play(self,
audio: AudioSource | AudioConfig | list[AudioConfig],
*,
loop: bool = False) ‑> livekit.agents.voice.background_audio.PlayHandle
Expand source code
def play(
    self,
    audio: AudioSource | AudioConfig | list[AudioConfig],
    *,
    loop: bool = False,
) -> PlayHandle:
    """
    Plays an audio once or in a loop.

    Args:
        audio (Union[AudioSource, AudioConfig, List[AudioConfig]]):
            The audio to play. Can be:
            - A string pointing to a file path
            - An AsyncIterator that yields `rtc.AudioFrame`
            - An AudioConfig object with volume and probability
            - A list of AudioConfig objects, where one will be selected based on probability

            If a string is provided and `loop` is True, the sound will be looped.
            If an AsyncIterator is provided, it is played until exhaustion (and cannot be looped
            automatically).
        loop (bool, optional):
            Whether to loop the audio. Only applicable if `audio` is a string or contains strings.
            Defaults to False.

    Returns:
        PlayHandle: An object representing the playback handle. This can be
        awaited or stopped manually.
    """  # noqa: E501
    if not self._mixer_atask:
        raise RuntimeError("BackgroundAudio is not started")

    normalized = self._normalize_sound_source(audio)
    if normalized is None:
        play_handle = PlayHandle()
        play_handle._mark_playout_done()
        return play_handle

    sound_source, volume = normalized

    if loop and isinstance(sound_source, AsyncIterator):
        raise ValueError(
            "Looping sound via AsyncIterator is not supported. Use a string file path or your own 'infinite' AsyncIterator with loop=False"  # noqa: E501
        )

    play_handle = PlayHandle()
    task = asyncio.create_task(self._play_task(play_handle, sound_source, volume, loop))
    task.add_done_callback(lambda _: self._play_tasks.remove(task))
    task.add_done_callback(lambda _: play_handle._mark_playout_done())
    self._play_tasks.append(task)
    return play_handle

Plays an audio once or in a loop.

Args

audio (Union[AudioSource, AudioConfig, List[AudioConfig]]): The audio to play. Can be: - A string pointing to a file path - An AsyncIterator that yields rtc.AudioFrame - An AudioConfig object with volume and probability - A list of AudioConfig objects, where one will be selected based on probability

If a string is provided and <code>loop</code> is True, the sound will be looped.
If an AsyncIterator is provided, it is played until exhaustion (and cannot be looped
automatically).

loop (bool, optional): Whether to loop the audio. Only applicable if audio is a string or contains strings. Defaults to False.

Returns

PlayHandle
An object representing the playback handle. This can be

awaited or stopped manually.

async def start(self,
*,
room: rtc.Room,
agent_session: NotGivenOr[AgentSession] = NOT_GIVEN,
track_publish_options: NotGivenOr[rtc.TrackPublishOptions] = NOT_GIVEN) ‑> None
Expand source code
async def start(
    self,
    *,
    room: rtc.Room,
    agent_session: NotGivenOr[AgentSession] = NOT_GIVEN,
    track_publish_options: NotGivenOr[rtc.TrackPublishOptions] = NOT_GIVEN,
) -> None:
    """
    Starts the background audio system, publishing the audio track
    and beginning playback of any configured ambient sound.

    If `ambient_sound` is provided (and contains file paths), they will loop
    automatically. If `ambient_sound` contains AsyncIterators, they are assumed
    to be already infinite or looped.

    Args:
        room (rtc.Room):
            The LiveKit Room object where the audio track will be published.
        agent_session (NotGivenOr[AgentSession], optional):
            The session object used to track the agent's state (e.g., "thinking").
            Required if `thinking_sound` is provided.
        track_publish_options (NotGivenOr[rtc.TrackPublishOptions], optional):
            Options used when publishing the audio track. If not given, defaults will
            be used.
    """
    async with self._lock:
        self._room = room
        self._agent_session = agent_session or None
        self._track_publish_options = track_publish_options or None

        if cli.CLI_ARGUMENTS is not None and cli.CLI_ARGUMENTS.console:
            logger.warning(
                "Background audio is not supported in console mode. Audio will not be played."
            )

        await self._publish_track()

        self._mixer_atask = asyncio.create_task(self._run_mixer_task())
        self._room.on("reconnected", self._on_reconnected)

        if self._agent_session:
            self._agent_session.on("agent_state_changed", self._agent_state_changed)

        if self._ambient_sound:
            normalized = self._normalize_sound_source(self._ambient_sound)
            if normalized:
                sound_source, volume = normalized
                selected_sound = AudioConfig(sound_source, volume)
                if isinstance(sound_source, str):
                    self._ambient_handle = self.play(selected_sound, loop=True)
                else:
                    self._ambient_handle = self.play(selected_sound)

Starts the background audio system, publishing the audio track and beginning playback of any configured ambient sound.

If ambient_sound is provided (and contains file paths), they will loop automatically. If ambient_sound contains AsyncIterators, they are assumed to be already infinite or looped.

Args

room (rtc.Room): The LiveKit Room object where the audio track will be published. agent_session (NotGivenOr[AgentSession], optional): The session object used to track the agent's state (e.g., "thinking"). Required if thinking_sound is provided. track_publish_options (NotGivenOr[rtc.TrackPublishOptions], optional): Options used when publishing the audio track. If not given, defaults will be used.

class BuiltinAudioClip (*args, **kwds)
Expand source code
class BuiltinAudioClip(enum.Enum):
    OFFICE_AMBIENCE = "office-ambience.ogg"
    KEYBOARD_TYPING = "keyboard-typing.ogg"
    KEYBOARD_TYPING2 = "keyboard-typing2.ogg"

    def path(self) -> str:
        file_path = files("livekit.agents.resources") / self.value
        return str(_resource_stack.enter_context(as_file(file_path)))

Create a collection of name/value pairs.

Example enumeration:

>>> class Color(Enum):
...     RED = 1
...     BLUE = 2
...     GREEN = 3

Access them by:

  • attribute access:

Color.RED

  • value lookup:

Color(1)

  • name lookup:

Color['RED']

Enumerations can be iterated over, and know how many members they have:

>>> len(Color)
3
>>> list(Color)
[<Color.RED: 1>, <Color.BLUE: 2>, <Color.GREEN: 3>]

Methods can be added to enumerations, and members can have their own attributes – see the documentation for details.

Ancestors

  • enum.Enum

Class variables

var KEYBOARD_TYPING
var KEYBOARD_TYPING2
var OFFICE_AMBIENCE

Methods

def path(self) ‑> str
Expand source code
def path(self) -> str:
    file_path = files("livekit.agents.resources") / self.value
    return str(_resource_stack.enter_context(as_file(file_path)))
class ChatContext (items: NotGivenOr[list[ChatItem]] = NOT_GIVEN)
Expand source code
class ChatContext:
    def __init__(self, items: NotGivenOr[list[ChatItem]] = NOT_GIVEN):
        self._items: list[ChatItem] = items if is_given(items) else []

    @classmethod
    def empty(cls) -> ChatContext:
        return cls([])

    @property
    def items(self) -> list[ChatItem]:
        return self._items

    @items.setter
    def items(self, items: list[ChatItem]):
        self._items = items

    def add_message(
        self,
        *,
        role: ChatRole,
        content: list[ChatContent] | str,
        id: NotGivenOr[str] = NOT_GIVEN,
        interrupted: NotGivenOr[bool] = NOT_GIVEN,
        created_at: NotGivenOr[float] = NOT_GIVEN,
    ) -> ChatMessage:
        kwargs = {}
        if is_given(id):
            kwargs["id"] = id
        if is_given(interrupted):
            kwargs["interrupted"] = interrupted
        if is_given(created_at):
            kwargs["created_at"] = created_at

        if isinstance(content, str):
            message = ChatMessage(role=role, content=[content], **kwargs)
        else:
            message = ChatMessage(role=role, content=content, **kwargs)

        self._items.append(message)
        return message

    def get_by_id(self, item_id: str) -> ChatItem | None:
        return next((item for item in self.items if item.id == item_id), None)

    def index_by_id(self, item_id: str) -> int | None:
        return next((i for i, item in enumerate(self.items) if item.id == item_id), None)

    def copy(
        self,
        *,
        exclude_function_call: bool = False,
        exclude_instructions: bool = False,
        tools: NotGivenOr[list[FunctionTool | RawFunctionTool | str | Any]] = NOT_GIVEN,
    ) -> ChatContext:
        items = []

        from .tool_context import (
            get_function_info,
            get_raw_function_info,
            is_function_tool,
            is_raw_function_tool,
        )

        valid_tools = set()
        if is_given(tools):
            for tool in tools:
                if isinstance(tool, str):
                    valid_tools.add(tool)
                elif is_function_tool(tool):
                    valid_tools.add(get_function_info(tool).name)
                elif is_raw_function_tool(tool):
                    valid_tools.add(get_raw_function_info(tool).name)
                # TODO(theomonnom): other tools

        for item in self.items:
            if exclude_function_call and item.type in [
                "function_call",
                "function_call_output",
            ]:
                continue

            if (
                exclude_instructions
                and item.type == "message"
                and item.role in ["system", "developer"]
            ):
                continue

            if (
                is_given(tools)
                and item.type in ["function_call", "function_call_output"]
                and item.name not in valid_tools
            ):
                continue

            items.append(item)

        return ChatContext(items)

    def truncate(self, *, max_items: int) -> ChatContext:
        """Truncate the chat context to the last N items in place.

        Removes leading function calls to avoid partial function outputs.
        Preserves the first system message by adding it back to the beginning.
        """
        instructions = next(
            (item for item in self._items if item.type == "message" and item.role == "system"),
            None,
        )

        new_items = self._items[-max_items:]
        # chat ctx shouldn't start with function_call or function_call_output
        while new_items and new_items[0].type in [
            "function_call",
            "function_call_output",
        ]:
            new_items.pop(0)

        if instructions:
            new_items.insert(0, instructions)

        self._items[:] = new_items
        return self

    def to_dict(
        self,
        *,
        exclude_image: bool = True,
        exclude_audio: bool = True,
        exclude_timestamp: bool = True,
        exclude_function_call: bool = False,
    ) -> dict:
        items = []
        for item in self.items:
            if exclude_function_call and item.type in [
                "function_call",
                "function_call_output",
            ]:
                continue

            if item.type == "message":
                item = item.model_copy()
                if exclude_image:
                    item.content = [c for c in item.content if not isinstance(c, ImageContent)]
                if exclude_audio:
                    item.content = [c for c in item.content if not isinstance(c, AudioContent)]

            items.append(item)

        exclude_fields = set()
        if exclude_timestamp:
            exclude_fields.add("created_at")

        return {
            "items": [
                item.model_dump(
                    mode="json",
                    exclude_none=True,
                    exclude_defaults=True,
                    exclude=exclude_fields,
                )
                for item in items
            ],
        }

    def find_insertion_index(self, *, created_at: float) -> int:
        """
        Returns the index to insert an item by creation time.

        Iterates in reverse, assuming items are sorted by `created_at`.
        Finds the position after the last item with `created_at <=` the given timestamp.
        """
        for i in reversed(range(len(self._items))):
            item = self._items[i]
            if item.type == "message" and item.created_at <= created_at:
                return i + 1

        return 0

    @classmethod
    def from_dict(cls, data: dict) -> ChatContext:
        item_adapter = TypeAdapter(list[ChatItem])
        items = item_adapter.validate_python(data["items"])
        return cls(items)

    @property
    def readonly(self) -> bool:
        return False

Subclasses

  • livekit.agents.llm.chat_context._ReadOnlyChatContext

Static methods

def empty() ‑> livekit.agents.llm.chat_context.ChatContext
def from_dict(data: dict) ‑> livekit.agents.llm.chat_context.ChatContext

Instance variables

prop items : list[ChatItem]
Expand source code
@property
def items(self) -> list[ChatItem]:
    return self._items
prop readonly : bool
Expand source code
@property
def readonly(self) -> bool:
    return False

Methods

def add_message(self,
*,
role: ChatRole,
content: list[ChatContent] | str,
id: NotGivenOr[str] = NOT_GIVEN,
interrupted: NotGivenOr[bool] = NOT_GIVEN,
created_at: NotGivenOr[float] = NOT_GIVEN) ‑> livekit.agents.llm.chat_context.ChatMessage
Expand source code
def add_message(
    self,
    *,
    role: ChatRole,
    content: list[ChatContent] | str,
    id: NotGivenOr[str] = NOT_GIVEN,
    interrupted: NotGivenOr[bool] = NOT_GIVEN,
    created_at: NotGivenOr[float] = NOT_GIVEN,
) -> ChatMessage:
    kwargs = {}
    if is_given(id):
        kwargs["id"] = id
    if is_given(interrupted):
        kwargs["interrupted"] = interrupted
    if is_given(created_at):
        kwargs["created_at"] = created_at

    if isinstance(content, str):
        message = ChatMessage(role=role, content=[content], **kwargs)
    else:
        message = ChatMessage(role=role, content=content, **kwargs)

    self._items.append(message)
    return message
def copy(self,
*,
exclude_function_call: bool = False,
exclude_instructions: bool = False,
tools: NotGivenOr[list[FunctionTool | RawFunctionTool | str | Any]] = NOT_GIVEN) ‑> ChatContext
Expand source code
def copy(
    self,
    *,
    exclude_function_call: bool = False,
    exclude_instructions: bool = False,
    tools: NotGivenOr[list[FunctionTool | RawFunctionTool | str | Any]] = NOT_GIVEN,
) -> ChatContext:
    items = []

    from .tool_context import (
        get_function_info,
        get_raw_function_info,
        is_function_tool,
        is_raw_function_tool,
    )

    valid_tools = set()
    if is_given(tools):
        for tool in tools:
            if isinstance(tool, str):
                valid_tools.add(tool)
            elif is_function_tool(tool):
                valid_tools.add(get_function_info(tool).name)
            elif is_raw_function_tool(tool):
                valid_tools.add(get_raw_function_info(tool).name)
            # TODO(theomonnom): other tools

    for item in self.items:
        if exclude_function_call and item.type in [
            "function_call",
            "function_call_output",
        ]:
            continue

        if (
            exclude_instructions
            and item.type == "message"
            and item.role in ["system", "developer"]
        ):
            continue

        if (
            is_given(tools)
            and item.type in ["function_call", "function_call_output"]
            and item.name not in valid_tools
        ):
            continue

        items.append(item)

    return ChatContext(items)
def find_insertion_index(self, *, created_at: float) ‑> int
Expand source code
def find_insertion_index(self, *, created_at: float) -> int:
    """
    Returns the index to insert an item by creation time.

    Iterates in reverse, assuming items are sorted by `created_at`.
    Finds the position after the last item with `created_at <=` the given timestamp.
    """
    for i in reversed(range(len(self._items))):
        item = self._items[i]
        if item.type == "message" and item.created_at <= created_at:
            return i + 1

    return 0

Returns the index to insert an item by creation time.

Iterates in reverse, assuming items are sorted by created_at. Finds the position after the last item with created_at <= the given timestamp.

def get_by_id(self, item_id: str) ‑> livekit.agents.llm.chat_context.ChatMessage | livekit.agents.llm.chat_context.FunctionCall | livekit.agents.llm.chat_context.FunctionCallOutput | None
Expand source code
def get_by_id(self, item_id: str) -> ChatItem | None:
    return next((item for item in self.items if item.id == item_id), None)
def index_by_id(self, item_id: str) ‑> int | None
Expand source code
def index_by_id(self, item_id: str) -> int | None:
    return next((i for i, item in enumerate(self.items) if item.id == item_id), None)
def to_dict(self,
*,
exclude_image: bool = True,
exclude_audio: bool = True,
exclude_timestamp: bool = True,
exclude_function_call: bool = False) ‑> dict
Expand source code
def to_dict(
    self,
    *,
    exclude_image: bool = True,
    exclude_audio: bool = True,
    exclude_timestamp: bool = True,
    exclude_function_call: bool = False,
) -> dict:
    items = []
    for item in self.items:
        if exclude_function_call and item.type in [
            "function_call",
            "function_call_output",
        ]:
            continue

        if item.type == "message":
            item = item.model_copy()
            if exclude_image:
                item.content = [c for c in item.content if not isinstance(c, ImageContent)]
            if exclude_audio:
                item.content = [c for c in item.content if not isinstance(c, AudioContent)]

        items.append(item)

    exclude_fields = set()
    if exclude_timestamp:
        exclude_fields.add("created_at")

    return {
        "items": [
            item.model_dump(
                mode="json",
                exclude_none=True,
                exclude_defaults=True,
                exclude=exclude_fields,
            )
            for item in items
        ],
    }
def truncate(self, *, max_items: int) ‑> livekit.agents.llm.chat_context.ChatContext
Expand source code
def truncate(self, *, max_items: int) -> ChatContext:
    """Truncate the chat context to the last N items in place.

    Removes leading function calls to avoid partial function outputs.
    Preserves the first system message by adding it back to the beginning.
    """
    instructions = next(
        (item for item in self._items if item.type == "message" and item.role == "system"),
        None,
    )

    new_items = self._items[-max_items:]
    # chat ctx shouldn't start with function_call or function_call_output
    while new_items and new_items[0].type in [
        "function_call",
        "function_call_output",
    ]:
        new_items.pop(0)

    if instructions:
        new_items.insert(0, instructions)

    self._items[:] = new_items
    return self

Truncate the chat context to the last N items in place.

Removes leading function calls to avoid partial function outputs. Preserves the first system message by adding it back to the beginning.

class ChatMessage (**data: Any)
Expand source code
class ChatMessage(BaseModel):
    id: str = Field(default_factory=lambda: utils.shortuuid("item_"))
    type: Literal["message"] = "message"
    role: ChatRole
    content: list[ChatContent]
    interrupted: bool = False
    hash: bytes | None = None
    created_at: float = Field(default_factory=time.time)

    @property
    def text_content(self) -> str | None:
        """
        Returns a string of all text content in the message.

        Multiple text content items will be joined by a newline.
        """
        text_parts = [c for c in self.content if isinstance(c, str)]
        if not text_parts:
            return None
        return "\n".join(text_parts)

Usage docs: https://docs.pydantic.dev/2.10/concepts/models/

A base class for creating Pydantic models.

Attributes

__class_vars__
The names of the class variables defined on the model.
__private_attributes__
Metadata about the private attributes of the model.
__signature__
The synthesized __init__ [Signature][inspect.Signature] of the model.
__pydantic_complete__
Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__
The core schema of the model.
__pydantic_custom_init__
Whether the model has a custom __init__ function.
__pydantic_decorators__
Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
__pydantic_generic_metadata__
Metadata for generic models; contains data used for a similar purpose to args, origin, parameters in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__
Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__
The name of the post-init method for the model, if defined.
__pydantic_root_model__
Whether the model is a [RootModel][pydantic.root_model.RootModel].
__pydantic_serializer__
The pydantic-core SchemaSerializer used to dump instances of the model.
__pydantic_validator__
The pydantic-core SchemaValidator used to validate instances of the model.
__pydantic_fields__
A dictionary of field names and their corresponding [FieldInfo][pydantic.fields.FieldInfo] objects.
__pydantic_computed_fields__
A dictionary of computed field names and their corresponding [ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects.
__pydantic_extra__
A dictionary containing extra values, if [extra][pydantic.config.ConfigDict.extra] is set to 'allow'.
__pydantic_fields_set__
The names of fields explicitly set during instantiation.
__pydantic_private__
Values of private attributes set on the model instance.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • pydantic.main.BaseModel

Class variables

var content : list[livekit.agents.llm.chat_context.ImageContent | livekit.agents.llm.chat_context.AudioContent | str]
var created_at : float
var hash : bytes | None
var id : str
var interrupted : bool
var model_config
var role : Literal['developer', 'system', 'user', 'assistant']
var type : Literal['message']

Instance variables

prop text_content : str | None
Expand source code
@property
def text_content(self) -> str | None:
    """
    Returns a string of all text content in the message.

    Multiple text content items will be joined by a newline.
    """
    text_parts = [c for c in self.content if isinstance(c, str)]
    if not text_parts:
        return None
    return "\n".join(text_parts)

Returns a string of all text content in the message.

Multiple text content items will be joined by a newline.

class CloseEvent (**data: Any)
Expand source code
class CloseEvent(BaseModel):
    type: Literal["close"] = "close"
    error: LLMError | STTError | TTSError | RealtimeModelError | None = None

Usage docs: https://docs.pydantic.dev/2.10/concepts/models/

A base class for creating Pydantic models.

Attributes

__class_vars__
The names of the class variables defined on the model.
__private_attributes__
Metadata about the private attributes of the model.
__signature__
The synthesized __init__ [Signature][inspect.Signature] of the model.
__pydantic_complete__
Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__
The core schema of the model.
__pydantic_custom_init__
Whether the model has a custom __init__ function.
__pydantic_decorators__
Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
__pydantic_generic_metadata__
Metadata for generic models; contains data used for a similar purpose to args, origin, parameters in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__
Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__
The name of the post-init method for the model, if defined.
__pydantic_root_model__
Whether the model is a [RootModel][pydantic.root_model.RootModel].
__pydantic_serializer__
The pydantic-core SchemaSerializer used to dump instances of the model.
__pydantic_validator__
The pydantic-core SchemaValidator used to validate instances of the model.
__pydantic_fields__
A dictionary of field names and their corresponding [FieldInfo][pydantic.fields.FieldInfo] objects.
__pydantic_computed_fields__
A dictionary of computed field names and their corresponding [ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects.
__pydantic_extra__
A dictionary containing extra values, if [extra][pydantic.config.ConfigDict.extra] is set to 'allow'.
__pydantic_fields_set__
The names of fields explicitly set during instantiation.
__pydantic_private__
Values of private attributes set on the model instance.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • pydantic.main.BaseModel

Class variables

var error : livekit.agents.llm.llm.LLMError | livekit.agents.stt.stt.STTError | livekit.agents.tts.tts.TTSError | livekit.agents.llm.realtime.RealtimeModelError | None
var model_config
var type : Literal['close']
class ConversationItemAddedEvent (**data: Any)
Expand source code
class ConversationItemAddedEvent(BaseModel):
    type: Literal["conversation_item_added"] = "conversation_item_added"
    item: ChatMessage | _TypeDiscriminator

Usage docs: https://docs.pydantic.dev/2.10/concepts/models/

A base class for creating Pydantic models.

Attributes

__class_vars__
The names of the class variables defined on the model.
__private_attributes__
Metadata about the private attributes of the model.
__signature__
The synthesized __init__ [Signature][inspect.Signature] of the model.
__pydantic_complete__
Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__
The core schema of the model.
__pydantic_custom_init__
Whether the model has a custom __init__ function.
__pydantic_decorators__
Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
__pydantic_generic_metadata__
Metadata for generic models; contains data used for a similar purpose to args, origin, parameters in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__
Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__
The name of the post-init method for the model, if defined.
__pydantic_root_model__
Whether the model is a [RootModel][pydantic.root_model.RootModel].
__pydantic_serializer__
The pydantic-core SchemaSerializer used to dump instances of the model.
__pydantic_validator__
The pydantic-core SchemaValidator used to validate instances of the model.
__pydantic_fields__
A dictionary of field names and their corresponding [FieldInfo][pydantic.fields.FieldInfo] objects.
__pydantic_computed_fields__
A dictionary of computed field names and their corresponding [ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects.
__pydantic_extra__
A dictionary containing extra values, if [extra][pydantic.config.ConfigDict.extra] is set to 'allow'.
__pydantic_fields_set__
The names of fields explicitly set during instantiation.
__pydantic_private__
Values of private attributes set on the model instance.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • pydantic.main.BaseModel

Class variables

var item : livekit.agents.llm.chat_context.ChatMessage | livekit.agents.voice.events._TypeDiscriminator
var model_config
var type : Literal['conversation_item_added']
class ErrorEvent (**data: Any)
Expand source code
class ErrorEvent(BaseModel):
    model_config = ConfigDict(arbitrary_types_allowed=True)
    type: Literal["error"] = "error"
    error: LLMError | STTError | TTSError | RealtimeModelError | Any
    source: LLM | STT | TTS | RealtimeModel | Any

Usage docs: https://docs.pydantic.dev/2.10/concepts/models/

A base class for creating Pydantic models.

Attributes

__class_vars__
The names of the class variables defined on the model.
__private_attributes__
Metadata about the private attributes of the model.
__signature__
The synthesized __init__ [Signature][inspect.Signature] of the model.
__pydantic_complete__
Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__
The core schema of the model.
__pydantic_custom_init__
Whether the model has a custom __init__ function.
__pydantic_decorators__
Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
__pydantic_generic_metadata__
Metadata for generic models; contains data used for a similar purpose to args, origin, parameters in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__
Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__
The name of the post-init method for the model, if defined.
__pydantic_root_model__
Whether the model is a [RootModel][pydantic.root_model.RootModel].
__pydantic_serializer__
The pydantic-core SchemaSerializer used to dump instances of the model.
__pydantic_validator__
The pydantic-core SchemaValidator used to validate instances of the model.
__pydantic_fields__
A dictionary of field names and their corresponding [FieldInfo][pydantic.fields.FieldInfo] objects.
__pydantic_computed_fields__
A dictionary of computed field names and their corresponding [ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects.
__pydantic_extra__
A dictionary containing extra values, if [extra][pydantic.config.ConfigDict.extra] is set to 'allow'.
__pydantic_fields_set__
The names of fields explicitly set during instantiation.
__pydantic_private__
Values of private attributes set on the model instance.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • pydantic.main.BaseModel

Class variables

var error : livekit.agents.llm.llm.LLMError | livekit.agents.stt.stt.STTError | livekit.agents.tts.tts.TTSError | livekit.agents.llm.realtime.RealtimeModelError | typing.Any
var model_config
var source : livekit.agents.llm.llm.LLM | livekit.agents.stt.stt.STT | livekit.agents.tts.tts.TTS | livekit.agents.llm.realtime.RealtimeModel | typing.Any
var type : Literal['error']
class FunctionCall (**data: Any)
Expand source code
class FunctionCall(BaseModel):
    id: str = Field(default_factory=lambda: utils.shortuuid("item_"))
    type: Literal["function_call"] = "function_call"
    call_id: str
    arguments: str
    name: str

Usage docs: https://docs.pydantic.dev/2.10/concepts/models/

A base class for creating Pydantic models.

Attributes

__class_vars__
The names of the class variables defined on the model.
__private_attributes__
Metadata about the private attributes of the model.
__signature__
The synthesized __init__ [Signature][inspect.Signature] of the model.
__pydantic_complete__
Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__
The core schema of the model.
__pydantic_custom_init__
Whether the model has a custom __init__ function.
__pydantic_decorators__
Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
__pydantic_generic_metadata__
Metadata for generic models; contains data used for a similar purpose to args, origin, parameters in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__
Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__
The name of the post-init method for the model, if defined.
__pydantic_root_model__
Whether the model is a [RootModel][pydantic.root_model.RootModel].
__pydantic_serializer__
The pydantic-core SchemaSerializer used to dump instances of the model.
__pydantic_validator__
The pydantic-core SchemaValidator used to validate instances of the model.
__pydantic_fields__
A dictionary of field names and their corresponding [FieldInfo][pydantic.fields.FieldInfo] objects.
__pydantic_computed_fields__
A dictionary of computed field names and their corresponding [ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects.
__pydantic_extra__
A dictionary containing extra values, if [extra][pydantic.config.ConfigDict.extra] is set to 'allow'.
__pydantic_fields_set__
The names of fields explicitly set during instantiation.
__pydantic_private__
Values of private attributes set on the model instance.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • pydantic.main.BaseModel

Class variables

var arguments : str
var call_id : str
var id : str
var model_config
var name : str
var type : Literal['function_call']
class FunctionCallOutput (**data: Any)
Expand source code
class FunctionCallOutput(BaseModel):
    id: str = Field(default_factory=lambda: utils.shortuuid("item_"))
    name: str = Field(default="")
    type: Literal["function_call_output"] = Field(default="function_call_output")
    call_id: str
    output: str
    is_error: bool

Usage docs: https://docs.pydantic.dev/2.10/concepts/models/

A base class for creating Pydantic models.

Attributes

__class_vars__
The names of the class variables defined on the model.
__private_attributes__
Metadata about the private attributes of the model.
__signature__
The synthesized __init__ [Signature][inspect.Signature] of the model.
__pydantic_complete__
Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__
The core schema of the model.
__pydantic_custom_init__
Whether the model has a custom __init__ function.
__pydantic_decorators__
Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
__pydantic_generic_metadata__
Metadata for generic models; contains data used for a similar purpose to args, origin, parameters in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__
Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__
The name of the post-init method for the model, if defined.
__pydantic_root_model__
Whether the model is a [RootModel][pydantic.root_model.RootModel].
__pydantic_serializer__
The pydantic-core SchemaSerializer used to dump instances of the model.
__pydantic_validator__
The pydantic-core SchemaValidator used to validate instances of the model.
__pydantic_fields__
A dictionary of field names and their corresponding [FieldInfo][pydantic.fields.FieldInfo] objects.
__pydantic_computed_fields__
A dictionary of computed field names and their corresponding [ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects.
__pydantic_extra__
A dictionary containing extra values, if [extra][pydantic.config.ConfigDict.extra] is set to 'allow'.
__pydantic_fields_set__
The names of fields explicitly set during instantiation.
__pydantic_private__
Values of private attributes set on the model instance.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • pydantic.main.BaseModel

Class variables

var call_id : str
var id : str
var is_error : bool
var model_config
var name : str
var output : str
var type : Literal['function_call_output']
class FunctionTool (*args, **kwargs)
Expand source code
@runtime_checkable
class FunctionTool(Protocol):
    __livekit_tool_info: _FunctionToolInfo

    def __call__(self, *args: Any, **kwargs: Any) -> Any: ...

Base class for protocol classes.

Protocol classes are defined as::

class Proto(Protocol):
    def meth(self) -> int:
        ...

Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing).

For example::

class C:
    def meth(self) -> int:
        return 0

def func(x: Proto) -> int:
    return x.meth()

func(C())  # Passes static type check

See PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as::

class GenProto[T](Protocol):
    def meth(self) -> T:
        ...

Ancestors

  • typing.Protocol
  • typing.Generic
class JobContext (*,
proc: JobProcess,
info: RunningJobInfo,
room: rtc.Room,
on_connect: Callable[[], None],
on_shutdown: Callable[[str], None],
inference_executor: InferenceExecutor)
Expand source code
class JobContext:
    # private ctor
    def __init__(
        self,
        *,
        proc: JobProcess,
        info: RunningJobInfo,
        room: rtc.Room,
        on_connect: Callable[[], None],
        on_shutdown: Callable[[str], None],
        inference_executor: InferenceExecutor,
    ) -> None:
        self._proc = proc
        self._info = info
        self._room = room
        self._on_connect = on_connect
        self._on_shutdown = on_shutdown
        self._shutdown_callbacks: list[Callable[[str], Coroutine[None, None, None]]] = []
        self._tracing_callbacks: list[Callable[[], Coroutine[None, None, None]]] = []
        self._participant_entrypoints: list[
            tuple[
                Callable[[JobContext, rtc.RemoteParticipant], Coroutine[None, None, None]],
                list[rtc.ParticipantKind.ValueType] | rtc.ParticipantKind.ValueType,
            ]
        ] = []
        self._participant_tasks = dict[tuple[str, Callable], asyncio.Task[None]]()
        self._pending_tasks = list[asyncio.Task]()
        self._room.on("participant_connected", self._participant_available)
        self._inf_executor = inference_executor

        self._init_log_factory()
        self._log_fields = {}

    def _init_log_factory(self) -> None:
        old_factory = logging.getLogRecordFactory()

        def record_factory(*args, **kwargs) -> logging.LogRecord:
            record = old_factory(*args, **kwargs)

            if self.proc.executor_type != JobExecutorType.PROCESS:
                try:
                    ctx = get_job_context()
                except RuntimeError:
                    return record
                else:
                    if ctx != self:
                        return record

            for key, value in self._log_fields.items():
                setattr(record, key, value)

            return record

        logging.setLogRecordFactory(record_factory)

    @property
    def inference_executor(self) -> InferenceExecutor:
        return self._inf_executor

    @functools.cached_property
    def api(self) -> api.LiveKitAPI:
        return api.LiveKitAPI(session=http_context.http_session())

    @property
    def proc(self) -> JobProcess:
        """Returns the process running the job. Useful for storing process-specific state."""
        return self._proc

    @property
    def job(self) -> agent.Job:
        """Returns the current job that the worker is executing."""
        return self._info.job

    @property
    def worker_id(self) -> str:
        """Returns the id of the worker."""
        return self._info.worker_id

    @property
    def room(self) -> rtc.Room:
        """The Room object is the main interface that the worker should interact with.

        When the entrypoint is called, the worker has not connected to the Room yet.
        Certain properties of Room would not be available before calling JobContext.connect()
        """
        return self._room

    @property
    def agent(self) -> rtc.LocalParticipant:
        return self._room.local_participant

    @property
    def log_context_fields(self) -> dict[str, Any]:
        """
        Returns the current dictionary of log fields that will be injected into log records.

        These fields enable enriched structured logging and can include job metadata,
        worker ID, trace IDs, or other diagnostic context.

        The returned dictionary can be directly edited, or entirely replaced via assignment
        (e.g., `job_context.log_context_fields = {...}`)
        """
        return self._log_fields

    @log_context_fields.setter
    def log_context_fields(self, fields: dict[str, Any]) -> None:
        """
        Sets the log fields to be injected into future log records.

        Args:
            fields (dict[str, Any]): A dictionary of key-value pairs representing
                structured data to attach to each log entry. Typically includes contextual
                information like job ID, trace information, or worker metadata.
        """
        self._log_fields = fields

    def add_tracing_callback(
        self,
        callback: Callable[[], Coroutine[None, None, None]],
    ) -> None:
        """
        Add a callback to be called when the job is about to receive a new tracing request.
        """
        self._tracing_callbacks.append(callback)

    def add_shutdown_callback(
        self,
        callback: Callable[[], Coroutine[None, None, None]]
        | Callable[[str], Coroutine[None, None, None]],
    ) -> None:
        """
        Add a callback to be called when the job is shutting down.
        Optionally the callback can take a single argument, the shutdown reason.
        """
        if callback.__code__.co_argcount > 0:
            self._shutdown_callbacks.append(callback)  # type: ignore
        else:

            async def wrapper(_: str) -> None:
                await callback()  # type: ignore

            self._shutdown_callbacks.append(wrapper)

    async def wait_for_participant(
        self,
        *,
        identity: str | None = None,
        kind: list[rtc.ParticipantKind.ValueType]
        | rtc.ParticipantKind.ValueType = DEFAULT_PARTICIPANT_KINDS,
    ) -> rtc.RemoteParticipant:
        """
        Returns a participant that matches the given identity. If identity is None, the first
        participant that joins the room will be returned.
        If the participant has already joined, the function will return immediately.
        """
        return await wait_for_participant(self._room, identity=identity, kind=kind)

    async def connect(
        self,
        *,
        e2ee: rtc.E2EEOptions | None = None,
        auto_subscribe: AutoSubscribe = AutoSubscribe.SUBSCRIBE_ALL,
        rtc_config: rtc.RtcConfiguration | None = None,
    ) -> None:
        """Connect to the room. This method should be called only once.

        Args:
            e2ee: End-to-end encryption options. If provided, the Agent will utilize end-to-end encryption. Note: clients will also need to handle E2EE.
            auto_subscribe: Whether to automatically subscribe to tracks. Default is AutoSubscribe.SUBSCRIBE_ALL.
            rtc_config: Custom RTC configuration to use when connecting to the room.
        """  # noqa: E501
        room_options = rtc.RoomOptions(
            e2ee=e2ee,
            auto_subscribe=auto_subscribe == AutoSubscribe.SUBSCRIBE_ALL,
            rtc_config=rtc_config,
        )

        await self._room.connect(self._info.url, self._info.token, options=room_options)
        self._on_connect()
        for p in self._room.remote_participants.values():
            self._participant_available(p)

        _apply_auto_subscribe_opts(self._room, auto_subscribe)

    def delete_room(self) -> asyncio.Future[api.DeleteRoomResponse]:
        """Deletes the room and disconnects all participants."""
        task = asyncio.create_task(
            self.api.room.delete_room(api.DeleteRoomRequest(room=self._room.name))
        )
        self._pending_tasks.append(task)
        task.add_done_callback(lambda _: self._pending_tasks.remove(task))
        return task

    def add_sip_participant(
        self,
        *,
        call_to: str,
        trunk_id: str,
        participant_identity: str,
        participant_name: str | NotGivenOr[str] = "SIP-participant",
    ) -> asyncio.Future[api.SIPParticipantInfo]:
        """
        Add a SIP participant to the room.

        Args:
            call_to: The number or SIP destination to transfer the participant to.
                         This can either be a number (+12345555555) or a
                         sip host (sip:<user>@<host>)
            trunk_id: The ID of the SIP trunk to use
            participant_identity: The identity of the participant to add
            participant_name: The name of the participant to add

        Make sure you have an outbound SIP trunk created in LiveKit.
        See https://docs.livekit.io/sip/trunk-outbound/ for more information.
        """
        task = asyncio.create_task(
            self.api.sip.create_sip_participant(
                api.CreateSIPParticipantRequest(
                    room_name=self._room.name,
                    participant_identity=participant_identity,
                    sip_trunk_id=trunk_id,
                    sip_call_to=call_to,
                    participant_name=participant_name,
                )
            ),
        )
        self._pending_tasks.append(task)
        task.add_done_callback(lambda _: self._pending_tasks.remove(task))
        return task

    def transfer_sip_participant(
        self,
        participant: rtc.RemoteParticipant | str,
        transfer_to: str,
        play_dialtone: bool = False,
    ) -> asyncio.Future[api.SIPParticipantInfo]:
        """Transfer a SIP participant to another number.

        Args:
            participant: The participant to transfer
            transfer_to: The number or SIP destination to transfer the participant to.
                         This can either be a number (+12345555555) or a
                         sip host (sip:<user>@<host>)
            play_dialtone: Whether to play a dialtone during transfer. Defaults to True.


        Returns:
            Future that completes when the transfer is complete

        Make sure you have enabled call transfer on your provider SIP trunk.
        See https://docs.livekit.io/sip/transfer-cold/ for more information.
        """
        assert participant.kind == rtc.ParticipantKind.PARTICIPANT_KIND_SIP, (
            "Participant must be a SIP participant"
        )
        task = asyncio.create_task(
            self.api.sip.transfer_sip_participant(
                api.TransferSIPParticipantRequest(
                    room_name=self._room.name,
                    participant_identity=participant.identity,
                    transfer_to=transfer_to,
                    play_dialtone=play_dialtone,
                )
            ),
        )
        self._pending_tasks.append(task)
        task.add_done_callback(lambda _: self._pending_tasks.remove(task))
        return task

    def shutdown(self, reason: str = "") -> None:
        self._on_shutdown(reason)

    def add_participant_entrypoint(
        self,
        entrypoint_fnc: Callable[[JobContext, rtc.RemoteParticipant], Coroutine[None, None, None]],
        *_,
        kind: list[rtc.ParticipantKind.ValueType]
        | rtc.ParticipantKind.ValueType = DEFAULT_PARTICIPANT_KINDS,
    ):
        """Adds an entrypoint function to be run when a participant joins the room. In cases where
        the participant has already joined, the entrypoint will be run immediately. Multiple unique entrypoints can be
        added and they will each be run in parallel for each participant.
        """  # noqa: E501

        if entrypoint_fnc in [e for (e, _) in self._participant_entrypoints]:
            raise ValueError("entrypoints cannot be added more than once")

        self._participant_entrypoints.append((entrypoint_fnc, kind))

    def _participant_available(self, p: rtc.RemoteParticipant) -> None:
        for coro, kind in self._participant_entrypoints:
            if isinstance(kind, list):
                if p.kind not in kind:
                    continue
            else:
                if p.kind != kind:
                    continue

            if (p.identity, coro) in self._participant_tasks:
                logger.warning(
                    f"a participant has joined before a prior participant task matching the same identity has finished: '{p.identity}'"  # noqa: E501
                )
            task_name = f"part-entry-{p.identity}-{coro.__name__}"
            task = asyncio.create_task(coro(self, p), name=task_name)
            self._participant_tasks[(p.identity, coro)] = task
            task.add_done_callback(
                lambda _, coro=coro: self._participant_tasks.pop((p.identity, coro))
            )

Instance variables

prop agent : rtc.LocalParticipant
Expand source code
@property
def agent(self) -> rtc.LocalParticipant:
    return self._room.local_participant
var api : api.LiveKitAPI
Expand source code
@functools.cached_property
def api(self) -> api.LiveKitAPI:
    return api.LiveKitAPI(session=http_context.http_session())
prop inference_executor : InferenceExecutor
Expand source code
@property
def inference_executor(self) -> InferenceExecutor:
    return self._inf_executor
prop job : agent.Job
Expand source code
@property
def job(self) -> agent.Job:
    """Returns the current job that the worker is executing."""
    return self._info.job

Returns the current job that the worker is executing.

prop log_context_fields : dict[str, Any]
Expand source code
@property
def log_context_fields(self) -> dict[str, Any]:
    """
    Returns the current dictionary of log fields that will be injected into log records.

    These fields enable enriched structured logging and can include job metadata,
    worker ID, trace IDs, or other diagnostic context.

    The returned dictionary can be directly edited, or entirely replaced via assignment
    (e.g., `job_context.log_context_fields = {...}`)
    """
    return self._log_fields

Returns the current dictionary of log fields that will be injected into log records.

These fields enable enriched structured logging and can include job metadata, worker ID, trace IDs, or other diagnostic context.

The returned dictionary can be directly edited, or entirely replaced via assignment (e.g., job_context.log_context_fields = {...})

prop procJobProcess
Expand source code
@property
def proc(self) -> JobProcess:
    """Returns the process running the job. Useful for storing process-specific state."""
    return self._proc

Returns the process running the job. Useful for storing process-specific state.

prop room : rtc.Room
Expand source code
@property
def room(self) -> rtc.Room:
    """The Room object is the main interface that the worker should interact with.

    When the entrypoint is called, the worker has not connected to the Room yet.
    Certain properties of Room would not be available before calling JobContext.connect()
    """
    return self._room

The Room object is the main interface that the worker should interact with.

When the entrypoint is called, the worker has not connected to the Room yet. Certain properties of Room would not be available before calling JobContext.connect()

prop worker_id : str
Expand source code
@property
def worker_id(self) -> str:
    """Returns the id of the worker."""
    return self._info.worker_id

Returns the id of the worker.

Methods

def add_participant_entrypoint(self,
entrypoint_fnc: Callable[[JobContext, rtc.RemoteParticipant], Coroutine[None, None, None]],
*_,
kind: list[rtc.ParticipantKind.ValueType] | rtc.ParticipantKind.ValueType = [3, 0])
Expand source code
def add_participant_entrypoint(
    self,
    entrypoint_fnc: Callable[[JobContext, rtc.RemoteParticipant], Coroutine[None, None, None]],
    *_,
    kind: list[rtc.ParticipantKind.ValueType]
    | rtc.ParticipantKind.ValueType = DEFAULT_PARTICIPANT_KINDS,
):
    """Adds an entrypoint function to be run when a participant joins the room. In cases where
    the participant has already joined, the entrypoint will be run immediately. Multiple unique entrypoints can be
    added and they will each be run in parallel for each participant.
    """  # noqa: E501

    if entrypoint_fnc in [e for (e, _) in self._participant_entrypoints]:
        raise ValueError("entrypoints cannot be added more than once")

    self._participant_entrypoints.append((entrypoint_fnc, kind))

Adds an entrypoint function to be run when a participant joins the room. In cases where the participant has already joined, the entrypoint will be run immediately. Multiple unique entrypoints can be added and they will each be run in parallel for each participant.

def add_shutdown_callback(self,
callback: Callable[[], Coroutine[None, None, None]] | Callable[[str], Coroutine[None, None, None]]) ‑> None
Expand source code
def add_shutdown_callback(
    self,
    callback: Callable[[], Coroutine[None, None, None]]
    | Callable[[str], Coroutine[None, None, None]],
) -> None:
    """
    Add a callback to be called when the job is shutting down.
    Optionally the callback can take a single argument, the shutdown reason.
    """
    if callback.__code__.co_argcount > 0:
        self._shutdown_callbacks.append(callback)  # type: ignore
    else:

        async def wrapper(_: str) -> None:
            await callback()  # type: ignore

        self._shutdown_callbacks.append(wrapper)

Add a callback to be called when the job is shutting down. Optionally the callback can take a single argument, the shutdown reason.

def add_sip_participant(self,
*,
call_to: str,
trunk_id: str,
participant_identity: str,
participant_name: str | NotGivenOr[str] = 'SIP-participant') ‑> _asyncio.Future[sip.SIPParticipantInfo]
Expand source code
def add_sip_participant(
    self,
    *,
    call_to: str,
    trunk_id: str,
    participant_identity: str,
    participant_name: str | NotGivenOr[str] = "SIP-participant",
) -> asyncio.Future[api.SIPParticipantInfo]:
    """
    Add a SIP participant to the room.

    Args:
        call_to: The number or SIP destination to transfer the participant to.
                     This can either be a number (+12345555555) or a
                     sip host (sip:<user>@<host>)
        trunk_id: The ID of the SIP trunk to use
        participant_identity: The identity of the participant to add
        participant_name: The name of the participant to add

    Make sure you have an outbound SIP trunk created in LiveKit.
    See https://docs.livekit.io/sip/trunk-outbound/ for more information.
    """
    task = asyncio.create_task(
        self.api.sip.create_sip_participant(
            api.CreateSIPParticipantRequest(
                room_name=self._room.name,
                participant_identity=participant_identity,
                sip_trunk_id=trunk_id,
                sip_call_to=call_to,
                participant_name=participant_name,
            )
        ),
    )
    self._pending_tasks.append(task)
    task.add_done_callback(lambda _: self._pending_tasks.remove(task))
    return task

Add a SIP participant to the room.

Args

call_to
The number or SIP destination to transfer the participant to. This can either be a number (+12345555555) or a sip host (sip:@)
trunk_id
The ID of the SIP trunk to use
participant_identity
The identity of the participant to add
participant_name
The name of the participant to add

Make sure you have an outbound SIP trunk created in LiveKit. See https://docs.livekit.io/sip/trunk-outbound/ for more information.

def add_tracing_callback(self, callback: Callable[[], Coroutine[None, None, None]]) ‑> None
Expand source code
def add_tracing_callback(
    self,
    callback: Callable[[], Coroutine[None, None, None]],
) -> None:
    """
    Add a callback to be called when the job is about to receive a new tracing request.
    """
    self._tracing_callbacks.append(callback)

Add a callback to be called when the job is about to receive a new tracing request.

async def connect(self,
*,
e2ee: rtc.E2EEOptions | None = None,
auto_subscribe: AutoSubscribe = AutoSubscribe.SUBSCRIBE_ALL,
rtc_config: rtc.RtcConfiguration | None = None) ‑> None
Expand source code
async def connect(
    self,
    *,
    e2ee: rtc.E2EEOptions | None = None,
    auto_subscribe: AutoSubscribe = AutoSubscribe.SUBSCRIBE_ALL,
    rtc_config: rtc.RtcConfiguration | None = None,
) -> None:
    """Connect to the room. This method should be called only once.

    Args:
        e2ee: End-to-end encryption options. If provided, the Agent will utilize end-to-end encryption. Note: clients will also need to handle E2EE.
        auto_subscribe: Whether to automatically subscribe to tracks. Default is AutoSubscribe.SUBSCRIBE_ALL.
        rtc_config: Custom RTC configuration to use when connecting to the room.
    """  # noqa: E501
    room_options = rtc.RoomOptions(
        e2ee=e2ee,
        auto_subscribe=auto_subscribe == AutoSubscribe.SUBSCRIBE_ALL,
        rtc_config=rtc_config,
    )

    await self._room.connect(self._info.url, self._info.token, options=room_options)
    self._on_connect()
    for p in self._room.remote_participants.values():
        self._participant_available(p)

    _apply_auto_subscribe_opts(self._room, auto_subscribe)

Connect to the room. This method should be called only once.

Args

e2ee
End-to-end encryption options. If provided, the Agent will utilize end-to-end encryption. Note: clients will also need to handle E2EE.
auto_subscribe
Whether to automatically subscribe to tracks. Default is AutoSubscribe.SUBSCRIBE_ALL.
rtc_config
Custom RTC configuration to use when connecting to the room.
def delete_room(self) ‑> _asyncio.Future[room.DeleteRoomResponse]
Expand source code
def delete_room(self) -> asyncio.Future[api.DeleteRoomResponse]:
    """Deletes the room and disconnects all participants."""
    task = asyncio.create_task(
        self.api.room.delete_room(api.DeleteRoomRequest(room=self._room.name))
    )
    self._pending_tasks.append(task)
    task.add_done_callback(lambda _: self._pending_tasks.remove(task))
    return task

Deletes the room and disconnects all participants.

def shutdown(self, reason: str = '') ‑> None
Expand source code
def shutdown(self, reason: str = "") -> None:
    self._on_shutdown(reason)
def transfer_sip_participant(self,
participant: rtc.RemoteParticipant | str,
transfer_to: str,
play_dialtone: bool = False) ‑> _asyncio.Future[sip.SIPParticipantInfo]
Expand source code
def transfer_sip_participant(
    self,
    participant: rtc.RemoteParticipant | str,
    transfer_to: str,
    play_dialtone: bool = False,
) -> asyncio.Future[api.SIPParticipantInfo]:
    """Transfer a SIP participant to another number.

    Args:
        participant: The participant to transfer
        transfer_to: The number or SIP destination to transfer the participant to.
                     This can either be a number (+12345555555) or a
                     sip host (sip:<user>@<host>)
        play_dialtone: Whether to play a dialtone during transfer. Defaults to True.


    Returns:
        Future that completes when the transfer is complete

    Make sure you have enabled call transfer on your provider SIP trunk.
    See https://docs.livekit.io/sip/transfer-cold/ for more information.
    """
    assert participant.kind == rtc.ParticipantKind.PARTICIPANT_KIND_SIP, (
        "Participant must be a SIP participant"
    )
    task = asyncio.create_task(
        self.api.sip.transfer_sip_participant(
            api.TransferSIPParticipantRequest(
                room_name=self._room.name,
                participant_identity=participant.identity,
                transfer_to=transfer_to,
                play_dialtone=play_dialtone,
            )
        ),
    )
    self._pending_tasks.append(task)
    task.add_done_callback(lambda _: self._pending_tasks.remove(task))
    return task

Transfer a SIP participant to another number.

Args

participant
The participant to transfer
transfer_to
The number or SIP destination to transfer the participant to. This can either be a number (+12345555555) or a sip host (sip:@)
play_dialtone
Whether to play a dialtone during transfer. Defaults to True.

Returns

Future that completes when the transfer is complete Make sure you have enabled call transfer on your provider SIP trunk. See https://docs.livekit.io/sip/transfer-cold/ for more information.

async def wait_for_participant(self,
*,
identity: str | None = None,
kind: list[rtc.ParticipantKind.ValueType] | rtc.ParticipantKind.ValueType = [3, 0]) ‑> RemoteParticipant
Expand source code
async def wait_for_participant(
    self,
    *,
    identity: str | None = None,
    kind: list[rtc.ParticipantKind.ValueType]
    | rtc.ParticipantKind.ValueType = DEFAULT_PARTICIPANT_KINDS,
) -> rtc.RemoteParticipant:
    """
    Returns a participant that matches the given identity. If identity is None, the first
    participant that joins the room will be returned.
    If the participant has already joined, the function will return immediately.
    """
    return await wait_for_participant(self._room, identity=identity, kind=kind)

Returns a participant that matches the given identity. If identity is None, the first participant that joins the room will be returned. If the participant has already joined, the function will return immediately.

class JobExecutorType (*args, **kwds)
Expand source code
@unique
class JobExecutorType(Enum):
    PROCESS = "process"
    THREAD = "thread"

Create a collection of name/value pairs.

Example enumeration:

>>> class Color(Enum):
...     RED = 1
...     BLUE = 2
...     GREEN = 3

Access them by:

  • attribute access:

Color.RED

  • value lookup:

Color(1)

  • name lookup:

Color['RED']

Enumerations can be iterated over, and know how many members they have:

>>> len(Color)
3
>>> list(Color)
[<Color.RED: 1>, <Color.BLUE: 2>, <Color.GREEN: 3>]

Methods can be added to enumerations, and members can have their own attributes – see the documentation for details.

Ancestors

  • enum.Enum

Class variables

var PROCESS
var THREAD
class JobProcess (*,
executor_type: JobExecutorType,
user_arguments: Any | None,
http_proxy: str | None)
Expand source code
class JobProcess:
    def __init__(
        self,
        *,
        executor_type: JobExecutorType,
        user_arguments: Any | None,
        http_proxy: str | None,
    ) -> None:
        self._executor_type = executor_type
        self._mp_proc = mp.current_process()
        self._userdata: dict[str, Any] = {}
        self._user_arguments = user_arguments
        self._http_proxy: str | None = http_proxy

    @property
    def executor_type(self) -> JobExecutorType:
        return self._executor_type

    @property
    def pid(self) -> int | None:
        return self._mp_proc.pid

    @property
    def userdata(self) -> dict:
        return self._userdata

    @property
    def user_arguments(self) -> Any | None:
        return self._user_arguments

    @property
    def http_proxy(self) -> str | None:
        return self._http_proxy

Instance variables

prop executor_typeJobExecutorType
Expand source code
@property
def executor_type(self) -> JobExecutorType:
    return self._executor_type
prop http_proxy : str | None
Expand source code
@property
def http_proxy(self) -> str | None:
    return self._http_proxy
prop pid : int | None
Expand source code
@property
def pid(self) -> int | None:
    return self._mp_proc.pid
prop user_arguments : Any | None
Expand source code
@property
def user_arguments(self) -> Any | None:
    return self._user_arguments
prop userdata : dict
Expand source code
@property
def userdata(self) -> dict:
    return self._userdata
class JobRequest (*,
job: agent.Job,
on_reject: Callable[[], Coroutine[None, None, None]],
on_accept: Callable[[JobAcceptArguments], Coroutine[None, None, None]])
Expand source code
class JobRequest:
    def __init__(
        self,
        *,
        job: agent.Job,
        on_reject: Callable[[], Coroutine[None, None, None]],
        on_accept: Callable[[JobAcceptArguments], Coroutine[None, None, None]],
    ) -> None:
        self._job = job
        self._lock = asyncio.Lock()
        self._on_reject = on_reject
        self._on_accept = on_accept

    @property
    def id(self) -> str:
        return self._job.id

    @property
    def job(self) -> agent.Job:
        return self._job

    @property
    def room(self) -> models.Room:
        return self._job.room

    @property
    def publisher(self) -> models.ParticipantInfo | None:
        return self._job.participant

    @property
    def agent_name(self) -> str:
        return self._job.agent_name

    async def reject(self) -> None:
        """Reject the job request. The job may be assigned to another worker"""
        await self._on_reject()

    async def accept(
        self,
        *,
        name: str = "",
        identity: str = "",
        metadata: str = "",
        attributes: dict[str, str] | None = None,
    ) -> None:
        """Accept the job request, and start the job if the LiveKit SFU assigns the job to our worker."""  # noqa: E501
        if not identity:
            identity = "agent-" + self.id

        accept_arguments = JobAcceptArguments(
            name=name,
            identity=identity,
            metadata=metadata,
            attributes=attributes,
        )

        await self._on_accept(accept_arguments)

Instance variables

prop agent_name : str
Expand source code
@property
def agent_name(self) -> str:
    return self._job.agent_name
prop id : str
Expand source code
@property
def id(self) -> str:
    return self._job.id
prop job : agent.Job
Expand source code
@property
def job(self) -> agent.Job:
    return self._job
prop publisher : models.ParticipantInfo | None
Expand source code
@property
def publisher(self) -> models.ParticipantInfo | None:
    return self._job.participant
prop room : models.Room
Expand source code
@property
def room(self) -> models.Room:
    return self._job.room

Methods

async def accept(self,
*,
name: str = '',
identity: str = '',
metadata: str = '',
attributes: dict[str, str] | None = None) ‑> None
Expand source code
async def accept(
    self,
    *,
    name: str = "",
    identity: str = "",
    metadata: str = "",
    attributes: dict[str, str] | None = None,
) -> None:
    """Accept the job request, and start the job if the LiveKit SFU assigns the job to our worker."""  # noqa: E501
    if not identity:
        identity = "agent-" + self.id

    accept_arguments = JobAcceptArguments(
        name=name,
        identity=identity,
        metadata=metadata,
        attributes=attributes,
    )

    await self._on_accept(accept_arguments)

Accept the job request, and start the job if the LiveKit SFU assigns the job to our worker.

async def reject(self) ‑> None
Expand source code
async def reject(self) -> None:
    """Reject the job request. The job may be assigned to another worker"""
    await self._on_reject()

Reject the job request. The job may be assigned to another worker

class MetricsCollectedEvent (**data: Any)
Expand source code
class MetricsCollectedEvent(BaseModel):
    type: Literal["metrics_collected"] = "metrics_collected"
    metrics: AgentMetrics

Usage docs: https://docs.pydantic.dev/2.10/concepts/models/

A base class for creating Pydantic models.

Attributes

__class_vars__
The names of the class variables defined on the model.
__private_attributes__
Metadata about the private attributes of the model.
__signature__
The synthesized __init__ [Signature][inspect.Signature] of the model.
__pydantic_complete__
Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__
The core schema of the model.
__pydantic_custom_init__
Whether the model has a custom __init__ function.
__pydantic_decorators__
Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
__pydantic_generic_metadata__
Metadata for generic models; contains data used for a similar purpose to args, origin, parameters in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__
Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__
The name of the post-init method for the model, if defined.
__pydantic_root_model__
Whether the model is a [RootModel][pydantic.root_model.RootModel].
__pydantic_serializer__
The pydantic-core SchemaSerializer used to dump instances of the model.
__pydantic_validator__
The pydantic-core SchemaValidator used to validate instances of the model.
__pydantic_fields__
A dictionary of field names and their corresponding [FieldInfo][pydantic.fields.FieldInfo] objects.
__pydantic_computed_fields__
A dictionary of computed field names and their corresponding [ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects.
__pydantic_extra__
A dictionary containing extra values, if [extra][pydantic.config.ConfigDict.extra] is set to 'allow'.
__pydantic_fields_set__
The names of fields explicitly set during instantiation.
__pydantic_private__
Values of private attributes set on the model instance.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • pydantic.main.BaseModel

Class variables

var metrics : livekit.agents.metrics.base.STTMetrics | livekit.agents.metrics.base.LLMMetrics | livekit.agents.metrics.base.TTSMetrics | livekit.agents.metrics.base.VADMetrics | livekit.agents.metrics.base.EOUMetrics
var model_config
var type : Literal['metrics_collected']
class ModelSettings (tool_choice: NotGivenOr[llm.ToolChoice] = NOT_GIVEN)
Expand source code
@dataclass
class ModelSettings:
    tool_choice: NotGivenOr[llm.ToolChoice] = NOT_GIVEN
    """The tool choice to use when calling the LLM."""

ModelSettings(tool_choice: 'NotGivenOr[llm.ToolChoice]' = NOT_GIVEN)

Instance variables

var tool_choice : livekit.agents.llm.tool_context.NamedToolChoice | Literal['auto', 'required', 'none'] | livekit.agents.types.NotGiven

The tool choice to use when calling the LLM.

class NotGiven
Expand source code
class NotGiven:
    def __bool__(self) -> Literal[False]:
        return False

    def __repr__(self) -> str:
        return "NOT_GIVEN"
class Plugin (title: str, version: str, package: str, logger: logging.Logger | None = None)
Expand source code
class Plugin(ABC):  # noqa: B024
    registered_plugins: list[Plugin] = []
    emitter: utils.EventEmitter[EventTypes] = utils.EventEmitter()

    # TODO(theomonnom): make logger mandatory once all plugins have been updated
    def __init__(
        self,
        title: str,
        version: str,
        package: str,
        logger: logging.Logger | None = None,
    ) -> None:
        self._title = title
        self._version = version
        self._package = package
        self._logger = logger

    @classmethod
    def register_plugin(cls, plugin: Plugin) -> None:
        if threading.current_thread() != threading.main_thread():
            raise RuntimeError("Plugins must be registered on the main thread")

        cls.registered_plugins.append(plugin)
        cls.emitter.emit("plugin_registered", plugin)

    # plugin can implement an optional download_files method
    def download_files(self) -> None:  # noqa: B027
        pass

    @property
    def package(self) -> str:
        return self._package

    @property
    def title(self) -> str:
        return self._title

    @property
    def version(self) -> str:
        return self._version

    @property
    def logger(self) -> logging.Logger | None:
        return self._logger

Helper class that provides a standard way to create an ABC using inheritance.

Ancestors

  • abc.ABC

Subclasses

  • livekit.plugins.anthropic.AnthropicPlugin
  • livekit.plugins.assemblyai.AssemblyAIPlugin
  • livekit.plugins.aws.AWSPlugin
  • livekit.plugins.azure.AzurePlugin
  • livekit.plugins.bey.BeyPlugin
  • livekit.plugins.bithuman.BitHumanPlugin
  • livekit.plugins.cartesia.CartesiaPlugin
  • livekit.plugins.clova.ClovaSTTPlugin
  • livekit.plugins.deepgram.DeepgramPlugin
  • livekit.plugins.elevenlabs.ElevenLabsPlugin
  • livekit.plugins.fal.FalPlugin
  • livekit.plugins.gladia.GladiaPlugin
  • livekit.plugins.google.GooglePlugin
  • livekit.plugins.groq.GroqPlugin
  • MinimalPlugin
  • livekit.plugins.neuphonic.NeuphonicPlugin
  • livekit.plugins.nltk.NltkPlugin
  • livekit.plugins.openai.OpenAIPlugin
  • livekit.plugins.playai.PlayAIPlugin
  • livekit.plugins.resemble.ResemblePlugin
  • livekit.plugins.rime.RimePlugin
  • livekit.plugins.silero.SileroPlugin
  • livekit.plugins.speechify.SpeechifyPlugin
  • livekit.plugins.speechmatics.SpeechmaticsPlugin
  • livekit.plugins.tavus.TavusPlugin
  • livekit.plugins.turn_detector.EOUPlugin

Class variables

var emitterEventEmitter[typing.Literal['plugin_registered']]
var registered_plugins : list[livekit.agents.plugin.Plugin]

Static methods

def register_plugin(plugin: Plugin) ‑> None

Instance variables

prop logger : logging.Logger | None
Expand source code
@property
def logger(self) -> logging.Logger | None:
    return self._logger
prop package : str
Expand source code
@property
def package(self) -> str:
    return self._package
prop title : str
Expand source code
@property
def title(self) -> str:
    return self._title
prop version : str
Expand source code
@property
def version(self) -> str:
    return self._version

Methods

def download_files(self) ‑> None
Expand source code
def download_files(self) -> None:  # noqa: B027
    pass
class RoomIO (agent_session: AgentSession,
room: rtc.Room,
*,
participant: rtc.RemoteParticipant | str | None = None,
input_options: RoomInputOptions = RoomInputOptions(text_enabled=True, audio_enabled=True, video_enabled=False, audio_sample_rate=24000, audio_num_channels=1, noise_cancellation=None, text_input_cb=<function _default_text_input_cb>, participant_kinds=NOT_GIVEN, participant_identity=NOT_GIVEN),
output_options: RoomOutputOptions = RoomOutputOptions(transcription_enabled=True, audio_enabled=True, audio_sample_rate=24000, audio_num_channels=1, audio_publish_options=source: SOURCE_MICROPHONE , sync_transcription=NOT_GIVEN))
Expand source code
class RoomIO:
    def __init__(
        self,
        agent_session: AgentSession,
        room: rtc.Room,
        *,
        participant: rtc.RemoteParticipant | str | None = None,
        input_options: RoomInputOptions = DEFAULT_ROOM_INPUT_OPTIONS,
        output_options: RoomOutputOptions = DEFAULT_ROOM_OUTPUT_OPTIONS,
    ) -> None:
        self._agent_session, self._room = agent_session, room
        self._input_options = input_options
        self._output_options = output_options
        self._participant_identity = (
            participant.identity if isinstance(participant, rtc.RemoteParticipant) else participant
        )
        if self._participant_identity is None and utils.is_given(
            input_options.participant_identity
        ):
            self._participant_identity = input_options.participant_identity

        self._audio_input: _ParticipantAudioInputStream | None = None
        self._video_input: _ParticipantVideoInputStream | None = None
        self._audio_output: _ParticipantAudioOutput | None = None
        self._user_tr_output: _ParallelTextOutput | None = None
        self._agent_tr_output: _ParallelTextOutput | None = None
        self._tr_synchronizer: TranscriptSynchronizer | None = None

        self._participant_available_fut = asyncio.Future[rtc.RemoteParticipant]()

        self._tasks: set[asyncio.Task] = set()
        self._update_state_task: asyncio.Task | None = None

    async def start(self) -> None:
        self._room.on("participant_connected", self._on_participant_connected)
        self._room.on("participant_disconnected", self._on_participant_disconnected)

        for participant in self._room.remote_participants.values():
            self._on_participant_connected(participant)

        if self._input_options.text_enabled:
            try:
                self._room.register_text_stream_handler(TOPIC_CHAT, self._on_user_text_input)
            except ValueError:
                logger.warning(
                    f"text stream handler for topic '{TOPIC_CHAT}' already set, ignoring"
                )

        if self._input_options.video_enabled:
            self._video_input = _ParticipantVideoInputStream(self._room)

        if self._input_options.audio_enabled:
            self._audio_input = _ParticipantAudioInputStream(
                self._room,
                sample_rate=self._input_options.audio_sample_rate,
                num_channels=self._input_options.audio_num_channels,
                noise_cancellation=self._input_options.noise_cancellation,
            )

        def _create_transcription_output(
            is_delta_stream: bool, participant: rtc.Participant | str | None = None
        ) -> _ParallelTextOutput:
            return _ParallelTextOutput(
                [
                    _ParticipantLegacyTranscriptionOutput(
                        room=self._room, is_delta_stream=is_delta_stream, participant=participant
                    ),
                    _ParticipantTranscriptionOutput(
                        room=self._room, is_delta_stream=is_delta_stream, participant=participant
                    ),
                ],
                next_in_chain=None,
            )

        if self._output_options.audio_enabled:
            self._audio_output = _ParticipantAudioOutput(
                self._room,
                sample_rate=self._output_options.audio_sample_rate,
                num_channels=self._output_options.audio_num_channels,
                track_publish_options=self._output_options.audio_publish_options,
            )

        if self._output_options.transcription_enabled:
            self._user_tr_output = _create_transcription_output(
                is_delta_stream=False, participant=self._participant_identity
            )
            self._agent_tr_output = _create_transcription_output(
                is_delta_stream=True, participant=self._room.local_participant
            )

            # use the RoomIO's audio output if available, otherwise use the agent's audio output
            # (e.g the audio output isn't using RoomIO with our avatar datastream impl)
            sync_transcription = True
            if utils.is_given(self._output_options.sync_transcription):
                sync_transcription = self._output_options.sync_transcription

            if sync_transcription and (
                audio_output := self._audio_output or self._agent_session.output.audio
            ):
                self._tr_synchronizer = TranscriptSynchronizer(
                    next_in_chain_audio=audio_output, next_in_chain_text=self._agent_tr_output
                )

        # TODO(theomonnom): ideally we're consistent and every input/output has a start method
        if self._audio_output:
            await self._audio_output.start()

        # wait for the specified participant or the first participant joined
        input_participant = await self._participant_available_fut
        self.set_participant(input_participant.identity)

        if self.audio_input:
            self._agent_session.input.audio = self.audio_input

        if self.video_input:
            self._agent_session.input.video = self.video_input

        if self.audio_output:
            self._agent_session.output.audio = self.audio_output

        if self.transcription_output:
            self._agent_session.output.transcription = self.transcription_output

        self._agent_session.on("agent_state_changed", self._on_agent_state_changed)
        self._agent_session.on("user_input_transcribed", self._on_user_input_transcribed)
        self._agent_session._room_io = self

    async def aclose(self) -> None:
        self._room.off("participant_connected", self._on_participant_connected)
        self._room.off("participant_disconnected", self._on_participant_disconnected)

        if self._audio_input:
            await self._audio_input.aclose()
        if self._video_input:
            await self._video_input.aclose()

        if self._tr_synchronizer:
            await self._tr_synchronizer.aclose()

        # cancel and wait for all pending tasks
        await utils.aio.cancel_and_wait(*self._tasks)
        self._tasks.clear()

    @property
    def audio_output(self) -> AudioOutput | None:
        if self._tr_synchronizer:
            return self._tr_synchronizer.audio_output

        return self._audio_output

    @property
    def transcription_output(self) -> TextOutput | None:
        if self._tr_synchronizer:
            return self._tr_synchronizer.text_output

        return self._agent_tr_output

    @property
    def audio_input(self) -> AudioInput | None:
        return self._audio_input

    @property
    def video_input(self) -> VideoInput | None:
        return self._video_input

    @property
    def linked_participant(self) -> rtc.RemoteParticipant | None:
        if not self._participant_available_fut.done():
            return None
        return self._participant_available_fut.result()

    def set_participant(self, participant_identity: str | None) -> None:
        """Switch audio and video streams to specified participant"""
        if participant_identity is None:
            self.unset_participant()
            return

        if (
            self._participant_identity is not None
            and self._participant_identity != participant_identity
        ):
            # reset future if switching to a different participant
            self._participant_available_fut = asyncio.Future[rtc.RemoteParticipant]()

            # check if new participant is already connected
            for participant in self._room.remote_participants.values():
                if participant.identity == participant_identity:
                    self._participant_available_fut.set_result(participant)
                    break

        # update participant identity and handlers
        self._participant_identity = participant_identity
        if self._audio_input:
            self._audio_input.set_participant(participant_identity)
        if self._video_input:
            self._video_input.set_participant(participant_identity)

        self._update_user_transcription(participant_identity)

    def unset_participant(self) -> None:
        self._participant_identity = None
        self._participant_available_fut = asyncio.Future[rtc.RemoteParticipant]()
        if self._audio_input:
            self._audio_input.set_participant(None)
        if self._video_input:
            self._video_input.set_participant(None)
        self._update_user_transcription(None)

    def _on_participant_connected(self, participant: rtc.RemoteParticipant) -> None:
        if self._participant_available_fut.done():
            return

        if self._participant_identity is not None:
            if participant.identity != self._participant_identity:
                return
        # otherwise, skip participants that are marked as publishing for this agent
        elif (
            participant.attributes.get(ATTRIBUTE_PUBLISH_ON_BEHALF)
            == self._room.local_participant.identity
        ):
            return

        accepted_kinds = self._input_options.participant_kinds or DEFAULT_PARTICIPANT_KINDS
        if participant.kind not in accepted_kinds:
            # not an accepted participant kind, skip
            return

        self._participant_available_fut.set_result(participant)

    def _on_participant_disconnected(self, participant: rtc.RemoteParticipant) -> None:
        if self._participant_identity is None or self._participant_identity != participant.identity:
            return

    def _on_user_input_transcribed(self, ev: UserInputTranscribedEvent) -> None:
        async def _capture_text():
            if self._user_tr_output is None:
                return

            await self._user_tr_output.capture_text(ev.transcript)
            if ev.is_final:
                # TODO(theomonnom): should we wait for the end of turn before sending the final transcript?  # noqa: E501
                self._user_tr_output.flush()

        task = asyncio.create_task(_capture_text())
        self._tasks.add(task)
        task.add_done_callback(self._tasks.discard)

    def _on_user_text_input(self, reader: rtc.TextStreamReader, participant_identity: str) -> None:
        if participant_identity != self._participant_identity:
            return

        participant = self._room.remote_participants.get(participant_identity)
        if not participant:
            logger.warning("participant not found, ignoring text input")
            return

        async def _read_text():
            text = await reader.read_all()

            if self._input_options.text_input_cb:
                text_input_result = self._input_options.text_input_cb(
                    self._agent_session,
                    TextInputEvent(text=text, info=reader.info, participant=participant),
                )
                if asyncio.iscoroutine(text_input_result):
                    await text_input_result

        task = asyncio.create_task(_read_text())
        self._tasks.add(task)
        task.add_done_callback(self._tasks.discard)

    def _on_agent_state_changed(self, ev: AgentStateChangedEvent):
        @utils.log_exceptions(logger=logger)
        async def _set_state() -> None:
            if self._room.isconnected():
                await self._room.local_participant.set_attributes(
                    {ATTRIBUTE_AGENT_STATE: ev.new_state}
                )

        if self._update_state_task is not None:
            self._update_state_task.cancel()

        self._update_state_task = asyncio.create_task(_set_state())

    def _update_user_transcription(self, participant_identity: str | None) -> None:
        if not self._user_tr_output:
            return

        for sink in self._user_tr_output._sinks:
            assert isinstance(
                sink,
                (
                    _ParticipantLegacyTranscriptionOutput,
                    _ParticipantTranscriptionOutput,
                ),
            )
            sink.set_participant(participant_identity)

Instance variables

prop audio_input : AudioInput | None
Expand source code
@property
def audio_input(self) -> AudioInput | None:
    return self._audio_input
prop audio_output : AudioOutput | None
Expand source code
@property
def audio_output(self) -> AudioOutput | None:
    if self._tr_synchronizer:
        return self._tr_synchronizer.audio_output

    return self._audio_output
prop linked_participant : rtc.RemoteParticipant | None
Expand source code
@property
def linked_participant(self) -> rtc.RemoteParticipant | None:
    if not self._participant_available_fut.done():
        return None
    return self._participant_available_fut.result()
prop transcription_output : TextOutput | None
Expand source code
@property
def transcription_output(self) -> TextOutput | None:
    if self._tr_synchronizer:
        return self._tr_synchronizer.text_output

    return self._agent_tr_output
prop video_input : VideoInput | None
Expand source code
@property
def video_input(self) -> VideoInput | None:
    return self._video_input

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    self._room.off("participant_connected", self._on_participant_connected)
    self._room.off("participant_disconnected", self._on_participant_disconnected)

    if self._audio_input:
        await self._audio_input.aclose()
    if self._video_input:
        await self._video_input.aclose()

    if self._tr_synchronizer:
        await self._tr_synchronizer.aclose()

    # cancel and wait for all pending tasks
    await utils.aio.cancel_and_wait(*self._tasks)
    self._tasks.clear()
def set_participant(self, participant_identity: str | None) ‑> None
Expand source code
def set_participant(self, participant_identity: str | None) -> None:
    """Switch audio and video streams to specified participant"""
    if participant_identity is None:
        self.unset_participant()
        return

    if (
        self._participant_identity is not None
        and self._participant_identity != participant_identity
    ):
        # reset future if switching to a different participant
        self._participant_available_fut = asyncio.Future[rtc.RemoteParticipant]()

        # check if new participant is already connected
        for participant in self._room.remote_participants.values():
            if participant.identity == participant_identity:
                self._participant_available_fut.set_result(participant)
                break

    # update participant identity and handlers
    self._participant_identity = participant_identity
    if self._audio_input:
        self._audio_input.set_participant(participant_identity)
    if self._video_input:
        self._video_input.set_participant(participant_identity)

    self._update_user_transcription(participant_identity)

Switch audio and video streams to specified participant

async def start(self) ‑> None
Expand source code
async def start(self) -> None:
    self._room.on("participant_connected", self._on_participant_connected)
    self._room.on("participant_disconnected", self._on_participant_disconnected)

    for participant in self._room.remote_participants.values():
        self._on_participant_connected(participant)

    if self._input_options.text_enabled:
        try:
            self._room.register_text_stream_handler(TOPIC_CHAT, self._on_user_text_input)
        except ValueError:
            logger.warning(
                f"text stream handler for topic '{TOPIC_CHAT}' already set, ignoring"
            )

    if self._input_options.video_enabled:
        self._video_input = _ParticipantVideoInputStream(self._room)

    if self._input_options.audio_enabled:
        self._audio_input = _ParticipantAudioInputStream(
            self._room,
            sample_rate=self._input_options.audio_sample_rate,
            num_channels=self._input_options.audio_num_channels,
            noise_cancellation=self._input_options.noise_cancellation,
        )

    def _create_transcription_output(
        is_delta_stream: bool, participant: rtc.Participant | str | None = None
    ) -> _ParallelTextOutput:
        return _ParallelTextOutput(
            [
                _ParticipantLegacyTranscriptionOutput(
                    room=self._room, is_delta_stream=is_delta_stream, participant=participant
                ),
                _ParticipantTranscriptionOutput(
                    room=self._room, is_delta_stream=is_delta_stream, participant=participant
                ),
            ],
            next_in_chain=None,
        )

    if self._output_options.audio_enabled:
        self._audio_output = _ParticipantAudioOutput(
            self._room,
            sample_rate=self._output_options.audio_sample_rate,
            num_channels=self._output_options.audio_num_channels,
            track_publish_options=self._output_options.audio_publish_options,
        )

    if self._output_options.transcription_enabled:
        self._user_tr_output = _create_transcription_output(
            is_delta_stream=False, participant=self._participant_identity
        )
        self._agent_tr_output = _create_transcription_output(
            is_delta_stream=True, participant=self._room.local_participant
        )

        # use the RoomIO's audio output if available, otherwise use the agent's audio output
        # (e.g the audio output isn't using RoomIO with our avatar datastream impl)
        sync_transcription = True
        if utils.is_given(self._output_options.sync_transcription):
            sync_transcription = self._output_options.sync_transcription

        if sync_transcription and (
            audio_output := self._audio_output or self._agent_session.output.audio
        ):
            self._tr_synchronizer = TranscriptSynchronizer(
                next_in_chain_audio=audio_output, next_in_chain_text=self._agent_tr_output
            )

    # TODO(theomonnom): ideally we're consistent and every input/output has a start method
    if self._audio_output:
        await self._audio_output.start()

    # wait for the specified participant or the first participant joined
    input_participant = await self._participant_available_fut
    self.set_participant(input_participant.identity)

    if self.audio_input:
        self._agent_session.input.audio = self.audio_input

    if self.video_input:
        self._agent_session.input.video = self.video_input

    if self.audio_output:
        self._agent_session.output.audio = self.audio_output

    if self.transcription_output:
        self._agent_session.output.transcription = self.transcription_output

    self._agent_session.on("agent_state_changed", self._on_agent_state_changed)
    self._agent_session.on("user_input_transcribed", self._on_user_input_transcribed)
    self._agent_session._room_io = self
def unset_participant(self) ‑> None
Expand source code
def unset_participant(self) -> None:
    self._participant_identity = None
    self._participant_available_fut = asyncio.Future[rtc.RemoteParticipant]()
    if self._audio_input:
        self._audio_input.set_participant(None)
    if self._video_input:
        self._video_input.set_participant(None)
    self._update_user_transcription(None)
class RoomInputOptions (text_enabled: bool = True,
audio_enabled: bool = True,
video_enabled: bool = False,
audio_sample_rate: int = 24000,
audio_num_channels: int = 1,
noise_cancellation: rtc.NoiseCancellationOptions | None = None,
text_input_cb: TextInputCallback = <function _default_text_input_cb>,
participant_kinds: NotGivenOr[list[rtc.ParticipantKind.ValueType]] = NOT_GIVEN,
participant_identity: NotGivenOr[str] = NOT_GIVEN)
Expand source code
@dataclass
class RoomInputOptions:
    text_enabled: bool = True
    audio_enabled: bool = True
    video_enabled: bool = False
    audio_sample_rate: int = 24000
    audio_num_channels: int = 1
    noise_cancellation: rtc.NoiseCancellationOptions | None = None
    text_input_cb: TextInputCallback = _default_text_input_cb
    participant_kinds: NotGivenOr[list[rtc.ParticipantKind.ValueType]] = NOT_GIVEN
    """Participant kinds accepted for auto subscription. If not provided,
    accept `DEFAULT_PARTICIPANT_KINDS`."""
    participant_identity: NotGivenOr[str] = NOT_GIVEN
    """The participant to link to. If not provided, link to the first participant.
    Can be overridden by the `participant` argument of RoomIO constructor or `set_participant`."""

RoomInputOptions(text_enabled: 'bool' = True, audio_enabled: 'bool' = True, video_enabled: 'bool' = False, audio_sample_rate: 'int' = 24000, audio_num_channels: 'int' = 1, noise_cancellation: 'rtc.NoiseCancellationOptions | None' = None, text_input_cb: 'TextInputCallback' = , participant_kinds: 'NotGivenOr[list[rtc.ParticipantKind.ValueType]]' = NOT_GIVEN, participant_identity: 'NotGivenOr[str]' = NOT_GIVEN)

Instance variables

var audio_enabled : bool
var audio_num_channels : int
var audio_sample_rate : int
var noise_cancellation : rtc.NoiseCancellationOptions | None
var participant_identity : NotGivenOr[str]

The participant to link to. If not provided, link to the first participant. Can be overridden by the participant argument of RoomIO constructor or set_participant.

var participant_kinds : NotGivenOr[list[rtc.ParticipantKind.ValueType]]

Participant kinds accepted for auto subscription. If not provided, accept DEFAULT_PARTICIPANT_KINDS.

var text_enabled : bool
var video_enabled : bool

Methods

def text_input_cb(sess: AgentSession,
ev: TextInputEvent) ‑> None
Expand source code
def _default_text_input_cb(sess: AgentSession, ev: TextInputEvent) -> None:
    sess.interrupt()
    sess.generate_reply(user_input=ev.text)
class RoomOutputOptions (transcription_enabled: bool = True,
audio_enabled: bool = True,
audio_sample_rate: int = 24000,
audio_num_channels: int = 1,
audio_publish_options: rtc.TrackPublishOptions = <factory>,
sync_transcription: NotGivenOr[bool] = NOT_GIVEN)
Expand source code
@dataclass
class RoomOutputOptions:
    transcription_enabled: bool = True
    audio_enabled: bool = True
    audio_sample_rate: int = 24000
    audio_num_channels: int = 1
    audio_publish_options: rtc.TrackPublishOptions = field(
        default_factory=lambda: rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_MICROPHONE)
    )
    sync_transcription: NotGivenOr[bool] = NOT_GIVEN
    """False to disable transcription synchronization with audio output.
    Otherwise, transcription is emitted as quickly as available."""

RoomOutputOptions(transcription_enabled: 'bool' = True, audio_enabled: 'bool' = True, audio_sample_rate: 'int' = 24000, audio_num_channels: 'int' = 1, audio_publish_options: 'rtc.TrackPublishOptions' = , sync_transcription: 'NotGivenOr[bool]' = NOT_GIVEN)

Instance variables

var audio_enabled : bool
var audio_num_channels : int
var audio_publish_options : room_pb2.TrackPublishOptions
var audio_sample_rate : int
var sync_transcription : bool | livekit.agents.types.NotGiven

False to disable transcription synchronization with audio output. Otherwise, transcription is emitted as quickly as available.

var transcription_enabled : bool
class RunContext (*,
session: AgentSession,
speech_handle: SpeechHandle,
function_call: FunctionCall)
Expand source code
class RunContext(Generic[Userdata_T]):
    # private ctor
    def __init__(
        self,
        *,
        session: AgentSession,
        speech_handle: SpeechHandle,
        function_call: FunctionCall,
    ) -> None:
        self._session = session
        self._speech_handle = speech_handle
        self._function_call = function_call

    @property
    def session(self) -> AgentSession[Userdata_T]:
        return self._session

    @property
    def speech_handle(self) -> SpeechHandle:
        return self._speech_handle

    @property
    def function_call(self) -> FunctionCall:
        return self._function_call

    @property
    def userdata(self) -> Userdata_T:
        return self.session.userdata

Abstract base class for generic types.

On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::

class Mapping[KT, VT]:
    def __getitem__(self, key: KT) -> VT:
        ...
    # Etc.

On older versions of Python, however, generic classes have to explicitly inherit from Generic.

After a class has been declared to be generic, it can then be used as follows::

def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT:
    try:
        return mapping[key]
    except KeyError:
        return default

Ancestors

  • typing.Generic

Instance variables

prop function_callFunctionCall
Expand source code
@property
def function_call(self) -> FunctionCall:
    return self._function_call
prop sessionAgentSession[Userdata_T]
Expand source code
@property
def session(self) -> AgentSession[Userdata_T]:
    return self._session
prop speech_handle : SpeechHandle
Expand source code
@property
def speech_handle(self) -> SpeechHandle:
    return self._speech_handle
prop userdata : Userdata_T
Expand source code
@property
def userdata(self) -> Userdata_T:
    return self.session.userdata
class SimulateJobInfo (room: str, participant_identity: str | None = None)
Expand source code
@dataclass
class SimulateJobInfo:
    room: str
    participant_identity: str | None = None

SimulateJobInfo(room: 'str', participant_identity: 'str | None' = None)

Instance variables

var participant_identity : str | None
var room : str
class SpeechCreatedEvent (**data: Any)
Expand source code
class SpeechCreatedEvent(BaseModel):
    model_config = ConfigDict(arbitrary_types_allowed=True)

    type: Literal["speech_created"] = "speech_created"
    user_initiated: bool
    """True if the speech was created using public methods like `say` or `generate_reply`"""
    source: Literal["say", "generate_reply", "tool_response"]
    """Source indicating how the speech handle was created"""
    speech_handle: SpeechHandle = Field(..., exclude=True)
    """The speech handle that was created"""

Usage docs: https://docs.pydantic.dev/2.10/concepts/models/

A base class for creating Pydantic models.

Attributes

__class_vars__
The names of the class variables defined on the model.
__private_attributes__
Metadata about the private attributes of the model.
__signature__
The synthesized __init__ [Signature][inspect.Signature] of the model.
__pydantic_complete__
Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__
The core schema of the model.
__pydantic_custom_init__
Whether the model has a custom __init__ function.
__pydantic_decorators__
Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
__pydantic_generic_metadata__
Metadata for generic models; contains data used for a similar purpose to args, origin, parameters in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__
Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__
The name of the post-init method for the model, if defined.
__pydantic_root_model__
Whether the model is a [RootModel][pydantic.root_model.RootModel].
__pydantic_serializer__
The pydantic-core SchemaSerializer used to dump instances of the model.
__pydantic_validator__
The pydantic-core SchemaValidator used to validate instances of the model.
__pydantic_fields__
A dictionary of field names and their corresponding [FieldInfo][pydantic.fields.FieldInfo] objects.
__pydantic_computed_fields__
A dictionary of computed field names and their corresponding [ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects.
__pydantic_extra__
A dictionary containing extra values, if [extra][pydantic.config.ConfigDict.extra] is set to 'allow'.
__pydantic_fields_set__
The names of fields explicitly set during instantiation.
__pydantic_private__
Values of private attributes set on the model instance.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • pydantic.main.BaseModel

Class variables

var model_config
var source : Literal['say', 'generate_reply', 'tool_response']

Source indicating how the speech handle was created

var speech_handle : livekit.agents.voice.speech_handle.SpeechHandle

The speech handle that was created

var type : Literal['speech_created']
var user_initiated : bool

True if the speech was created using public methods like say or generate_reply

class StopResponse
Expand source code
class StopResponse(Exception):
    def __init__(self) -> None:
        """
        Exception raised within AI functions.

        This exception can be raised by the user to indicate that
        the agent should not generate a response for the current
        function call.
        """
        super().__init__()

Common base class for all non-exit exceptions.

Exception raised within AI functions.

This exception can be raised by the user to indicate that the agent should not generate a response for the current function call.

Ancestors

  • builtins.Exception
  • builtins.BaseException
class ToolError (message: str)
Expand source code
class ToolError(Exception):
    def __init__(self, message: str) -> None:
        """
        Exception raised within AI functions.

        This exception should be raised by users when an error occurs
        in the context of AI operations. The provided message will be
        visible to the LLM, allowing it to understand the context of
        the error during FunctionOutput generation.
        """
        super().__init__(message)
        self._message = message

    @property
    def message(self) -> str:
        return self._message

Common base class for all non-exit exceptions.

Exception raised within AI functions.

This exception should be raised by users when an error occurs in the context of AI operations. The provided message will be visible to the LLM, allowing it to understand the context of the error during FunctionOutput generation.

Ancestors

  • builtins.Exception
  • builtins.BaseException

Instance variables

prop message : str
Expand source code
@property
def message(self) -> str:
    return self._message
class UserInputTranscribedEvent (**data: Any)
Expand source code
class UserInputTranscribedEvent(BaseModel):
    type: Literal["user_input_transcribed"] = "user_input_transcribed"
    transcript: str
    is_final: bool

Usage docs: https://docs.pydantic.dev/2.10/concepts/models/

A base class for creating Pydantic models.

Attributes

__class_vars__
The names of the class variables defined on the model.
__private_attributes__
Metadata about the private attributes of the model.
__signature__
The synthesized __init__ [Signature][inspect.Signature] of the model.
__pydantic_complete__
Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__
The core schema of the model.
__pydantic_custom_init__
Whether the model has a custom __init__ function.
__pydantic_decorators__
Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
__pydantic_generic_metadata__
Metadata for generic models; contains data used for a similar purpose to args, origin, parameters in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__
Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__
The name of the post-init method for the model, if defined.
__pydantic_root_model__
Whether the model is a [RootModel][pydantic.root_model.RootModel].
__pydantic_serializer__
The pydantic-core SchemaSerializer used to dump instances of the model.
__pydantic_validator__
The pydantic-core SchemaValidator used to validate instances of the model.
__pydantic_fields__
A dictionary of field names and their corresponding [FieldInfo][pydantic.fields.FieldInfo] objects.
__pydantic_computed_fields__
A dictionary of computed field names and their corresponding [ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects.
__pydantic_extra__
A dictionary containing extra values, if [extra][pydantic.config.ConfigDict.extra] is set to 'allow'.
__pydantic_fields_set__
The names of fields explicitly set during instantiation.
__pydantic_private__
Values of private attributes set on the model instance.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • pydantic.main.BaseModel

Class variables

var is_final : bool
var model_config
var transcript : str
var type : Literal['user_input_transcribed']
class UserStateChangedEvent (**data: Any)
Expand source code
class UserStateChangedEvent(BaseModel):
    type: Literal["user_state_changed"] = "user_state_changed"
    old_state: UserState
    new_state: UserState

Usage docs: https://docs.pydantic.dev/2.10/concepts/models/

A base class for creating Pydantic models.

Attributes

__class_vars__
The names of the class variables defined on the model.
__private_attributes__
Metadata about the private attributes of the model.
__signature__
The synthesized __init__ [Signature][inspect.Signature] of the model.
__pydantic_complete__
Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__
The core schema of the model.
__pydantic_custom_init__
Whether the model has a custom __init__ function.
__pydantic_decorators__
Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
__pydantic_generic_metadata__
Metadata for generic models; contains data used for a similar purpose to args, origin, parameters in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__
Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__
The name of the post-init method for the model, if defined.
__pydantic_root_model__
Whether the model is a [RootModel][pydantic.root_model.RootModel].
__pydantic_serializer__
The pydantic-core SchemaSerializer used to dump instances of the model.
__pydantic_validator__
The pydantic-core SchemaValidator used to validate instances of the model.
__pydantic_fields__
A dictionary of field names and their corresponding [FieldInfo][pydantic.fields.FieldInfo] objects.
__pydantic_computed_fields__
A dictionary of computed field names and their corresponding [ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects.
__pydantic_extra__
A dictionary containing extra values, if [extra][pydantic.config.ConfigDict.extra] is set to 'allow'.
__pydantic_fields_set__
The names of fields explicitly set during instantiation.
__pydantic_private__
Values of private attributes set on the model instance.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • pydantic.main.BaseModel

Class variables

var model_config
var new_state : Literal['speaking', 'listening', 'away']
var old_state : Literal['speaking', 'listening', 'away']
var type : Literal['user_state_changed']
class Worker (opts: WorkerOptions,
*,
devmode: bool = True,
register: bool = True,
loop: asyncio.AbstractEventLoop | None = None)
Expand source code
class Worker(utils.EventEmitter[EventTypes]):
    def __init__(
        self,
        opts: WorkerOptions,
        *,
        devmode: bool = True,
        register: bool = True,
        loop: asyncio.AbstractEventLoop | None = None,
    ) -> None:
        super().__init__()
        opts.ws_url = opts.ws_url or os.environ.get("LIVEKIT_URL") or ""
        opts.api_key = opts.api_key or os.environ.get("LIVEKIT_API_KEY") or ""
        opts.api_secret = opts.api_secret or os.environ.get("LIVEKIT_API_SECRET") or ""

        if not opts.ws_url:
            raise ValueError("ws_url is required, or add LIVEKIT_URL in your environment")

        if not opts.api_key:
            raise ValueError("api_key is required, or add LIVEKIT_API_KEY in your environment")

        if not opts.api_secret:
            raise ValueError(
                "api_secret is required, or add LIVEKIT_API_SECRET in your environment"
            )

        if opts.job_memory_limit_mb > 0 and opts.job_executor_type != JobExecutorType.PROCESS:
            logger.warning(
                "max_job_memory_usage is only supported for process-based job executors, "
                "ignoring max_job_memory_usage"
            )

        if not is_given(opts.http_proxy):
            opts.http_proxy = os.environ.get("HTTPS_PROXY") or os.environ.get("HTTP_PROXY")

        self._opts = opts
        self._loop = loop or asyncio.get_event_loop()

        self._id = "unregistered"
        self._closed, self._draining, self._connecting = True, False, False
        self._tasks = set[asyncio.Task[Any]]()
        self._pending_assignments: dict[str, asyncio.Future[agent.JobAssignment]] = {}
        self._close_future: asyncio.Future[None] | None = None
        self._msg_chan = utils.aio.Chan[agent.WorkerMessage](128, loop=self._loop)
        self._devmode = devmode
        self._register = register

        # using spawn context for all platforms. We may have further optimizations for
        # Linux with forkserver, but for now, this is the safest option
        mp_ctx = mp.get_context("spawn")

        self._inference_executor: ipc.inference_proc_executor.InferenceProcExecutor | None = None
        if len(_InferenceRunner.registered_runners) > 0:
            self._inference_executor = ipc.inference_proc_executor.InferenceProcExecutor(
                runners=_InferenceRunner.registered_runners,
                initialize_timeout=30,
                close_timeout=5,
                memory_warn_mb=2000,
                memory_limit_mb=0,  # no limit
                ping_interval=5,
                ping_timeout=60,
                high_ping_threshold=2.5,
                mp_ctx=mp_ctx,
                loop=self._loop,
                http_proxy=opts.http_proxy or None,
            )

        self._proc_pool = ipc.proc_pool.ProcPool(
            initialize_process_fnc=opts.prewarm_fnc,
            job_entrypoint_fnc=opts.entrypoint_fnc,
            num_idle_processes=_WorkerEnvOption.getvalue(opts.num_idle_processes, self._devmode),
            loop=self._loop,
            job_executor_type=opts.job_executor_type,
            inference_executor=self._inference_executor,
            mp_ctx=mp_ctx,
            initialize_timeout=opts.initialize_process_timeout,
            close_timeout=opts.shutdown_process_timeout,
            memory_warn_mb=opts.job_memory_warn_mb,
            memory_limit_mb=opts.job_memory_limit_mb,
            http_proxy=opts.http_proxy or None,
        )

        self._previous_status = agent.WorkerStatus.WS_AVAILABLE

        self._api: api.LiveKitAPI | None = None
        self._http_session: aiohttp.ClientSession | None = None
        self._http_server = http_server.HttpServer(
            opts.host,
            _WorkerEnvOption.getvalue(opts.port, self._devmode),
            loop=self._loop,
        )

        async def health_check(_: Any):
            return web.Response(text="OK")

        async def worker(_: Any):
            body = json.dumps(
                {
                    "agent_name": self._opts.agent_name,
                    "worker_type": agent.JobType.Name(self._opts.worker_type.value),
                    "active_jobs": len(self.active_jobs),
                }
            )
            return web.Response(body=body, content_type="application/json")

        self._http_server.app.add_routes([web.get("/", health_check)])
        self._http_server.app.add_routes([web.get("/worker", worker)])
        self._http_server.app.add_subapp("/debug", tracing._create_tracing_app(self))

        self._conn_task: asyncio.Task[None] | None = None
        self._load_task: asyncio.Task[None] | None = None

        self._worker_load: float = 0.0
        self._worker_load_graph = tracing.Tracing.add_graph(
            title="worker_load",
            x_label="time",
            y_label="load",
            x_type="time",
            y_range=(0, 1),
            max_data_points=int(1 / UPDATE_LOAD_INTERVAL * 30),
        )

        default_num_idle_processes = _WorkerEnvOption.getvalue(
            self._opts.num_idle_processes, self._devmode
        )
        self._num_idle_target_graph = tracing.Tracing.add_graph(
            title="num_idle_processes_target",
            x_label="time",
            y_label="target",
            x_type="time",
            y_range=(0, default_num_idle_processes),
            max_data_points=int(1 / UPDATE_LOAD_INTERVAL * 30),
        )

        self._num_idle_process_graph = tracing.Tracing.add_graph(
            title="num_idle_processes",
            x_label="time",
            y_label="idle",
            x_type="time",
            y_range=(0, default_num_idle_processes),
            max_data_points=int(1 / UPDATE_LOAD_INTERVAL * 30),
        )

    @property
    def worker_info(self) -> WorkerInfo:
        return WorkerInfo(http_port=self._http_server.port)

    async def run(self):
        if not self._closed:
            raise Exception("worker is already running")

        logger.info(
            "starting worker",
            extra={"version": __version__, "rtc-version": rtc.__version__},
        )

        if self._inference_executor is not None:
            logger.info("starting inference executor")
            await self._inference_executor.start()
            await self._inference_executor.initialize()

        self._closed = False

        def _update_job_status(proc: ipc.job_executor.JobExecutor) -> None:
            t = self._loop.create_task(self._update_job_status(proc))
            self._tasks.add(t)
            t.add_done_callback(self._tasks.discard)

        await self._http_server.start()

        self._proc_pool.on("process_started", _update_job_status)
        self._proc_pool.on("process_closed", _update_job_status)
        self._proc_pool.on("process_job_launched", _update_job_status)
        await self._proc_pool.start()

        self._http_session = aiohttp.ClientSession(proxy=self._opts.http_proxy or None)
        self._api = api.LiveKitAPI(
            self._opts.ws_url, self._opts.api_key, self._opts.api_secret, session=self._http_session
        )
        self._close_future = asyncio.Future(loop=self._loop)

        @utils.log_exceptions(logger=logger)
        async def _load_task():
            """periodically check load"""
            interval = utils.aio.interval(UPDATE_LOAD_INTERVAL)
            while True:
                await interval.tick()

                def load_fnc():
                    signature = inspect.signature(self._opts.load_fnc)
                    parameters = list(signature.parameters.values())
                    if len(parameters) == 0:
                        return self._opts.load_fnc()  # type: ignore

                    return self._opts.load_fnc(self)  # type: ignore

                self._worker_load = await asyncio.get_event_loop().run_in_executor(None, load_fnc)

                load_threshold = _WorkerEnvOption.getvalue(self._opts.load_threshold, self._devmode)
                default_num_idle_processes = _WorkerEnvOption.getvalue(
                    self._opts.num_idle_processes, self._devmode
                )

                if not math.isinf(load_threshold):
                    active_jobs = len(self.active_jobs)
                    if active_jobs > 0:
                        job_load = self._worker_load / len(self.active_jobs)
                        if job_load > 0.0:
                            available_load = max(load_threshold - self._worker_load, 0.0)
                            available_job = min(
                                math.ceil(available_load / job_load), default_num_idle_processes
                            )
                            self._proc_pool.set_target_idle_processes(available_job)
                    else:
                        self._proc_pool.set_target_idle_processes(default_num_idle_processes)

                self._num_idle_target_graph.plot(time.time(), self._proc_pool.target_idle_processes)
                self._num_idle_process_graph.plot(
                    time.time(), self._proc_pool._warmed_proc_queue.qsize()
                )
                self._worker_load_graph.plot(time.time(), self._worker_load)

        tasks = []
        self._load_task = asyncio.create_task(_load_task(), name="load_task")
        tasks.append(self._load_task)

        if self._register:
            self._conn_task = asyncio.create_task(self._connection_task(), name="worker_conn_task")
            tasks.append(self._conn_task)

        self.emit("worker_started")

        try:
            await asyncio.gather(*tasks)
        finally:
            await utils.aio.cancel_and_wait(*tasks)
            if not self._close_future.done():
                self._close_future.set_result(None)

    @property
    def id(self) -> str:
        return self._id

    @property
    def active_jobs(self) -> list[RunningJobInfo]:
        return [proc.running_job for proc in self._proc_pool.processes if proc.running_job]

    async def drain(self, timeout: int | None = None) -> None:
        """When timeout isn't None, it will raise asyncio.TimeoutError if the processes didn't finish in time."""  # noqa: E501
        if self._draining:
            return

        logger.info("draining worker", extra={"id": self.id, "timeout": timeout})
        self._draining = True
        await self._update_worker_status()

        async def _join_jobs():
            for proc in self._proc_pool.processes:
                if proc.running_job:
                    await proc.join()

        if timeout:
            await asyncio.wait_for(_join_jobs(), timeout)  # raises asyncio.TimeoutError on timeout
        else:
            await _join_jobs()

    async def simulate_job(
        self,
        info: SimulateJobInfo | str,
    ) -> None:
        """
        Simulate a job by creating a room and participant.

        Args:
            info: SimulateJobInfo or a join token for an existing room
        """
        assert self._api is not None
        # TODO(theomonnom): some fake information can still be found in the token

        from livekit.protocol.models import Room

        room = info.room if isinstance(info, SimulateJobInfo) else "unknown-room"
        participant_identity = (
            info.participant_identity
            if isinstance(info, SimulateJobInfo)
            else "unknown-participant"
        )
        agent_id = utils.shortuuid("simulated-agent-")

        room_info = Room(sid=utils.shortuuid("RM_"), name=room)
        participant_info = None

        if isinstance(info, SimulateJobInfo):
            from .cli import cli

            if cli.CLI_ARGUMENTS is None or not cli.CLI_ARGUMENTS.console:
                room_info = await self._api.room.create_room(api.CreateRoomRequest(name=room))
                if participant_identity:
                    participant_info = await self._api.room.get_participant(
                        api.RoomParticipantIdentity(room=room, identity=participant_identity)
                    )

            token = (
                api.AccessToken(self._opts.api_key, self._opts.api_secret)
                .with_identity(agent_id)
                .with_kind("agent")
                .with_grants(api.VideoGrants(room_join=True, room=room, agent=True))
                .to_jwt()
            )
        else:
            token = info

        job = agent.Job(
            id=utils.shortuuid("simulated-job-"),
            room=room_info,
            type=agent.JobType.JT_ROOM,
            participant=participant_info,
        )

        running_info = RunningJobInfo(
            worker_id=self._id,
            accept_arguments=JobAcceptArguments(identity=agent_id, name="", metadata=""),
            job=job,
            url=self._opts.ws_url,
            token=token,
        )

        await self._proc_pool.launch_job(running_info)

    async def aclose(self) -> None:
        if self._closed:
            if self._close_future is not None:
                await self._close_future
            return

        logger.info("shutting down worker", extra={"id": self.id})

        assert self._close_future is not None
        assert self._http_session is not None
        assert self._api is not None

        self._closed = True

        if self._conn_task is not None:
            await utils.aio.cancel_and_wait(self._conn_task)

        if self._load_task is not None:
            await utils.aio.cancel_and_wait(self._load_task)

        await self._proc_pool.aclose()

        if self._inference_executor is not None:
            await self._inference_executor.aclose()

        await self._http_session.close()
        await self._http_server.aclose()
        await self._api.aclose()

        await asyncio.gather(*self._tasks, return_exceptions=True)

        # await asyncio.sleep(0.25)  # see https://github.com/aio-libs/aiohttp/issues/1925
        self._msg_chan.close()
        await self._close_future

    async def _queue_msg(self, msg: agent.WorkerMessage) -> None:
        """_queue_msg raises aio.ChanClosed when the worker is closing/closed"""
        if self._connecting:
            which = msg.WhichOneof("message")
            if which == "update_worker":
                return
            elif which == "ping":
                return

        await self._msg_chan.send(msg)

    @utils.log_exceptions(logger=logger)
    async def _connection_task(self) -> None:
        assert self._http_session is not None

        retry_count = 0
        ws: aiohttp.ClientWebSocketResponse | None = None
        while not self._closed:
            try:
                self._connecting = True
                join_jwt = (
                    api.AccessToken(self._opts.api_key, self._opts.api_secret)
                    .with_grants(api.VideoGrants(agent=True))
                    .to_jwt()
                )

                headers = {"Authorization": f"Bearer {join_jwt}"}

                parse = urlparse(self._opts.ws_url)
                scheme = parse.scheme
                if scheme.startswith("http"):
                    scheme = scheme.replace("http", "ws")

                path_parts = [f"{scheme}://{parse.netloc}", parse.path, "/agent"]
                agent_url = reduce(urljoin, path_parts)

                ws = await self._http_session.ws_connect(
                    agent_url, headers=headers, autoping=True, proxy=self._opts.http_proxy or None
                )

                retry_count = 0

                # register the worker
                req = agent.WorkerMessage()
                req.register.type = self._opts.worker_type.value
                req.register.allowed_permissions.CopyFrom(
                    models.ParticipantPermission(
                        can_publish=self._opts.permissions.can_publish,
                        can_subscribe=self._opts.permissions.can_subscribe,
                        can_publish_data=self._opts.permissions.can_publish_data,
                        can_update_metadata=self._opts.permissions.can_update_metadata,
                        can_publish_sources=self._opts.permissions.can_publish_sources,
                        hidden=self._opts.permissions.hidden,
                        agent=True,
                    )
                )
                req.register.agent_name = self._opts.agent_name
                req.register.version = __version__
                await ws.send_bytes(req.SerializeToString())

                # wait for the register response before running this connection
                first_msg_b = await ws.receive_bytes()
                msg = agent.ServerMessage()
                msg.ParseFromString(first_msg_b)

                if not msg.HasField("register"):
                    raise Exception("expected register response as first message")

                self._handle_register(msg.register)
                self._connecting = False

                await self._run_ws(ws)
            except Exception as e:
                if self._closed:
                    break

                if retry_count >= self._opts.max_retry:
                    raise RuntimeError(
                        f"failed to connect to livekit after {retry_count} attempts",
                    ) from None

                retry_delay = min(retry_count * 2, 10)
                retry_count += 1

                logger.warning(
                    f"failed to connect to livekit, retrying in {retry_delay}s", exc_info=e
                )
                await asyncio.sleep(retry_delay)
            finally:
                if ws is not None:
                    await ws.close()

    async def _run_ws(self, ws: aiohttp.ClientWebSocketResponse):
        closing_ws = False

        async def _load_task():
            """periodically update worker status"""
            interval = utils.aio.interval(UPDATE_STATUS_INTERVAL)
            while True:
                await interval.tick()
                await self._update_worker_status()

        async def _send_task():
            nonlocal closing_ws
            while True:
                try:
                    msg = await self._msg_chan.recv()
                    await ws.send_bytes(msg.SerializeToString())
                except utils.aio.ChanClosed:
                    closing_ws = True
                    return

        async def _recv_task():
            nonlocal closing_ws
            while True:
                msg = await ws.receive()
                if msg.type in (
                    aiohttp.WSMsgType.CLOSE,
                    aiohttp.WSMsgType.CLOSED,
                    aiohttp.WSMsgType.CLOSING,
                ):
                    if closing_ws:
                        return

                    raise Exception("worker connection closed unexpectedly")

                if msg.type != aiohttp.WSMsgType.BINARY:
                    logger.warning("unexpected message type: %s", msg.type)
                    continue

                data = msg.data
                msg = agent.ServerMessage()
                msg.ParseFromString(data)
                which = msg.WhichOneof("message")
                if which == "availability":
                    self._handle_availability(msg.availability)
                elif which == "assignment":
                    self._handle_assignment(msg.assignment)
                elif which == "termination":
                    user_task = self._loop.create_task(
                        self._handle_termination(msg.termination),
                        name="agent_job_termination",
                    )
                    self._tasks.add(user_task)
                    user_task.add_done_callback(self._tasks.discard)

        tasks = [
            asyncio.create_task(_load_task()),
            asyncio.create_task(_send_task()),
            asyncio.create_task(_recv_task()),
        ]
        try:
            await asyncio.gather(*tasks)
        finally:
            await utils.aio.cancel_and_wait(*tasks)

    async def _reload_jobs(self, jobs: list[RunningJobInfo]) -> None:
        if not self._opts.api_secret:
            raise RuntimeError("api_secret is required to reload jobs")

        for aj in jobs:
            logger.log(
                DEV_LEVEL,
                "reloading job",
                extra={"job_id": aj.job.id, "agent_name": aj.job.agent_name},
            )

            # take the original jwt token and extend it while keeping all the same data that was generated  # noqa: E501
            # by the SFU for the original join token.
            original_token = aj.token
            decoded = jwt.decode(original_token, self._opts.api_secret, algorithms=["HS256"])
            decoded["exp"] = int(datetime.datetime.now(datetime.timezone.utc).timestamp()) + 3600
            running_info = RunningJobInfo(
                accept_arguments=aj.accept_arguments,
                job=aj.job,
                url=self._opts.ws_url,
                token=jwt.encode(decoded, self._opts.api_secret, algorithm="HS256"),
                worker_id=aj.worker_id,
            )
            await self._proc_pool.launch_job(running_info)

    def _handle_register(self, reg: agent.RegisterWorkerResponse):
        self._id = reg.worker_id
        logger.info(
            "registered worker",
            extra={
                "id": reg.worker_id,
                "url": self._opts.ws_url,
                "region": reg.server_info.region,
                "protocol": reg.server_info.protocol,
            },
        )
        self.emit("worker_registered", reg.worker_id, reg.server_info)

    def _handle_availability(self, msg: agent.AvailabilityRequest):
        task = self._loop.create_task(self._answer_availability(msg))
        self._tasks.add(task)
        task.add_done_callback(self._tasks.discard)

    async def _answer_availability(self, msg: agent.AvailabilityRequest):
        """Ask the user if they want to accept this job and forward the answer to the server.
        If we get the job assigned, we start a new process."""

        answered = False

        async def _on_reject() -> None:
            nonlocal answered
            answered = True

            availability_resp = agent.WorkerMessage()
            availability_resp.availability.job_id = msg.job.id
            availability_resp.availability.available = False
            await self._queue_msg(availability_resp)

        async def _on_accept(args: JobAcceptArguments) -> None:
            nonlocal answered
            answered = True

            availability_resp = agent.WorkerMessage()
            availability_resp.availability.job_id = msg.job.id
            availability_resp.availability.available = True
            availability_resp.availability.participant_identity = args.identity
            availability_resp.availability.participant_name = args.name
            availability_resp.availability.participant_metadata = args.metadata
            if args.attributes:
                availability_resp.availability.participant_attributes.update(args.attributes)
            await self._queue_msg(availability_resp)

            wait_assignment = asyncio.Future[agent.JobAssignment]()
            self._pending_assignments[job_req.id] = wait_assignment

            # the job was accepted by the user, wait for the server assignment
            try:
                await asyncio.wait_for(wait_assignment, ASSIGNMENT_TIMEOUT)
            except asyncio.TimeoutError:
                logger.warning(
                    f"assignment for job {job_req.id} timed out",
                    extra={"job_request": job_req, "agent_name": self._opts.agent_name},
                )
                raise AssignmentTimeoutError() from None

            job_assign = wait_assignment.result()
            running_info = RunningJobInfo(
                accept_arguments=args,
                job=msg.job,
                url=job_assign.url or self._opts.ws_url,
                token=job_assign.token,
                worker_id=self._id,
            )

            await self._proc_pool.launch_job(running_info)

        job_req = JobRequest(job=msg.job, on_reject=_on_reject, on_accept=_on_accept)

        logger.info(
            "received job request",
            extra={
                "job_id": msg.job.id,
                "dispatch_id": msg.job.dispatch_id,
                "room_name": msg.job.room.name,
                "agent_name": self._opts.agent_name,
                "resuming": msg.resuming,
            },
        )

        @utils.log_exceptions(logger=logger)
        async def _job_request_task():
            try:
                await self._opts.request_fnc(job_req)
            except Exception:
                logger.exception(
                    "job_request_fnc failed",
                    extra={"job_request": job_req, "agent_name": self._opts.agent_name},
                )

            if not answered:
                logger.warning(
                    "no answer was given inside the job_request_fnc, automatically rejecting the job",  # noqa: E501
                    extra={"job_request": job_req, "agent_name": self._opts.agent_name},
                )
                await _on_reject()

        user_task = self._loop.create_task(_job_request_task(), name="job_request")
        self._tasks.add(user_task)
        user_task.add_done_callback(self._tasks.discard)

    def _handle_assignment(self, assignment: agent.JobAssignment):
        if assignment.job.id in self._pending_assignments:
            with contextlib.suppress(asyncio.InvalidStateError):
                fut = self._pending_assignments.pop(assignment.job.id)
                fut.set_result(assignment)
        else:
            logger.warning(
                "received assignment for an unknown job",
                extra={"job": assignment.job, "agent_name": self._opts.agent_name},
            )

    async def _handle_termination(self, msg: agent.JobTermination):
        proc = self._proc_pool.get_by_job_id(msg.job_id)
        if not proc:
            # safe to ignore
            return
        await proc.aclose()

    async def _update_worker_status(self):
        job_cnt = len(self.active_jobs)
        if self._draining:
            update = agent.UpdateWorkerStatus(status=agent.WorkerStatus.WS_FULL, job_count=job_cnt)
            msg = agent.WorkerMessage(update_worker=update)
            await self._queue_msg(msg)
            return

        load_threshold = _WorkerEnvOption.getvalue(self._opts.load_threshold, self._devmode)
        is_full = self._worker_load >= load_threshold
        currently_available = not is_full and not self._draining

        status = (
            agent.WorkerStatus.WS_AVAILABLE if currently_available else agent.WorkerStatus.WS_FULL
        )

        update = agent.UpdateWorkerStatus(load=self._worker_load, status=status, job_count=job_cnt)

        # only log if status has changed
        if self._previous_status != status and not self._draining:
            self._previous_status = status
            extra = {
                "load": self._worker_load,
                "threshold": self._opts.load_threshold,
            }
            if is_full:
                logger.info(
                    "worker is at full capacity, marking as unavailable",
                    extra=extra,
                )
            else:
                logger.info(
                    "worker is below capacity, marking as available",
                    extra=extra,
                )

        msg = agent.WorkerMessage(update_worker=update)
        with contextlib.suppress(utils.aio.ChanClosed):
            await self._queue_msg(msg)

    async def _update_job_status(self, proc: ipc.job_executor.JobExecutor) -> None:
        job_info = proc.running_job
        if job_info is None:
            return

        status: agent.JobStatus = agent.JobStatus.JS_RUNNING
        if proc.status == ipc.job_executor.JobStatus.FAILED:
            status = agent.JobStatus.JS_FAILED
        elif proc.status == ipc.job_executor.JobStatus.SUCCESS:
            status = agent.JobStatus.JS_SUCCESS
        elif proc.status == ipc.job_executor.JobStatus.RUNNING:
            status = agent.JobStatus.JS_RUNNING

        update = agent.UpdateJobStatus(job_id=job_info.job.id, status=status, error="")
        msg = agent.WorkerMessage(update_job=update)
        await self._queue_msg(msg)

Abstract base class for generic types.

On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::

class Mapping[KT, VT]:
    def __getitem__(self, key: KT) -> VT:
        ...
    # Etc.

On older versions of Python, however, generic classes have to explicitly inherit from Generic.

After a class has been declared to be generic, it can then be used as follows::

def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT:
    try:
        return mapping[key]
    except KeyError:
        return default

Initialize a new instance of EventEmitter.

Ancestors

Instance variables

prop active_jobs : list[RunningJobInfo]
Expand source code
@property
def active_jobs(self) -> list[RunningJobInfo]:
    return [proc.running_job for proc in self._proc_pool.processes if proc.running_job]
prop id : str
Expand source code
@property
def id(self) -> str:
    return self._id
prop worker_info : WorkerInfo
Expand source code
@property
def worker_info(self) -> WorkerInfo:
    return WorkerInfo(http_port=self._http_server.port)

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    if self._closed:
        if self._close_future is not None:
            await self._close_future
        return

    logger.info("shutting down worker", extra={"id": self.id})

    assert self._close_future is not None
    assert self._http_session is not None
    assert self._api is not None

    self._closed = True

    if self._conn_task is not None:
        await utils.aio.cancel_and_wait(self._conn_task)

    if self._load_task is not None:
        await utils.aio.cancel_and_wait(self._load_task)

    await self._proc_pool.aclose()

    if self._inference_executor is not None:
        await self._inference_executor.aclose()

    await self._http_session.close()
    await self._http_server.aclose()
    await self._api.aclose()

    await asyncio.gather(*self._tasks, return_exceptions=True)

    # await asyncio.sleep(0.25)  # see https://github.com/aio-libs/aiohttp/issues/1925
    self._msg_chan.close()
    await self._close_future
async def drain(self, timeout: int | None = None) ‑> None
Expand source code
async def drain(self, timeout: int | None = None) -> None:
    """When timeout isn't None, it will raise asyncio.TimeoutError if the processes didn't finish in time."""  # noqa: E501
    if self._draining:
        return

    logger.info("draining worker", extra={"id": self.id, "timeout": timeout})
    self._draining = True
    await self._update_worker_status()

    async def _join_jobs():
        for proc in self._proc_pool.processes:
            if proc.running_job:
                await proc.join()

    if timeout:
        await asyncio.wait_for(_join_jobs(), timeout)  # raises asyncio.TimeoutError on timeout
    else:
        await _join_jobs()

When timeout isn't None, it will raise asyncio.TimeoutError if the processes didn't finish in time.

async def run(self)
Expand source code
async def run(self):
    if not self._closed:
        raise Exception("worker is already running")

    logger.info(
        "starting worker",
        extra={"version": __version__, "rtc-version": rtc.__version__},
    )

    if self._inference_executor is not None:
        logger.info("starting inference executor")
        await self._inference_executor.start()
        await self._inference_executor.initialize()

    self._closed = False

    def _update_job_status(proc: ipc.job_executor.JobExecutor) -> None:
        t = self._loop.create_task(self._update_job_status(proc))
        self._tasks.add(t)
        t.add_done_callback(self._tasks.discard)

    await self._http_server.start()

    self._proc_pool.on("process_started", _update_job_status)
    self._proc_pool.on("process_closed", _update_job_status)
    self._proc_pool.on("process_job_launched", _update_job_status)
    await self._proc_pool.start()

    self._http_session = aiohttp.ClientSession(proxy=self._opts.http_proxy or None)
    self._api = api.LiveKitAPI(
        self._opts.ws_url, self._opts.api_key, self._opts.api_secret, session=self._http_session
    )
    self._close_future = asyncio.Future(loop=self._loop)

    @utils.log_exceptions(logger=logger)
    async def _load_task():
        """periodically check load"""
        interval = utils.aio.interval(UPDATE_LOAD_INTERVAL)
        while True:
            await interval.tick()

            def load_fnc():
                signature = inspect.signature(self._opts.load_fnc)
                parameters = list(signature.parameters.values())
                if len(parameters) == 0:
                    return self._opts.load_fnc()  # type: ignore

                return self._opts.load_fnc(self)  # type: ignore

            self._worker_load = await asyncio.get_event_loop().run_in_executor(None, load_fnc)

            load_threshold = _WorkerEnvOption.getvalue(self._opts.load_threshold, self._devmode)
            default_num_idle_processes = _WorkerEnvOption.getvalue(
                self._opts.num_idle_processes, self._devmode
            )

            if not math.isinf(load_threshold):
                active_jobs = len(self.active_jobs)
                if active_jobs > 0:
                    job_load = self._worker_load / len(self.active_jobs)
                    if job_load > 0.0:
                        available_load = max(load_threshold - self._worker_load, 0.0)
                        available_job = min(
                            math.ceil(available_load / job_load), default_num_idle_processes
                        )
                        self._proc_pool.set_target_idle_processes(available_job)
                else:
                    self._proc_pool.set_target_idle_processes(default_num_idle_processes)

            self._num_idle_target_graph.plot(time.time(), self._proc_pool.target_idle_processes)
            self._num_idle_process_graph.plot(
                time.time(), self._proc_pool._warmed_proc_queue.qsize()
            )
            self._worker_load_graph.plot(time.time(), self._worker_load)

    tasks = []
    self._load_task = asyncio.create_task(_load_task(), name="load_task")
    tasks.append(self._load_task)

    if self._register:
        self._conn_task = asyncio.create_task(self._connection_task(), name="worker_conn_task")
        tasks.append(self._conn_task)

    self.emit("worker_started")

    try:
        await asyncio.gather(*tasks)
    finally:
        await utils.aio.cancel_and_wait(*tasks)
        if not self._close_future.done():
            self._close_future.set_result(None)
async def simulate_job(self,
info: SimulateJobInfo | str) ‑> None
Expand source code
async def simulate_job(
    self,
    info: SimulateJobInfo | str,
) -> None:
    """
    Simulate a job by creating a room and participant.

    Args:
        info: SimulateJobInfo or a join token for an existing room
    """
    assert self._api is not None
    # TODO(theomonnom): some fake information can still be found in the token

    from livekit.protocol.models import Room

    room = info.room if isinstance(info, SimulateJobInfo) else "unknown-room"
    participant_identity = (
        info.participant_identity
        if isinstance(info, SimulateJobInfo)
        else "unknown-participant"
    )
    agent_id = utils.shortuuid("simulated-agent-")

    room_info = Room(sid=utils.shortuuid("RM_"), name=room)
    participant_info = None

    if isinstance(info, SimulateJobInfo):
        from .cli import cli

        if cli.CLI_ARGUMENTS is None or not cli.CLI_ARGUMENTS.console:
            room_info = await self._api.room.create_room(api.CreateRoomRequest(name=room))
            if participant_identity:
                participant_info = await self._api.room.get_participant(
                    api.RoomParticipantIdentity(room=room, identity=participant_identity)
                )

        token = (
            api.AccessToken(self._opts.api_key, self._opts.api_secret)
            .with_identity(agent_id)
            .with_kind("agent")
            .with_grants(api.VideoGrants(room_join=True, room=room, agent=True))
            .to_jwt()
        )
    else:
        token = info

    job = agent.Job(
        id=utils.shortuuid("simulated-job-"),
        room=room_info,
        type=agent.JobType.JT_ROOM,
        participant=participant_info,
    )

    running_info = RunningJobInfo(
        worker_id=self._id,
        accept_arguments=JobAcceptArguments(identity=agent_id, name="", metadata=""),
        job=job,
        url=self._opts.ws_url,
        token=token,
    )

    await self._proc_pool.launch_job(running_info)

Simulate a job by creating a room and participant.

Args

info
SimulateJobInfo or a join token for an existing room

Inherited members

class WorkerOptions (entrypoint_fnc: Callable[[JobContext], Awaitable[None]],
request_fnc: Callable[[JobRequest], Awaitable[None]] = <function _default_request_fnc>,
prewarm_fnc: Callable[[JobProcess], Any] = <function _default_initialize_process_fnc>,
load_fnc: Callable[[Worker], float] | Callable[[], float] = <bound method _DefaultLoadCalc.get_load of <class 'livekit.agents.worker._DefaultLoadCalc'>>,
job_executor_type: JobExecutorType = JobExecutorType.PROCESS,
load_threshold: float | _WorkerEnvOption[float] = _WorkerEnvOption(dev_default=inf, prod_default=0.75),
job_memory_warn_mb: float = 500,
job_memory_limit_mb: float = 0,
num_idle_processes: int | _WorkerEnvOption[int] = _WorkerEnvOption(dev_default=0, prod_default=4),
shutdown_process_timeout: float = 60.0,
initialize_process_timeout: float = 10.0,
permissions: WorkerPermissions = <factory>,
agent_name: str = '',
worker_type: WorkerType = WorkerType.ROOM,
max_retry: int = 16,
ws_url: str = 'ws://localhost:7880',
api_key: str | None = None,
api_secret: str | None = None,
host: str = '',
port: int | _WorkerEnvOption[int] = _WorkerEnvOption(dev_default=0, prod_default=8081),
http_proxy: NotGivenOr[str | None] = NOT_GIVEN)
Expand source code
@dataclass
class WorkerOptions:
    entrypoint_fnc: Callable[[JobContext], Awaitable[None]]
    """Entrypoint function that will be called when a job is assigned to this worker."""
    request_fnc: Callable[[JobRequest], Awaitable[None]] = _default_request_fnc
    """Inspect the request and decide if the current worker should handle it.

    When left empty, all jobs are accepted."""
    prewarm_fnc: Callable[[JobProcess], Any] = _default_initialize_process_fnc
    """A function to perform any necessary initialization before the job starts."""
    load_fnc: Callable[[Worker], float] | Callable[[], float] = _DefaultLoadCalc.get_load
    """Called to determine the current load of the worker. Should return a value between 0 and 1."""
    job_executor_type: JobExecutorType = _default_job_executor_type
    """Which executor to use to run jobs. (currently thread or process are supported)"""
    load_threshold: float | _WorkerEnvOption[float] = _WorkerEnvOption(
        dev_default=math.inf, prod_default=0.75
    )
    """When the load exceeds this threshold, the worker will be marked as unavailable.

    Defaults to 0.75 on "production" mode, and is disabled in "development" mode.
    """

    job_memory_warn_mb: float = 500
    """Memory warning threshold in MB. If the job process exceeds this limit, a warning will be logged."""  # noqa: E501
    job_memory_limit_mb: float = 0
    """Maximum memory usage for a job in MB, the job process will be killed if it exceeds this limit.
    Defaults to 0 (disabled).
    """  # noqa: E501

    """Number of idle processes to keep warm."""
    num_idle_processes: int | _WorkerEnvOption[int] = _WorkerEnvOption(
        dev_default=0, prod_default=math.ceil(get_cpu_monitor().cpu_count())
    )
    """Number of idle processes to keep warm."""
    shutdown_process_timeout: float = 60.0
    """Maximum amount of time to wait for a job to shut down gracefully"""
    initialize_process_timeout: float = 10.0
    """Maximum amount of time to wait for a process to initialize/prewarm"""
    permissions: WorkerPermissions = field(default_factory=WorkerPermissions)
    """Permissions that the agent should join the room with."""
    agent_name: str = ""
    """Set agent_name to enable explicit dispatch. When explicit dispatch is enabled, jobs will not be dispatched to rooms automatically. Instead, you can either specify the agent(s) to be dispatched in the end-user's token, or use the AgentDispatch.createDispatch API"""  # noqa: E501
    worker_type: WorkerType = WorkerType.ROOM
    """Whether to spin up an agent for each room or publisher."""
    max_retry: int = 16
    """Maximum number of times to retry connecting to LiveKit."""
    ws_url: str = "ws://localhost:7880"
    """URL to connect to the LiveKit server.

    By default it uses ``LIVEKIT_URL`` from environment"""
    api_key: str | None = None
    """API key to authenticate with LiveKit.

    By default it uses ``LIVEKIT_API_KEY`` from environment"""
    api_secret: str | None = None
    """API secret to authenticate with LiveKit.

    By default it uses ``LIVEKIT_API_SECRET`` from environment"""
    host: str = ""  # default to all interfaces
    port: int | _WorkerEnvOption[int] = _WorkerEnvOption(dev_default=0, prod_default=8081)
    """Port for local HTTP server to listen on.

    The HTTP server is used as a health check endpoint.
    """

    http_proxy: NotGivenOr[str | None] = NOT_GIVEN
    """HTTP proxy used to connect to the LiveKit server.

    By default it uses ``HTTP_PROXY`` or ``HTTPS_PROXY`` from environment
    """

    def validate_config(self, devmode: bool):
        load_threshold = _WorkerEnvOption.getvalue(self.load_threshold, devmode)
        if load_threshold > 1 and not devmode:
            logger.warning(
                f"load_threshold in prod env must be less than 1, current value: {load_threshold}"
            )

WorkerOptions(entrypoint_fnc: 'Callable[[JobContext], Awaitable[None]]', request_fnc: 'Callable[[JobRequest], Awaitable[None]]' = , prewarm_fnc: 'Callable[[JobProcess], Any]' = , load_fnc: 'Callable[[Worker], float] | Callable[[], float]' = >, job_executor_type: 'JobExecutorType' = , load_threshold: 'float | _WorkerEnvOption[float]' = _WorkerEnvOption(dev_default=inf, prod_default=0.75), job_memory_warn_mb: 'float' = 500, job_memory_limit_mb: 'float' = 0, num_idle_processes: 'int | _WorkerEnvOption[int]' = _WorkerEnvOption(dev_default=0, prod_default=4), shutdown_process_timeout: 'float' = 60.0, initialize_process_timeout: 'float' = 10.0, permissions: 'WorkerPermissions' = , agent_name: 'str' = '', worker_type: 'WorkerType' = , max_retry: 'int' = 16, ws_url: 'str' = 'ws://localhost:7880', api_key: 'str | None' = None, api_secret: 'str | None' = None, host: 'str' = '', port: 'int | _WorkerEnvOption[int]' = _WorkerEnvOption(dev_default=0, prod_default=8081), http_proxy: 'NotGivenOr[str | None]' = NOT_GIVEN)

Static methods

def load_fnc(worker: Worker) ‑> float

Instance variables

var agent_name : str

Set agent_name to enable explicit dispatch. When explicit dispatch is enabled, jobs will not be dispatched to rooms automatically. Instead, you can either specify the agent(s) to be dispatched in the end-user's token, or use the AgentDispatch.createDispatch API

var api_key : str | None

API key to authenticate with LiveKit.

By default it uses LIVEKIT_API_KEY from environment

var api_secret : str | None

API secret to authenticate with LiveKit.

By default it uses LIVEKIT_API_SECRET from environment

var entrypoint_fnc : Callable[[livekit.agents.job.JobContext], Awaitable[None]]

Entrypoint function that will be called when a job is assigned to this worker.

var host : str
var http_proxy : str | livekit.agents.types.NotGiven | None

HTTP proxy used to connect to the LiveKit server.

By default it uses HTTP_PROXY or HTTPS_PROXY from environment

var initialize_process_timeout : float

Maximum amount of time to wait for a process to initialize/prewarm

var job_executor_type : livekit.agents.job.JobExecutorType

Which executor to use to run jobs. (currently thread or process are supported)

var job_memory_limit_mb : float

Maximum memory usage for a job in MB, the job process will be killed if it exceeds this limit. Defaults to 0 (disabled).

var job_memory_warn_mb : float

Memory warning threshold in MB. If the job process exceeds this limit, a warning will be logged.

var load_threshold : float | livekit.agents.worker._WorkerEnvOption[float]

When the load exceeds this threshold, the worker will be marked as unavailable.

Defaults to 0.75 on "production" mode, and is disabled in "development" mode.

var max_retry : int

Maximum number of times to retry connecting to LiveKit.

var num_idle_processes : int | livekit.agents.worker._WorkerEnvOption[int]

Number of idle processes to keep warm.

var permissions : livekit.agents.worker.WorkerPermissions

Permissions that the agent should join the room with.

var port : int | livekit.agents.worker._WorkerEnvOption[int]

Port for local HTTP server to listen on.

The HTTP server is used as a health check endpoint.

var shutdown_process_timeout : float

Maximum amount of time to wait for a job to shut down gracefully

var worker_type : livekit.agents.worker.WorkerType

Whether to spin up an agent for each room or publisher.

var ws_url : str

URL to connect to the LiveKit server.

By default it uses LIVEKIT_URL from environment

Methods

def prewarm_fnc(proc: JobProcess) ‑> Any
Expand source code
def _default_initialize_process_fnc(proc: JobProcess) -> Any:
    return
async def request_fnc(ctx: JobRequest) ‑> None
Expand source code
async def _default_request_fnc(ctx: JobRequest) -> None:
    await ctx.accept()
def validate_config(self, devmode: bool)
Expand source code
def validate_config(self, devmode: bool):
    load_threshold = _WorkerEnvOption.getvalue(self.load_threshold, devmode)
    if load_threshold > 1 and not devmode:
        logger.warning(
            f"load_threshold in prod env must be less than 1, current value: {load_threshold}"
        )
class WorkerPermissions (can_publish: bool = True,
can_subscribe: bool = True,
can_publish_data: bool = True,
can_update_metadata: bool = True,
can_publish_sources: list[models.TrackSource] = <factory>,
hidden: bool = False)
Expand source code
@dataclass
class WorkerPermissions:
    can_publish: bool = True
    can_subscribe: bool = True
    can_publish_data: bool = True
    can_update_metadata: bool = True
    can_publish_sources: list[models.TrackSource] = field(default_factory=list)
    hidden: bool = False

WorkerPermissions(can_publish: 'bool' = True, can_subscribe: 'bool' = True, can_publish_data: 'bool' = True, can_update_metadata: 'bool' = True, can_publish_sources: 'list[models.TrackSource]' = , hidden: 'bool' = False)

Instance variables

var can_publish : bool
var can_publish_data : bool
var can_publish_sources : list[]
var can_subscribe : bool
var can_update_metadata : bool
var hidden : bool
class WorkerType (*args, **kwds)
Expand source code
class WorkerType(Enum):
    ROOM = agent.JobType.JT_ROOM
    PUBLISHER = agent.JobType.JT_PUBLISHER

Create a collection of name/value pairs.

Example enumeration:

>>> class Color(Enum):
...     RED = 1
...     BLUE = 2
...     GREEN = 3

Access them by:

  • attribute access:

Color.RED

  • value lookup:

Color(1)

  • name lookup:

Color['RED']

Enumerations can be iterated over, and know how many members they have:

>>> len(Color)
3
>>> list(Color)
[<Color.RED: 1>, <Color.BLUE: 2>, <Color.GREEN: 3>]

Methods can be added to enumerations, and members can have their own attributes – see the documentation for details.

Ancestors

  • enum.Enum

Class variables

var PUBLISHER
var ROOM