Module livekit.plugins.openai.realtime

Sub-modules

livekit.plugins.openai.realtime.api_proto
livekit.plugins.openai.realtime.log
livekit.plugins.openai.realtime.realtime_model
livekit.plugins.openai.realtime.remote_items

Classes

class InputTranscriptionCompleted (item_id: str, transcript: str)
Expand source code
@dataclass
class InputTranscriptionCompleted:
    item_id: str
    """id of the item"""
    transcript: str
    """transcript of the input audio"""

InputTranscriptionCompleted(item_id: 'str', transcript: 'str')

Class variables

var item_id : str

id of the item

var transcript : str

transcript of the input audio

class InputTranscriptionFailed (item_id: str, message: str)
Expand source code
@dataclass
class InputTranscriptionFailed:
    item_id: str
    """id of the item"""
    message: str
    """error message"""

InputTranscriptionFailed(item_id: 'str', message: 'str')

Class variables

var item_id : str

id of the item

var message : str

error message

class InputTranscriptionOptions (model: api_proto.InputTranscriptionModel | str)
Expand source code
@dataclass
class InputTranscriptionOptions:
    model: api_proto.InputTranscriptionModel | str

InputTranscriptionOptions(model: 'api_proto.InputTranscriptionModel | str')

Class variables

var model : Literal['whisper-1'] | str
class RealtimeContent (response_id: str,
item_id: str,
output_index: int,
content_index: int,
text: str,
audio: list[rtc.AudioFrame],
text_stream: AsyncIterable[str],
audio_stream: AsyncIterable[rtc.AudioFrame],
tool_calls: list[RealtimeToolCall],
content_type: api_proto.Modality)
Expand source code
@dataclass
class RealtimeContent:
    response_id: str
    """id of the response"""
    item_id: str
    """id of the item"""
    output_index: int
    """index of the output"""
    content_index: int
    """index of the content"""
    text: str
    """accumulated text content"""
    audio: list[rtc.AudioFrame]
    """accumulated audio content"""
    text_stream: AsyncIterable[str]
    """stream of text content"""
    audio_stream: AsyncIterable[rtc.AudioFrame]
    """stream of audio content"""
    tool_calls: list[RealtimeToolCall]
    """pending tool calls"""
    content_type: api_proto.Modality
    """type of the content"""

RealtimeContent(response_id: 'str', item_id: 'str', output_index: 'int', content_index: 'int', text: 'str', audio: 'list[rtc.AudioFrame]', text_stream: 'AsyncIterable[str]', audio_stream: 'AsyncIterable[rtc.AudioFrame]', tool_calls: 'list[RealtimeToolCall]', content_type: 'api_proto.Modality')

Class variables

var audio : list[AudioFrame]

accumulated audio content

var audio_stream : AsyncIterable[AudioFrame]

stream of audio content

var content_index : int

index of the content

var content_type : Literal['text', 'audio']

type of the content

var item_id : str

id of the item

var output_index : int

index of the output

var response_id : str

id of the response

var text : str

accumulated text content

var text_stream : AsyncIterable[str]

stream of text content

var tool_calls : list[RealtimeToolCall]

pending tool calls

class RealtimeError (event_id: str, type: str, message: str, code: Optional[str], param: Optional[str])
Expand source code
@dataclass
class RealtimeError:
    event_id: str
    type: str
    message: str
    code: Optional[str]
    param: Optional[str]

RealtimeError(event_id: 'str', type: 'str', message: 'str', code: 'Optional[str]', param: 'Optional[str]')

Class variables

