Module livekit.plugins.openai.realtime

Classes

class RealtimeModel (*,
model: str = 'gpt-4o-realtime-preview',
voice: str = 'alloy',
temperature: NotGivenOr[float] = NOT_GIVEN,
tool_choice: NotGivenOr[llm.ToolChoice | None] = NOT_GIVEN,
base_url: NotGivenOr[str] = NOT_GIVEN,
input_audio_transcription: NotGivenOr[InputAudioTranscription | None] = NOT_GIVEN,
turn_detection: NotGivenOr[TurnDetection | None] = NOT_GIVEN,
api_key: str | None = None,
http_session: aiohttp.ClientSession | None = None,
azure_deployment: str | None = None,
entra_token: str | None = None,
api_version: str | None = None)
Expand source code
class RealtimeModel(llm.RealtimeModel):
    @overload
    def __init__(
        self,
        *,
        model: str = "gpt-4o-realtime-preview",
        voice: str = "alloy",
        input_audio_transcription: NotGivenOr[InputAudioTranscription | None] = NOT_GIVEN,
        turn_detection: NotGivenOr[TurnDetection | None] = NOT_GIVEN,
        temperature: NotGivenOr[float] = NOT_GIVEN,
        tool_choice: NotGivenOr[llm.ToolChoice | None] = NOT_GIVEN,
        api_key: str | None = None,
        base_url: str | None = None,
        http_session: aiohttp.ClientSession | None = None,
    ) -> None: ...

    @overload
    def __init__(
        self,
        *,
        azure_deployment: str | None = None,
        entra_token: str | None = None,
        api_key: str | None = None,
        api_version: str | None = None,
        base_url: str | None = None,
        voice: str = "alloy",
        input_audio_transcription: NotGivenOr[InputAudioTranscription | None] = NOT_GIVEN,
        turn_detection: NotGivenOr[TurnDetection | None] = NOT_GIVEN,
        temperature: NotGivenOr[float] = NOT_GIVEN,
        tool_choice: NotGivenOr[llm.ToolChoice | None] = NOT_GIVEN,
        http_session: aiohttp.ClientSession | None = None,
    ) -> None: ...

    def __init__(
        self,
        *,
        model: str = "gpt-4o-realtime-preview",
        voice: str = "alloy",
        temperature: NotGivenOr[float] = NOT_GIVEN,
        tool_choice: NotGivenOr[llm.ToolChoice | None] = NOT_GIVEN,
        base_url: NotGivenOr[str] = NOT_GIVEN,
        input_audio_transcription: NotGivenOr[InputAudioTranscription | None] = NOT_GIVEN,
        turn_detection: NotGivenOr[TurnDetection | None] = NOT_GIVEN,
        api_key: str | None = None,
        http_session: aiohttp.ClientSession | None = None,
        azure_deployment: str | None = None,
        entra_token: str | None = None,
        api_version: str | None = None,
    ) -> None:
        super().__init__(
            capabilities=llm.RealtimeCapabilities(
                message_truncation=True,
                turn_detection=turn_detection is not None,
                user_transcription=input_audio_transcription is not None,
                auto_tool_reply_generation=False,
            )
        )

        is_azure = (
            api_version is not None or entra_token is not None or azure_deployment is not None
        )

        api_key = api_key or os.environ.get("OPENAI_API_KEY")
        if api_key is None and not is_azure:
            raise ValueError(
                "The api_key client option must be set either by passing api_key "
                "to the client or by setting the OPENAI_API_KEY environment variable"
            )

        if is_given(base_url):
            base_url_val = base_url
        else:
            if is_azure:
                azure_endpoint = os.getenv("AZURE_OPENAI_ENDPOINT")
                if azure_endpoint is None:
                    raise ValueError(
                        "Missing Azure endpoint. Please pass base_url "
                        "or set AZURE_OPENAI_ENDPOINT environment variable."
                    )
                base_url_val = f"{azure_endpoint.rstrip('/')}/openai"
            else:
                base_url_val = OPENAI_BASE_URL

        self._opts = _RealtimeOptions(
            model=model,
            voice=voice,
            temperature=temperature if is_given(temperature) else DEFAULT_TEMPERATURE,
            tool_choice=tool_choice or None,
            input_audio_transcription=input_audio_transcription
            if is_given(input_audio_transcription)
            else DEFAULT_INPUT_AUDIO_TRANSCRIPTION,
            turn_detection=turn_detection if is_given(turn_detection) else DEFAULT_TURN_DETECTION,
            api_key=api_key,
            base_url=base_url_val,
            is_azure=is_azure,
            azure_deployment=azure_deployment,
            entra_token=entra_token,
            api_version=api_version,
        )
        self._http_session = http_session
        self._sessions = weakref.WeakSet[RealtimeSession]()

    @classmethod
    def with_azure(
        cls,
        *,
        azure_deployment: str,
        azure_endpoint: str | None = None,
        api_version: str | None = None,
        api_key: str | None = None,
        entra_token: str | None = None,
        base_url: str | None = None,
        voice: str = "alloy",
        input_audio_transcription: NotGivenOr[InputAudioTranscription | None] = NOT_GIVEN,
        turn_detection: NotGivenOr[TurnDetection | None] = NOT_GIVEN,
        temperature: float = 0.8,
        http_session: aiohttp.ClientSession | None = None,
    ):
        """
        Create a RealtimeClient instance configured for Azure OpenAI Service.

        Args:
            azure_deployment (str): The name of your Azure OpenAI deployment.
            azure_endpoint (str or None, optional): The endpoint URL for your Azure OpenAI resource. If None, will attempt to read from the environment variable AZURE_OPENAI_ENDPOINT.
            api_version (str or None, optional): API version to use with Azure OpenAI Service. If None, will attempt to read from the environment variable OPENAI_API_VERSION.
            api_key (str or None, optional): Azure OpenAI API key. If None, will attempt to read from the environment variable AZURE_OPENAI_API_KEY.
            entra_token (str or None, optional): Azure Entra authentication token. Required if not using API key authentication.
            base_url (str or None, optional): Base URL for the API endpoint. If None, constructed from the azure_endpoint.
            voice (api_proto.Voice, optional): Voice setting for audio outputs. Defaults to "alloy".
            input_audio_transcription (InputTranscriptionOptions, optional): Options for transcribing input audio. Defaults to DEFAULT_INPUT_AUDIO_TRANSCRIPTION.
            turn_detection (ServerVadOptions, optional): Options for server-based voice activity detection (VAD). Defaults to DEFAULT_SERVER_VAD_OPTIONS.
            temperature (float, optional): Sampling temperature for response generation. Defaults to 0.8.
            max_response_output_tokens (int or Literal["inf"], optional): Maximum number of tokens in the response. Defaults to "inf".
            http_session (aiohttp.ClientSession or None, optional): Async HTTP session to use for requests. If None, a new session will be created.

        Returns:
            RealtimeClient: An instance of RealtimeClient configured for Azure OpenAI Service.

        Raises:
            ValueError: If required Azure parameters are missing or invalid.
        """  # noqa: E501
        api_key = api_key or os.getenv("AZURE_OPENAI_API_KEY")
        if api_key is None and entra_token is None:
            raise ValueError(
                "Missing credentials. Please pass one of `api_key`, `entra_token`, "
                "or the `AZURE_OPENAI_API_KEY` environment variable."
            )

        api_version = api_version or os.getenv("OPENAI_API_VERSION")
        if api_version is None:
            raise ValueError(
                "Must provide either the `api_version` argument or the "
                "`OPENAI_API_VERSION` environment variable"
            )

        if base_url is None:
            azure_endpoint = azure_endpoint or os.getenv("AZURE_OPENAI_ENDPOINT")
            if azure_endpoint is None:
                raise ValueError(
                    "Missing Azure endpoint. Please pass the `azure_endpoint` "
                    "parameter or set the `AZURE_OPENAI_ENDPOINT` environment variable."
                )

            base_url = f"{azure_endpoint.rstrip('/')}/openai"
        elif azure_endpoint is not None:
            raise ValueError("base_url and azure_endpoint are mutually exclusive")

        if not is_given(input_audio_transcription):
            input_audio_transcription = AZURE_DEFAULT_INPUT_AUDIO_TRANSCRIPTION

        if not is_given(turn_detection):
            turn_detection = AZURE_DEFAULT_TURN_DETECTION

        return cls(
            voice=voice,
            input_audio_transcription=input_audio_transcription,
            turn_detection=turn_detection,
            temperature=temperature,
            api_key=api_key,
            http_session=http_session,
            azure_deployment=azure_deployment,
            api_version=api_version,
            entra_token=entra_token,
            base_url=base_url,
        )

    def update_options(
        self,
        *,
        voice: NotGivenOr[str] = NOT_GIVEN,
        temperature: NotGivenOr[float] = NOT_GIVEN,
        turn_detection: NotGivenOr[TurnDetection | None] = NOT_GIVEN,
        tool_choice: NotGivenOr[llm.ToolChoice | None] = NOT_GIVEN,
    ) -> None:
        if is_given(voice):
            self._opts.voice = voice

        if is_given(temperature):
            self._opts.temperature = temperature

        if is_given(turn_detection):
            self._opts.turn_detection = turn_detection

        if is_given(tool_choice):
            self._opts.tool_choice = tool_choice

        for sess in self._sessions:
            sess.update_options(
                voice=voice,
                temperature=temperature,
                turn_detection=turn_detection,
                tool_choice=tool_choice,
            )

    def _ensure_http_session(self) -> aiohttp.ClientSession:
        if not self._http_session:
            self._http_session = utils.http_context.http_session()

        return self._http_session

    def session(self) -> RealtimeSession:
        sess = RealtimeSession(self)
        self._sessions.add(sess)
        return sess

    async def aclose(self) -> None: ...

