Module livekit.plugins.openai.realtime
Sub-modules
livekit.plugins.openai.realtime.api_proto
livekit.plugins.openai.realtime.log
livekit.plugins.openai.realtime.realtime_model
livekit.plugins.openai.realtime.remote_items
Classes
class InputTranscriptionCompleted (item_id: str, transcript: str)
-
InputTranscriptionCompleted(item_id: 'str', transcript: 'str')
Expand source code
@dataclass class InputTranscriptionCompleted: item_id: str """id of the item""" transcript: str """transcript of the input audio"""
Class variables
var item_id : str
-
id of the item
var transcript : str
-
transcript of the input audio
class InputTranscriptionFailed (item_id: str, message: str)
-
InputTranscriptionFailed(item_id: 'str', message: 'str')
Expand source code
@dataclass class InputTranscriptionFailed: item_id: str """id of the item""" message: str """error message"""
Class variables
var item_id : str
-
id of the item
var message : str
-
error message
class InputTranscriptionOptions (model: api_proto.InputTranscriptionModel | str)
-
InputTranscriptionOptions(model: 'api_proto.InputTranscriptionModel | str')
Expand source code
@dataclass class InputTranscriptionOptions: model: api_proto.InputTranscriptionModel | str
Class variables
var model : Union[Literal['whisper-1'], str]
class RealtimeContent (response_id: str, item_id: str, output_index: int, content_index: int, text: str, audio: list[rtc.AudioFrame], text_stream: AsyncIterable[str], audio_stream: AsyncIterable[rtc.AudioFrame], tool_calls: list[RealtimeToolCall], content_type: api_proto.Modality)
-
RealtimeContent(response_id: 'str', item_id: 'str', output_index: 'int', content_index: 'int', text: 'str', audio: 'list[rtc.AudioFrame]', text_stream: 'AsyncIterable[str]', audio_stream: 'AsyncIterable[rtc.AudioFrame]', tool_calls: 'list[RealtimeToolCall]', content_type: 'api_proto.Modality')
Expand source code
@dataclass class RealtimeContent: response_id: str """id of the response""" item_id: str """id of the item""" output_index: int """index of the output""" content_index: int """index of the content""" text: str """accumulated text content""" audio: list[rtc.AudioFrame] """accumulated audio content""" text_stream: AsyncIterable[str] """stream of text content""" audio_stream: AsyncIterable[rtc.AudioFrame] """stream of audio content""" tool_calls: list[RealtimeToolCall] """pending tool calls""" content_type: api_proto.Modality """type of the content"""
Class variables
var audio : list[AudioFrame]
-
accumulated audio content
var audio_stream : AsyncIterable[AudioFrame]
-
stream of audio content
var content_index : int
-
index of the content
var content_type : Literal['text', 'audio']
-
type of the content
var item_id : str
-
id of the item
var output_index : int
-
index of the output
var response_id : str
-
id of the response
var text : str
-
accumulated text content
var text_stream : AsyncIterable[str]
-
stream of text content
var tool_calls : list[RealtimeToolCall]
-
pending tool calls
class RealtimeModel (*, instructions: str = '', modalities: list[api_proto.Modality] = ['text', 'audio'], model: str | None = 'gpt-4o-realtime-preview-2024-10-01', voice: api_proto.Voice = 'alloy', input_audio_format: api_proto.AudioFormat = 'pcm16', output_audio_format: api_proto.AudioFormat = 'pcm16', input_audio_transcription: InputTranscriptionOptions = InputTranscriptionOptions(model='whisper-1'), turn_detection: ServerVadOptions = ServerVadOptions(threshold=0.5, prefix_padding_ms=300, silence_duration_ms=500), tool_choice: api_proto.ToolChoice = 'auto', temperature: float = 0.8, max_response_output_tokens: "int | Literal['inf']" = 'inf', base_url: str | None = None, http_session: aiohttp.ClientSession | None = None, loop: asyncio.AbstractEventLoop | None = None, azure_deployment: str | None = None, entra_token: str | None = None, api_key: str | None = None, api_version: str | None = None)
-
Initializes a RealtimeClient instance for interacting with OpenAI's Realtime API.
Args
instructions
:str
, optional- Initial system instructions for the model. Defaults to "".
api_key
:str
orNone
, optional- OpenAI API key. If None, will attempt to read from the environment variable OPENAI_API_KEY
modalities
:list[api_proto.Modality]
, optional- Modalities to use, such as ["text", "audio"]. Defaults to ["text", "audio"].
model
:str
orNone
, optional- The name of the model to use. Defaults to "gpt-4o-realtime-preview-2024-10-01".
voice
:api_proto.Voice
, optional- Voice setting for audio outputs. Defaults to "alloy".
input_audio_format
:api_proto.AudioFormat
, optional- Format of input audio data. Defaults to "pcm16".
output_audio_format
:api_proto.AudioFormat
, optional- Format of output audio data. Defaults to "pcm16".
input_audio_transcription
:InputTranscriptionOptions
, optional- Options for transcribing input audio. Defaults to DEFAULT_INPUT_AUDIO_TRANSCRIPTION.
turn_detection
:ServerVadOptions
, optional- Options for server-based voice activity detection (VAD). Defaults to DEFAULT_SERVER_VAD_OPTIONS.
tool_choice
:api_proto.ToolChoice
, optional- Tool choice for the model, such as "auto". Defaults to "auto".
temperature
:float
, optional- Sampling temperature for response generation. Defaults to 0.8.
- max_response_output_tokens (int or Literal["inf"], optional): Maximum number of tokens in the response. Defaults to "inf".
base_url
:str
orNone
, optional- Base URL for the API endpoint. If None, defaults to OpenAI's default API URL.
http_session
:aiohttp.ClientSession
orNone
, optional- Async HTTP session to use for requests. If None, a new session will be created.
loop
:asyncio.AbstractEventLoop
orNone
, optional- Event loop to use for async operations. If None, the current event loop is used.
Raises
ValueError
- If the API key is not provided and cannot be found in environment variables.
Expand source code
class RealtimeModel: @overload def __init__( self, *, instructions: str = "", modalities: list[api_proto.Modality] = ["text", "audio"], model: str = "gpt-4o-realtime-preview-2024-10-01", voice: api_proto.Voice = "alloy", input_audio_format: api_proto.AudioFormat = "pcm16", output_audio_format: api_proto.AudioFormat = "pcm16", input_audio_transcription: InputTranscriptionOptions = DEFAULT_INPUT_AUDIO_TRANSCRIPTION, turn_detection: ServerVadOptions = DEFAULT_SERVER_VAD_OPTIONS, tool_choice: api_proto.ToolChoice = "auto", temperature: float = 0.8, max_response_output_tokens: int | Literal["inf"] = "inf", api_key: str | None = None, base_url: str | None = None, http_session: aiohttp.ClientSession | None = None, loop: asyncio.AbstractEventLoop | None = None, ) -> None: ... @overload def __init__( self, *, azure_deployment: str | None = None, entra_token: str | None = None, api_key: str | None = None, api_version: str | None = None, base_url: str | None = None, instructions: str = "", modalities: list[api_proto.Modality] = ["text", "audio"], voice: api_proto.Voice = "alloy", input_audio_format: api_proto.AudioFormat = "pcm16", output_audio_format: api_proto.AudioFormat = "pcm16", input_audio_transcription: InputTranscriptionOptions = DEFAULT_INPUT_AUDIO_TRANSCRIPTION, turn_detection: ServerVadOptions = DEFAULT_SERVER_VAD_OPTIONS, tool_choice: api_proto.ToolChoice = "auto", temperature: float = 0.8, max_response_output_tokens: int | Literal["inf"] = "inf", http_session: aiohttp.ClientSession | None = None, loop: asyncio.AbstractEventLoop | None = None, ) -> None: ... def __init__( self, *, instructions: str = "", modalities: list[api_proto.Modality] = ["text", "audio"], model: str | None = "gpt-4o-realtime-preview-2024-10-01", voice: api_proto.Voice = "alloy", input_audio_format: api_proto.AudioFormat = "pcm16", output_audio_format: api_proto.AudioFormat = "pcm16", input_audio_transcription: InputTranscriptionOptions = DEFAULT_INPUT_AUDIO_TRANSCRIPTION, turn_detection: ServerVadOptions = DEFAULT_SERVER_VAD_OPTIONS, tool_choice: api_proto.ToolChoice = "auto", temperature: float = 0.8, max_response_output_tokens: int | Literal["inf"] = "inf", base_url: str | None = None, http_session: aiohttp.ClientSession | None = None, loop: asyncio.AbstractEventLoop | None = None, # azure specific parameters azure_deployment: str | None = None, entra_token: str | None = None, api_key: str | None = None, api_version: str | None = None, ) -> None: """ Initializes a RealtimeClient instance for interacting with OpenAI's Realtime API. Args: instructions (str, optional): Initial system instructions for the model. Defaults to "". api_key (str or None, optional): OpenAI API key. If None, will attempt to read from the environment variable OPENAI_API_KEY modalities (list[api_proto.Modality], optional): Modalities to use, such as ["text", "audio"]. Defaults to ["text", "audio"]. model (str or None, optional): The name of the model to use. Defaults to "gpt-4o-realtime-preview-2024-10-01". voice (api_proto.Voice, optional): Voice setting for audio outputs. Defaults to "alloy". input_audio_format (api_proto.AudioFormat, optional): Format of input audio data. Defaults to "pcm16". output_audio_format (api_proto.AudioFormat, optional): Format of output audio data. Defaults to "pcm16". input_audio_transcription (InputTranscriptionOptions, optional): Options for transcribing input audio. Defaults to DEFAULT_INPUT_AUDIO_TRANSCRIPTION. turn_detection (ServerVadOptions, optional): Options for server-based voice activity detection (VAD). Defaults to DEFAULT_SERVER_VAD_OPTIONS. tool_choice (api_proto.ToolChoice, optional): Tool choice for the model, such as "auto". Defaults to "auto". temperature (float, optional): Sampling temperature for response generation. Defaults to 0.8. max_response_output_tokens (int or Literal["inf"], optional): Maximum number of tokens in the response. Defaults to "inf". base_url (str or None, optional): Base URL for the API endpoint. If None, defaults to OpenAI's default API URL. http_session (aiohttp.ClientSession or None, optional): Async HTTP session to use for requests. If None, a new session will be created. loop (asyncio.AbstractEventLoop or None, optional): Event loop to use for async operations. If None, the current event loop is used. Raises: ValueError: If the API key is not provided and cannot be found in environment variables. """ super().__init__() self._base_url = base_url is_azure = ( api_version is not None or entra_token is not None or azure_deployment is not None ) api_key = api_key or os.environ.get("OPENAI_API_KEY") if api_key is None and not is_azure: raise ValueError( "OpenAI API key is required, either using the argument or by setting the OPENAI_API_KEY environmental variable" ) if not base_url: base_url = os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1") self._default_opts = _ModelOptions( model=model, modalities=modalities, instructions=instructions, voice=voice, input_audio_format=input_audio_format, output_audio_format=output_audio_format, input_audio_transcription=input_audio_transcription, turn_detection=turn_detection, temperature=temperature, tool_choice=tool_choice, max_response_output_tokens=max_response_output_tokens, api_key=api_key, base_url=base_url, azure_deployment=azure_deployment, entra_token=entra_token, is_azure=is_azure, api_version=api_version, ) self._loop = loop or asyncio.get_event_loop() self._rt_sessions: list[RealtimeSession] = [] self._http_session = http_session @classmethod def with_azure( cls, *, azure_deployment: str, azure_endpoint: str | None = None, api_version: str | None = None, api_key: str | None = None, entra_token: str | None = None, base_url: str | None = None, instructions: str = "", modalities: list[api_proto.Modality] = ["text", "audio"], voice: api_proto.Voice = "alloy", input_audio_format: api_proto.AudioFormat = "pcm16", output_audio_format: api_proto.AudioFormat = "pcm16", input_audio_transcription: InputTranscriptionOptions = DEFAULT_INPUT_AUDIO_TRANSCRIPTION, turn_detection: ServerVadOptions = DEFAULT_SERVER_VAD_OPTIONS, tool_choice: api_proto.ToolChoice = "auto", temperature: float = 0.8, max_response_output_tokens: int | Literal["inf"] = "inf", http_session: aiohttp.ClientSession | None = None, loop: asyncio.AbstractEventLoop | None = None, ): """ Create a RealtimeClient instance configured for Azure OpenAI Service. Args: azure_deployment (str): The name of your Azure OpenAI deployment. azure_endpoint (str or None, optional): The endpoint URL for your Azure OpenAI resource. If None, will attempt to read from the environment variable AZURE_OPENAI_ENDPOINT. api_version (str or None, optional): API version to use with Azure OpenAI Service. If None, will attempt to read from the environment variable OPENAI_API_VERSION. api_key (str or None, optional): Azure OpenAI API key. If None, will attempt to read from the environment variable AZURE_OPENAI_API_KEY. entra_token (str or None, optional): Azure Entra authentication token. Required if not using API key authentication. base_url (str or None, optional): Base URL for the API endpoint. If None, constructed from the azure_endpoint. instructions (str, optional): Initial system instructions for the model. Defaults to "". modalities (list[api_proto.Modality], optional): Modalities to use, such as ["text", "audio"]. Defaults to ["text", "audio"]. voice (api_proto.Voice, optional): Voice setting for audio outputs. Defaults to "alloy". input_audio_format (api_proto.AudioFormat, optional): Format of input audio data. Defaults to "pcm16". output_audio_format (api_proto.AudioFormat, optional): Format of output audio data. Defaults to "pcm16". input_audio_transcription (InputTranscriptionOptions, optional): Options for transcribing input audio. Defaults to DEFAULT_INPUT_AUDIO_TRANSCRIPTION. turn_detection (ServerVadOptions, optional): Options for server-based voice activity detection (VAD). Defaults to DEFAULT_SERVER_VAD_OPTIONS. tool_choice (api_proto.ToolChoice, optional): Tool choice for the model, such as "auto". Defaults to "auto". temperature (float, optional): Sampling temperature for response generation. Defaults to 0.8. max_response_output_tokens (int or Literal["inf"], optional): Maximum number of tokens in the response. Defaults to "inf". http_session (aiohttp.ClientSession or None, optional): Async HTTP session to use for requests. If None, a new session will be created. loop (asyncio.AbstractEventLoop or None, optional): Event loop to use for async operations. If None, the current event loop is used. Returns: RealtimeClient: An instance of RealtimeClient configured for Azure OpenAI Service. Raises: ValueError: If required Azure parameters are missing or invalid. """ api_key = api_key or os.getenv("AZURE_OPENAI_API_KEY") if api_key is None and entra_token is None: raise ValueError( "Missing credentials. Please pass one of `api_key`, `entra_token`, or the `AZURE_OPENAI_API_KEY` environment variable." ) api_version = api_version or os.getenv("OPENAI_API_VERSION") if api_version is None: raise ValueError( "Must provide either the `api_version` argument or the `OPENAI_API_VERSION` environment variable" ) if base_url is None: azure_endpoint = azure_endpoint or os.getenv("AZURE_OPENAI_ENDPOINT") if azure_endpoint is None: raise ValueError( "Missing Azure endpoint. Please pass the `azure_endpoint` parameter or set the `AZURE_OPENAI_ENDPOINT` environment variable." ) base_url = f"{azure_endpoint.rstrip('/')}/openai" elif azure_endpoint is not None: raise ValueError("base_url and azure_endpoint are mutually exclusive") return cls( instructions=instructions, modalities=modalities, voice=voice, input_audio_format=input_audio_format, output_audio_format=output_audio_format, input_audio_transcription=input_audio_transcription, turn_detection=turn_detection, tool_choice=tool_choice, temperature=temperature, max_response_output_tokens=max_response_output_tokens, api_key=api_key, http_session=http_session, loop=loop, azure_deployment=azure_deployment, api_version=api_version, entra_token=entra_token, base_url=base_url, ) def _ensure_session(self) -> aiohttp.ClientSession: if not self._http_session: self._http_session = utils.http_context.http_session() return self._http_session @property def sessions(self) -> list[RealtimeSession]: return self._rt_sessions def session( self, *, chat_ctx: llm.ChatContext | None = None, fnc_ctx: llm.FunctionContext | None = None, modalities: list[api_proto.Modality] | None = None, instructions: str | None = None, voice: api_proto.Voice | None = None, input_audio_format: api_proto.AudioFormat | None = None, output_audio_format: api_proto.AudioFormat | None = None, tool_choice: api_proto.ToolChoice | None = None, input_audio_transcription: InputTranscriptionOptions | None = None, turn_detection: ServerVadOptions | None = None, temperature: float | None = None, max_response_output_tokens: int | Literal["inf"] | None = None, ) -> RealtimeSession: opts = deepcopy(self._default_opts) if modalities is not None: opts.modalities = modalities if instructions is not None: opts.instructions = instructions if voice is not None: opts.voice = voice if input_audio_format is not None: opts.input_audio_format = input_audio_format if output_audio_format is not None: opts.output_audio_format = output_audio_format if tool_choice is not None: opts.tool_choice = tool_choice if input_audio_transcription is not None: opts.input_audio_transcription if turn_detection is not None: opts.turn_detection = turn_detection if temperature is not None: opts.temperature = temperature if max_response_output_tokens is not None: opts.max_response_output_tokens = max_response_output_tokens new_session = RealtimeSession( chat_ctx=chat_ctx or llm.ChatContext(), fnc_ctx=fnc_ctx, opts=opts, http_session=self._ensure_session(), loop=self._loop, ) self._rt_sessions.append(new_session) return new_session async def aclose(self) -> None: for session in self._rt_sessions: await session.aclose()
Static methods
def with_azure(*, azure_deployment: str, azure_endpoint: str | None = None, api_version: str | None = None, api_key: str | None = None, entra_token: str | None = None, base_url: str | None = None, instructions: str = '', modalities: list[api_proto.Modality] = ['text', 'audio'], voice: api_proto.Voice = 'alloy', input_audio_format: api_proto.AudioFormat = 'pcm16', output_audio_format: api_proto.AudioFormat = 'pcm16', input_audio_transcription: InputTranscriptionOptions = InputTranscriptionOptions(model='whisper-1'), turn_detection: ServerVadOptions = ServerVadOptions(threshold=0.5, prefix_padding_ms=300, silence_duration_ms=500), tool_choice: api_proto.ToolChoice = 'auto', temperature: float = 0.8, max_response_output_tokens: "int | Literal['inf']" = 'inf', http_session: aiohttp.ClientSession | None = None, loop: asyncio.AbstractEventLoop | None = None)
-
Create a RealtimeClient instance configured for Azure OpenAI Service.
Args
azure_deployment
:str
- The name of your Azure OpenAI deployment.
azure_endpoint
:str
orNone
, optional- The endpoint URL for your Azure OpenAI resource. If None, will attempt to read from the environment variable AZURE_OPENAI_ENDPOINT.
api_version
:str
orNone
, optional- API version to use with Azure OpenAI Service. If None, will attempt to read from the environment variable OPENAI_API_VERSION.
api_key
:str
orNone
, optional- Azure OpenAI API key. If None, will attempt to read from the environment variable AZURE_OPENAI_API_KEY.
entra_token
:str
orNone
, optional- Azure Entra authentication token. Required if not using API key authentication.
base_url
:str
orNone
, optional- Base URL for the API endpoint. If None, constructed from the azure_endpoint.
instructions
:str
, optional- Initial system instructions for the model. Defaults to "".
modalities
:list[api_proto.Modality]
, optional- Modalities to use, such as ["text", "audio"]. Defaults to ["text", "audio"].
voice
:api_proto.Voice
, optional- Voice setting for audio outputs. Defaults to "alloy".
input_audio_format
:api_proto.AudioFormat
, optional- Format of input audio data. Defaults to "pcm16".
output_audio_format
:api_proto.AudioFormat
, optional- Format of output audio data. Defaults to "pcm16".
input_audio_transcription
:InputTranscriptionOptions
, optional- Options for transcribing input audio. Defaults to DEFAULT_INPUT_AUDIO_TRANSCRIPTION.
turn_detection
:ServerVadOptions
, optional- Options for server-based voice activity detection (VAD). Defaults to DEFAULT_SERVER_VAD_OPTIONS.
tool_choice
:api_proto.ToolChoice
, optional- Tool choice for the model, such as "auto". Defaults to "auto".
temperature
:float
, optional- Sampling temperature for response generation. Defaults to 0.8.
- max_response_output_tokens (int or Literal["inf"], optional): Maximum number of tokens in the response. Defaults to "inf".
http_session
:aiohttp.ClientSession
orNone
, optional- Async HTTP session to use for requests. If None, a new session will be created.
loop
:asyncio.AbstractEventLoop
orNone
, optional- Event loop to use for async operations. If None, the current event loop is used.
Returns
RealtimeClient
- An instance of RealtimeClient configured for Azure OpenAI Service.
Raises
ValueError
- If required Azure parameters are missing or invalid.
Instance variables
prop sessions : list[RealtimeSession]
-
Expand source code
@property def sessions(self) -> list[RealtimeSession]: return self._rt_sessions
Methods
async def aclose(self) ‑> None
def session(self, *, chat_ctx: llm.ChatContext | None = None, fnc_ctx: llm.FunctionContext | None = None, modalities: list[api_proto.Modality] | None = None, instructions: str | None = None, voice: api_proto.Voice | None = None, input_audio_format: api_proto.AudioFormat | None = None, output_audio_format: api_proto.AudioFormat | None = None, tool_choice: api_proto.ToolChoice | None = None, input_audio_transcription: InputTranscriptionOptions | None = None, turn_detection: ServerVadOptions | None = None, temperature: float | None = None, max_response_output_tokens: "int | Literal['inf'] | None" = None) ‑> RealtimeSession
class RealtimeOutput (response_id: str, item_id: str, output_index: int, role: api_proto.Role, type: "Literal['message', 'function_call']", content: list[RealtimeContent], done_fut: asyncio.Future[None])
-
RealtimeOutput(response_id: 'str', item_id: 'str', output_index: 'int', role: 'api_proto.Role', type: "Literal['message', 'function_call']", content: 'list[RealtimeContent]', done_fut: 'asyncio.Future[None]')
Expand source code
@dataclass class RealtimeOutput: response_id: str """id of the response""" item_id: str """id of the item""" output_index: int """index of the output""" role: api_proto.Role """role of the message""" type: Literal["message", "function_call"] """type of the output""" content: list[RealtimeContent] """list of content""" done_fut: asyncio.Future[None] """future that will be set when the output is completed"""
Class variables
var content : list[RealtimeContent]
-
list of content
var done_fut : _asyncio.Future[None]
-
future that will be set when the output is completed
var item_id : str
-
id of the item
var output_index : int
-
index of the output
var response_id : str
-
id of the response
var role : Literal['system', 'assistant', 'user', 'tool']
-
role of the message
var type : Literal['message', 'function_call']
-
type of the output
class RealtimeResponse (id: str, status: api_proto.ResponseStatus, status_details: api_proto.ResponseStatusDetails | None, output: list[RealtimeOutput], usage: Usage | None, done_fut: asyncio.Future[None])
-
RealtimeResponse(id: 'str', status: 'api_proto.ResponseStatus', status_details: 'api_proto.ResponseStatusDetails | None', output: 'list[RealtimeOutput]', usage: 'api_proto.Usage | None', done_fut: 'asyncio.Future[None]')
Expand source code
@dataclass class RealtimeResponse: id: str """id of the message""" status: api_proto.ResponseStatus """status of the response""" status_details: api_proto.ResponseStatusDetails | None """details of the status (only with "incomplete, cancelled and failed")""" output: list[RealtimeOutput] """list of outputs""" usage: api_proto.Usage | None """usage of the response""" done_fut: asyncio.Future[None] """future that will be set when the response is completed"""
Class variables
var done_fut : _asyncio.Future[None]
-
future that will be set when the response is completed
var id : str
-
id of the message
var output : list[RealtimeOutput]
-
list of outputs
var status : Literal['in_progress', 'completed', 'incomplete', 'cancelled', 'failed']
-
status of the response
var status_details : Union[CancelledStatusDetails, IncompleteStatusDetails, FailedStatusDetails, ForwardRef(None)]
-
details of the status (only with "incomplete, cancelled and failed")
var usage : Usage | None
-
usage of the response
class RealtimeSession (*, opts: _ModelOptions, http_session: aiohttp.ClientSession, chat_ctx: llm.ChatContext, fnc_ctx: llm.FunctionContext | None, loop: asyncio.AbstractEventLoop)
-
Abstract base class for generic types.
On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::
class Mapping[KT, VT]: def __getitem__(self, key: KT) -> VT: ... # Etc.
On older versions of Python, however, generic classes have to explicitly inherit from Generic.
After a class has been declared to be generic, it can then be used as follows::
def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
Initialize a new instance of EventEmitter.
Expand source code
class RealtimeSession(utils.EventEmitter[EventTypes]): class InputAudioBuffer: def __init__(self, sess: RealtimeSession) -> None: self._sess = sess def append(self, frame: rtc.AudioFrame) -> None: self._sess._queue_msg( { "type": "input_audio_buffer.append", "audio": base64.b64encode(frame.data).decode("utf-8"), } ) def clear(self) -> None: self._sess._queue_msg({"type": "input_audio_buffer.clear"}) def commit(self) -> None: self._sess._queue_msg({"type": "input_audio_buffer.commit"}) class ConversationItem: def __init__(self, sess: RealtimeSession) -> None: self._sess = sess def create( self, message: llm.ChatMessage, previous_item_id: str | None = None ) -> asyncio.Future[bool]: fut = asyncio.Future[bool]() message_content = message.content tool_call_id = message.tool_call_id if not tool_call_id and message_content is None: # not a function call while the message content is None fut.set_result(False) return fut event: api_proto.ClientEvent.ConversationItemCreate | None = None if tool_call_id: if message.role == "tool": # function_call_output assert isinstance(message_content, str) event = { "type": "conversation.item.create", "previous_item_id": previous_item_id, "item": { "id": message.id, "type": "function_call_output", "call_id": tool_call_id, "output": message_content, }, } else: # function_call if not message.tool_calls or message.name is None: logger.warning( "function call message has no name or tool calls: %s", message, extra=self._sess.logging_extra(), ) fut.set_result(False) return fut if len(message.tool_calls) > 1: logger.warning( "function call message has multiple tool calls, " "only the first one will be used", extra=self._sess.logging_extra(), ) event = { "type": "conversation.item.create", "previous_item_id": previous_item_id, "item": { "id": message.id, "type": "function_call", "call_id": tool_call_id, "name": message.name, "arguments": message.tool_calls[0].raw_arguments, }, } else: if message_content is None: logger.warning( "message content is None, skipping: %s", message, extra=self._sess.logging_extra(), ) fut.set_result(False) return fut if not isinstance(message_content, list): message_content = [message_content] if message.role == "user": user_contents: list[ api_proto.InputTextContent | api_proto.InputAudioContent ] = [] for cnt in message_content: if isinstance(cnt, str): user_contents.append( { "type": "input_text", "text": cnt, } ) elif isinstance(cnt, llm.ChatAudio): user_contents.append( { "type": "input_audio", "audio": base64.b64encode( utils.merge_frames(cnt.frame).data ).decode("utf-8"), } ) event = { "type": "conversation.item.create", "previous_item_id": previous_item_id, "item": { "id": message.id, "type": "message", "role": "user", "content": user_contents, }, } elif message.role == "assistant": assistant_contents: list[api_proto.TextContent] = [] for cnt in message_content: if isinstance(cnt, str): assistant_contents.append( { "type": "text", "text": cnt, } ) elif isinstance(cnt, llm.ChatAudio): logger.warning( "audio content in assistant message is not supported" ) event = { "type": "conversation.item.create", "previous_item_id": previous_item_id, "item": { "id": message.id, "type": "message", "role": "assistant", "content": assistant_contents, }, } elif message.role == "system": system_contents: list[api_proto.InputTextContent] = [] for cnt in message_content: if isinstance(cnt, str): system_contents.append({"type": "input_text", "text": cnt}) elif isinstance(cnt, llm.ChatAudio): logger.warning( "audio content in system message is not supported" ) event = { "type": "conversation.item.create", "previous_item_id": previous_item_id, "item": { "id": message.id, "type": "message", "role": "system", "content": system_contents, }, } if event is None: logger.warning( "chat message is not supported inside the realtime API %s", message, extra=self._sess.logging_extra(), ) fut.set_result(False) return fut self._sess._item_created_futs[message.id] = fut self._sess._queue_msg(event) return fut def truncate( self, *, item_id: str, content_index: int, audio_end_ms: int ) -> asyncio.Future[bool]: fut = asyncio.Future[bool]() self._sess._item_truncated_futs[item_id] = fut self._sess._queue_msg( { "type": "conversation.item.truncate", "item_id": item_id, "content_index": content_index, "audio_end_ms": audio_end_ms, } ) return fut def delete(self, *, item_id: str) -> asyncio.Future[bool]: fut = asyncio.Future[bool]() self._sess._item_deleted_futs[item_id] = fut self._sess._queue_msg( { "type": "conversation.item.delete", "item_id": item_id, } ) return fut class Conversation: def __init__(self, sess: RealtimeSession) -> None: self._sess = sess @property def item(self) -> RealtimeSession.ConversationItem: return RealtimeSession.ConversationItem(self._sess) class Response: def __init__(self, sess: RealtimeSession) -> None: self._sess = sess def create(self) -> None: self._sess._queue_msg({"type": "response.create"}) def cancel(self) -> None: self._sess._queue_msg({"type": "response.cancel"}) def __init__( self, *, opts: _ModelOptions, http_session: aiohttp.ClientSession, chat_ctx: llm.ChatContext, fnc_ctx: llm.FunctionContext | None, loop: asyncio.AbstractEventLoop, ) -> None: super().__init__() self._main_atask = asyncio.create_task( self._main_task(), name="openai-realtime-session" ) # manage conversation items internally self._remote_converstation_items = remote_items._RemoteConversationItems() # wait for the item to be created or deleted self._item_created_futs: dict[str, asyncio.Future[bool]] = {} self._item_deleted_futs: dict[str, asyncio.Future[bool]] = {} self._item_truncated_futs: dict[str, asyncio.Future[bool]] = {} self._fnc_ctx = fnc_ctx self._loop = loop self._opts = opts self._send_ch = utils.aio.Chan[api_proto.ClientEvents]() self._http_session = http_session self._pending_responses: dict[str, RealtimeResponse] = {} self._session_id = "not-connected" self.session_update() # initial session init # sync the chat context to the session self._init_sync_task = asyncio.create_task(self.set_chat_ctx(chat_ctx)) self._fnc_tasks = utils.aio.TaskSet() async def aclose(self) -> None: if self._send_ch.closed: return self._send_ch.close() await self._main_atask @property def fnc_ctx(self) -> llm.FunctionContext | None: return self._fnc_ctx @fnc_ctx.setter def fnc_ctx(self, fnc_ctx: llm.FunctionContext | None) -> None: self._fnc_ctx = fnc_ctx @property def conversation(self) -> Conversation: return RealtimeSession.Conversation(self) @property def input_audio_buffer(self) -> InputAudioBuffer: return RealtimeSession.InputAudioBuffer(self) @property def response(self) -> Response: return RealtimeSession.Response(self) def session_update( self, *, modalities: list[api_proto.Modality] | None = None, instructions: str | None = None, voice: api_proto.Voice | None = None, input_audio_format: api_proto.AudioFormat | None = None, output_audio_format: api_proto.AudioFormat | None = None, input_audio_transcription: InputTranscriptionOptions | None = None, turn_detection: ServerVadOptions | None = None, tool_choice: api_proto.ToolChoice | None = None, temperature: float | None = None, max_response_output_tokens: int | Literal["inf"] | None = None, ) -> None: self._opts = deepcopy(self._opts) if modalities is not None: self._opts.modalities = modalities if instructions is not None: self._opts.instructions = instructions if voice is not None: self._opts.voice = voice if input_audio_format is not None: self._opts.input_audio_format = input_audio_format if output_audio_format is not None: self._opts.output_audio_format = output_audio_format if input_audio_transcription is not None: self._opts.input_audio_transcription = input_audio_transcription if turn_detection is not None: self._opts.turn_detection = turn_detection if tool_choice is not None: self._opts.tool_choice = tool_choice if temperature is not None: self._opts.temperature = temperature if max_response_output_tokens is not None: self._opts.max_response_output_tokens = max_response_output_tokens tools = [] if self._fnc_ctx is not None: for fnc in self._fnc_ctx.ai_functions.values(): # the realtime API is using internally-tagged polymorphism. # build_oai_function_description was built for the ChatCompletion API function_data = llm._oai_api.build_oai_function_description(fnc)[ "function" ] function_data["type"] = "function" tools.append(function_data) server_vad_opts: api_proto.ServerVad = { "type": "server_vad", "threshold": self._opts.turn_detection.threshold, "prefix_padding_ms": self._opts.turn_detection.prefix_padding_ms, "silence_duration_ms": self._opts.turn_detection.silence_duration_ms, } session_data: api_proto.ClientEvent.SessionUpdateData = { "modalities": self._opts.modalities, "instructions": self._opts.instructions, "voice": self._opts.voice, "input_audio_format": self._opts.input_audio_format, "output_audio_format": self._opts.output_audio_format, "input_audio_transcription": { "model": self._opts.input_audio_transcription.model, }, "turn_detection": server_vad_opts, "tools": tools, "tool_choice": self._opts.tool_choice, "temperature": self._opts.temperature, "max_response_output_tokens": None, } # azure doesn't support inf for max_response_output_tokens if not self._opts.is_azure or isinstance( self._opts.max_response_output_tokens, int ): session_data["max_response_output_tokens"] = ( self._opts.max_response_output_tokens ) else: del session_data["max_response_output_tokens"] # type: ignore self._queue_msg( { "type": "session.update", "session": session_data, } ) def chat_ctx_copy(self) -> llm.ChatContext: return self._remote_converstation_items.to_chat_context() async def set_chat_ctx(self, new_ctx: llm.ChatContext) -> None: """Sync the chat context with the agent's chat context. Compute the minimum number of insertions and deletions to transform the old chat context messages to the new chat context messages. """ original_ctx = self._remote_converstation_items.to_chat_context() changes = utils._compute_changes( original_ctx.messages, new_ctx.messages, key_fnc=lambda x: x.id ) logger.debug( "sync chat context", extra={ "to_delete": [msg.id for msg in changes.to_delete], "to_add": [ (prev.id if prev else None, msg.id) for prev, msg in changes.to_add ], }, ) # append an empty audio message if all new messages are text if changes.to_add and not any( isinstance(msg.content, llm.ChatAudio) for _, msg in changes.to_add ): # Patch: add an empty audio message to the chat context # to set the API in audio mode data = b"\x00\x00" * api_proto.SAMPLE_RATE _empty_audio = rtc.AudioFrame( data=data, sample_rate=api_proto.SAMPLE_RATE, num_channels=api_proto.NUM_CHANNELS, samples_per_channel=len(data) // 2, ) changes.to_add.append( ( None, llm.ChatMessage( role="user", content=llm.ChatAudio(frame=_empty_audio) ), ) ) logger.debug("added empty audio message to the chat context") _futs = [ self.conversation.item.delete(item_id=msg.id) for msg in changes.to_delete ] + [ self.conversation.item.create(msg, prev.id if prev else None) for prev, msg in changes.to_add ] # wait for all the futures to complete await asyncio.gather(*_futs) def _update_converstation_item_content( self, item_id: str, content: llm.ChatContent | list[llm.ChatContent] | None ) -> None: item = self._remote_converstation_items.get(item_id) if item is None: logger.warning( "conversation item not found, skipping update", extra={"item_id": item_id}, ) return item.content = content def _queue_msg(self, msg: api_proto.ClientEvents) -> None: self._send_ch.send_nowait(msg) @utils.log_exceptions(logger=logger) async def _main_task(self) -> None: try: headers = {"User-Agent": "LiveKit Agents"} query_params: dict[str, str] = {} base_url = self._opts.base_url if self._opts.is_azure: if self._opts.entra_token: headers["Authorization"] = f"Bearer {self._opts.entra_token}" if self._opts.api_key: headers["api-key"] = self._opts.api_key if self._opts.api_version: query_params["api-version"] = self._opts.api_version if self._opts.azure_deployment: query_params["deployment"] = self._opts.azure_deployment else: # OAI endpoint headers["Authorization"] = f"Bearer {self._opts.api_key}" headers["OpenAI-Beta"] = "realtime=v1" if self._opts.model: query_params["model"] = self._opts.model url = f"{base_url.rstrip('/')}/realtime?{urlencode(query_params)}" if url.startswith("http"): url = url.replace("http", "ws", 1) ws_conn = await self._http_session.ws_connect( url, headers=headers, ) except Exception: logger.exception("failed to connect to OpenAI API S2S") return closing = False @utils.log_exceptions(logger=logger) async def _send_task(): nonlocal closing async for msg in self._send_ch: await ws_conn.send_json(msg) closing = True await ws_conn.close() @utils.log_exceptions(logger=logger) async def _recv_task(): while True: msg = await ws_conn.receive() if msg.type in ( aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE, aiohttp.WSMsgType.CLOSING, ): if closing: return raise Exception("OpenAI S2S connection closed unexpectedly") if msg.type != aiohttp.WSMsgType.TEXT: logger.warning( "unexpected OpenAI S2S message type %s", msg.type, extra=self.logging_extra(), ) continue try: data = msg.json() event: api_proto.ServerEventType = data["type"] if event == "session.created": self._handle_session_created(data) elif event == "error": self._handle_error(data) elif event == "input_audio_buffer.speech_started": self._handle_input_audio_buffer_speech_started(data) elif event == "input_audio_buffer.speech_stopped": self._handle_input_audio_buffer_speech_stopped(data) elif event == "input_audio_buffer.committed": self._handle_input_audio_buffer_speech_committed(data) elif ( event == "conversation.item.input_audio_transcription.completed" ): self._handle_conversation_item_input_audio_transcription_completed( data ) elif event == "conversation.item.input_audio_transcription.failed": self._handle_conversation_item_input_audio_transcription_failed( data ) elif event == "conversation.item.created": self._handle_conversation_item_created(data) elif event == "conversation.item.deleted": self._handle_conversation_item_deleted(data) elif event == "conversation.item.truncated": self._handle_conversation_item_truncated(data) elif event == "response.created": self._handle_response_created(data) elif event == "response.output_item.added": self._handle_response_output_item_added(data) elif event == "response.content_part.added": self._handle_response_content_part_added(data) elif event == "response.audio.delta": self._handle_response_audio_delta(data) elif event == "response.audio_transcript.delta": self._handle_response_audio_transcript_delta(data) elif event == "response.audio.done": self._handle_response_audio_done(data) elif event == "response.audio_transcript.done": self._handle_response_audio_transcript_done(data) elif event == "response.content_part.done": self._handle_response_content_part_done(data) elif event == "response.output_item.done": self._handle_response_output_item_done(data) elif event == "response.done": self._handle_response_done(data) except Exception: logger.exception( "failed to handle OpenAI S2S message", extra={"websocket_message": msg, **self.logging_extra()}, ) tasks = [ asyncio.create_task(_send_task(), name="openai-realtime-send"), asyncio.create_task(_recv_task(), name="openai-realtime-recv"), ] try: await asyncio.gather(*tasks) finally: await utils.aio.gracefully_cancel(*tasks) def _handle_session_created( self, session_created: api_proto.ServerEvent.SessionCreated ): self._session_id = session_created["session"]["id"] def _handle_error(self, error: api_proto.ServerEvent.Error): logger.error( "OpenAI S2S error %s", error, extra=self.logging_extra(), ) def _handle_input_audio_buffer_speech_started( self, speech_started: api_proto.ServerEvent.InputAudioBufferSpeechStarted ): self.emit("input_speech_started") def _handle_input_audio_buffer_speech_stopped( self, speech_stopped: api_proto.ServerEvent.InputAudioBufferSpeechStopped ): self.emit("input_speech_stopped") def _handle_input_audio_buffer_speech_committed( self, speech_committed: api_proto.ServerEvent.InputAudioBufferCommitted ): self.emit("input_speech_committed") def _handle_conversation_item_input_audio_transcription_completed( self, transcription_completed: api_proto.ServerEvent.ConversationItemInputAudioTranscriptionCompleted, ): transcript = transcription_completed["transcript"] self.emit( "input_speech_transcription_completed", InputTranscriptionCompleted( item_id=transcription_completed["item_id"], transcript=transcript, ), ) def _handle_conversation_item_input_audio_transcription_failed( self, transcription_failed: api_proto.ServerEvent.ConversationItemInputAudioTranscriptionFailed, ): error = transcription_failed["error"] logger.error( "OAI S2S failed to transcribe input audio: %s", error["message"], extra=self.logging_extra(), ) self.emit( "input_speech_transcription_failed", InputTranscriptionFailed( item_id=transcription_failed["item_id"], message=error["message"], ), ) def _handle_conversation_item_created( self, item_created: api_proto.ServerEvent.ConversationItemCreated ): previous_item_id = item_created["previous_item_id"] item = item_created["item"] item_type = item["type"] item_id = item["id"] # Create message based on item type # Leave the content empty and fill it in later from the content parts if item_type == "message": # Handle message items (system/user/assistant) item = cast(Union[api_proto.SystemItem, api_proto.UserItem], item) role = item["role"] message = llm.ChatMessage(id=item_id, role=role) if item.get("content"): content = item["content"][0] if content["type"] in ("text", "input_text"): content = cast(api_proto.InputTextContent, content) message.content = content["text"] elif content["type"] == "input_audio" and content.get("audio"): audio_data = base64.b64decode(content["audio"]) message.content = llm.ChatAudio( frame=rtc.AudioFrame( data=audio_data, sample_rate=api_proto.SAMPLE_RATE, num_channels=api_proto.NUM_CHANNELS, samples_per_channel=len(audio_data) // 2, ) ) elif item_type == "function_call": # Handle function call items item = cast(api_proto.FunctionCallItem, item) message = llm.ChatMessage( id=item_id, role="assistant", name=item["name"], tool_call_id=item["call_id"], ) elif item_type == "function_call_output": # Handle function call output items item = cast(api_proto.FunctionCallOutputItem, item) message = llm.ChatMessage( id=item_id, role="tool", tool_call_id=item["call_id"], content=item["output"], ) else: logger.error( f"unknown conversation item type {item_type}", extra=self.logging_extra(), ) return # Insert into conversation items self._remote_converstation_items.insert_after(previous_item_id, message) if item_id in self._item_created_futs: self._item_created_futs[item_id].set_result(True) del self._item_created_futs[item_id] logger.debug("conversation item created", extra=item_created) def _handle_conversation_item_deleted( self, item_deleted: api_proto.ServerEvent.ConversationItemDeleted ): # Delete from conversation items item_id = item_deleted["item_id"] self._remote_converstation_items.delete(item_id) if item_id in self._item_deleted_futs: self._item_deleted_futs[item_id].set_result(True) del self._item_deleted_futs[item_id] logger.debug("conversation item deleted", extra=item_deleted) def _handle_conversation_item_truncated( self, item_truncated: api_proto.ServerEvent.ConversationItemTruncated ): item_id = item_truncated["item_id"] if item_id in self._item_truncated_futs: self._item_truncated_futs[item_id].set_result(True) del self._item_truncated_futs[item_id] def _handle_response_created( self, response_created: api_proto.ServerEvent.ResponseCreated ): response = response_created["response"] done_fut = self._loop.create_future() status_details = response.get("status_details") new_response = RealtimeResponse( id=response["id"], status=response["status"], status_details=status_details, output=[], usage=response.get("usage"), done_fut=done_fut, ) self._pending_responses[new_response.id] = new_response self.emit("response_created", new_response) def _handle_response_output_item_added( self, response_output_added: api_proto.ServerEvent.ResponseOutputItemAdded ): response_id = response_output_added["response_id"] response = self._pending_responses[response_id] done_fut = self._loop.create_future() item_data = response_output_added["item"] item_type: Literal["message", "function_call"] = item_data["type"] # type: ignore assert item_type in ("message", "function_call") # function_call doesn't have a role field, defaulting it to assistant item_role: api_proto.Role = item_data.get("role") or "assistant" # type: ignore new_output = RealtimeOutput( response_id=response_id, item_id=item_data["id"], output_index=response_output_added["output_index"], type=item_type, role=item_role, content=[], done_fut=done_fut, ) response.output.append(new_output) self.emit("response_output_added", new_output) def _handle_response_content_part_added( self, response_content_added: api_proto.ServerEvent.ResponseContentPartAdded ): response_id = response_content_added["response_id"] response = self._pending_responses[response_id] output_index = response_content_added["output_index"] output = response.output[output_index] content_type = response_content_added["part"]["type"] text_ch = utils.aio.Chan[str]() audio_ch = utils.aio.Chan[rtc.AudioFrame]() new_content = RealtimeContent( response_id=response_id, item_id=response_content_added["item_id"], output_index=output_index, content_index=response_content_added["content_index"], text="", audio=[], text_stream=text_ch, audio_stream=audio_ch, tool_calls=[], content_type=content_type, ) output.content.append(new_content) self.emit("response_content_added", new_content) def _handle_response_audio_delta( self, response_audio_delta: api_proto.ServerEvent.ResponseAudioDelta ): content = self._get_content(response_audio_delta) data = base64.b64decode(response_audio_delta["delta"]) audio = rtc.AudioFrame( data=data, sample_rate=api_proto.SAMPLE_RATE, num_channels=api_proto.NUM_CHANNELS, samples_per_channel=len(data) // 2, ) content.audio.append(audio) assert isinstance(content.audio_stream, utils.aio.Chan) content.audio_stream.send_nowait(audio) def _handle_response_audio_transcript_delta( self, response_audio_transcript_delta: api_proto.ServerEvent.ResponseAudioTranscriptDelta, ): content = self._get_content(response_audio_transcript_delta) transcript = response_audio_transcript_delta["delta"] content.text += transcript assert isinstance(content.text_stream, utils.aio.Chan) content.text_stream.send_nowait(transcript) def _handle_response_audio_done( self, response_audio_done: api_proto.ServerEvent.ResponseAudioDone ): content = self._get_content(response_audio_done) assert isinstance(content.audio_stream, utils.aio.Chan) content.audio_stream.close() def _handle_response_audio_transcript_done( self, response_audio_transcript_done: api_proto.ServerEvent.ResponseAudioTranscriptDone, ): content = self._get_content(response_audio_transcript_done) assert isinstance(content.text_stream, utils.aio.Chan) content.text_stream.close() def _handle_response_content_part_done( self, response_content_done: api_proto.ServerEvent.ResponseContentPartDone ): content = self._get_content(response_content_done) self.emit("response_content_done", content) def _handle_response_output_item_done( self, response_output_done: api_proto.ServerEvent.ResponseOutputItemDone ): response_id = response_output_done["response_id"] response = self._pending_responses[response_id] output_index = response_output_done["output_index"] output = response.output[output_index] if output.type == "function_call": if self._fnc_ctx is None: logger.error( "function call received but no fnc_ctx is available", extra=self.logging_extra(), ) return # parse the arguments and call the function inside the fnc_ctx item = response_output_done["item"] assert item["type"] == "function_call" fnc_call_info = _oai_api.create_ai_function_info( self._fnc_ctx, item["call_id"], item["name"], item["arguments"], ) msg = self._remote_converstation_items.get(output.item_id) if msg is not None: # update the content of the message assert msg.tool_call_id == item["call_id"] assert msg.role == "assistant" msg.name = item["name"] msg.tool_calls = [fnc_call_info] self.emit("function_calls_collected", [fnc_call_info]) self._fnc_tasks.create_task( self._run_fnc_task(fnc_call_info, output.item_id) ) output.done_fut.set_result(None) self.emit("response_output_done", output) def _handle_response_done(self, response_done: api_proto.ServerEvent.ResponseDone): response_data = response_done["response"] response_id = response_data["id"] response = self._pending_responses[response_id] response.done_fut.set_result(None) response.status = response_data["status"] response.status_details = response_data.get("status_details") response.usage = response_data.get("usage") if response.status == "failed": assert response.status_details is not None error = response.status_details.get("error") code: str | None = None message: str | None = None if error is not None: code = error.get("code") # type: ignore message = error.get("message") # type: ignore logger.error( "response generation failed", extra={"code": code, "error": message, **self.logging_extra()}, ) elif response.status == "incomplete": assert response.status_details is not None reason = response.status_details.get("reason") logger.warning( "response generation incomplete", extra={"reason": reason, **self.logging_extra()}, ) self.emit("response_done", response) def _get_content(self, ptr: _ContentPtr) -> RealtimeContent: response = self._pending_responses[ptr["response_id"]] output = response.output[ptr["output_index"]] content = output.content[ptr["content_index"]] return content @utils.log_exceptions(logger=logger) async def _run_fnc_task(self, fnc_call_info: llm.FunctionCallInfo, item_id: str): logger.debug( "executing ai function", extra={ "function": fnc_call_info.function_info.name, }, ) called_fnc = fnc_call_info.execute() await called_fnc.task tool_call = llm.ChatMessage.create_tool_from_called_function(called_fnc) if called_fnc.result is not None: create_fut = self.conversation.item.create( tool_call, previous_item_id=item_id, ) self.response.create() await create_fut # update the message with the tool call result msg = self._remote_converstation_items.get(tool_call.id) if msg is not None: assert msg.tool_call_id == tool_call.tool_call_id assert msg.role == "tool" msg.name = tool_call.name msg.content = tool_call.content msg.tool_exception = tool_call.tool_exception self.emit("function_calls_finished", [called_fnc]) def logging_extra(self) -> dict: return {"session_id": self._session_id}
Ancestors
- EventEmitter
- typing.Generic
Class variables
var Conversation
var ConversationItem
var InputAudioBuffer
var Response
Instance variables
prop conversation : Conversation
-
Expand source code
@property def conversation(self) -> Conversation: return RealtimeSession.Conversation(self)
prop fnc_ctx : llm.FunctionContext | None
-
Expand source code
@property def fnc_ctx(self) -> llm.FunctionContext | None: return self._fnc_ctx
prop input_audio_buffer : InputAudioBuffer
-
Expand source code
@property def input_audio_buffer(self) -> InputAudioBuffer: return RealtimeSession.InputAudioBuffer(self)
prop response : Response
-
Expand source code
@property def response(self) -> Response: return RealtimeSession.Response(self)
Methods
async def aclose(self) ‑> None
def chat_ctx_copy(self) ‑> ChatContext
def logging_extra(self) ‑> dict
def session_update(self, *, modalities: list[api_proto.Modality] | None = None, instructions: str | None = None, voice: api_proto.Voice | None = None, input_audio_format: api_proto.AudioFormat | None = None, output_audio_format: api_proto.AudioFormat | None = None, input_audio_transcription: InputTranscriptionOptions | None = None, turn_detection: ServerVadOptions | None = None, tool_choice: api_proto.ToolChoice | None = None, temperature: float | None = None, max_response_output_tokens: "int | Literal['inf'] | None" = None) ‑> None
async def set_chat_ctx(self, new_ctx: llm.ChatContext) ‑> None
-
Sync the chat context with the agent's chat context.
Compute the minimum number of insertions and deletions to transform the old chat context messages to the new chat context messages.
Inherited members
class RealtimeToolCall (name: str, arguments: str, tool_call_id: str)
-
RealtimeToolCall(name: 'str', arguments: 'str', tool_call_id: 'str')
Expand source code
@dataclass class RealtimeToolCall: name: str """name of the function""" arguments: str """accumulated arguments""" tool_call_id: str """id of the tool call"""
Class variables
var arguments : str
-
accumulated arguments
var name : str
-
name of the function
var tool_call_id : str
-
id of the tool call
class ServerVadOptions (threshold: float, prefix_padding_ms: int, silence_duration_ms: int)
-
ServerVadOptions(threshold: 'float', prefix_padding_ms: 'int', silence_duration_ms: 'int')
Expand source code
@dataclass class ServerVadOptions: threshold: float prefix_padding_ms: int silence_duration_ms: int
Class variables
var prefix_padding_ms : int
var silence_duration_ms : int
var threshold : float