var code : str | None
var event_id : str
var message : str
var param : str | None
var type : str
class RealtimeModel (*,
instructions: str = '',
modalities: list[api_proto.Modality] = ['text', 'audio'],
model: api_proto.OpenAIModel | str = 'gpt-4o-realtime-preview',
voice: api_proto.Voice = 'alloy',
input_audio_format: api_proto.AudioFormat = 'pcm16',
output_audio_format: api_proto.AudioFormat = 'pcm16',
input_audio_transcription: InputTranscriptionOptions = InputTranscriptionOptions(model='whisper-1'),
turn_detection: ServerVadOptions = ServerVadOptions(threshold=0.5, prefix_padding_ms=300, silence_duration_ms=500),
tool_choice: api_proto.ToolChoice = 'auto',
temperature: float = 0.8,
max_response_output_tokens: "int | Literal['inf']" = 'inf',
base_url: str | None = None,
http_session: aiohttp.ClientSession | None = None,
loop: asyncio.AbstractEventLoop | None = None,
azure_deployment: str | None = None,
entra_token: str | None = None,
api_key: str | None = None,
api_version: str | None = None)
Expand source code
class RealtimeModel:
    @overload
    def __init__(
        self,
        *,
        instructions: str = "",
        modalities: list[api_proto.Modality] = ["text", "audio"],
        model: api_proto.OpenAIModel | str = api_proto.DefaultOpenAIModel,
        voice: api_proto.Voice = "alloy",
        input_audio_format: api_proto.AudioFormat = "pcm16",
        output_audio_format: api_proto.AudioFormat = "pcm16",
        input_audio_transcription: InputTranscriptionOptions = DEFAULT_INPUT_AUDIO_TRANSCRIPTION,
        turn_detection: ServerVadOptions = DEFAULT_SERVER_VAD_OPTIONS,
        tool_choice: api_proto.ToolChoice = "auto",
        temperature: float = 0.8,
        max_response_output_tokens: int | Literal["inf"] = "inf",
        api_key: str | None = None,
        base_url: str | None = None,
        http_session: aiohttp.ClientSession | None = None,
        loop: asyncio.AbstractEventLoop | None = None,
    ) -> None: ...

    @overload
    def __init__(
        self,
        *,
        azure_deployment: str | None = None,
        entra_token: str | None = None,
        api_key: str | None = None,
        api_version: str | None = None,
        base_url: str | None = None,
        instructions: str = "",
        modalities: list[api_proto.Modality] = ["text", "audio"],
        voice: api_proto.Voice = "alloy",
        input_audio_format: api_proto.AudioFormat = "pcm16",
        output_audio_format: api_proto.AudioFormat = "pcm16",
        input_audio_transcription: InputTranscriptionOptions = DEFAULT_INPUT_AUDIO_TRANSCRIPTION,
        turn_detection: ServerVadOptions = DEFAULT_SERVER_VAD_OPTIONS,
        tool_choice: api_proto.ToolChoice = "auto",
        temperature: float = 0.8,
        max_response_output_tokens: int | Literal["inf"] = "inf",
        http_session: aiohttp.ClientSession | None = None,
        loop: asyncio.AbstractEventLoop | None = None,
    ) -> None: ...

    def __init__(
        self,
        *,
        instructions: str = "",
        modalities: list[api_proto.Modality] = ["text", "audio"],
        model: api_proto.OpenAIModel | str = api_proto.DefaultOpenAIModel,
        voice: api_proto.Voice = "alloy",
        input_audio_format: api_proto.AudioFormat = "pcm16",
        output_audio_format: api_proto.AudioFormat = "pcm16",
        input_audio_transcription: InputTranscriptionOptions = DEFAULT_INPUT_AUDIO_TRANSCRIPTION,
        turn_detection: ServerVadOptions = DEFAULT_SERVER_VAD_OPTIONS,
        tool_choice: api_proto.ToolChoice = "auto",
        temperature: float = 0.8,
        max_response_output_tokens: int | Literal["inf"] = "inf",
        base_url: str | None = None,
        http_session: aiohttp.ClientSession | None = None,
        loop: asyncio.AbstractEventLoop | None = None,
        # azure specific parameters
        azure_deployment: str | None = None,
        entra_token: str | None = None,
        api_key: str | None = None,
        api_version: str | None = None,
    ) -> None:
        """
        Initializes a RealtimeClient instance for interacting with OpenAI's Realtime API.

        Args:
            instructions (str, optional): Initial system instructions for the model. Defaults to "".
            api_key (str or None, optional): OpenAI API key. If None, will attempt to read from the environment variable OPENAI_API_KEY
            modalities (list[api_proto.Modality], optional): Modalities to use, such as ["text", "audio"]. Defaults to ["text", "audio"].
            model (str or None, optional): The name of the model to use. Defaults to "gpt-4o-realtime-preview-2024-10-01".
            voice (api_proto.Voice, optional): Voice setting for audio outputs. Defaults to "alloy".
            input_audio_format (api_proto.AudioFormat, optional): Format of input audio data. Defaults to "pcm16".
            output_audio_format (api_proto.AudioFormat, optional): Format of output audio data. Defaults to "pcm16".
            input_audio_transcription (InputTranscriptionOptions, optional): Options for transcribing input audio. Defaults to DEFAULT_INPUT_AUDIO_TRANSCRIPTION.
            turn_detection (ServerVadOptions, optional): Options for server-based voice activity detection (VAD). Defaults to DEFAULT_SERVER_VAD_OPTIONS.
            tool_choice (api_proto.ToolChoice, optional): Tool choice for the model, such as "auto". Defaults to "auto".
            temperature (float, optional): Sampling temperature for response generation. Defaults to 0.8.
            max_response_output_tokens (int or Literal["inf"], optional): Maximum number of tokens in the response. Defaults to "inf".
            base_url (str or None, optional): Base URL for the API endpoint. If None, defaults to OpenAI's default API URL.
            http_session (aiohttp.ClientSession or None, optional): Async HTTP session to use for requests. If None, a new session will be created.
            loop (asyncio.AbstractEventLoop or None, optional): Event loop to use for async operations. If None, the current event loop is used.

        Raises:
            ValueError: If the API key is not provided and cannot be found in environment variables.
        """
        super().__init__()
        self._base_url = base_url

        is_azure = (
            api_version is not None
            or entra_token is not None
            or azure_deployment is not None
        )

        api_key = api_key or os.environ.get("OPENAI_API_KEY")
        if api_key is None and not is_azure:
            raise ValueError(
                "OpenAI API key is required, either using the argument or by setting the OPENAI_API_KEY environmental variable"
            )

        if not base_url:
            base_url = os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1")

        self._default_opts = _ModelOptions(
            model=model,
            modalities=modalities,
            instructions=instructions,
            voice=voice,
            input_audio_format=input_audio_format,
            output_audio_format=output_audio_format,
            input_audio_transcription=input_audio_transcription,
            turn_detection=turn_detection,
            temperature=temperature,
            tool_choice=tool_choice,
            max_response_output_tokens=max_response_output_tokens,
            api_key=api_key,
            base_url=base_url,
            azure_deployment=azure_deployment,
            entra_token=entra_token,
            is_azure=is_azure,
            api_version=api_version,
        )

        self._loop = loop or asyncio.get_event_loop()
        self._rt_sessions: list[RealtimeSession] = []
        self._http_session = http_session

    @classmethod
    def with_azure(
        cls,
        *,
        azure_deployment: str,
        azure_endpoint: str | None = None,
        api_version: str | None = None,
        api_key: str | None = None,
        entra_token: str | None = None,
        base_url: str | None = None,
        instructions: str = "",
        modalities: list[api_proto.Modality] = ["text", "audio"],
        voice: api_proto.Voice = "alloy",
        input_audio_format: api_proto.AudioFormat = "pcm16",
        output_audio_format: api_proto.AudioFormat = "pcm16",
        input_audio_transcription: InputTranscriptionOptions = DEFAULT_INPUT_AUDIO_TRANSCRIPTION,
        turn_detection: ServerVadOptions = DEFAULT_SERVER_VAD_OPTIONS,
        tool_choice: api_proto.ToolChoice = "auto",
        temperature: float = 0.8,
        max_response_output_tokens: int | Literal["inf"] = "inf",
        http_session: aiohttp.ClientSession | None = None,
        loop: asyncio.AbstractEventLoop | None = None,
    ):
        """
        Create a RealtimeClient instance configured for Azure OpenAI Service.

        Args:
            azure_deployment (str): The name of your Azure OpenAI deployment.
            azure_endpoint (str or None, optional): The endpoint URL for your Azure OpenAI resource. If None, will attempt to read from the environment variable AZURE_OPENAI_ENDPOINT.
            api_version (str or None, optional): API version to use with Azure OpenAI Service. If None, will attempt to read from the environment variable OPENAI_API_VERSION.
            api_key (str or None, optional): Azure OpenAI API key. If None, will attempt to read from the environment variable AZURE_OPENAI_API_KEY.
            entra_token (str or None, optional): Azure Entra authentication token. Required if not using API key authentication.
            base_url (str or None, optional): Base URL for the API endpoint. If None, constructed from the azure_endpoint.
            instructions (str, optional): Initial system instructions for the model. Defaults to "".
            modalities (list[api_proto.Modality], optional): Modalities to use, such as ["text", "audio"]. Defaults to ["text", "audio"].
            voice (api_proto.Voice, optional): Voice setting for audio outputs. Defaults to "alloy".
            input_audio_format (api_proto.AudioFormat, optional): Format of input audio data. Defaults to "pcm16".
            output_audio_format (api_proto.AudioFormat, optional): Format of output audio data. Defaults to "pcm16".
            input_audio_transcription (InputTranscriptionOptions, optional): Options for transcribing input audio. Defaults to DEFAULT_INPUT_AUDIO_TRANSCRIPTION.
            turn_detection (ServerVadOptions, optional): Options for server-based voice activity detection (VAD). Defaults to DEFAULT_SERVER_VAD_OPTIONS.
            tool_choice (api_proto.ToolChoice, optional): Tool choice for the model, such as "auto". Defaults to "auto".
            temperature (float, optional): Sampling temperature for response generation. Defaults to 0.8.
            max_response_output_tokens (int or Literal["inf"], optional): Maximum number of tokens in the response. Defaults to "inf".
            http_session (aiohttp.ClientSession or None, optional): Async HTTP session to use for requests. If None, a new session will be created.
            loop (asyncio.AbstractEventLoop or None, optional): Event loop to use for async operations. If None, the current event loop is used.

        Returns:
            RealtimeClient: An instance of RealtimeClient configured for Azure OpenAI Service.

        Raises:
            ValueError: If required Azure parameters are missing or invalid.
        """
        api_key = api_key or os.getenv("AZURE_OPENAI_API_KEY")
        if api_key is None and entra_token is None:
            raise ValueError(
                "Missing credentials. Please pass one of `api_key`, `entra_token`, or the `AZURE_OPENAI_API_KEY` environment variable."
            )

        api_version = api_version or os.getenv("OPENAI_API_VERSION")
        if api_version is None:
            raise ValueError(
                "Must provide either the `api_version` argument or the `OPENAI_API_VERSION` environment variable"
            )

        if base_url is None:
            azure_endpoint = azure_endpoint or os.getenv("AZURE_OPENAI_ENDPOINT")
            if azure_endpoint is None:
                raise ValueError(
                    "Missing Azure endpoint. Please pass the `azure_endpoint` parameter or set the `AZURE_OPENAI_ENDPOINT` environment variable."
                )

            base_url = f"{azure_endpoint.rstrip('/')}/openai"
        elif azure_endpoint is not None:
            raise ValueError("base_url and azure_endpoint are mutually exclusive")

        return cls(
            instructions=instructions,
            modalities=modalities,
            voice=voice,
            input_audio_format=input_audio_format,
            output_audio_format=output_audio_format,
            input_audio_transcription=input_audio_transcription,
            turn_detection=turn_detection,
            tool_choice=tool_choice,
            temperature=temperature,
            max_response_output_tokens=max_response_output_tokens,
            api_key=api_key,
            http_session=http_session,
            loop=loop,
            azure_deployment=azure_deployment,
            api_version=api_version,
            entra_token=entra_token,
            base_url=base_url,
        )

    def _ensure_session(self) -> aiohttp.ClientSession:
        if not self._http_session:
            self._http_session = utils.http_context.http_session()

        return self._http_session

    @property
    def sessions(self) -> list[RealtimeSession]:
        return self._rt_sessions

    def session(
        self,
        *,
        chat_ctx: llm.ChatContext | None = None,
        fnc_ctx: llm.FunctionContext | None = None,
        modalities: list[api_proto.Modality] | None = None,
        instructions: str | None = None,
        voice: api_proto.Voice | None = None,
        input_audio_format: api_proto.AudioFormat | None = None,
        output_audio_format: api_proto.AudioFormat | None = None,
        tool_choice: api_proto.ToolChoice | None = None,
        input_audio_transcription: InputTranscriptionOptions | None = None,
        turn_detection: ServerVadOptions | None = None,
        temperature: float | None = None,
        max_response_output_tokens: int | Literal["inf"] | None = None,
    ) -> RealtimeSession:
        opts = deepcopy(self._default_opts)
        if modalities is not None:
            opts.modalities = modalities
        if instructions is not None:
            opts.instructions = instructions
        if voice is not None:
            opts.voice = voice
        if input_audio_format is not None:
            opts.input_audio_format = input_audio_format
        if output_audio_format is not None:
            opts.output_audio_format = output_audio_format
        if tool_choice is not None:
            opts.tool_choice = tool_choice
        if input_audio_transcription is not None:
            opts.input_audio_transcription
        if turn_detection is not None:
            opts.turn_detection = turn_detection
        if temperature is not None:
            opts.temperature = temperature
        if max_response_output_tokens is not None:
            opts.max_response_output_tokens = max_response_output_tokens

        new_session = RealtimeSession(
            chat_ctx=chat_ctx or llm.ChatContext(),
            fnc_ctx=fnc_ctx,
            opts=opts,
            http_session=self._ensure_session(),
            loop=self._loop,
        )
        self._rt_sessions.append(new_session)
        return new_session

    async def aclose(self) -> None:
        for session in self._rt_sessions:
            await session.aclose()

Initializes a RealtimeClient instance for interacting with OpenAI's Realtime API.

Args

instructions : str, optional
Initial system instructions for the model. Defaults to "".
api_key : str or None, optional
OpenAI API key. If None, will attempt to read from the environment variable OPENAI_API_KEY
modalities : list[api_proto.Modality], optional
Modalities to use, such as ["text", "audio"]. Defaults to ["text", "audio"].
model : str or None, optional
The name of the model to use. Defaults to "gpt-4o-realtime-preview-2024-10-01".
voice : api_proto.Voice, optional
Voice setting for audio outputs. Defaults to "alloy".
input_audio_format : api_proto.AudioFormat, optional
Format of input audio data. Defaults to "pcm16".
output_audio_format : api_proto.AudioFormat, optional
Format of output audio data. Defaults to "pcm16".
input_audio_transcription : InputTranscriptionOptions, optional
Options for transcribing input audio. Defaults to DEFAULT_INPUT_AUDIO_TRANSCRIPTION.
turn_detection : ServerVadOptions, optional
Options for server-based voice activity detection (VAD). Defaults to DEFAULT_SERVER_VAD_OPTIONS.
tool_choice : api_proto.ToolChoice, optional
Tool choice for the model, such as "auto". Defaults to "auto".
temperature : float, optional
Sampling temperature for response generation. Defaults to 0.8.
max_response_output_tokens (int or Literal["inf"], optional): Maximum number of tokens in the response. Defaults to "inf".
base_url : str or None, optional
Base URL for the API endpoint. If None, defaults to OpenAI's default API URL.
http_session : aiohttp.ClientSession or None, optional
Async HTTP session to use for requests. If None, a new session will be created.
loop : asyncio.AbstractEventLoop or None, optional
Event loop to use for async operations. If None, the current event loop is used.