Ancestors

  • livekit.agents.llm.realtime.RealtimeModel

Static methods

def with_azure(*,
azure_deployment: str,
azure_endpoint: str | None = None,
api_version: str | None = None,
api_key: str | None = None,
entra_token: str | None = None,
base_url: str | None = None,
voice: str = 'alloy',
input_audio_transcription: NotGivenOr[InputAudioTranscription | None] = NOT_GIVEN,
turn_detection: NotGivenOr[TurnDetection | None] = NOT_GIVEN,
temperature: float = 0.8,
http_session: aiohttp.ClientSession | None = None)

Create a RealtimeClient instance configured for Azure OpenAI Service.

Args

azure_deployment : str
The name of your Azure OpenAI deployment.
azure_endpoint : str or None, optional
The endpoint URL for your Azure OpenAI resource. If None, will attempt to read from the environment variable AZURE_OPENAI_ENDPOINT.
api_version : str or None, optional
API version to use with Azure OpenAI Service. If None, will attempt to read from the environment variable OPENAI_API_VERSION.
api_key : str or None, optional
Azure OpenAI API key. If None, will attempt to read from the environment variable AZURE_OPENAI_API_KEY.
entra_token : str or None, optional
Azure Entra authentication token. Required if not using API key authentication.
base_url : str or None, optional
Base URL for the API endpoint. If None, constructed from the azure_endpoint.
voice : api_proto.Voice, optional
Voice setting for audio outputs. Defaults to "alloy".
input_audio_transcription : InputTranscriptionOptions, optional
Options for transcribing input audio. Defaults to DEFAULT_INPUT_AUDIO_TRANSCRIPTION.
turn_detection : ServerVadOptions, optional
Options for server-based voice activity detection (VAD). Defaults to DEFAULT_SERVER_VAD_OPTIONS.
temperature : float, optional
Sampling temperature for response generation. Defaults to 0.8.
max_response_output_tokens (int or Literal["inf"], optional): Maximum number of tokens in the response. Defaults to "inf".
http_session : aiohttp.ClientSession or None, optional
Async HTTP session to use for requests. If None, a new session will be created.

Returns

RealtimeClient
An instance of RealtimeClient configured for Azure OpenAI Service.

Raises

ValueError
If required Azure parameters are missing or invalid.

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None: ...
def session(self) ‑> livekit.plugins.openai.realtime.realtime_model.RealtimeSession
Expand source code
def session(self) -> RealtimeSession:
    sess = RealtimeSession(self)
    self._sessions.add(sess)
    return sess
def update_options(self,
*,
voice: NotGivenOr[str] = NOT_GIVEN,
temperature: NotGivenOr[float] = NOT_GIVEN,
turn_detection: NotGivenOr[TurnDetection | None] = NOT_GIVEN,
tool_choice: NotGivenOr[llm.ToolChoice | None] = NOT_GIVEN) ‑> None
Expand source code
def update_options(
    self,
    *,
    voice: NotGivenOr[str] = NOT_GIVEN,
    temperature: NotGivenOr[float] = NOT_GIVEN,
    turn_detection: NotGivenOr[TurnDetection | None] = NOT_GIVEN,
    tool_choice: NotGivenOr[llm.ToolChoice | None] = NOT_GIVEN,
) -> None:
    if is_given(voice):
        self._opts.voice = voice

    if is_given(temperature):
        self._opts.temperature = temperature

    if is_given(turn_detection):
        self._opts.turn_detection = turn_detection

    if is_given(tool_choice):
        self._opts.tool_choice = tool_choice

    for sess in self._sessions:
        sess.update_options(
            voice=voice,
            temperature=temperature,
            turn_detection=turn_detection,
            tool_choice=tool_choice,
        )