Raises

ValueError
If the API key is not provided and cannot be found in environment variables.

Static methods

def with_azure(*,
azure_deployment: str,
azure_endpoint: str | None = None,
api_version: str | None = None,
api_key: str | None = None,
entra_token: str | None = None,
base_url: str | None = None,
instructions: str = '',
modalities: list[api_proto.Modality] = ['text', 'audio'],
voice: api_proto.Voice = 'alloy',
input_audio_format: api_proto.AudioFormat = 'pcm16',
output_audio_format: api_proto.AudioFormat = 'pcm16',
input_audio_transcription: InputTranscriptionOptions = InputTranscriptionOptions(model='whisper-1'),
turn_detection: ServerVadOptions = ServerVadOptions(threshold=0.5, prefix_padding_ms=300, silence_duration_ms=500),
tool_choice: api_proto.ToolChoice = 'auto',
temperature: float = 0.8,
max_response_output_tokens: "int | Literal['inf']" = 'inf',
http_session: aiohttp.ClientSession | None = None,
loop: asyncio.AbstractEventLoop | None = None)

Create a RealtimeClient instance configured for Azure OpenAI Service.

Args

azure_deployment : str
The name of your Azure OpenAI deployment.
azure_endpoint : str or None, optional
The endpoint URL for your Azure OpenAI resource. If None, will attempt to read from the environment variable AZURE_OPENAI_ENDPOINT.
api_version : str or None, optional
API version to use with Azure OpenAI Service. If None, will attempt to read from the environment variable OPENAI_API_VERSION.
api_key : str or None, optional
Azure OpenAI API key. If None, will attempt to read from the environment variable AZURE_OPENAI_API_KEY.
entra_token : str or None, optional
Azure Entra authentication token. Required if not using API key authentication.
base_url : str or None, optional
Base URL for the API endpoint. If None, constructed from the azure_endpoint.
instructions : str, optional
Initial system instructions for the model. Defaults to "".
modalities : list[api_proto.Modality], optional
Modalities to use, such as ["text", "audio"]. Defaults to ["text", "audio"].
voice : api_proto.Voice, optional
Voice setting for audio outputs. Defaults to "alloy".
input_audio_format : api_proto.AudioFormat, optional
Format of input audio data. Defaults to "pcm16".
output_audio_format : api_proto.AudioFormat, optional
Format of output audio data. Defaults to "pcm16".
input_audio_transcription : InputTranscriptionOptions, optional
Options for transcribing input audio. Defaults to DEFAULT_INPUT_AUDIO_TRANSCRIPTION.
turn_detection : ServerVadOptions, optional
Options for server-based voice activity detection (VAD). Defaults to DEFAULT_SERVER_VAD_OPTIONS.
tool_choice : api_proto.ToolChoice, optional
Tool choice for the model, such as "auto". Defaults to "auto".
temperature : float, optional
Sampling temperature for response generation. Defaults to 0.8.
max_response_output_tokens (int or Literal["inf"], optional): Maximum number of tokens in the response. Defaults to "inf".
http_session : aiohttp.ClientSession or None, optional
Async HTTP session to use for requests. If None, a new session will be created.
loop : asyncio.AbstractEventLoop or None, optional
Event loop to use for async operations. If None, the current event loop is used.

Returns

RealtimeClient
An instance of RealtimeClient configured for Azure OpenAI Service.

Raises

ValueError
If required Azure parameters are missing or invalid.

Instance variables

prop sessions : list[RealtimeSession]
Expand source code
@property
def sessions(self) -> list[RealtimeSession]:
    return self._rt_sessions

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    for session in self._rt_sessions:
        await session.aclose()
def session(self,
*,
chat_ctx: llm.ChatContext | None = None,
fnc_ctx: llm.FunctionContext | None = None,
modalities: list[api_proto.Modality] | None = None,
instructions: str | None = None,
voice: api_proto.Voice | None = None,
input_audio_format: api_proto.AudioFormat | None = None,
output_audio_format: api_proto.AudioFormat | None = None,
tool_choice: api_proto.ToolChoice | None = None,
input_audio_transcription: InputTranscriptionOptions | None = None,
turn_detection: ServerVadOptions | None = None,
temperature: float | None = None,
max_response_output_tokens: "int | Literal['inf'] | None" = None) ‑> RealtimeSession
Expand source code
def session(
    self,
    *,
    chat_ctx: llm.ChatContext | None = None,
    fnc_ctx: llm.FunctionContext | None = None,
    modalities: list[api_proto.Modality] | None = None,
    instructions: str | None = None,
    voice: api_proto.Voice | None = None,
    input_audio_format: api_proto.AudioFormat | None = None,
    output_audio_format: api_proto.AudioFormat | None = None,
    tool_choice: api_proto.ToolChoice | None = None,
    input_audio_transcription: InputTranscriptionOptions | None = None,
    turn_detection: ServerVadOptions | None = None,
    temperature: float | None = None,
    max_response_output_tokens: int | Literal["inf"] | None = None,
) -> RealtimeSession:
    opts = deepcopy(self._default_opts)
    if modalities is not None:
        opts.modalities = modalities
    if instructions is not None:
        opts.instructions = instructions
    if voice is not None:
        opts.voice = voice
    if input_audio_format is not None:
        opts.input_audio_format = input_audio_format
    if output_audio_format is not None:
        opts.output_audio_format = output_audio_format
    if tool_choice is not None:
        opts.tool_choice = tool_choice
    if input_audio_transcription is not None:
        opts.input_audio_transcription
    if turn_detection is not None:
        opts.turn_detection = turn_detection
    if temperature is not None:
        opts.temperature = temperature
    if max_response_output_tokens is not None:
        opts.max_response_output_tokens = max_response_output_tokens

    new_session = RealtimeSession(
        chat_ctx=chat_ctx or llm.ChatContext(),
        fnc_ctx=fnc_ctx,
        opts=opts,
        http_session=self._ensure_session(),
        loop=self._loop,
    )
    self._rt_sessions.append(new_session)
    return new_session
class RealtimeOutput (response_id: str,
item_id: str,
output_index: int,
role: api_proto.Role,
type: "Literal['message', 'function_call']",
content: list[RealtimeContent],
done_fut: asyncio.Future[None])
Expand source code
@dataclass
class RealtimeOutput:
    response_id: str
    """id of the response"""
    item_id: str
    """id of the item"""
    output_index: int
    """index of the output"""
    role: api_proto.Role
    """role of the message"""
    type: Literal["message", "function_call"]
    """type of the output"""
    content: list[RealtimeContent]
    """list of content"""
    done_fut: asyncio.Future[None]
    """future that will be set when the output is completed"""

RealtimeOutput(response_id: 'str', item_id: 'str', output_index: 'int', role: 'api_proto.Role', type: "Literal['message', 'function_call']", content: 'list[RealtimeContent]', done_fut: 'asyncio.Future[None]')

Class variables

var content : list[RealtimeContent]

list of content

var done_fut : _asyncio.Future[None]

future that will be set when the output is completed

var item_id : str

id of the item

var output_index : int

index of the output

var response_id : str

id of the response

var role : Literal['system', 'assistant', 'user', 'tool']

role of the message

var type : Literal['message', 'function_call']

type of the output

class RealtimeResponse (id: str,
status: api_proto.ResponseStatus,
status_details: api_proto.ResponseStatusDetails | None,
output: list[RealtimeOutput],
usage: Usage | None,
done_fut: asyncio.Future[None],
_created_timestamp: float)
Expand source code
@dataclass
class RealtimeResponse:
    id: str
    """id of the message"""
    status: api_proto.ResponseStatus
    """status of the response"""
    status_details: api_proto.ResponseStatusDetails | None
    """details of the status (only with "incomplete, cancelled and failed")"""
    output: list[RealtimeOutput]
    """list of outputs"""
    usage: api_proto.Usage | None
    """usage of the response"""
    done_fut: asyncio.Future[None]
    """future that will be set when the response is completed"""
    _created_timestamp: float
    """timestamp when the response was created"""
    _first_token_timestamp: float | None = None
    """timestamp when the first token was received"""

RealtimeResponse(id: 'str', status: 'api_proto.ResponseStatus', status_details: 'api_proto.ResponseStatusDetails | None', output: 'list[RealtimeOutput]', usage: 'api_proto.Usage | None', done_fut: 'asyncio.Future[None]', _created_timestamp: 'float', _first_token_timestamp: 'float | None' = None)

Class variables

var done_fut : _asyncio.Future[None]

future that will be set when the response is completed

var id : str

id of the message

var output : list[RealtimeOutput]

list of outputs

var status : Literal['in_progress', 'completed', 'incomplete', 'cancelled', 'failed']

status of the response

var status_detailsCancelledStatusDetails | IncompleteStatusDetails | FailedStatusDetails | None

details of the status (only with "incomplete, cancelled and failed")

var usageUsage | None

usage of the response