class RealtimeSession (realtime_model: RealtimeModel)
Expand source code
class RealtimeSession(
    llm.RealtimeSession[Literal["openai_server_event_received", "openai_client_event_queued"]]
):
    """
    A session for the OpenAI Realtime API.

    This class is used to interact with the OpenAI Realtime API.
    It is responsible for sending events to the OpenAI Realtime API and receiving events from it.

    It exposes two more events:
    - openai_server_event_received: expose the raw server events from the OpenAI Realtime API
    - openai_client_event_queued: expose the raw client events sent to the OpenAI Realtime API
    """

    def __init__(self, realtime_model: RealtimeModel) -> None:
        super().__init__(realtime_model)
        self._realtime_model = realtime_model
        self._tools = llm.ToolContext.empty()
        self._msg_ch = utils.aio.Chan[Union[RealtimeClientEvent, dict]]()
        self._input_resampler: rtc.AudioResampler | None = None

        self._main_atask = asyncio.create_task(self._main_task(), name="RealtimeSession._main_task")
        self._initial_session_update()

        self._response_created_futures: dict[str, _CreateResponseHandle] = {}
        self._text_mode_recovery_atask: asyncio.Task | None = None
        self._text_mode_recovery_retries: int = 0

        self._item_delete_future: dict[str, asyncio.Future] = {}
        self._item_create_future: dict[str, asyncio.Future] = {}

        self._current_generation: _ResponseGeneration | None = None
        self._remote_chat_ctx = llm.remote_chat_context.RemoteChatContext()

        self._update_chat_ctx_lock = asyncio.Lock()
        self._update_fnc_ctx_lock = asyncio.Lock()

        # 100ms chunks
        self._bstream = utils.audio.AudioByteStream(
            SAMPLE_RATE, NUM_CHANNELS, samples_per_channel=SAMPLE_RATE // 10
        )
        self._pushed_duration_s = 0  # duration of audio pushed to the OpenAI Realtime API

    def send_event(self, event: RealtimeClientEvent | dict) -> None:
        with contextlib.suppress(utils.aio.channel.ChanClosed):
            self._msg_ch.send_nowait(event)

    @utils.log_exceptions(logger=logger)
    async def _main_task(self) -> None:
        headers = {"User-Agent": "LiveKit Agents"}
        if self._realtime_model._opts.is_azure:
            if self._realtime_model._opts.entra_token:
                headers["Authorization"] = f"Bearer {self._realtime_model._opts.entra_token}"

            if self._realtime_model._opts.api_key:
                headers["api-key"] = self._realtime_model._opts.api_key
        else:
            headers["Authorization"] = f"Bearer {self._realtime_model._opts.api_key}"
            headers["OpenAI-Beta"] = "realtime=v1"

        url = process_base_url(
            self._realtime_model._opts.base_url,
            self._realtime_model._opts.model,
            is_azure=self._realtime_model._opts.is_azure,
            api_version=self._realtime_model._opts.api_version,
            azure_deployment=self._realtime_model._opts.azure_deployment,
        )

        if lk_oai_debug:
            logger.debug(f"connecting to Realtime API: {url}")

        ws_conn = await self._realtime_model._ensure_http_session().ws_connect(
            url=url, headers=headers
        )

        closing = False

        @utils.log_exceptions(logger=logger)
        async def _send_task() -> None:
            nonlocal closing
            async for msg in self._msg_ch:
                try:
                    if isinstance(msg, BaseModel):
                        msg = msg.model_dump(
                            by_alias=True, exclude_unset=True, exclude_defaults=False
                        )

                    self.emit("openai_client_event_queued", msg)
                    await ws_conn.send_str(json.dumps(msg))

                    if lk_oai_debug:
                        msg_copy = msg.copy()
                        if msg_copy["type"] == "input_audio_buffer.append":
                            msg_copy = {**msg_copy, "audio": "..."}

                        logger.debug(f">>> {msg_copy}")
                except Exception:
                    break

            closing = True
            await ws_conn.close()

        @utils.log_exceptions(logger=logger)
        async def _recv_task() -> None:
            while True:
                msg = await ws_conn.receive()
                if msg.type == aiohttp.WSMsgType.CLOSED:
                    if not closing:
                        error = Exception("OpenAI S2S connection closed unexpectedly")
                        self.emit(
                            "error",
                            llm.RealtimeModelError(
                                timestamp=time.time(),
                                label=self._realtime_model._label,
                                error=APIConnectionError(
                                    message="OpenAI S2S connection closed unexpectedly",
                                ),
                                recoverable=False,
                            ),
                        )
                        raise error

                    return
                elif msg.type != aiohttp.WSMsgType.TEXT:
                    continue

                event = json.loads(msg.data)

                # emit the raw json dictionary instead of the BaseModel because different
                # providers can have different event types that are not part of the OpenAI Realtime API  # noqa: E501
                self.emit("openai_server_event_received", event)

                try:
                    if lk_oai_debug:
                        event_copy = event.copy()
                        if event_copy["type"] == "response.audio.delta":
                            event_copy = {**event_copy, "delta": "..."}

                        logger.debug(f"<<< {event_copy}")

                    if event["type"] == "input_audio_buffer.speech_started":
                        self._handle_input_audio_buffer_speech_started(
                            InputAudioBufferSpeechStartedEvent.construct(**event)
                        )
                    elif event["type"] == "input_audio_buffer.speech_stopped":
                        self._handle_input_audio_buffer_speech_stopped(
                            InputAudioBufferSpeechStoppedEvent.construct(**event)
                        )
                    elif event["type"] == "response.created":
                        self._handle_response_created(ResponseCreatedEvent.construct(**event))
                    elif event["type"] == "response.output_item.added":
                        self._handle_response_output_item_added(
                            ResponseOutputItemAddedEvent.construct(**event)
                        )
                    elif event["type"] == "conversation.item.created":
                        self._handle_conversion_item_created(
                            ConversationItemCreatedEvent.construct(**event)
                        )
                    elif event["type"] == "conversation.item.deleted":
                        self._handle_conversion_item_deleted(
                            ConversationItemDeletedEvent.construct(**event)
                        )
                    elif event["type"] == "conversation.item.input_audio_transcription.completed":
                        self._handle_conversion_item_input_audio_transcription_completed(
                            ConversationItemInputAudioTranscriptionCompletedEvent.construct(**event)
                        )
                    elif event["type"] == "conversation.item.input_audio_transcription.failed":
                        self._handle_conversion_item_input_audio_transcription_failed(
                            ConversationItemInputAudioTranscriptionFailedEvent.construct(**event)
                        )
                    elif event["type"] == "response.content_part.added":
                        self._handle_response_content_part_added(
                            ResponseContentPartAddedEvent.construct(**event)
                        )
                    elif event["type"] == "response.content_part.done":
                        self._handle_response_content_part_done(
                            ResponseContentPartDoneEvent.construct(**event)
                        )
                    elif event["type"] == "response.audio_transcript.delta":
                        self._handle_response_audio_transcript_delta(event)
                    elif event["type"] == "response.audio.delta":
                        self._handle_response_audio_delta(
                            ResponseAudioDeltaEvent.construct(**event)
                        )
                    elif event["type"] == "response.audio_transcript.done":
                        self._handle_response_audio_transcript_done(
                            ResponseAudioTranscriptDoneEvent.construct(**event)
                        )
                    elif event["type"] == "response.audio.done":
                        self._handle_response_audio_done(ResponseAudioDoneEvent.construct(**event))
                    elif event["type"] == "response.output_item.done":
                        self._handle_response_output_item_done(
                            ResponseOutputItemDoneEvent.construct(**event)
                        )
                    elif event["type"] == "response.done":
                        self._handle_response_done(ResponseDoneEvent.construct(**event))
                    elif event["type"] == "error":
                        self._handle_error(ErrorEvent.construct(**event))
                except Exception:
                    if event["type"] == "response.audio.delta":
                        event["delta"] = event["delta"][:10] + "..."
                    logger.exception("failed to handle event", extra={"event": event})

        tasks = [
            asyncio.create_task(_recv_task(), name="_recv_task"),
            asyncio.create_task(_send_task(), name="_send_task"),
        ]
        try:
            await asyncio.gather(*tasks)
        finally:
            await utils.aio.cancel_and_wait(*tasks)
            await ws_conn.close()

    def _initial_session_update(self) -> None:
        input_audio_transcription = self._realtime_model._opts.input_audio_transcription
        input_audio_transcription = (
            session_update_event.SessionInputAudioTranscription.model_validate(
                input_audio_transcription.model_dump(
                    by_alias=True,
                    exclude_unset=True,
                    exclude_defaults=True,
                )
            )
            if input_audio_transcription
            else None
        )

        turn_detection = self._realtime_model._opts.turn_detection
        turn_detection = (
            session_update_event.SessionTurnDetection.model_validate(
                turn_detection.model_dump(
                    by_alias=True,
                    exclude_unset=True,
                    exclude_defaults=True,
                )
            )
            if turn_detection
            else None
        )

        # initial session update
        self.send_event(
            SessionUpdateEvent(
                type="session.update",
                # Using model_construct since OpenAI restricts voices to those defined in the BaseModel.  # noqa: E501
                # Other providers support different voices, so we need to accommodate that.
                session=session_update_event.Session.model_construct(
                    model=self._realtime_model._opts.model,
                    voice=self._realtime_model._opts.voice,
                    input_audio_format="pcm16",
                    output_audio_format="pcm16",
                    modalities=["text", "audio"],
                    turn_detection=turn_detection,
                    input_audio_transcription=input_audio_transcription,
                    temperature=self._realtime_model._opts.temperature,
                ),
                event_id=utils.shortuuid("session_update_"),
            )
        )

    @property
    def chat_ctx(self) -> llm.ChatContext:
        return self._remote_chat_ctx.to_chat_ctx()

    @property
    def tools(self) -> llm.ToolContext:
        return self._tools.copy()

    def update_options(
        self,
        *,
        tool_choice: NotGivenOr[llm.ToolChoice | None] = NOT_GIVEN,
        voice: NotGivenOr[str] = NOT_GIVEN,
        temperature: NotGivenOr[float] = NOT_GIVEN,
        turn_detection: NotGivenOr[TurnDetection | None] = NOT_GIVEN,
    ) -> None:
        kwargs = {}

        if is_given(tool_choice):
            oai_tool_choice = tool_choice
            if isinstance(tool_choice, dict) and tool_choice["type"] == "function":
                oai_tool_choice = tool_choice["function"]
            if oai_tool_choice is None:
                oai_tool_choice = DEFAULT_TOOL_CHOICE

            kwargs["tool_choice"] = oai_tool_choice

        if is_given(voice):
            kwargs["voice"] = voice

        if is_given(temperature):
            kwargs["temperature"] = temperature

        if is_given(turn_detection):
            kwargs["turn_detection"] = turn_detection

        if kwargs:
            self.send_event(
                SessionUpdateEvent(
                    type="session.update",
                    session=session_update_event.Session.model_construct(**kwargs),
                    event_id=utils.shortuuid("options_update_"),
                )
            )

    async def update_chat_ctx(
        self, chat_ctx: llm.ChatContext, *, _add_mock_audio: bool = False
    ) -> None:
        chat_ctx = chat_ctx.copy()
        if _add_mock_audio:
            chat_ctx.items.append(_create_mock_audio_item())
        else:
            # clean up existing mock audio items
            chat_ctx.items[:] = [
                item for item in chat_ctx.items if not item.id.startswith(_MOCK_AUDIO_ID_PREFIX)
            ]

        async with self._update_chat_ctx_lock:
            diff_ops = llm.utils.compute_chat_ctx_diff(
                self._remote_chat_ctx.to_chat_ctx(), chat_ctx
            )

            futs = []

            for msg_id in diff_ops.to_remove:
                event_id = utils.shortuuid("chat_ctx_delete_")
                self.send_event(
                    ConversationItemDeleteEvent(
                        type="conversation.item.delete",
                        item_id=msg_id,
                        event_id=event_id,
                    )
                )
                futs.append(f := asyncio.Future())
                self._item_delete_future[msg_id] = f

            for previous_msg_id, msg_id in diff_ops.to_create:
                event_id = utils.shortuuid("chat_ctx_create_")
                chat_item = chat_ctx.get_by_id(msg_id)
                assert chat_item is not None

                self.send_event(
                    ConversationItemCreateEvent(
                        type="conversation.item.create",
                        item=_livekit_item_to_openai_item(chat_item),
                        previous_item_id=("root" if previous_msg_id is None else previous_msg_id),
                        event_id=event_id,
                    )
                )
                futs.append(f := asyncio.Future())
                self._item_create_future[msg_id] = f

            try:
                await asyncio.wait_for(asyncio.gather(*futs, return_exceptions=True), timeout=5.0)
            except asyncio.TimeoutError:
                raise llm.RealtimeError("update_chat_ctx timed out.") from None

    async def update_tools(self, tools: list[llm.FunctionTool | llm.RawFunctionTool]) -> None:
        async with self._update_fnc_ctx_lock:
            oai_tools: list[session_update_event.SessionTool] = []
            retained_tools: list[llm.FunctionTool | llm.RawFunctionTool] = []

            for tool in tools:
                if is_function_tool(tool):
                    tool_desc = llm.utils.build_legacy_openai_schema(tool, internally_tagged=True)
                elif is_raw_function_tool(tool):
                    tool_info = get_raw_function_info(tool)
                    tool_desc = tool_info.raw_schema
                    tool_desc["type"] = "function"  # internally tagged
                else:
                    logger.error(
                        "OpenAI Realtime API doesn't support this tool type", extra={"tool": tool}
                    )
                    continue

                try:
                    session_tool = session_update_event.SessionTool.model_validate(tool_desc)
                    oai_tools.append(session_tool)
                    retained_tools.append(tool)
                except ValidationError:
                    logger.error(
                        "OpenAI Realtime API doesn't support this tool",
                        extra={"tool": tool_desc},
                    )
                    continue

            event_id = utils.shortuuid("tools_update_")
            # f = asyncio.Future()
            # self._response_futures[event_id] = f
            self.send_event(
                SessionUpdateEvent(
                    type="session.update",
                    session=session_update_event.Session.model_construct(
                        model=self._realtime_model._opts.model,
                        tools=oai_tools,
                    ),
                    event_id=event_id,
                )
            )

            self._tools = llm.ToolContext(retained_tools)

    async def update_instructions(self, instructions: str) -> None:
        event_id = utils.shortuuid("instructions_update_")
        # f = asyncio.Future()
        # self._response_futures[event_id] = f
        self.send_event(
            SessionUpdateEvent(
                type="session.update",
                session=session_update_event.Session.model_construct(instructions=instructions),
                event_id=event_id,
            )
        )

    def push_audio(self, frame: rtc.AudioFrame) -> None:
        for f in self._resample_audio(frame):
            data = f.data.tobytes()
            for nf in self._bstream.write(data):
                self.send_event(
                    InputAudioBufferAppendEvent(
                        type="input_audio_buffer.append",
                        audio=base64.b64encode(nf.data).decode("utf-8"),
                    )
                )
                self._pushed_duration_s += nf.duration

    def push_video(self, frame: rtc.VideoFrame) -> None:
        pass

    def commit_audio(self) -> None:
        if self._pushed_duration_s > 0.1:  # OpenAI requires at least 100ms of audio
            self.send_event(InputAudioBufferCommitEvent(type="input_audio_buffer.commit"))
            self._pushed_duration_s = 0

    def clear_audio(self) -> None:
        self.send_event(InputAudioBufferClearEvent(type="input_audio_buffer.clear"))
        self._pushed_duration_s = 0

    def generate_reply(
        self, *, instructions: NotGivenOr[str] = NOT_GIVEN
    ) -> asyncio.Future[llm.GenerationCreatedEvent]:
        handle = self._create_response(instructions=instructions, user_initiated=True)
        self._text_mode_recovery_retries = 0  # reset the counter
        return handle.done_fut

    def interrupt(self) -> None:
        self.send_event(ResponseCancelEvent(type="response.cancel"))

    def truncate(self, *, message_id: str, audio_end_ms: int) -> None:
        self.send_event(
            ConversationItemTruncateEvent(
                type="conversation.item.truncate",
                content_index=0,
                item_id=message_id,
                audio_end_ms=audio_end_ms,
            )
        )

    async def aclose(self) -> None:
        self._msg_ch.close()
        await self._main_atask

    def _resample_audio(self, frame: rtc.AudioFrame) -> Iterator[rtc.AudioFrame]:
        if self._input_resampler:
            if frame.sample_rate != self._input_resampler._input_rate:
                # input audio changed to a different sample rate
                self._input_resampler = None

        if self._input_resampler is None and (
            frame.sample_rate != SAMPLE_RATE or frame.num_channels != NUM_CHANNELS
        ):
            self._input_resampler = rtc.AudioResampler(
                input_rate=frame.sample_rate,
                output_rate=SAMPLE_RATE,
                num_channels=NUM_CHANNELS,
            )

        if self._input_resampler:
            # TODO(long): flush the resampler when the input source is changed
            yield from self._input_resampler.push(frame)
        else:
            yield frame

    def _create_response(
        self,
        *,
        user_initiated: bool,
        instructions: NotGivenOr[str] = NOT_GIVEN,
        old_handle: _CreateResponseHandle | None = None,
    ) -> _CreateResponseHandle:
        handle = old_handle or _CreateResponseHandle(
            instructions=instructions, done_fut=asyncio.Future()
        )
        if old_handle and utils.is_given(instructions):
            handle.instructions = instructions

        event_id = utils.shortuuid("response_create_")
        if user_initiated:
            self._response_created_futures[event_id] = handle

        self.send_event(
            ResponseCreateEvent(
                type="response.create",
                event_id=event_id,
                response=Response(
                    instructions=handle.instructions or None,
                    metadata={"client_event_id": event_id} if user_initiated else None,
                ),
            )
        )
        if user_initiated:
            handle.timeout_start()
        return handle

    def _emit_generation_event(self, response_id: str) -> None:
        # called when the generation is a function call or a audio message
        generation_ev = llm.GenerationCreatedEvent(
            message_stream=self._current_generation.message_ch,
            function_stream=self._current_generation.function_ch,
            user_initiated=False,
        )

        if handle := self._response_created_futures.pop(response_id, None):
            generation_ev.user_initiated = True
            try:
                handle.done_fut.set_result(generation_ev)
            except asyncio.InvalidStateError:
                # in case the generation comes after the reply timeout
                logger.warning(
                    "response received after timeout", extra={"response_id": response_id}
                )

        self.emit("generation_created", generation_ev)

    def _handle_input_audio_buffer_speech_started(
        self, _: InputAudioBufferSpeechStartedEvent
    ) -> None:
        self.emit("input_speech_started", llm.InputSpeechStartedEvent())

    def _handle_input_audio_buffer_speech_stopped(
        self, _: InputAudioBufferSpeechStoppedEvent
    ) -> None:
        user_transcription_enabled = (
            self._realtime_model._opts.input_audio_transcription is not None
        )
        self.emit(
            "input_speech_stopped",
            llm.InputSpeechStoppedEvent(user_transcription_enabled=user_transcription_enabled),
        )

    def _handle_response_created(self, event: ResponseCreatedEvent) -> None:
        assert event.response.id is not None, "response.id is None"

        self._current_generation = _ResponseGeneration(
            message_ch=utils.aio.Chan(),
            function_ch=utils.aio.Chan(),
            messages={},
            _created_timestamp=time.time(),
        )

        if (
            isinstance(event.response.metadata, dict)
            and (client_event_id := event.response.metadata.get("client_event_id"))
            and (handle := self._response_created_futures.pop(client_event_id, None))
        ):
            # set key to the response id
            self._response_created_futures[event.response.id] = handle

        # the generation_created event is emitted when
        # 1. the response is not a message on response.output_item.added event
        # 2. the content is audio on response.content_part.added event
        # will try to recover from text response on response.content_part.done event

    def _handle_response_output_item_added(self, event: ResponseOutputItemAddedEvent) -> None:
        assert self._current_generation is not None, "current_generation is None"
        assert (item_type := event.item.type) is not None, "item.type is None"
        assert (response_id := event.response_id) is not None, "response_id is None"

        if item_type != "message":
            # emit immediately if it's not a message, otherwise wait response.content_part.added
            self._emit_generation_event(response_id)
            self._text_mode_recovery_retries = 0

    def _handle_response_content_part_added(self, event: ResponseContentPartAddedEvent) -> None:
        assert self._current_generation is not None, "current_generation is None"
        assert (item_id := event.item_id) is not None, "item_id is None"
        assert (item_type := event.part.type) is not None, "part.type is None"
        assert (response_id := event.response_id) is not None, "response_id is None"

        if item_type == "audio":
            self._emit_generation_event(response_id)
            if self._text_mode_recovery_retries > 0:
                logger.info(
                    "recovered from text-only response",
                    extra={"retried_times": self._text_mode_recovery_retries},
                )
                self._text_mode_recovery_retries = 0

            item_generation = _MessageGeneration(
                message_id=item_id,
                text_ch=utils.aio.Chan(),
                audio_ch=utils.aio.Chan(),
            )
            self._current_generation.message_ch.send_nowait(
                llm.MessageGeneration(
                    message_id=item_id,
                    text_stream=item_generation.text_ch,
                    audio_stream=item_generation.audio_ch,
                )
            )
            self._current_generation.messages[item_id] = item_generation
            self._current_generation._first_token_timestamp = time.time()
        else:
            self.interrupt()
            if self._text_mode_recovery_retries == 0:
                logger.warning("received text-only response from realtime API")

    def _handle_response_content_part_done(self, event: ResponseContentPartDoneEvent) -> None:
        if event.part.type != "text":
            return

        # try to recover from text-only response on response.content_part_done event
        assert self._current_generation is not None, "current_generation is None"
        assert (item_id := event.item_id) is not None, "item_id is None"
        assert (response_id := event.response_id) is not None, "response_id is None"

        async def _retry_generation(
            item_id: str, response_handle: _CreateResponseHandle | None
        ) -> None:
            """Recover from text-only response to audio mode.

            When chat history is loaded, OpenAI Realtime API may respond with text only.
            This method recovers by:
            1. Deleting the text response
            2. Creating an empty user audio message
            3. Requesting a new response to trigger audio mode
            """

            # remove the text item
            chat_ctx = self.chat_ctx
            idx = chat_ctx.index_by_id(item_id)
            if idx is not None:
                chat_ctx.items.pop(idx)
            await self.update_chat_ctx(chat_ctx, _add_mock_audio=True)

            if response_handle and response_handle.done_fut.done():
                if response_handle.done_fut.exception() is not None:
                    logger.error("generate_reply timed out, cancel recovery")
                return

            self._create_response(
                old_handle=response_handle,
                user_initiated=response_handle is not None,
            )

        if self._text_mode_recovery_retries >= 5:
            logger.error(
                "failed to recover from text-only response",
                extra={"retried_times": self._text_mode_recovery_retries},
            )
            self._text_mode_recovery_retries = 0
            return

        handle = self._response_created_futures.pop(response_id, None)
        if handle and handle.done_fut.done():
            if handle.done_fut.exception() is not None:
                logger.error("generate_reply timed out, cancel recovery")
            self._text_mode_recovery_retries = 0
            return

        self._text_mode_recovery_retries += 1
        logger.warning(
            "trying to recover from text-only response",
            extra={"retries": self._text_mode_recovery_retries},
        )

        if self._text_mode_recovery_atask and not self._text_mode_recovery_atask.done():
            self._text_mode_recovery_atask.cancel()
        self._text_mode_recovery_atask = asyncio.create_task(
            _retry_generation(item_id=item_id, response_handle=handle)
        )

    def _handle_conversion_item_created(self, event: ConversationItemCreatedEvent) -> None:
        assert event.item.id is not None, "item.id is None"

        try:
            self._remote_chat_ctx.insert(
                event.previous_item_id, _openai_item_to_livekit_item(event.item)
            )
        except ValueError as e:
            logger.warning(
                f"failed to insert item `{event.item.id}`: {str(e)}",
            )

        if fut := self._item_create_future.pop(event.item.id, None):
            fut.set_result(None)

    def _handle_conversion_item_deleted(self, event: ConversationItemDeletedEvent) -> None:
        assert event.item_id is not None, "item_id is None"

        try:
            self._remote_chat_ctx.delete(event.item_id)
        except ValueError as e:
            logger.warning(
                f"failed to delete item `{event.item_id}`: {str(e)}",
            )

        if fut := self._item_delete_future.pop(event.item_id, None):
            fut.set_result(None)

    def _handle_conversion_item_input_audio_transcription_completed(
        self, event: ConversationItemInputAudioTranscriptionCompletedEvent
    ) -> None:
        if remote_item := self._remote_chat_ctx.get(event.item_id):
            assert isinstance(remote_item.item, llm.ChatMessage)
            remote_item.item.content.append(event.transcript)

        self.emit(
            "input_audio_transcription_completed",
            llm.InputTranscriptionCompleted(
                item_id=event.item_id,
                transcript=event.transcript,
                is_final=True,
            ),
        )

    def _handle_conversion_item_input_audio_transcription_failed(
        self, event: ConversationItemInputAudioTranscriptionFailedEvent
    ) -> None:
        logger.error(
            "OpenAI Realtime API failed to transcribe input audio",
            extra={"error": event.error},
        )

    def _handle_response_audio_transcript_delta(self, event: dict) -> None:
        assert self._current_generation is not None, "current_generation is None"

        item_id = event["item_id"]
        delta = event["delta"]

        if (start_time := event.get("start_time")) is not None:
            delta = io.TimedString(delta, start_time=start_time)

        item_generation = self._current_generation.messages[item_id]
        item_generation.text_ch.send_nowait(delta)

    def _handle_response_audio_delta(self, event: ResponseAudioDeltaEvent) -> None:
        assert self._current_generation is not None, "current_generation is None"
        item_generation = self._current_generation.messages[event.item_id]

        data = base64.b64decode(event.delta)
        item_generation.audio_ch.send_nowait(
            rtc.AudioFrame(
                data=data,
                sample_rate=SAMPLE_RATE,
                num_channels=NUM_CHANNELS,
                samples_per_channel=len(data) // 2,
            )
        )

    def _handle_response_audio_transcript_done(self, _: ResponseAudioTranscriptDoneEvent) -> None:
        assert self._current_generation is not None, "current_generation is None"

    def _handle_response_audio_done(self, _: ResponseAudioDoneEvent) -> None:
        assert self._current_generation is not None, "current_generation is None"

    def _handle_response_output_item_done(self, event: ResponseOutputItemDoneEvent) -> None:
        assert self._current_generation is not None, "current_generation is None"
        assert (item_id := event.item.id) is not None, "item.id is None"
        assert (item_type := event.item.type) is not None, "item.type is None"

        if item_type == "function_call":
            item = event.item
            assert item.call_id is not None, "call_id is None"
            assert item.name is not None, "name is None"
            assert item.arguments is not None, "arguments is None"

            self._current_generation.function_ch.send_nowait(
                llm.FunctionCall(
                    call_id=item.call_id,
                    name=item.name,
                    arguments=item.arguments,
                )
            )
        elif item_type == "message" and (
            item_generation := self._current_generation.messages.get(item_id)
        ):
            # text response doesn't have item_generation
            item_generation.text_ch.close()
            item_generation.audio_ch.close()

    def _handle_response_done(self, event: ResponseDoneEvent) -> None:
        if self._current_generation is None:
            return  # OpenAI has a race condition where we could receive response.done without any previous response.created (This happens generally during interruption)  # noqa: E501

        assert self._current_generation is not None, "current_generation is None"

        created_timestamp = self._current_generation._created_timestamp
        first_token_timestamp = self._current_generation._first_token_timestamp

        for generation in self._current_generation.messages.values():
            # close all messages that haven't been closed yet
            if not generation.text_ch.closed:
                generation.text_ch.close()
            if not generation.audio_ch.closed:
                generation.audio_ch.close()

        self._current_generation.function_ch.close()
        self._current_generation.message_ch.close()
        self._current_generation = None

        # calculate metrics
        usage = (
            event.response.usage.model_dump(exclude_defaults=True) if event.response.usage else {}
        )
        ttft = first_token_timestamp - created_timestamp if first_token_timestamp else -1
        duration = time.time() - created_timestamp
        metrics = RealtimeModelMetrics(
            timestamp=created_timestamp,
            request_id=event.response.id,
            ttft=ttft,
            duration=duration,
            cancelled=event.response.status == "cancelled",
            label=self._realtime_model._label,
            input_tokens=usage.get("input_tokens", 0),
            output_tokens=usage.get("output_tokens", 0),
            total_tokens=usage.get("total_tokens", 0),
            tokens_per_second=usage.get("output_tokens", 0) / duration,
            input_token_details=RealtimeModelMetrics.InputTokenDetails(
                audio_tokens=usage.get("input_token_details", {}).get("audio_tokens", 0),
                cached_tokens=usage.get("input_token_details", {}).get("cached_tokens", 0),
                text_tokens=usage.get("input_token_details", {}).get("text_tokens", 0),
                cached_tokens_details=None,
                image_tokens=0,
            ),
            output_token_details=RealtimeModelMetrics.OutputTokenDetails(
                text_tokens=usage.get("output_token_details", {}).get("text_tokens", 0),
                audio_tokens=usage.get("output_token_details", {}).get("audio_tokens", 0),
                image_tokens=0,
            ),
        )
        self.emit("metrics_collected", metrics)

    def _handle_error(self, event: ErrorEvent) -> None:
        if event.error.message.startswith("Cancellation failed"):
            return

        logger.error(
            "OpenAI Realtime API returned an error",
            extra={"error": event.error},
        )
        self.emit(
            "error",
            llm.RealtimeModelError(
                timestamp=time.time(),
                label=self._realtime_model._label,
                error=APIError(
                    message="OpenAI Realtime API returned an error",
                    body=event.error,
                    retryable=True,
                ),
                recoverable=True,
            ),
        )

        # if event.error.event_id:
        #     fut = self._response_futures.pop(event.error.event_id, None)
        #     if fut is not None and not fut.done():
        #         fut.set_exception(multimodal.RealtimeError(event.error.message))