class RealtimeSession (*,
opts: _ModelOptions,
http_session: aiohttp.ClientSession,
chat_ctx: llm.ChatContext,
fnc_ctx: llm.FunctionContext | None,
loop: asyncio.AbstractEventLoop)
Expand source code
class RealtimeSession(utils.EventEmitter[EventTypes]):
    class InputAudioBuffer:
        def __init__(self, sess: RealtimeSession) -> None:
            self._sess = sess

        def append(self, frame: rtc.AudioFrame) -> None:
            self._sess._queue_msg(
                {
                    "type": "input_audio_buffer.append",
                    "audio": base64.b64encode(frame.data).decode("utf-8"),
                }
            )

        def clear(self) -> None:
            self._sess._queue_msg({"type": "input_audio_buffer.clear"})

        def commit(self) -> None:
            self._sess._queue_msg({"type": "input_audio_buffer.commit"})

    class ConversationItem:
        def __init__(self, sess: RealtimeSession) -> None:
            self._sess = sess

        def create(
            self, message: llm.ChatMessage, previous_item_id: str | None = None
        ) -> asyncio.Future[bool]:
            fut = asyncio.Future[bool]()

            message_content = message.content
            tool_call_id = message.tool_call_id
            event: api_proto.ClientEvent.ConversationItemCreate | None = None
            if tool_call_id:
                if message.role == "tool":
                    # function_call_output
                    assert isinstance(message_content, str)
                    event = {
                        "type": "conversation.item.create",
                        "previous_item_id": previous_item_id,
                        "item": {
                            "id": message.id,
                            "type": "function_call_output",
                            "call_id": tool_call_id,
                            "output": message_content,
                        },
                    }
                else:
                    # function_call
                    if not message.tool_calls or message.name is None:
                        logger.warning(
                            "function call message has no name or tool calls: %s",
                            message,
                            extra=self._sess.logging_extra(),
                        )
                        fut.set_result(False)
                        return fut
                    if len(message.tool_calls) > 1:
                        logger.warning(
                            "function call message has multiple tool calls, "
                            "only the first one will be used",
                            extra=self._sess.logging_extra(),
                        )

                    event = {
                        "type": "conversation.item.create",
                        "previous_item_id": previous_item_id,
                        "item": {
                            "id": message.id,
                            "type": "function_call",
                            "call_id": tool_call_id,
                            "name": message.name,
                            "arguments": message.tool_calls[0].raw_arguments,
                        },
                    }
            else:
                if message_content is None:
                    logger.warning(
                        "message content is None, skipping: %s",
                        message,
                        extra=self._sess.logging_extra(),
                    )
                    fut.set_result(False)
                    return fut
                if not isinstance(message_content, list):
                    message_content = [message_content]

                if message.role == "user":
                    user_contents: list[
                        api_proto.InputTextContent | api_proto.InputAudioContent
                    ] = []
                    for cnt in message_content:
                        if isinstance(cnt, str):
                            user_contents.append(
                                {
                                    "type": "input_text",
                                    "text": cnt,
                                }
                            )
                        elif isinstance(cnt, llm.ChatAudio):
                            user_contents.append(
                                {
                                    "type": "input_audio",
                                    "audio": base64.b64encode(
                                        utils.merge_frames(cnt.frame).data
                                    ).decode("utf-8"),
                                }
                            )

                    event = {
                        "type": "conversation.item.create",
                        "previous_item_id": previous_item_id,
                        "item": {
                            "id": message.id,
                            "type": "message",
                            "role": "user",
                            "content": user_contents,
                        },
                    }

                elif message.role == "assistant":
                    assistant_contents: list[api_proto.TextContent] = []
                    for cnt in message_content:
                        if isinstance(cnt, str):
                            assistant_contents.append(
                                {
                                    "type": "text",
                                    "text": cnt,
                                }
                            )
                        elif isinstance(cnt, llm.ChatAudio):
                            logger.warning(
                                "audio content in assistant message is not supported"
                            )

                    event = {
                        "type": "conversation.item.create",
                        "previous_item_id": previous_item_id,
                        "item": {
                            "id": message.id,
                            "type": "message",
                            "role": "assistant",
                            "content": assistant_contents,
                        },
                    }
                elif message.role == "system":
                    system_contents: list[api_proto.InputTextContent] = []
                    for cnt in message_content:
                        if isinstance(cnt, str):
                            system_contents.append({"type": "input_text", "text": cnt})
                        elif isinstance(cnt, llm.ChatAudio):
                            logger.warning(
                                "audio content in system message is not supported"
                            )

                    event = {
                        "type": "conversation.item.create",
                        "previous_item_id": previous_item_id,
                        "item": {
                            "id": message.id,
                            "type": "message",
                            "role": "system",
                            "content": system_contents,
                        },
                    }

            if event is None:
                logger.warning(
                    "chat message is not supported inside the realtime API %s",
                    message,
                    extra=self._sess.logging_extra(),
                )
                fut.set_result(False)
                return fut

            self._sess._item_created_futs[message.id] = fut
            self._sess._queue_msg(event)
            return fut

        def truncate(
            self, *, item_id: str, content_index: int, audio_end_ms: int
        ) -> asyncio.Future[bool]:
            fut = asyncio.Future[bool]()
            self._sess._item_truncated_futs[item_id] = fut
            self._sess._queue_msg(
                {
                    "type": "conversation.item.truncate",
                    "item_id": item_id,
                    "content_index": content_index,
                    "audio_end_ms": audio_end_ms,
                }
            )
            return fut

        def delete(self, *, item_id: str) -> asyncio.Future[bool]:
            fut = asyncio.Future[bool]()
            self._sess._item_deleted_futs[item_id] = fut
            self._sess._queue_msg(
                {
                    "type": "conversation.item.delete",
                    "item_id": item_id,
                }
            )
            return fut

    class Conversation:
        def __init__(self, sess: RealtimeSession) -> None:
            self._sess = sess

        @property
        def item(self) -> RealtimeSession.ConversationItem:
            return RealtimeSession.ConversationItem(self._sess)

    class Response:
        def __init__(self, sess: RealtimeSession) -> None:
            self._sess = sess

        def create(
            self,
            *,
            on_duplicate: Literal[
                "cancel_existing", "cancel_new", "keep_both"
            ] = "keep_both",
        ) -> asyncio.Future[bool]:
            """Creates a new response.

            Args:
                on_duplicate: How to handle when there is an existing response in progress:
                    - "cancel_existing": Cancel the existing response before creating new one
                    - "cancel_new": Skip creating new response if one is in progress
                    - "keep_both": Wait for the existing response to be done and then create a new one

            Returns:
                Future that resolves when the response create request is queued
            """
            if on_duplicate not in ("cancel_existing", "cancel_new", "keep_both"):
                raise ValueError(
                    "invalid on_duplicate value, must be one of: "
                    "cancel_existing, cancel_new, keep_both"
                )

            # check if there is a pending response creation request sent
            pending_create_fut = self._sess._response_create_fut
            if pending_create_fut is not None:
                if on_duplicate == "cancel_new":
                    logger.warning(
                        "skip new response creation due to previous pending response creation",
                        extra=self._sess.logging_extra(),
                    )
                    _fut = asyncio.Future[bool]()
                    _fut.set_result(False)
                    return _fut

            active_resp_id = self._sess._active_response_id
            _logging_extra = {
                "existing_response_id": active_resp_id,
                **self._sess.logging_extra(),
            }

            if (
                not active_resp_id
                or self._sess._pending_responses[active_resp_id].done_fut.done()
            ):
                # no active response in progress, create a new one
                self._sess._queue_msg({"type": "response.create"})
                _fut = asyncio.Future[bool]()
                _fut.set_result(True)
                return _fut

            # there is an active response in progress
            if on_duplicate == "cancel_new":
                logger.warning(
                    "skip new response creation due to active response in progress",
                    extra=_logging_extra,
                )
                _fut = asyncio.Future[bool]()
                _fut.set_result(False)
                return _fut

            if on_duplicate == "cancel_existing":
                self.cancel()
                logger.warning(
                    "cancelling in-progress response to create a new one",
                    extra=_logging_extra,
                )
            elif on_duplicate == "keep_both":
                logger.warning(
                    "waiting for in-progress response to be done "
                    "before creating a new one",
                    extra=_logging_extra,
                )

            # create a task to wait for the previous response and then create new one
            async def wait_and_create() -> bool:
                await self._sess._pending_responses[active_resp_id].done_fut
                logger.info(
                    "in-progress response is done, creating a new one",
                    extra=_logging_extra,
                )
                new_create_fut = asyncio.Future[None]()
                self._sess._response_create_fut = new_create_fut
                self._sess._queue_msg({"type": "response.create"})
                return True

            return asyncio.create_task(wait_and_create())

        def cancel(self) -> None:
            self._sess._queue_msg({"type": "response.cancel"})

    def __init__(
        self,
        *,
        opts: _ModelOptions,
        http_session: aiohttp.ClientSession,
        chat_ctx: llm.ChatContext,
        fnc_ctx: llm.FunctionContext | None,
        loop: asyncio.AbstractEventLoop,
    ) -> None:
        super().__init__()
        self._label = f"{type(self).__module__}.{type(self).__name__}"
        self._main_atask = asyncio.create_task(
            self._main_task(), name="openai-realtime-session"
        )
        # manage conversation items internally
        self._remote_conversation_items = remote_items._RemoteConversationItems()

        # wait for the item to be created or deleted
        self._item_created_futs: dict[str, asyncio.Future[bool]] = {}
        self._item_deleted_futs: dict[str, asyncio.Future[bool]] = {}
        self._item_truncated_futs: dict[str, asyncio.Future[bool]] = {}

        self._fnc_ctx = fnc_ctx
        self._loop = loop

        self._opts = opts
        self._send_ch = utils.aio.Chan[api_proto.ClientEvents]()
        self._http_session = http_session

        self._pending_responses: dict[str, RealtimeResponse] = {}
        self._active_response_id: str | None = None
        self._response_create_fut: asyncio.Future[None] | None = None

        self._session_id = "not-connected"
        self.session_update()  # initial session init

        # sync the chat context to the session
        self._init_sync_task = asyncio.create_task(self.set_chat_ctx(chat_ctx))

        self._fnc_tasks = utils.aio.TaskSet()

    async def aclose(self) -> None:
        if self._send_ch.closed:
            return

        self._send_ch.close()
        await self._main_atask

    @property
    def fnc_ctx(self) -> llm.FunctionContext | None:
        return self._fnc_ctx

    @fnc_ctx.setter
    def fnc_ctx(self, fnc_ctx: llm.FunctionContext | None) -> None:
        self._fnc_ctx = fnc_ctx

    @property
    def conversation(self) -> Conversation:
        return RealtimeSession.Conversation(self)

    @property
    def input_audio_buffer(self) -> InputAudioBuffer:
        return RealtimeSession.InputAudioBuffer(self)

    @property
    def response(self) -> Response:
        return RealtimeSession.Response(self)

    def session_update(
        self,
        *,
        modalities: list[api_proto.Modality] | None = None,
        instructions: str | None = None,
        voice: api_proto.Voice | None = None,
        input_audio_format: api_proto.AudioFormat | None = None,
        output_audio_format: api_proto.AudioFormat | None = None,
        input_audio_transcription: InputTranscriptionOptions | None = None,
        turn_detection: ServerVadOptions | None = None,
        tool_choice: api_proto.ToolChoice | None = None,
        temperature: float | None = None,
        max_response_output_tokens: int | Literal["inf"] | None = None,
    ) -> None:
        self._opts = deepcopy(self._opts)
        if modalities is not None:
            self._opts.modalities = modalities
        if instructions is not None:
            self._opts.instructions = instructions
        if voice is not None:
            self._opts.voice = voice
        if input_audio_format is not None:
            self._opts.input_audio_format = input_audio_format
        if output_audio_format is not None:
            self._opts.output_audio_format = output_audio_format
        if input_audio_transcription is not None:
            self._opts.input_audio_transcription = input_audio_transcription
        if turn_detection is not None:
            self._opts.turn_detection = turn_detection
        if tool_choice is not None:
            self._opts.tool_choice = tool_choice
        if temperature is not None:
            self._opts.temperature = temperature
        if max_response_output_tokens is not None:
            self._opts.max_response_output_tokens = max_response_output_tokens

        tools = []
        if self._fnc_ctx is not None:
            for fnc in self._fnc_ctx.ai_functions.values():
                # the realtime API is using internally-tagged polymorphism.
                # build_oai_function_description was built for the ChatCompletion API
                function_data = build_oai_function_description(fnc)["function"]
                function_data["type"] = "function"
                tools.append(function_data)

        server_vad_opts: api_proto.ServerVad | None = None
        if self._opts.turn_detection is not None:
            server_vad_opts = {
                "type": "server_vad",
                "threshold": self._opts.turn_detection.threshold,
                "prefix_padding_ms": self._opts.turn_detection.prefix_padding_ms,
                "silence_duration_ms": self._opts.turn_detection.silence_duration_ms,
            }
        input_audio_transcription_opts: api_proto.InputAudioTranscription | None = None
        if self._opts.input_audio_transcription is not None:
            input_audio_transcription_opts = {
                "model": self._opts.input_audio_transcription.model,
            }

        session_data: api_proto.ClientEvent.SessionUpdateData = {
            "modalities": self._opts.modalities,
            "instructions": self._opts.instructions,
            "voice": self._opts.voice,
            "input_audio_format": self._opts.input_audio_format,
            "output_audio_format": self._opts.output_audio_format,
            "input_audio_transcription": input_audio_transcription_opts,
            "turn_detection": server_vad_opts,
            "tools": tools,
            "tool_choice": self._opts.tool_choice,
            "temperature": self._opts.temperature,
            "max_response_output_tokens": None,
        }

        # azure doesn't support inf for max_response_output_tokens
        if not self._opts.is_azure or isinstance(
            self._opts.max_response_output_tokens, int
        ):
            session_data["max_response_output_tokens"] = (
                self._opts.max_response_output_tokens
            )
        else:
            del session_data["max_response_output_tokens"]  # type: ignore

        self._queue_msg(
            {
                "type": "session.update",
                "session": session_data,
            }
        )

    def chat_ctx_copy(self) -> llm.ChatContext:
        return self._remote_conversation_items.to_chat_context()

    async def set_chat_ctx(self, new_ctx: llm.ChatContext) -> None:
        """Sync the chat context with the agent's chat context.

        Compute the minimum number of insertions and deletions to transform the old
        chat context messages to the new chat context messages.
        """
        original_ctx = self._remote_conversation_items.to_chat_context()

        # filter out messages that are not function calls and content is None
        filtered_messages = [
            msg
            for msg in new_ctx.messages
            if msg.tool_call_id or msg.content is not None
        ]
        changes = utils._compute_changes(
            original_ctx.messages, filtered_messages, key_fnc=lambda x: x.id
        )
        logger.debug(
            "sync chat context",
            extra={
                "to_delete": [msg.id for msg in changes.to_delete],
                "to_add": [
                    (prev.id if prev else None, msg.id) for prev, msg in changes.to_add
                ],
            },
        )

        # append an empty audio message if all new messages are text
        if changes.to_add and not any(
            isinstance(msg.content, llm.ChatAudio) for _, msg in changes.to_add
        ):
            # Patch: append an empty audio message to set the API in audio mode
            changes.to_add.append((None, self._create_empty_user_audio_message(1.0)))

        _futs = [
            self.conversation.item.delete(item_id=msg.id) for msg in changes.to_delete
        ] + [
            self.conversation.item.create(msg, prev.id if prev else None)
            for prev, msg in changes.to_add
        ]

        # wait for all the futures to complete
        await asyncio.gather(*_futs)

    def _create_empty_user_audio_message(self, duration: float) -> llm.ChatMessage:
        """Create an empty audio message with the given duration."""
        samples = int(duration * api_proto.SAMPLE_RATE)
        return llm.ChatMessage(
            role="user",
            content=llm.ChatAudio(
                frame=rtc.AudioFrame(
                    data=b"\x00\x00" * (samples * api_proto.NUM_CHANNELS),
                    sample_rate=api_proto.SAMPLE_RATE,
                    num_channels=api_proto.NUM_CHANNELS,
                    samples_per_channel=samples,
                )
            ),
        )

    def _recover_from_text_response(self, item_id: str | None = None) -> None:
        """Try to recover from a text response to audio mode.

        Sometimes the OpenAI Realtime API returns text instead of audio responses.
        This method tries to recover from this by requesting a new response after
        deleting the text response and creating an empty user audio message.
        """
        if item_id:
            # remove the text response if needed
            self.conversation.item.delete(item_id=item_id)
        self.conversation.item.create(self._create_empty_user_audio_message(1.0))
        self.response.create(on_duplicate="keep_both")

    def _update_conversation_item_content(
        self, item_id: str, content: llm.ChatContent | list[llm.ChatContent] | None
    ) -> None:
        item = self._remote_conversation_items.get(item_id)
        if item is None:
            logger.warning(
                "conversation item not found, skipping update",
                extra={"item_id": item_id},
            )
            return
        item.content = content

    def _queue_msg(self, msg: api_proto.ClientEvents) -> None:
        self._send_ch.send_nowait(msg)

    @utils.log_exceptions(logger=logger)
    async def _main_task(self) -> None:
        try:
            headers = {"User-Agent": "LiveKit Agents"}
            query_params: dict[str, str] = {}

            base_url = self._opts.base_url
            if self._opts.is_azure:
                if self._opts.entra_token:
                    headers["Authorization"] = f"Bearer {self._opts.entra_token}"

                if self._opts.api_key:
                    headers["api-key"] = self._opts.api_key

                if self._opts.api_version:
                    query_params["api-version"] = self._opts.api_version

                if self._opts.azure_deployment:
                    query_params["deployment"] = self._opts.azure_deployment
            else:
                # OAI endpoint
                headers["Authorization"] = f"Bearer {self._opts.api_key}"
                headers["OpenAI-Beta"] = "realtime=v1"

                if self._opts.model:
                    query_params["model"] = self._opts.model

            url = f"{base_url.rstrip('/')}/realtime?{urlencode(query_params)}"
            if url.startswith("http"):
                url = url.replace("http", "ws", 1)

            ws_conn = await self._http_session.ws_connect(
                url,
                headers=headers,
            )
        except Exception:
            logger.exception("failed to connect to OpenAI API S2S")
            return

        closing = False

        @utils.log_exceptions(logger=logger)
        async def _send_task():
            nonlocal closing
            async for msg in self._send_ch:
                await ws_conn.send_json(msg)

            closing = True
            await ws_conn.close()

        @utils.log_exceptions(logger=logger)
        async def _recv_task():
            while True:
                msg = await ws_conn.receive()
                if msg.type in (
                    aiohttp.WSMsgType.CLOSED,
                    aiohttp.WSMsgType.CLOSE,
                    aiohttp.WSMsgType.CLOSING,
                ):
                    if closing:
                        return

                    raise Exception("OpenAI S2S connection closed unexpectedly")

                if msg.type != aiohttp.WSMsgType.TEXT:
                    logger.warning(
                        "unexpected OpenAI S2S message type %s",
                        msg.type,
                        extra=self.logging_extra(),
                    )
                    continue

                try:
                    data = msg.json()
                    event: api_proto.ServerEventType = data["type"]

                    if event == "session.created":
                        self._handle_session_created(data)
                    if event == "session.updated":
                        self._handle_session_updated(data)
                    elif event == "error":
                        self._handle_error(data)
                    elif event == "input_audio_buffer.speech_started":
                        self._handle_input_audio_buffer_speech_started(data)
                    elif event == "input_audio_buffer.speech_stopped":
                        self._handle_input_audio_buffer_speech_stopped(data)
                    elif event == "input_audio_buffer.committed":
                        self._handle_input_audio_buffer_speech_committed(data)
                    elif (
                        event == "conversation.item.input_audio_transcription.completed"
                    ):
                        self._handle_conversation_item_input_audio_transcription_completed(
                            data
                        )
                    elif event == "conversation.item.input_audio_transcription.failed":
                        self._handle_conversation_item_input_audio_transcription_failed(
                            data
                        )
                    elif event == "conversation.item.created":
                        self._handle_conversation_item_created(data)
                    elif event == "conversation.item.deleted":
                        self._handle_conversation_item_deleted(data)
                    elif event == "conversation.item.truncated":
                        self._handle_conversation_item_truncated(data)
                    elif event == "response.created":
                        self._handle_response_created(data)
                    elif event == "response.output_item.added":
                        self._handle_response_output_item_added(data)
                    elif event == "response.content_part.added":
                        self._handle_response_content_part_added(data)
                    elif event == "response.audio.delta":
                        self._handle_response_audio_delta(data)
                    elif event == "response.audio_transcript.delta":
                        self._handle_response_audio_transcript_delta(data)
                    elif event == "response.audio.done":
                        self._handle_response_audio_done(data)
                    elif event == "response.text.done":
                        self._handle_response_text_done(data)
                    elif event == "response.audio_transcript.done":
                        self._handle_response_audio_transcript_done(data)
                    elif event == "response.content_part.done":
                        self._handle_response_content_part_done(data)
                    elif event == "response.output_item.done":
                        self._handle_response_output_item_done(data)
                    elif event == "response.done":
                        self._handle_response_done(data)

                except Exception:
                    logger.exception(
                        "failed to handle OpenAI S2S message",
                        extra={"websocket_message": msg, **self.logging_extra()},
                    )

        tasks = [
            asyncio.create_task(_send_task(), name="openai-realtime-send"),
            asyncio.create_task(_recv_task(), name="openai-realtime-recv"),
        ]

        try:
            await asyncio.gather(*tasks)
        finally:
            await utils.aio.gracefully_cancel(*tasks)

    def _handle_session_created(
        self, session_created: api_proto.ServerEvent.SessionCreated
    ):
        self._session_id = session_created["session"]["id"]

    def _handle_session_updated(
        self, session_updated: api_proto.ServerEvent.SessionUpdated
    ):
        session = session_updated["session"]
        if session["turn_detection"] is None:
            turn_detection = None
        else:
            turn_detection = ServerVadOptions(
                threshold=session["turn_detection"]["threshold"],
                prefix_padding_ms=session["turn_detection"]["prefix_padding_ms"],
                silence_duration_ms=session["turn_detection"]["silence_duration_ms"],
            )
        if session["input_audio_transcription"] is None:
            input_audio_transcription = None
        else:
            input_audio_transcription = InputTranscriptionOptions(
                model=session["input_audio_transcription"]["model"],
            )

        self.emit(
            "session_updated",
            RealtimeSessionOptions(
                model=session["model"],
                modalities=session["modalities"],
                instructions=session["instructions"],
                voice=session["voice"],
                input_audio_format=session["input_audio_format"],
                output_audio_format=session["output_audio_format"],
                input_audio_transcription=input_audio_transcription,
                turn_detection=turn_detection,
                tool_choice=session["tool_choice"],
                temperature=session["temperature"],
                max_response_output_tokens=session["max_response_output_tokens"],
            ),
        )

    def _handle_error(self, error: api_proto.ServerEvent.Error):
        logger.error(
            "OpenAI S2S error %s",
            error,
            extra=self.logging_extra(),
        )
        error_content = error["error"]
        self.emit(
            "error",
            RealtimeError(
                event_id=error["event_id"],
                type=error_content["type"],
                message=error_content["message"],
                code=error_content.get("code"),
                param=error_content.get("param"),
            ),
        )

    def _handle_input_audio_buffer_speech_started(
        self, speech_started: api_proto.ServerEvent.InputAudioBufferSpeechStarted
    ):
        self.emit("input_speech_started")

    def _handle_input_audio_buffer_speech_stopped(
        self, speech_stopped: api_proto.ServerEvent.InputAudioBufferSpeechStopped
    ):
        self.emit("input_speech_stopped")

    def _handle_input_audio_buffer_speech_committed(
        self, speech_committed: api_proto.ServerEvent.InputAudioBufferCommitted
    ):
        self.emit("input_speech_committed")

    def _handle_conversation_item_input_audio_transcription_completed(
        self,
        transcription_completed: api_proto.ServerEvent.ConversationItemInputAudioTranscriptionCompleted,
    ):
        transcript = transcription_completed["transcript"]
        self.emit(
            "input_speech_transcription_completed",
            InputTranscriptionCompleted(
                item_id=transcription_completed["item_id"],
                transcript=transcript,
            ),
        )

    def _handle_conversation_item_input_audio_transcription_failed(
        self,
        transcription_failed: api_proto.ServerEvent.ConversationItemInputAudioTranscriptionFailed,
    ):
        error = transcription_failed["error"]
        logger.error(
            "OAI S2S failed to transcribe input audio: %s",
            error["message"],
            extra=self.logging_extra(),
        )
        self.emit(
            "input_speech_transcription_failed",
            InputTranscriptionFailed(
                item_id=transcription_failed["item_id"],
                message=error["message"],
            ),
        )

    def _handle_conversation_item_created(
        self, item_created: api_proto.ServerEvent.ConversationItemCreated
    ):
        previous_item_id = item_created["previous_item_id"]
        item = item_created["item"]
        item_type = item["type"]
        item_id = item["id"]

        # Create message based on item type
        # Leave the content empty and fill it in later from the content parts
        if item_type == "message":
            # Handle message items (system/user/assistant)
            item = cast(Union[api_proto.SystemItem, api_proto.UserItem], item)
            role = item["role"]
            message = llm.ChatMessage(id=item_id, role=role)
            if item.get("content"):
                content = item["content"][0]
                if content["type"] in ("text", "input_text"):
                    content = cast(api_proto.InputTextContent, content)
                    message.content = content["text"]
                elif content["type"] == "input_audio" and content.get("audio"):
                    audio_data = base64.b64decode(content["audio"])
                    message.content = llm.ChatAudio(
                        frame=rtc.AudioFrame(
                            data=audio_data,
                            sample_rate=api_proto.SAMPLE_RATE,
                            num_channels=api_proto.NUM_CHANNELS,
                            samples_per_channel=len(audio_data) // 2,
                        )
                    )

        elif item_type == "function_call":
            # Handle function call items
            item = cast(api_proto.FunctionCallItem, item)
            message = llm.ChatMessage(
                id=item_id,
                role="assistant",
                name=item["name"],
                tool_call_id=item["call_id"],
            )

        elif item_type == "function_call_output":
            # Handle function call output items
            item = cast(api_proto.FunctionCallOutputItem, item)
            message = llm.ChatMessage(
                id=item_id,
                role="tool",
                tool_call_id=item["call_id"],
                content=item["output"],
            )

        else:
            logger.error(
                f"unknown conversation item type {item_type}",
                extra=self.logging_extra(),
            )
            return

        # Insert into conversation items
        self._remote_conversation_items.insert_after(previous_item_id, message)
        if item_id in self._item_created_futs:
            self._item_created_futs[item_id].set_result(True)
            del self._item_created_futs[item_id]
        logger.debug("conversation item created", extra=item_created)

    def _handle_conversation_item_deleted(
        self, item_deleted: api_proto.ServerEvent.ConversationItemDeleted
    ):
        # Delete from conversation items
        item_id = item_deleted["item_id"]
        self._remote_conversation_items.delete(item_id)
        if item_id in self._item_deleted_futs:
            self._item_deleted_futs[item_id].set_result(True)
            del self._item_deleted_futs[item_id]
        logger.debug("conversation item deleted", extra=item_deleted)

    def _handle_conversation_item_truncated(
        self, item_truncated: api_proto.ServerEvent.ConversationItemTruncated
    ):
        item_id = item_truncated["item_id"]
        if item_id in self._item_truncated_futs:
            self._item_truncated_futs[item_id].set_result(True)
            del self._item_truncated_futs[item_id]

    def _handle_response_created(
        self, response_created: api_proto.ServerEvent.ResponseCreated
    ):
        response = response_created["response"]
        done_fut = self._loop.create_future()
        status_details = response.get("status_details")
        new_response = RealtimeResponse(
            id=response["id"],
            status=response["status"],
            status_details=status_details,
            output=[],
            usage=response.get("usage"),
            done_fut=done_fut,
            _created_timestamp=time.time(),
        )
        self._pending_responses[new_response.id] = new_response
        self._active_response_id = new_response.id

        # complete the create future if it exists
        if self._response_create_fut is not None:
            self._response_create_fut.set_result(None)
            self._response_create_fut = None

        self.emit("response_created", new_response)

    def _handle_response_output_item_added(
        self, response_output_added: api_proto.ServerEvent.ResponseOutputItemAdded
    ):
        response_id = response_output_added["response_id"]
        response = self._pending_responses[response_id]
        done_fut = self._loop.create_future()
        item_data = response_output_added["item"]

        item_type: Literal["message", "function_call"] = item_data["type"]  # type: ignore
        assert item_type in ("message", "function_call")
        # function_call doesn't have a role field, defaulting it to assistant
        item_role: api_proto.Role = item_data.get("role") or "assistant"  # type: ignore

        new_output = RealtimeOutput(
            response_id=response_id,
            item_id=item_data["id"],
            output_index=response_output_added["output_index"],
            type=item_type,
            role=item_role,
            content=[],
            done_fut=done_fut,
        )
        response.output.append(new_output)
        self.emit("response_output_added", new_output)

    def _handle_response_content_part_added(
        self, response_content_added: api_proto.ServerEvent.ResponseContentPartAdded
    ):
        response_id = response_content_added["response_id"]
        response = self._pending_responses[response_id]
        output_index = response_content_added["output_index"]
        output = response.output[output_index]
        content_type = response_content_added["part"]["type"]

        text_ch = utils.aio.Chan[str]()
        audio_ch = utils.aio.Chan[rtc.AudioFrame]()

        new_content = RealtimeContent(
            response_id=response_id,
            item_id=response_content_added["item_id"],
            output_index=output_index,
            content_index=response_content_added["content_index"],
            text="",
            audio=[],
            text_stream=text_ch,
            audio_stream=audio_ch,
            tool_calls=[],
            content_type=content_type,
        )
        output.content.append(new_content)
        response._first_token_timestamp = time.time()
        self.emit("response_content_added", new_content)

    def _handle_response_audio_delta(
        self, response_audio_delta: api_proto.ServerEvent.ResponseAudioDelta
    ):
        content = self._get_content(response_audio_delta)
        data = base64.b64decode(response_audio_delta["delta"])
        audio = rtc.AudioFrame(
            data=data,
            sample_rate=api_proto.SAMPLE_RATE,
            num_channels=api_proto.NUM_CHANNELS,
            samples_per_channel=len(data) // 2,
        )
        content.audio.append(audio)

        assert isinstance(content.audio_stream, utils.aio.Chan)
        content.audio_stream.send_nowait(audio)

    def _handle_response_audio_transcript_delta(
        self,
        response_audio_transcript_delta: api_proto.ServerEvent.ResponseAudioTranscriptDelta,
    ):
        content = self._get_content(response_audio_transcript_delta)
        transcript = response_audio_transcript_delta["delta"]
        content.text += transcript

        assert isinstance(content.text_stream, utils.aio.Chan)
        content.text_stream.send_nowait(transcript)

    def _handle_response_audio_done(
        self, response_audio_done: api_proto.ServerEvent.ResponseAudioDone
    ):
        content = self._get_content(response_audio_done)
        assert isinstance(content.audio_stream, utils.aio.Chan)
        content.audio_stream.close()

    def _handle_response_text_done(
        self, response_text_done: api_proto.ServerEvent.ResponseTextDone
    ):
        content = self._get_content(response_text_done)
        content.text = response_text_done["text"]

    def _handle_response_audio_transcript_done(
        self,
        response_audio_transcript_done: api_proto.ServerEvent.ResponseAudioTranscriptDone,
    ):
        content = self._get_content(response_audio_transcript_done)
        assert isinstance(content.text_stream, utils.aio.Chan)
        content.text_stream.close()

    def _handle_response_content_part_done(
        self, response_content_done: api_proto.ServerEvent.ResponseContentPartDone
    ):
        content = self._get_content(response_content_done)
        self.emit("response_content_done", content)

    def _handle_response_output_item_done(
        self, response_output_done: api_proto.ServerEvent.ResponseOutputItemDone
    ):
        response_id = response_output_done["response_id"]
        response = self._pending_responses[response_id]
        output_index = response_output_done["output_index"]
        output = response.output[output_index]

        if output.type == "function_call":
            if self._fnc_ctx is None:
                logger.error(
                    "function call received but no fnc_ctx is available",
                    extra=self.logging_extra(),
                )
                return

            # parse the arguments and call the function inside the fnc_ctx
            item = response_output_done["item"]
            assert item["type"] == "function_call"

            fnc_call_info = _create_ai_function_info(
                self._fnc_ctx,
                item["call_id"],
                item["name"],
                item["arguments"],
            )

            msg = self._remote_conversation_items.get(output.item_id)
            if msg is not None:
                # update the content of the message
                assert msg.tool_call_id == item["call_id"]
                assert msg.role == "assistant"
                msg.name = item["name"]
                msg.tool_calls = [fnc_call_info]

            self.emit("function_calls_collected", [fnc_call_info])

            self._fnc_tasks.create_task(
                self._run_fnc_task(fnc_call_info, output.item_id)
            )

        output.done_fut.set_result(None)
        self.emit("response_output_done", output)

    def _handle_response_done(self, response_done: api_proto.ServerEvent.ResponseDone):
        response_data = response_done["response"]
        response_id = response_data["id"]
        response = self._pending_responses[response_id]
        self._active_response_id = None
        response.done_fut.set_result(None)

        response.status = response_data["status"]
        response.status_details = response_data.get("status_details")
        response.usage = response_data.get("usage")

        metrics_error = None
        cancelled = False
        if response.status == "failed":
            assert response.status_details is not None

            error = response.status_details.get("error", {})
            code: str | None = error.get("code")  # type: ignore
            message: str | None = error.get("message")  # type: ignore
            metrics_error = MultimodalLLMError(
                type=response.status_details.get("type"),
                code=code,
                message=message,
            )

            logger.error(
                "response generation failed",
                extra={"code": code, "error": message, **self.logging_extra()},
            )
        elif response.status == "incomplete":
            assert response.status_details is not None
            reason = response.status_details.get("reason")

            metrics_error = MultimodalLLMError(
                type=response.status_details.get("type"),
                reason=reason,  # type: ignore
            )

            logger.warning(
                "response generation incomplete",
                extra={"reason": reason, **self.logging_extra()},
            )
        elif response.status == "cancelled":
            cancelled = True

        self.emit("response_done", response)

        # calculate metrics
        ttft = -1.0
        if response._first_token_timestamp is not None:
            ttft = response._first_token_timestamp - response._created_timestamp
        duration = time.time() - response._created_timestamp

        usage = response.usage or {}  # type: ignore
        input_token_details = usage.get("input_token_details", {})
        metrics = MultimodalLLMMetrics(
            timestamp=response._created_timestamp,
            request_id=response.id,
            ttft=ttft,
            duration=duration,
            cancelled=cancelled,
            label=self._label,
            completion_tokens=usage.get("output_tokens", 0),
            prompt_tokens=usage.get("input_tokens", 0),
            total_tokens=usage.get("total_tokens", 0),
            tokens_per_second=usage.get("output_tokens", 0) / duration,
            error=metrics_error,
            input_token_details=MultimodalLLMMetrics.InputTokenDetails(
                cached_tokens=input_token_details.get("cached_tokens", 0),
                text_tokens=usage.get("input_token_details", {}).get("text_tokens", 0),
                audio_tokens=usage.get("input_token_details", {}).get(
                    "audio_tokens", 0
                ),
                cached_tokens_details=MultimodalLLMMetrics.CachedTokenDetails(
                    text_tokens=input_token_details.get(
                        "cached_tokens_details", {}
                    ).get("text_tokens", 0),
                    audio_tokens=input_token_details.get(
                        "cached_tokens_details", {}
                    ).get("audio_tokens", 0),
                ),
            ),
            output_token_details=MultimodalLLMMetrics.OutputTokenDetails(
                text_tokens=usage.get("output_token_details", {}).get("text_tokens", 0),
                audio_tokens=usage.get("output_token_details", {}).get(
                    "audio_tokens", 0
                ),
            ),
        )
        self.emit("metrics_collected", metrics)

    def _get_content(self, ptr: _ContentPtr) -> RealtimeContent:
        response = self._pending_responses[ptr["response_id"]]
        output = response.output[ptr["output_index"]]
        content = output.content[ptr["content_index"]]
        return content

    @utils.log_exceptions(logger=logger)
    async def _run_fnc_task(self, fnc_call_info: llm.FunctionCallInfo, item_id: str):
        logger.debug(
            "executing ai function",
            extra={
                "function": fnc_call_info.function_info.name,
            },
        )

        called_fnc = fnc_call_info.execute()
        await called_fnc.task

        tool_call = llm.ChatMessage.create_tool_from_called_function(called_fnc)
        logger.info(
            "creating response for tool call",
            extra={
                "function": fnc_call_info.function_info.name,
            },
        )
        if called_fnc.result is not None:
            create_fut = self.conversation.item.create(
                tool_call,
                previous_item_id=item_id,
            )
            await self.response.create(on_duplicate="keep_both")
            await create_fut

        # update the message with the tool call result
        msg = self._remote_conversation_items.get(tool_call.id)
        if msg is not None:
            assert msg.tool_call_id == tool_call.tool_call_id
            assert msg.role == "tool"
            msg.name = tool_call.name
            msg.content = tool_call.content
            msg.tool_exception = tool_call.tool_exception

        self.emit("function_calls_finished", [called_fnc])

    def logging_extra(self) -> dict:
        return {"session_id": self._session_id}

Abstract base class for generic types.

On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::

class Mapping[KT, VT]:
    def __getitem__(self, key: KT) -> VT:
        ...
    # Etc.

On older versions of Python, however, generic classes have to explicitly inherit from Generic.

After a class has been declared to be generic, it can then be used as follows::

def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT:
    try:
        return mapping[key]
    except KeyError:
        return default

Initialize a new instance of EventEmitter.

Ancestors

Class variables

var Conversation
var ConversationItem
var InputAudioBuffer
var Response

Instance variables

prop conversation : Conversation
Expand source code
@property
def conversation(self) -> Conversation:
    return RealtimeSession.Conversation(self)
prop fnc_ctx : llm.FunctionContext | None
Expand source code
@property
def fnc_ctx(self) -> llm.FunctionContext | None:
    return self._fnc_ctx
prop input_audio_buffer : InputAudioBuffer
Expand source code
@property
def input_audio_buffer(self) -> InputAudioBuffer:
    return RealtimeSession.InputAudioBuffer(self)
prop response : Response
Expand source code
@property
def response(self) -> Response:
    return RealtimeSession.Response(self)

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    if self._send_ch.closed:
        return

    self._send_ch.close()
    await self._main_atask
def chat_ctx_copy(self) ‑> ChatContext
Expand source code
def chat_ctx_copy(self) -> llm.ChatContext:
    return self._remote_conversation_items.to_chat_context()