A session for the OpenAI Realtime API.

This class is used to interact with the OpenAI Realtime API. It is responsible for sending events to the OpenAI Realtime API and receiving events from it.

It exposes two more events: - openai_server_event_received: expose the raw server events from the OpenAI Realtime API - openai_client_event_queued: expose the raw client events sent to the OpenAI Realtime API

Ancestors

  • livekit.agents.llm.realtime.RealtimeSession
  • abc.ABC
  • EventEmitter
  • typing.Generic

Instance variables

prop chat_ctx : llm.ChatContext
Expand source code
@property
def chat_ctx(self) -> llm.ChatContext:
    return self._remote_chat_ctx.to_chat_ctx()
prop tools : llm.ToolContext
Expand source code
@property
def tools(self) -> llm.ToolContext:
    return self._tools.copy()

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    self._msg_ch.close()
    await self._main_atask
def clear_audio(self) ‑> None
Expand source code
def clear_audio(self) -> None:
    self.send_event(InputAudioBufferClearEvent(type="input_audio_buffer.clear"))
    self._pushed_duration_s = 0
def commit_audio(self) ‑> None
Expand source code
def commit_audio(self) -> None:
    if self._pushed_duration_s > 0.1:  # OpenAI requires at least 100ms of audio
        self.send_event(InputAudioBufferCommitEvent(type="input_audio_buffer.commit"))
        self._pushed_duration_s = 0
def generate_reply(self, *, instructions: NotGivenOr[str] = NOT_GIVEN) ‑> _asyncio.Future[livekit.agents.llm.realtime.GenerationCreatedEvent]
Expand source code
def generate_reply(
    self, *, instructions: NotGivenOr[str] = NOT_GIVEN
) -> asyncio.Future[llm.GenerationCreatedEvent]:
    handle = self._create_response(instructions=instructions, user_initiated=True)
    self._text_mode_recovery_retries = 0  # reset the counter
    return handle.done_fut
def interrupt(self) ‑> None
Expand source code
def interrupt(self) -> None:
    self.send_event(ResponseCancelEvent(type="response.cancel"))
def push_audio(self, frame: rtc.AudioFrame) ‑> None
Expand source code
def push_audio(self, frame: rtc.AudioFrame) -> None:
    for f in self._resample_audio(frame):
        data = f.data.tobytes()
        for nf in self._bstream.write(data):
            self.send_event(
                InputAudioBufferAppendEvent(
                    type="input_audio_buffer.append",
                    audio=base64.b64encode(nf.data).decode("utf-8"),
                )
            )
            self._pushed_duration_s += nf.duration
def push_video(self, frame: rtc.VideoFrame) ‑> None
Expand source code
def push_video(self, frame: rtc.VideoFrame) -> None:
    pass