def logging_extra(self) ‑> dict
Expand source code
def logging_extra(self) -> dict:
    return {"session_id": self._session_id}
def session_update(self,
*,
modalities: list[api_proto.Modality] | None = None,
instructions: str | None = None,
voice: api_proto.Voice | None = None,
input_audio_format: api_proto.AudioFormat | None = None,
output_audio_format: api_proto.AudioFormat | None = None,
input_audio_transcription: InputTranscriptionOptions | None = None,
turn_detection: ServerVadOptions | None = None,
tool_choice: api_proto.ToolChoice | None = None,
temperature: float | None = None,
max_response_output_tokens: "int | Literal['inf'] | None" = None) ‑> None
Expand source code
def session_update(
    self,
    *,
    modalities: list[api_proto.Modality] | None = None,
    instructions: str | None = None,
    voice: api_proto.Voice | None = None,
    input_audio_format: api_proto.AudioFormat | None = None,
    output_audio_format: api_proto.AudioFormat | None = None,
    input_audio_transcription: InputTranscriptionOptions | None = None,
    turn_detection: ServerVadOptions | None = None,
    tool_choice: api_proto.ToolChoice | None = None,
    temperature: float | None = None,
    max_response_output_tokens: int | Literal["inf"] | None = None,
) -> None:
    self._opts = deepcopy(self._opts)
    if modalities is not None:
        self._opts.modalities = modalities
    if instructions is not None:
        self._opts.instructions = instructions
    if voice is not None:
        self._opts.voice = voice
    if input_audio_format is not None:
        self._opts.input_audio_format = input_audio_format
    if output_audio_format is not None:
        self._opts.output_audio_format = output_audio_format
    if input_audio_transcription is not None:
        self._opts.input_audio_transcription = input_audio_transcription
    if turn_detection is not None:
        self._opts.turn_detection = turn_detection
    if tool_choice is not None:
        self._opts.tool_choice = tool_choice
    if temperature is not None:
        self._opts.temperature = temperature
    if max_response_output_tokens is not None:
        self._opts.max_response_output_tokens = max_response_output_tokens

    tools = []
    if self._fnc_ctx is not None:
        for fnc in self._fnc_ctx.ai_functions.values():
            # the realtime API is using internally-tagged polymorphism.
            # build_oai_function_description was built for the ChatCompletion API
            function_data = build_oai_function_description(fnc)["function"]
            function_data["type"] = "function"
            tools.append(function_data)

    server_vad_opts: api_proto.ServerVad | None = None
    if self._opts.turn_detection is not None:
        server_vad_opts = {
            "type": "server_vad",
            "threshold": self._opts.turn_detection.threshold,
            "prefix_padding_ms": self._opts.turn_detection.prefix_padding_ms,
            "silence_duration_ms": self._opts.turn_detection.silence_duration_ms,
        }
    input_audio_transcription_opts: api_proto.InputAudioTranscription | None = None
    if self._opts.input_audio_transcription is not None:
        input_audio_transcription_opts = {
            "model": self._opts.input_audio_transcription.model,
        }

    session_data: api_proto.ClientEvent.SessionUpdateData = {
        "modalities": self._opts.modalities,
        "instructions": self._opts.instructions,
        "voice": self._opts.voice,
        "input_audio_format": self._opts.input_audio_format,
        "output_audio_format": self._opts.output_audio_format,
        "input_audio_transcription": input_audio_transcription_opts,
        "turn_detection": server_vad_opts,
        "tools": tools,
        "tool_choice": self._opts.tool_choice,
        "temperature": self._opts.temperature,
        "max_response_output_tokens": None,
    }

    # azure doesn't support inf for max_response_output_tokens
    if not self._opts.is_azure or isinstance(
        self._opts.max_response_output_tokens, int
    ):
        session_data["max_response_output_tokens"] = (
            self._opts.max_response_output_tokens
        )
    else:
        del session_data["max_response_output_tokens"]  # type: ignore

    self._queue_msg(
        {
            "type": "session.update",
            "session": session_data,
        }
    )