def send_event(self, event: RealtimeClientEvent | dict) ‑> None
Expand source code
def send_event(self, event: RealtimeClientEvent | dict) -> None:
    with contextlib.suppress(utils.aio.channel.ChanClosed):
        self._msg_ch.send_nowait(event)
def truncate(self, *, message_id: str, audio_end_ms: int) ‑> None
Expand source code
def truncate(self, *, message_id: str, audio_end_ms: int) -> None:
    self.send_event(
        ConversationItemTruncateEvent(
            type="conversation.item.truncate",
            content_index=0,
            item_id=message_id,
            audio_end_ms=audio_end_ms,
        )
    )
async def update_chat_ctx(self, chat_ctx: llm.ChatContext) ‑> None
Expand source code
async def update_chat_ctx(
    self, chat_ctx: llm.ChatContext, *, _add_mock_audio: bool = False
) -> None:
    chat_ctx = chat_ctx.copy()
    if _add_mock_audio:
        chat_ctx.items.append(_create_mock_audio_item())
    else:
        # clean up existing mock audio items
        chat_ctx.items[:] = [
            item for item in chat_ctx.items if not item.id.startswith(_MOCK_AUDIO_ID_PREFIX)
        ]

    async with self._update_chat_ctx_lock:
        diff_ops = llm.utils.compute_chat_ctx_diff(
            self._remote_chat_ctx.to_chat_ctx(), chat_ctx
        )

        futs = []

        for msg_id in diff_ops.to_remove:
            event_id = utils.shortuuid("chat_ctx_delete_")
            self.send_event(
                ConversationItemDeleteEvent(
                    type="conversation.item.delete",
                    item_id=msg_id,
                    event_id=event_id,
                )
            )
            futs.append(f := asyncio.Future())
            self._item_delete_future[msg_id] = f

        for previous_msg_id, msg_id in diff_ops.to_create:
            event_id = utils.shortuuid("chat_ctx_create_")
            chat_item = chat_ctx.get_by_id(msg_id)
            assert chat_item is not None

            self.send_event(
                ConversationItemCreateEvent(
                    type="conversation.item.create",
                    item=_livekit_item_to_openai_item(chat_item),
                    previous_item_id=("root" if previous_msg_id is None else previous_msg_id),
                    event_id=event_id,
                )
            )
            futs.append(f := asyncio.Future())
            self._item_create_future[msg_id] = f

        try:
            await asyncio.wait_for(asyncio.gather(*futs, return_exceptions=True), timeout=5.0)
        except asyncio.TimeoutError:
            raise llm.RealtimeError("update_chat_ctx timed out.") from None
async def update_instructions(self, instructions: str) ‑> None
Expand source code
async def update_instructions(self, instructions: str) -> None:
    event_id = utils.shortuuid("instructions_update_")
    # f = asyncio.Future()
    # self._response_futures[event_id] = f
    self.send_event(
        SessionUpdateEvent(
            type="session.update",
            session=session_update_event.Session.model_construct(instructions=instructions),
            event_id=event_id,
        )
    )
def update_options(self,
*,
tool_choice: NotGivenOr[llm.ToolChoice | None] = NOT_GIVEN,
voice: NotGivenOr[str] = NOT_GIVEN,
temperature: NotGivenOr[float] = NOT_GIVEN,
turn_detection: NotGivenOr[TurnDetection | None] = NOT_GIVEN) ‑> None
Expand source code
def update_options(
    self,
    *,
    tool_choice: NotGivenOr[llm.ToolChoice | None] = NOT_GIVEN,
    voice: NotGivenOr[str] = NOT_GIVEN,
    temperature: NotGivenOr[float] = NOT_GIVEN,
    turn_detection: NotGivenOr[TurnDetection | None] = NOT_GIVEN,
) -> None:
    kwargs = {}

    if is_given(tool_choice):
        oai_tool_choice = tool_choice
        if isinstance(tool_choice, dict) and tool_choice["type"] == "function":
            oai_tool_choice = tool_choice["function"]
        if oai_tool_choice is None:
            oai_tool_choice = DEFAULT_TOOL_CHOICE

        kwargs["tool_choice"] = oai_tool_choice

    if is_given(voice):
        kwargs["voice"] = voice

    if is_given(temperature):
        kwargs["temperature"] = temperature

    if is_given(turn_detection):
        kwargs["turn_detection"] = turn_detection

    if kwargs:
        self.send_event(
            SessionUpdateEvent(
                type="session.update",
                session=session_update_event.Session.model_construct(**kwargs),
                event_id=utils.shortuuid("options_update_"),
            )
        )