async def set_chat_ctx(self, new_ctx: llm.ChatContext) ‑> None
Expand source code
async def set_chat_ctx(self, new_ctx: llm.ChatContext) -> None:
    """Sync the chat context with the agent's chat context.

    Compute the minimum number of insertions and deletions to transform the old
    chat context messages to the new chat context messages.
    """
    original_ctx = self._remote_conversation_items.to_chat_context()

    # filter out messages that are not function calls and content is None
    filtered_messages = [
        msg
        for msg in new_ctx.messages
        if msg.tool_call_id or msg.content is not None
    ]
    changes = utils._compute_changes(
        original_ctx.messages, filtered_messages, key_fnc=lambda x: x.id
    )
    logger.debug(
        "sync chat context",
        extra={
            "to_delete": [msg.id for msg in changes.to_delete],
            "to_add": [
                (prev.id if prev else None, msg.id) for prev, msg in changes.to_add
            ],
        },
    )

    # append an empty audio message if all new messages are text
    if changes.to_add and not any(
        isinstance(msg.content, llm.ChatAudio) for _, msg in changes.to_add
    ):
        # Patch: append an empty audio message to set the API in audio mode
        changes.to_add.append((None, self._create_empty_user_audio_message(1.0)))

    _futs = [
        self.conversation.item.delete(item_id=msg.id) for msg in changes.to_delete
    ] + [
        self.conversation.item.create(msg, prev.id if prev else None)
        for prev, msg in changes.to_add
    ]

    # wait for all the futures to complete
    await asyncio.gather(*_futs)