async def update_tools(self, tools: list[llm.FunctionTool | llm.RawFunctionTool]) ‑> None
Expand source code
async def update_tools(self, tools: list[llm.FunctionTool | llm.RawFunctionTool]) -> None:
    async with self._update_fnc_ctx_lock:
        oai_tools: list[session_update_event.SessionTool] = []
        retained_tools: list[llm.FunctionTool | llm.RawFunctionTool] = []

        for tool in tools:
            if is_function_tool(tool):
                tool_desc = llm.utils.build_legacy_openai_schema(tool, internally_tagged=True)
            elif is_raw_function_tool(tool):
                tool_info = get_raw_function_info(tool)
                tool_desc = tool_info.raw_schema
                tool_desc["type"] = "function"  # internally tagged
            else:
                logger.error(
                    "OpenAI Realtime API doesn't support this tool type", extra={"tool": tool}
                )
                continue

            try:
                session_tool = session_update_event.SessionTool.model_validate(tool_desc)
                oai_tools.append(session_tool)
                retained_tools.append(tool)
            except ValidationError:
                logger.error(
                    "OpenAI Realtime API doesn't support this tool",
                    extra={"tool": tool_desc},
                )
                continue

        event_id = utils.shortuuid("tools_update_")
        # f = asyncio.Future()
        # self._response_futures[event_id] = f
        self.send_event(
            SessionUpdateEvent(
                type="session.update",
                session=session_update_event.Session.model_construct(
                    model=self._realtime_model._opts.model,
                    tools=oai_tools,
                ),
                event_id=event_id,
            )
        )

        self._tools = llm.ToolContext(retained_tools)

Inherited members