Sync the chat context with the agent's chat context.

Compute the minimum number of insertions and deletions to transform the old chat context messages to the new chat context messages.

Inherited members

class RealtimeSessionOptions (model: api_proto.OpenAIModel | str,
modalities: list[api_proto.Modality],
instructions: str,
voice: api_proto.Voice,
input_audio_format: api_proto.AudioFormat,
output_audio_format: api_proto.AudioFormat,
input_audio_transcription: InputTranscriptionOptions | None,
turn_detection: ServerVadOptions | None,
tool_choice: api_proto.ToolChoice,
temperature: float,
max_response_output_tokens: "int | Literal['inf']")
Expand source code
@dataclass
class RealtimeSessionOptions:
    model: api_proto.OpenAIModel | str
    modalities: list[api_proto.Modality]
    instructions: str
    voice: api_proto.Voice
    input_audio_format: api_proto.AudioFormat
    output_audio_format: api_proto.AudioFormat
    input_audio_transcription: InputTranscriptionOptions | None
    turn_detection: ServerVadOptions | None
    tool_choice: api_proto.ToolChoice
    temperature: float
    max_response_output_tokens: int | Literal["inf"]

RealtimeSessionOptions(model: 'api_proto.OpenAIModel | str', modalities: 'list[api_proto.Modality]', instructions: 'str', voice: 'api_proto.Voice', input_audio_format: 'api_proto.AudioFormat', output_audio_format: 'api_proto.AudioFormat', input_audio_transcription: 'InputTranscriptionOptions | None', turn_detection: 'ServerVadOptions | None', tool_choice: 'api_proto.ToolChoice', temperature: 'float', max_response_output_tokens: "int | Literal['inf']")

Subclasses

  • livekit.plugins.openai.realtime.realtime_model._ModelOptions

Class variables

var input_audio_format : Literal['pcm16', 'g711_ulaw', 'g711_alaw']
var input_audio_transcriptionInputTranscriptionOptions | None
var instructions : str
var max_response_output_tokens : int | Literal['inf']
var modalities : list[typing.Literal['text', 'audio']]
var model : Literal['gpt-4o-realtime-preview', 'gpt-4o-realtime-preview-2024-10-01', 'gpt-4o-realtime-preview-2024-12-17', 'gpt-4o-mini-realtime-preview', 'gpt-4o-mini-realtime-preview-2024-12-17'] | str
var output_audio_format : Literal['pcm16', 'g711_ulaw', 'g711_alaw']
var temperature : float
var tool_choice : Literal['auto', 'none', 'required'] | FunctionToolChoice
var turn_detectionServerVadOptions | None
var voice : Literal['alloy', 'echo', 'shimmer', 'ash', 'ballad', 'coral', 'sage', 'verse']
class RealtimeToolCall (name: str, arguments: str, tool_call_id: str)
Expand source code
@dataclass
class RealtimeToolCall:
    name: str
    """name of the function"""
    arguments: str
    """accumulated arguments"""
    tool_call_id: str
    """id of the tool call"""

RealtimeToolCall(name: 'str', arguments: 'str', tool_call_id: 'str')

Class variables

var arguments : str

accumulated arguments

var name : str

name of the function

var tool_call_id : str

id of the tool call

class ServerVadOptions (threshold: float, prefix_padding_ms: int, silence_duration_ms: int)
Expand source code
@dataclass
class ServerVadOptions:
    threshold: float
    prefix_padding_ms: int
    silence_duration_ms: int

ServerVadOptions(threshold: 'float', prefix_padding_ms: 'int', silence_duration_ms: 'int')

Class variables

var prefix_padding_ms : int
var silence_duration_ms : int
var threshold : float