Module livekit.plugins.openai.realtime.realtime_model
Classes
class InputTranscriptionCompleted (item_id: str, transcript: str)
-
Expand source code
@dataclass class InputTranscriptionCompleted: item_id: str """id of the item""" transcript: str """transcript of the input audio"""
InputTranscriptionCompleted(item_id: 'str', transcript: 'str')
Class variables
var item_id : str
-
id of the item
var transcript : str
-
transcript of the input audio
class InputTranscriptionFailed (item_id: str, message: str)
-
Expand source code
@dataclass class InputTranscriptionFailed: item_id: str """id of the item""" message: str """error message"""
InputTranscriptionFailed(item_id: 'str', message: 'str')
Class variables
var item_id : str
-
id of the item
var message : str
-
error message
class InputTranscriptionOptions (model: api_proto.InputTranscriptionModel | str)
-
Expand source code
@dataclass class InputTranscriptionOptions: model: api_proto.InputTranscriptionModel | str
InputTranscriptionOptions(model: 'api_proto.InputTranscriptionModel | str')
Class variables
var model : Literal['whisper-1'] | str
class RealtimeContent (response_id: str,
item_id: str,
output_index: int,
content_index: int,
text: str,
audio: list[rtc.AudioFrame],
text_stream: AsyncIterable[str],
audio_stream: AsyncIterable[rtc.AudioFrame],
tool_calls: list[RealtimeToolCall],
content_type: api_proto.Modality)-
Expand source code
@dataclass class RealtimeContent: response_id: str """id of the response""" item_id: str """id of the item""" output_index: int """index of the output""" content_index: int """index of the content""" text: str """accumulated text content""" audio: list[rtc.AudioFrame] """accumulated audio content""" text_stream: AsyncIterable[str] """stream of text content""" audio_stream: AsyncIterable[rtc.AudioFrame] """stream of audio content""" tool_calls: list[RealtimeToolCall] """pending tool calls""" content_type: api_proto.Modality """type of the content"""
RealtimeContent(response_id: 'str', item_id: 'str', output_index: 'int', content_index: 'int', text: 'str', audio: 'list[rtc.AudioFrame]', text_stream: 'AsyncIterable[str]', audio_stream: 'AsyncIterable[rtc.AudioFrame]', tool_calls: 'list[RealtimeToolCall]', content_type: 'api_proto.Modality')
Class variables
var audio : list[AudioFrame]
-
accumulated audio content
var audio_stream : AsyncIterable[AudioFrame]
-
stream of audio content
var content_index : int
-
index of the content
var content_type : Literal['text', 'audio']
-
type of the content
var item_id : str
-
id of the item
var output_index : int
-
index of the output
var response_id : str
-
id of the response
var text : str
-
accumulated text content
var text_stream : AsyncIterable[str]
-
stream of text content
var tool_calls : list[RealtimeToolCall]
-
pending tool calls
class RealtimeError (event_id: str, type: str, message: str, code: Optional[str], param: Optional[str])
-
Expand source code
@dataclass class RealtimeError: event_id: str type: str message: str code: Optional[str] param: Optional[str]
RealtimeError(event_id: 'str', type: 'str', message: 'str', code: 'Optional[str]', param: 'Optional[str]')
Class variables
var code : str | None
var event_id : str
var message : str
var param : str | None
var type : str
class RealtimeModel (*,
instructions: str = '',
modalities: list[api_proto.Modality] = ['text', 'audio'],
model: api_proto.OpenAIModel | str = 'gpt-4o-realtime-preview',
voice: api_proto.Voice = 'alloy',
input_audio_format: api_proto.AudioFormat = 'pcm16',
output_audio_format: api_proto.AudioFormat = 'pcm16',
input_audio_transcription: InputTranscriptionOptions = InputTranscriptionOptions(model='whisper-1'),
turn_detection: ServerVadOptions = ServerVadOptions(threshold=0.5, prefix_padding_ms=300, silence_duration_ms=500),
tool_choice: api_proto.ToolChoice = 'auto',
temperature: float = 0.8,
max_response_output_tokens: "int | Literal['inf']" = 'inf',
base_url: str | None = None,
http_session: aiohttp.ClientSession | None = None,
loop: asyncio.AbstractEventLoop | None = None,
azure_deployment: str | None = None,
entra_token: str | None = None,
api_key: str | None = None,
api_version: str | None = None)-
Expand source code
class RealtimeModel: @overload def __init__( self, *, instructions: str = "", modalities: list[api_proto.Modality] = ["text", "audio"], model: api_proto.OpenAIModel | str = api_proto.DefaultOpenAIModel, voice: api_proto.Voice = "alloy", input_audio_format: api_proto.AudioFormat = "pcm16", output_audio_format: api_proto.AudioFormat = "pcm16", input_audio_transcription: InputTranscriptionOptions = DEFAULT_INPUT_AUDIO_TRANSCRIPTION, turn_detection: ServerVadOptions = DEFAULT_SERVER_VAD_OPTIONS, tool_choice: api_proto.ToolChoice = "auto", temperature: float = 0.8, max_response_output_tokens: int | Literal["inf"] = "inf", api_key: str | None = None, base_url: str | None = None, http_session: aiohttp.ClientSession | None = None, loop: asyncio.AbstractEventLoop | None = None, ) -> None: ... @overload def __init__( self, *, azure_deployment: str | None = None, entra_token: str | None = None, api_key: str | None = None, api_version: str | None = None, base_url: str | None = None, instructions: str = "", modalities: list[api_proto.Modality] = ["text", "audio"], voice: api_proto.Voice = "alloy", input_audio_format: api_proto.AudioFormat = "pcm16", output_audio_format: api_proto.AudioFormat = "pcm16", input_audio_transcription: InputTranscriptionOptions = DEFAULT_INPUT_AUDIO_TRANSCRIPTION, turn_detection: ServerVadOptions = DEFAULT_SERVER_VAD_OPTIONS, tool_choice: api_proto.ToolChoice = "auto", temperature: float = 0.8, max_response_output_tokens: int | Literal["inf"] = "inf", http_session: aiohttp.ClientSession | None = None, loop: asyncio.AbstractEventLoop | None = None, ) -> None: ... def __init__( self, *, instructions: str = "", modalities: list[api_proto.Modality] = ["text", "audio"], model: api_proto.OpenAIModel | str = api_proto.DefaultOpenAIModel, voice: api_proto.Voice = "alloy", input_audio_format: api_proto.AudioFormat = "pcm16", output_audio_format: api_proto.AudioFormat = "pcm16", input_audio_transcription: InputTranscriptionOptions = DEFAULT_INPUT_AUDIO_TRANSCRIPTION, turn_detection: ServerVadOptions = DEFAULT_SERVER_VAD_OPTIONS, tool_choice: api_proto.ToolChoice = "auto", temperature: float = 0.8, max_response_output_tokens: int | Literal["inf"] = "inf", base_url: str | None = None, http_session: aiohttp.ClientSession | None = None, loop: asyncio.AbstractEventLoop | None = None, # azure specific parameters azure_deployment: str | None = None, entra_token: str | None = None, api_key: str | None = None, api_version: str | None = None, ) -> None: """ Initializes a RealtimeClient instance for interacting with OpenAI's Realtime API. Args: instructions (str, optional): Initial system instructions for the model. Defaults to "". api_key (str or None, optional): OpenAI API key. If None, will attempt to read from the environment variable OPENAI_API_KEY modalities (list[api_proto.Modality], optional): Modalities to use, such as ["text", "audio"]. Defaults to ["text", "audio"]. model (str or None, optional): The name of the model to use. Defaults to "gpt-4o-realtime-preview-2024-10-01". voice (api_proto.Voice, optional): Voice setting for audio outputs. Defaults to "alloy". input_audio_format (api_proto.AudioFormat, optional): Format of input audio data. Defaults to "pcm16". output_audio_format (api_proto.AudioFormat, optional): Format of output audio data. Defaults to "pcm16". input_audio_transcription (InputTranscriptionOptions, optional): Options for transcribing input audio. Defaults to DEFAULT_INPUT_AUDIO_TRANSCRIPTION. turn_detection (ServerVadOptions, optional): Options for server-based voice activity detection (VAD). Defaults to DEFAULT_SERVER_VAD_OPTIONS. tool_choice (api_proto.ToolChoice, optional): Tool choice for the model, such as "auto". Defaults to "auto". temperature (float, optional): Sampling temperature for response generation. Defaults to 0.8. max_response_output_tokens (int or Literal["inf"], optional): Maximum number of tokens in the response. Defaults to "inf". base_url (str or None, optional): Base URL for the API endpoint. If None, defaults to OpenAI's default API URL. http_session (aiohttp.ClientSession or None, optional): Async HTTP session to use for requests. If None, a new session will be created. loop (asyncio.AbstractEventLoop or None, optional): Event loop to use for async operations. If None, the current event loop is used. Raises: ValueError: If the API key is not provided and cannot be found in environment variables. """ super().__init__() self._base_url = base_url is_azure = ( api_version is not None or entra_token is not None or azure_deployment is not None ) api_key = api_key or os.environ.get("OPENAI_API_KEY") if api_key is None and not is_azure: raise ValueError( "OpenAI API key is required, either using the argument or by setting the OPENAI_API_KEY environmental variable" ) if not base_url: base_url = os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1") self._default_opts = _ModelOptions( model=model, modalities=modalities, instructions=instructions, voice=voice, input_audio_format=input_audio_format, output_audio_format=output_audio_format, input_audio_transcription=input_audio_transcription, turn_detection=turn_detection, temperature=temperature, tool_choice=tool_choice, max_response_output_tokens=max_response_output_tokens, api_key=api_key, base_url=base_url, azure_deployment=azure_deployment, entra_token=entra_token, is_azure=is_azure, api_version=api_version, ) self._loop = loop or asyncio.get_event_loop() self._rt_sessions: list[RealtimeSession] = [] self._http_session = http_session @classmethod def with_azure( cls, *, azure_deployment: str, azure_endpoint: str | None = None, api_version: str | None = None, api_key: str | None = None, entra_token: str | None = None, base_url: str | None = None, instructions: str = "", modalities: list[api_proto.Modality] = ["text", "audio"], voice: api_proto.Voice = "alloy", input_audio_format: api_proto.AudioFormat = "pcm16", output_audio_format: api_proto.AudioFormat = "pcm16", input_audio_transcription: InputTranscriptionOptions = DEFAULT_INPUT_AUDIO_TRANSCRIPTION, turn_detection: ServerVadOptions = DEFAULT_SERVER_VAD_OPTIONS, tool_choice: api_proto.ToolChoice = "auto", temperature: float = 0.8, max_response_output_tokens: int | Literal["inf"] = "inf", http_session: aiohttp.ClientSession | None = None, loop: asyncio.AbstractEventLoop | None = None, ): """ Create a RealtimeClient instance configured for Azure OpenAI Service. Args: azure_deployment (str): The name of your Azure OpenAI deployment. azure_endpoint (str or None, optional): The endpoint URL for your Azure OpenAI resource. If None, will attempt to read from the environment variable AZURE_OPENAI_ENDPOINT. api_version (str or None, optional): API version to use with Azure OpenAI Service. If None, will attempt to read from the environment variable OPENAI_API_VERSION. api_key (str or None, optional): Azure OpenAI API key. If None, will attempt to read from the environment variable AZURE_OPENAI_API_KEY. entra_token (str or None, optional): Azure Entra authentication token. Required if not using API key authentication. base_url (str or None, optional): Base URL for the API endpoint. If None, constructed from the azure_endpoint. instructions (str, optional): Initial system instructions for the model. Defaults to "". modalities (list[api_proto.Modality], optional): Modalities to use, such as ["text", "audio"]. Defaults to ["text", "audio"]. voice (api_proto.Voice, optional): Voice setting for audio outputs. Defaults to "alloy". input_audio_format (api_proto.AudioFormat, optional): Format of input audio data. Defaults to "pcm16". output_audio_format (api_proto.AudioFormat, optional): Format of output audio data. Defaults to "pcm16". input_audio_transcription (InputTranscriptionOptions, optional): Options for transcribing input audio. Defaults to DEFAULT_INPUT_AUDIO_TRANSCRIPTION. turn_detection (ServerVadOptions, optional): Options for server-based voice activity detection (VAD). Defaults to DEFAULT_SERVER_VAD_OPTIONS. tool_choice (api_proto.ToolChoice, optional): Tool choice for the model, such as "auto". Defaults to "auto". temperature (float, optional): Sampling temperature for response generation. Defaults to 0.8. max_response_output_tokens (int or Literal["inf"], optional): Maximum number of tokens in the response. Defaults to "inf". http_session (aiohttp.ClientSession or None, optional): Async HTTP session to use for requests. If None, a new session will be created. loop (asyncio.AbstractEventLoop or None, optional): Event loop to use for async operations. If None, the current event loop is used. Returns: RealtimeClient: An instance of RealtimeClient configured for Azure OpenAI Service. Raises: ValueError: If required Azure parameters are missing or invalid. """ api_key = api_key or os.getenv("AZURE_OPENAI_API_KEY") if api_key is None and entra_token is None: raise ValueError( "Missing credentials. Please pass one of `api_key`, `entra_token`, or the `AZURE_OPENAI_API_KEY` environment variable." ) api_version = api_version or os.getenv("OPENAI_API_VERSION") if api_version is None: raise ValueError( "Must provide either the `api_version` argument or the `OPENAI_API_VERSION` environment variable" ) if base_url is None: azure_endpoint = azure_endpoint or os.getenv("AZURE_OPENAI_ENDPOINT") if azure_endpoint is None: raise ValueError( "Missing Azure endpoint. Please pass the `azure_endpoint` parameter or set the `AZURE_OPENAI_ENDPOINT` environment variable." ) base_url = f"{azure_endpoint.rstrip('/')}/openai" elif azure_endpoint is not None: raise ValueError("base_url and azure_endpoint are mutually exclusive") return cls( instructions=instructions, modalities=modalities, voice=voice, input_audio_format=input_audio_format, output_audio_format=output_audio_format, input_audio_transcription=input_audio_transcription, turn_detection=turn_detection, tool_choice=tool_choice, temperature=temperature, max_response_output_tokens=max_response_output_tokens, api_key=api_key, http_session=http_session, loop=loop, azure_deployment=azure_deployment, api_version=api_version, entra_token=entra_token, base_url=base_url, ) def _ensure_session(self) -> aiohttp.ClientSession: if not self._http_session: self._http_session = utils.http_context.http_session() return self._http_session @property def sessions(self) -> list[RealtimeSession]: return self._rt_sessions def session( self, *, chat_ctx: llm.ChatContext | None = None, fnc_ctx: llm.FunctionContext | None = None, modalities: list[api_proto.Modality] | None = None, instructions: str | None = None, voice: api_proto.Voice | None = None, input_audio_format: api_proto.AudioFormat | None = None, output_audio_format: api_proto.AudioFormat | None = None, tool_choice: api_proto.ToolChoice | None = None, input_audio_transcription: InputTranscriptionOptions | None = None, turn_detection: ServerVadOptions | None = None, temperature: float | None = None, max_response_output_tokens: int | Literal["inf"] | None = None, ) -> RealtimeSession: opts = deepcopy(self._default_opts) if modalities is not None: opts.modalities = modalities if instructions is not None: opts.instructions = instructions if voice is not None: opts.voice = voice if input_audio_format is not None: opts.input_audio_format = input_audio_format if output_audio_format is not None: opts.output_audio_format = output_audio_format if tool_choice is not None: opts.tool_choice = tool_choice if input_audio_transcription is not None: opts.input_audio_transcription if turn_detection is not None: opts.turn_detection = turn_detection if temperature is not None: opts.temperature = temperature if max_response_output_tokens is not None: opts.max_response_output_tokens = max_response_output_tokens new_session = RealtimeSession( chat_ctx=chat_ctx or llm.ChatContext(), fnc_ctx=fnc_ctx, opts=opts, http_session=self._ensure_session(), loop=self._loop, ) self._rt_sessions.append(new_session) return new_session async def aclose(self) -> None: for session in self._rt_sessions: await session.aclose()
Initializes a RealtimeClient instance for interacting with OpenAI's Realtime API.
Args
instructions
:str
, optional- Initial system instructions for the model. Defaults to "".
api_key
:str
orNone
, optional- OpenAI API key. If None, will attempt to read from the environment variable OPENAI_API_KEY
modalities
:list[api_proto.Modality]
, optional- Modalities to use, such as ["text", "audio"]. Defaults to ["text", "audio"].
model
:str
orNone
, optional- The name of the model to use. Defaults to "gpt-4o-realtime-preview-2024-10-01".
voice
:api_proto.Voice
, optional- Voice setting for audio outputs. Defaults to "alloy".
input_audio_format
:api_proto.AudioFormat
, optional- Format of input audio data. Defaults to "pcm16".
output_audio_format
:api_proto.AudioFormat
, optional- Format of output audio data. Defaults to "pcm16".
input_audio_transcription
:InputTranscriptionOptions
, optional- Options for transcribing input audio. Defaults to DEFAULT_INPUT_AUDIO_TRANSCRIPTION.
turn_detection
:ServerVadOptions
, optional- Options for server-based voice activity detection (VAD). Defaults to DEFAULT_SERVER_VAD_OPTIONS.
tool_choice
:api_proto.ToolChoice
, optional- Tool choice for the model, such as "auto". Defaults to "auto".
temperature
:float
, optional- Sampling temperature for response generation. Defaults to 0.8.
- max_response_output_tokens (int or Literal["inf"], optional): Maximum number of tokens in the response. Defaults to "inf".
base_url
:str
orNone
, optional- Base URL for the API endpoint. If None, defaults to OpenAI's default API URL.
http_session
:aiohttp.ClientSession
orNone
, optional- Async HTTP session to use for requests. If None, a new session will be created.
loop
:asyncio.AbstractEventLoop
orNone
, optional- Event loop to use for async operations. If None, the current event loop is used.
Raises
ValueError
- If the API key is not provided and cannot be found in environment variables.
Static methods
def with_azure(*,
azure_deployment: str,
azure_endpoint: str | None = None,
api_version: str | None = None,
api_key: str | None = None,
entra_token: str | None = None,
base_url: str | None = None,
instructions: str = '',
modalities: list[api_proto.Modality] = ['text', 'audio'],
voice: api_proto.Voice = 'alloy',
input_audio_format: api_proto.AudioFormat = 'pcm16',
output_audio_format: api_proto.AudioFormat = 'pcm16',
input_audio_transcription: InputTranscriptionOptions = InputTranscriptionOptions(model='whisper-1'),
turn_detection: ServerVadOptions = ServerVadOptions(threshold=0.5, prefix_padding_ms=300, silence_duration_ms=500),
tool_choice: api_proto.ToolChoice = 'auto',
temperature: float = 0.8,
max_response_output_tokens: "int | Literal['inf']" = 'inf',
http_session: aiohttp.ClientSession | None = None,
loop: asyncio.AbstractEventLoop | None = None)-
Create a RealtimeClient instance configured for Azure OpenAI Service.
Args
azure_deployment
:str
- The name of your Azure OpenAI deployment.
azure_endpoint
:str
orNone
, optional- The endpoint URL for your Azure OpenAI resource. If None, will attempt to read from the environment variable AZURE_OPENAI_ENDPOINT.
api_version
:str
orNone
, optional- API version to use with Azure OpenAI Service. If None, will attempt to read from the environment variable OPENAI_API_VERSION.
api_key
:str
orNone
, optional- Azure OpenAI API key. If None, will attempt to read from the environment variable AZURE_OPENAI_API_KEY.
entra_token
:str
orNone
, optional- Azure Entra authentication token. Required if not using API key authentication.
base_url
:str
orNone
, optional- Base URL for the API endpoint. If None, constructed from the azure_endpoint.
instructions
:str
, optional- Initial system instructions for the model. Defaults to "".
modalities
:list[api_proto.Modality]
, optional- Modalities to use, such as ["text", "audio"]. Defaults to ["text", "audio"].
voice
:api_proto.Voice
, optional- Voice setting for audio outputs. Defaults to "alloy".
input_audio_format
:api_proto.AudioFormat
, optional- Format of input audio data. Defaults to "pcm16".
output_audio_format
:api_proto.AudioFormat
, optional- Format of output audio data. Defaults to "pcm16".
input_audio_transcription
:InputTranscriptionOptions
, optional- Options for transcribing input audio. Defaults to DEFAULT_INPUT_AUDIO_TRANSCRIPTION.
turn_detection
:ServerVadOptions
, optional- Options for server-based voice activity detection (VAD). Defaults to DEFAULT_SERVER_VAD_OPTIONS.
tool_choice
:api_proto.ToolChoice
, optional- Tool choice for the model, such as "auto". Defaults to "auto".
temperature
:float
, optional- Sampling temperature for response generation. Defaults to 0.8.
- max_response_output_tokens (int or Literal["inf"], optional): Maximum number of tokens in the response. Defaults to "inf".
http_session
:aiohttp.ClientSession
orNone
, optional- Async HTTP session to use for requests. If None, a new session will be created.
loop
:asyncio.AbstractEventLoop
orNone
, optional- Event loop to use for async operations. If None, the current event loop is used.
Returns
RealtimeClient
- An instance of RealtimeClient configured for Azure OpenAI Service.
Raises
ValueError
- If required Azure parameters are missing or invalid.
Instance variables
prop sessions : list[RealtimeSession]
-
Expand source code
@property def sessions(self) -> list[RealtimeSession]: return self._rt_sessions
Methods
async def aclose(self) ‑> None
-
Expand source code
async def aclose(self) -> None: for session in self._rt_sessions: await session.aclose()
def session(self,
*,
chat_ctx: llm.ChatContext | None = None,
fnc_ctx: llm.FunctionContext | None = None,
modalities: list[api_proto.Modality] | None = None,
instructions: str | None = None,
voice: api_proto.Voice | None = None,
input_audio_format: api_proto.AudioFormat | None = None,
output_audio_format: api_proto.AudioFormat | None = None,
tool_choice: api_proto.ToolChoice | None = None,
input_audio_transcription: InputTranscriptionOptions | None = None,
turn_detection: ServerVadOptions | None = None,
temperature: float | None = None,
max_response_output_tokens: "int | Literal['inf'] | None" = None) ‑> RealtimeSession-
Expand source code
def session( self, *, chat_ctx: llm.ChatContext | None = None, fnc_ctx: llm.FunctionContext | None = None, modalities: list[api_proto.Modality] | None = None, instructions: str | None = None, voice: api_proto.Voice | None = None, input_audio_format: api_proto.AudioFormat | None = None, output_audio_format: api_proto.AudioFormat | None = None, tool_choice: api_proto.ToolChoice | None = None, input_audio_transcription: InputTranscriptionOptions | None = None, turn_detection: ServerVadOptions | None = None, temperature: float | None = None, max_response_output_tokens: int | Literal["inf"] | None = None, ) -> RealtimeSession: opts = deepcopy(self._default_opts) if modalities is not None: opts.modalities = modalities if instructions is not None: opts.instructions = instructions if voice is not None: opts.voice = voice if input_audio_format is not None: opts.input_audio_format = input_audio_format if output_audio_format is not None: opts.output_audio_format = output_audio_format if tool_choice is not None: opts.tool_choice = tool_choice if input_audio_transcription is not None: opts.input_audio_transcription if turn_detection is not None: opts.turn_detection = turn_detection if temperature is not None: opts.temperature = temperature if max_response_output_tokens is not None: opts.max_response_output_tokens = max_response_output_tokens new_session = RealtimeSession( chat_ctx=chat_ctx or llm.ChatContext(), fnc_ctx=fnc_ctx, opts=opts, http_session=self._ensure_session(), loop=self._loop, ) self._rt_sessions.append(new_session) return new_session
class RealtimeOutput (response_id: str,
item_id: str,
output_index: int,
role: api_proto.Role,
type: "Literal['message', 'function_call']",
content: list[RealtimeContent],
done_fut: asyncio.Future[None])-
Expand source code
@dataclass class RealtimeOutput: response_id: str """id of the response""" item_id: str """id of the item""" output_index: int """index of the output""" role: api_proto.Role """role of the message""" type: Literal["message", "function_call"] """type of the output""" content: list[RealtimeContent] """list of content""" done_fut: asyncio.Future[None] """future that will be set when the output is completed"""
RealtimeOutput(response_id: 'str', item_id: 'str', output_index: 'int', role: 'api_proto.Role', type: "Literal['message', 'function_call']", content: 'list[RealtimeContent]', done_fut: 'asyncio.Future[None]')
Class variables
var content : list[RealtimeContent]
-
list of content
var done_fut : _asyncio.Future[None]
-
future that will be set when the output is completed
var item_id : str
-
id of the item
var output_index : int
-
index of the output
var response_id : str
-
id of the response
var role : Literal['system', 'assistant', 'user', 'tool']
-
role of the message
var type : Literal['message', 'function_call']
-
type of the output
class RealtimeResponse (id: str,
status: api_proto.ResponseStatus,
status_details: api_proto.ResponseStatusDetails | None,
output: list[RealtimeOutput],
usage: api_proto.Usage | None,
done_fut: asyncio.Future[None],
_created_timestamp: float)-
Expand source code
@dataclass class RealtimeResponse: id: str """id of the message""" status: api_proto.ResponseStatus """status of the response""" status_details: api_proto.ResponseStatusDetails | None """details of the status (only with "incomplete, cancelled and failed")""" output: list[RealtimeOutput] """list of outputs""" usage: api_proto.Usage | None """usage of the response""" done_fut: asyncio.Future[None] """future that will be set when the response is completed""" _created_timestamp: float """timestamp when the response was created""" _first_token_timestamp: float | None = None """timestamp when the first token was received"""
RealtimeResponse(id: 'str', status: 'api_proto.ResponseStatus', status_details: 'api_proto.ResponseStatusDetails | None', output: 'list[RealtimeOutput]', usage: 'api_proto.Usage | None', done_fut: 'asyncio.Future[None]', _created_timestamp: 'float', _first_token_timestamp: 'float | None' = None)
Class variables
var done_fut : _asyncio.Future[None]
-
future that will be set when the response is completed
var id : str
-
id of the message
var output : list[RealtimeOutput]
-
list of outputs
var status : Literal['in_progress', 'completed', 'incomplete', 'cancelled', 'failed']
-
status of the response
var status_details : CancelledStatusDetails | IncompleteStatusDetails | FailedStatusDetails | None
-
details of the status (only with "incomplete, cancelled and failed")
var usage : Usage | None
-
usage of the response
class RealtimeSession (*,
opts: _ModelOptions,
http_session: aiohttp.ClientSession,
chat_ctx: llm.ChatContext,
fnc_ctx: llm.FunctionContext | None,
loop: asyncio.AbstractEventLoop)-
Expand source code
class RealtimeSession(utils.EventEmitter[EventTypes]): class InputAudioBuffer: def __init__(self, sess: RealtimeSession) -> None: self._sess = sess def append(self, frame: rtc.AudioFrame) -> None: self._sess._queue_msg( { "type": "input_audio_buffer.append", "audio": base64.b64encode(frame.data).decode("utf-8"), } ) def clear(self) -> None: self._sess._queue_msg({"type": "input_audio_buffer.clear"}) def commit(self) -> None: self._sess._queue_msg({"type": "input_audio_buffer.commit"}) class ConversationItem: def __init__(self, sess: RealtimeSession) -> None: self._sess = sess def create( self, message: llm.ChatMessage, previous_item_id: str | None = None ) -> asyncio.Future[bool]: fut = asyncio.Future[bool]() message_content = message.content tool_call_id = message.tool_call_id event: api_proto.ClientEvent.ConversationItemCreate | None = None if tool_call_id: if message.role == "tool": # function_call_output assert isinstance(message_content, str) event = { "type": "conversation.item.create", "previous_item_id": previous_item_id, "item": { "id": message.id, "type": "function_call_output", "call_id": tool_call_id, "output": message_content, }, } else: # function_call if not message.tool_calls or message.name is None: logger.warning( "function call message has no name or tool calls: %s", message, extra=self._sess.logging_extra(), ) fut.set_result(False) return fut if len(message.tool_calls) > 1: logger.warning( "function call message has multiple tool calls, " "only the first one will be used", extra=self._sess.logging_extra(), ) event = { "type": "conversation.item.create", "previous_item_id": previous_item_id, "item": { "id": message.id, "type": "function_call", "call_id": tool_call_id, "name": message.name, "arguments": message.tool_calls[0].raw_arguments, }, } else: if message_content is None: logger.warning( "message content is None, skipping: %s", message, extra=self._sess.logging_extra(), ) fut.set_result(False) return fut if not isinstance(message_content, list): message_content = [message_content] if message.role == "user": user_contents: list[ api_proto.InputTextContent | api_proto.InputAudioContent ] = [] for cnt in message_content: if isinstance(cnt, str): user_contents.append( { "type": "input_text", "text": cnt, } ) elif isinstance(cnt, llm.ChatAudio): user_contents.append( { "type": "input_audio", "audio": base64.b64encode( utils.merge_frames(cnt.frame).data ).decode("utf-8"), } ) event = { "type": "conversation.item.create", "previous_item_id": previous_item_id, "item": { "id": message.id, "type": "message", "role": "user", "content": user_contents, }, } elif message.role == "assistant": assistant_contents: list[api_proto.TextContent] = [] for cnt in message_content: if isinstance(cnt, str): assistant_contents.append( { "type": "text", "text": cnt, } ) elif isinstance(cnt, llm.ChatAudio): logger.warning( "audio content in assistant message is not supported" ) event = { "type": "conversation.item.create", "previous_item_id": previous_item_id, "item": { "id": message.id, "type": "message", "role": "assistant", "content": assistant_contents, }, } elif message.role == "system": system_contents: list[api_proto.InputTextContent] = [] for cnt in message_content: if isinstance(cnt, str): system_contents.append({"type": "input_text", "text": cnt}) elif isinstance(cnt, llm.ChatAudio): logger.warning( "audio content in system message is not supported" ) event = { "type": "conversation.item.create", "previous_item_id": previous_item_id, "item": { "id": message.id, "type": "message", "role": "system", "content": system_contents, }, } if event is None: logger.warning( "chat message is not supported inside the realtime API %s", message, extra=self._sess.logging_extra(), ) fut.set_result(False) return fut self._sess._item_created_futs[message.id] = fut self._sess._queue_msg(event) return fut def truncate( self, *, item_id: str, content_index: int, audio_end_ms: int ) -> asyncio.Future[bool]: fut = asyncio.Future[bool]() self._sess._item_truncated_futs[item_id] = fut self._sess._queue_msg( { "type": "conversation.item.truncate", "item_id": item_id, "content_index": content_index, "audio_end_ms": audio_end_ms, } ) return fut def delete(self, *, item_id: str) -> asyncio.Future[bool]: fut = asyncio.Future[bool]() self._sess._item_deleted_futs[item_id] = fut self._sess._queue_msg( { "type": "conversation.item.delete", "item_id": item_id, } ) return fut class Conversation: def __init__(self, sess: RealtimeSession) -> None: self._sess = sess @property def item(self) -> RealtimeSession.ConversationItem: return RealtimeSession.ConversationItem(self._sess) class Response: def __init__(self, sess: RealtimeSession) -> None: self._sess = sess def create( self, *, on_duplicate: Literal[ "cancel_existing", "cancel_new", "keep_both" ] = "keep_both", ) -> asyncio.Future[bool]: """Creates a new response. Args: on_duplicate: How to handle when there is an existing response in progress: - "cancel_existing": Cancel the existing response before creating new one - "cancel_new": Skip creating new response if one is in progress - "keep_both": Wait for the existing response to be done and then create a new one Returns: Future that resolves when the response create request is queued """ if on_duplicate not in ("cancel_existing", "cancel_new", "keep_both"): raise ValueError( "invalid on_duplicate value, must be one of: " "cancel_existing, cancel_new, keep_both" ) # check if there is a pending response creation request sent pending_create_fut = self._sess._response_create_fut if pending_create_fut is not None: if on_duplicate == "cancel_new": logger.warning( "skip new response creation due to previous pending response creation", extra=self._sess.logging_extra(), ) _fut = asyncio.Future[bool]() _fut.set_result(False) return _fut active_resp_id = self._sess._active_response_id _logging_extra = { "existing_response_id": active_resp_id, **self._sess.logging_extra(), } if ( not active_resp_id or self._sess._pending_responses[active_resp_id].done_fut.done() ): # no active response in progress, create a new one self._sess._queue_msg({"type": "response.create"}) _fut = asyncio.Future[bool]() _fut.set_result(True) return _fut # there is an active response in progress if on_duplicate == "cancel_new": logger.warning( "skip new response creation due to active response in progress", extra=_logging_extra, ) _fut = asyncio.Future[bool]() _fut.set_result(False) return _fut if on_duplicate == "cancel_existing": self.cancel() logger.warning( "cancelling in-progress response to create a new one", extra=_logging_extra, ) elif on_duplicate == "keep_both": logger.warning( "waiting for in-progress response to be done " "before creating a new one", extra=_logging_extra, ) # create a task to wait for the previous response and then create new one async def wait_and_create() -> bool: await self._sess._pending_responses[active_resp_id].done_fut logger.info( "in-progress response is done, creating a new one", extra=_logging_extra, ) new_create_fut = asyncio.Future[None]() self._sess._response_create_fut = new_create_fut self._sess._queue_msg({"type": "response.create"}) return True return asyncio.create_task(wait_and_create()) def cancel(self) -> None: self._sess._queue_msg({"type": "response.cancel"}) def __init__( self, *, opts: _ModelOptions, http_session: aiohttp.ClientSession, chat_ctx: llm.ChatContext, fnc_ctx: llm.FunctionContext | None, loop: asyncio.AbstractEventLoop, ) -> None: super().__init__() self._label = f"{type(self).__module__}.{type(self).__name__}" self._main_atask = asyncio.create_task( self._main_task(), name="openai-realtime-session" ) # manage conversation items internally self._remote_conversation_items = remote_items._RemoteConversationItems() # wait for the item to be created or deleted self._item_created_futs: dict[str, asyncio.Future[bool]] = {} self._item_deleted_futs: dict[str, asyncio.Future[bool]] = {} self._item_truncated_futs: dict[str, asyncio.Future[bool]] = {} self._fnc_ctx = fnc_ctx self._loop = loop self._opts = opts self._send_ch = utils.aio.Chan[api_proto.ClientEvents]() self._http_session = http_session self._pending_responses: dict[str, RealtimeResponse] = {} self._active_response_id: str | None = None self._response_create_fut: asyncio.Future[None] | None = None self._session_id = "not-connected" self.session_update() # initial session init # sync the chat context to the session self._init_sync_task = asyncio.create_task(self.set_chat_ctx(chat_ctx)) self._fnc_tasks = utils.aio.TaskSet() async def aclose(self) -> None: if self._send_ch.closed: return self._send_ch.close() await self._main_atask @property def fnc_ctx(self) -> llm.FunctionContext | None: return self._fnc_ctx @fnc_ctx.setter def fnc_ctx(self, fnc_ctx: llm.FunctionContext | None) -> None: self._fnc_ctx = fnc_ctx @property def conversation(self) -> Conversation: return RealtimeSession.Conversation(self) @property def input_audio_buffer(self) -> InputAudioBuffer: return RealtimeSession.InputAudioBuffer(self) @property def response(self) -> Response: return RealtimeSession.Response(self) def session_update( self, *, modalities: list[api_proto.Modality] | None = None, instructions: str | None = None, voice: api_proto.Voice | None = None, input_audio_format: api_proto.AudioFormat | None = None, output_audio_format: api_proto.AudioFormat | None = None, input_audio_transcription: InputTranscriptionOptions | None = None, turn_detection: ServerVadOptions | None = None, tool_choice: api_proto.ToolChoice | None = None, temperature: float | None = None, max_response_output_tokens: int | Literal["inf"] | None = None, ) -> None: self._opts = deepcopy(self._opts) if modalities is not None: self._opts.modalities = modalities if instructions is not None: self._opts.instructions = instructions if voice is not None: self._opts.voice = voice if input_audio_format is not None: self._opts.input_audio_format = input_audio_format if output_audio_format is not None: self._opts.output_audio_format = output_audio_format if input_audio_transcription is not None: self._opts.input_audio_transcription = input_audio_transcription if turn_detection is not None: self._opts.turn_detection = turn_detection if tool_choice is not None: self._opts.tool_choice = tool_choice if temperature is not None: self._opts.temperature = temperature if max_response_output_tokens is not None: self._opts.max_response_output_tokens = max_response_output_tokens tools = [] if self._fnc_ctx is not None: for fnc in self._fnc_ctx.ai_functions.values(): # the realtime API is using internally-tagged polymorphism. # build_oai_function_description was built for the ChatCompletion API function_data = build_oai_function_description(fnc)["function"] function_data["type"] = "function" tools.append(function_data) server_vad_opts: api_proto.ServerVad | None = None if self._opts.turn_detection is not None: server_vad_opts = { "type": "server_vad", "threshold": self._opts.turn_detection.threshold, "prefix_padding_ms": self._opts.turn_detection.prefix_padding_ms, "silence_duration_ms": self._opts.turn_detection.silence_duration_ms, } input_audio_transcription_opts: api_proto.InputAudioTranscription | None = None if self._opts.input_audio_transcription is not None: input_audio_transcription_opts = { "model": self._opts.input_audio_transcription.model, } session_data: api_proto.ClientEvent.SessionUpdateData = { "modalities": self._opts.modalities, "instructions": self._opts.instructions, "voice": self._opts.voice, "input_audio_format": self._opts.input_audio_format, "output_audio_format": self._opts.output_audio_format, "input_audio_transcription": input_audio_transcription_opts, "turn_detection": server_vad_opts, "tools": tools, "tool_choice": self._opts.tool_choice, "temperature": self._opts.temperature, "max_response_output_tokens": None, } # azure doesn't support inf for max_response_output_tokens if not self._opts.is_azure or isinstance( self._opts.max_response_output_tokens, int ): session_data["max_response_output_tokens"] = ( self._opts.max_response_output_tokens ) else: del session_data["max_response_output_tokens"] # type: ignore self._queue_msg( { "type": "session.update", "session": session_data, } ) def chat_ctx_copy(self) -> llm.ChatContext: return self._remote_conversation_items.to_chat_context() async def set_chat_ctx(self, new_ctx: llm.ChatContext) -> None: """Sync the chat context with the agent's chat context. Compute the minimum number of insertions and deletions to transform the old chat context messages to the new chat context messages. """ original_ctx = self._remote_conversation_items.to_chat_context() # filter out messages that are not function calls and content is None filtered_messages = [ msg for msg in new_ctx.messages if msg.tool_call_id or msg.content is not None ] changes = utils._compute_changes( original_ctx.messages, filtered_messages, key_fnc=lambda x: x.id ) logger.debug( "sync chat context", extra={ "to_delete": [msg.id for msg in changes.to_delete], "to_add": [ (prev.id if prev else None, msg.id) for prev, msg in changes.to_add ], }, ) # append an empty audio message if all new messages are text if changes.to_add and not any( isinstance(msg.content, llm.ChatAudio) for _, msg in changes.to_add ): # Patch: append an empty audio message to set the API in audio mode changes.to_add.append((None, self._create_empty_user_audio_message(1.0))) _futs = [ self.conversation.item.delete(item_id=msg.id) for msg in changes.to_delete ] + [ self.conversation.item.create(msg, prev.id if prev else None) for prev, msg in changes.to_add ] # wait for all the futures to complete await asyncio.gather(*_futs) def _create_empty_user_audio_message(self, duration: float) -> llm.ChatMessage: """Create an empty audio message with the given duration.""" samples = int(duration * api_proto.SAMPLE_RATE) return llm.ChatMessage( role="user", content=llm.ChatAudio( frame=rtc.AudioFrame( data=b"\x00\x00" * (samples * api_proto.NUM_CHANNELS), sample_rate=api_proto.SAMPLE_RATE, num_channels=api_proto.NUM_CHANNELS, samples_per_channel=samples, ) ), ) def _recover_from_text_response(self, item_id: str | None = None) -> None: """Try to recover from a text response to audio mode. Sometimes the OpenAI Realtime API returns text instead of audio responses. This method tries to recover from this by requesting a new response after deleting the text response and creating an empty user audio message. """ if item_id: # remove the text response if needed self.conversation.item.delete(item_id=item_id) self.conversation.item.create(self._create_empty_user_audio_message(1.0)) self.response.create(on_duplicate="keep_both") def _update_conversation_item_content( self, item_id: str, content: llm.ChatContent | list[llm.ChatContent] | None ) -> None: item = self._remote_conversation_items.get(item_id) if item is None: logger.warning( "conversation item not found, skipping update", extra={"item_id": item_id}, ) return item.content = content def _queue_msg(self, msg: api_proto.ClientEvents) -> None: self._send_ch.send_nowait(msg) @utils.log_exceptions(logger=logger) async def _main_task(self) -> None: try: headers = {"User-Agent": "LiveKit Agents"} query_params: dict[str, str] = {} base_url = self._opts.base_url if self._opts.is_azure: if self._opts.entra_token: headers["Authorization"] = f"Bearer {self._opts.entra_token}" if self._opts.api_key: headers["api-key"] = self._opts.api_key if self._opts.api_version: query_params["api-version"] = self._opts.api_version if self._opts.azure_deployment: query_params["deployment"] = self._opts.azure_deployment else: # OAI endpoint headers["Authorization"] = f"Bearer {self._opts.api_key}" headers["OpenAI-Beta"] = "realtime=v1" if self._opts.model: query_params["model"] = self._opts.model url = f"{base_url.rstrip('/')}/realtime?{urlencode(query_params)}" if url.startswith("http"): url = url.replace("http", "ws", 1) ws_conn = await self._http_session.ws_connect( url, headers=headers, ) except Exception: logger.exception("failed to connect to OpenAI API S2S") return closing = False @utils.log_exceptions(logger=logger) async def _send_task(): nonlocal closing async for msg in self._send_ch: await ws_conn.send_json(msg) closing = True await ws_conn.close() @utils.log_exceptions(logger=logger) async def _recv_task(): while True: msg = await ws_conn.receive() if msg.type in ( aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE, aiohttp.WSMsgType.CLOSING, ): if closing: return raise Exception("OpenAI S2S connection closed unexpectedly") if msg.type != aiohttp.WSMsgType.TEXT: logger.warning( "unexpected OpenAI S2S message type %s", msg.type, extra=self.logging_extra(), ) continue try: data = msg.json() event: api_proto.ServerEventType = data["type"] if event == "session.created": self._handle_session_created(data) if event == "session.updated": self._handle_session_updated(data) elif event == "error": self._handle_error(data) elif event == "input_audio_buffer.speech_started": self._handle_input_audio_buffer_speech_started(data) elif event == "input_audio_buffer.speech_stopped": self._handle_input_audio_buffer_speech_stopped(data) elif event == "input_audio_buffer.committed": self._handle_input_audio_buffer_speech_committed(data) elif ( event == "conversation.item.input_audio_transcription.completed" ): self._handle_conversation_item_input_audio_transcription_completed( data ) elif event == "conversation.item.input_audio_transcription.failed": self._handle_conversation_item_input_audio_transcription_failed( data ) elif event == "conversation.item.created": self._handle_conversation_item_created(data) elif event == "conversation.item.deleted": self._handle_conversation_item_deleted(data) elif event == "conversation.item.truncated": self._handle_conversation_item_truncated(data) elif event == "response.created": self._handle_response_created(data) elif event == "response.output_item.added": self._handle_response_output_item_added(data) elif event == "response.content_part.added": self._handle_response_content_part_added(data) elif event == "response.audio.delta": self._handle_response_audio_delta(data) elif event == "response.audio_transcript.delta": self._handle_response_audio_transcript_delta(data) elif event == "response.audio.done": self._handle_response_audio_done(data) elif event == "response.text.done": self._handle_response_text_done(data) elif event == "response.audio_transcript.done": self._handle_response_audio_transcript_done(data) elif event == "response.content_part.done": self._handle_response_content_part_done(data) elif event == "response.output_item.done": self._handle_response_output_item_done(data) elif event == "response.done": self._handle_response_done(data) except Exception: logger.exception( "failed to handle OpenAI S2S message", extra={"websocket_message": msg, **self.logging_extra()}, ) tasks = [ asyncio.create_task(_send_task(), name="openai-realtime-send"), asyncio.create_task(_recv_task(), name="openai-realtime-recv"), ] try: await asyncio.gather(*tasks) finally: await utils.aio.gracefully_cancel(*tasks) def _handle_session_created( self, session_created: api_proto.ServerEvent.SessionCreated ): self._session_id = session_created["session"]["id"] def _handle_session_updated( self, session_updated: api_proto.ServerEvent.SessionUpdated ): session = session_updated["session"] if session["turn_detection"] is None: turn_detection = None else: turn_detection = ServerVadOptions( threshold=session["turn_detection"]["threshold"], prefix_padding_ms=session["turn_detection"]["prefix_padding_ms"], silence_duration_ms=session["turn_detection"]["silence_duration_ms"], ) if session["input_audio_transcription"] is None: input_audio_transcription = None else: input_audio_transcription = InputTranscriptionOptions( model=session["input_audio_transcription"]["model"], ) self.emit( "session_updated", RealtimeSessionOptions( model=session["model"], modalities=session["modalities"], instructions=session["instructions"], voice=session["voice"], input_audio_format=session["input_audio_format"], output_audio_format=session["output_audio_format"], input_audio_transcription=input_audio_transcription, turn_detection=turn_detection, tool_choice=session["tool_choice"], temperature=session["temperature"], max_response_output_tokens=session["max_response_output_tokens"], ), ) def _handle_error(self, error: api_proto.ServerEvent.Error): logger.error( "OpenAI S2S error %s", error, extra=self.logging_extra(), ) error_content = error["error"] self.emit( "error", RealtimeError( event_id=error["event_id"], type=error_content["type"], message=error_content["message"], code=error_content.get("code"), param=error_content.get("param"), ), ) def _handle_input_audio_buffer_speech_started( self, speech_started: api_proto.ServerEvent.InputAudioBufferSpeechStarted ): self.emit("input_speech_started") def _handle_input_audio_buffer_speech_stopped( self, speech_stopped: api_proto.ServerEvent.InputAudioBufferSpeechStopped ): self.emit("input_speech_stopped") def _handle_input_audio_buffer_speech_committed( self, speech_committed: api_proto.ServerEvent.InputAudioBufferCommitted ): self.emit("input_speech_committed") def _handle_conversation_item_input_audio_transcription_completed( self, transcription_completed: api_proto.ServerEvent.ConversationItemInputAudioTranscriptionCompleted, ): transcript = transcription_completed["transcript"] self.emit( "input_speech_transcription_completed", InputTranscriptionCompleted( item_id=transcription_completed["item_id"], transcript=transcript, ), ) def _handle_conversation_item_input_audio_transcription_failed( self, transcription_failed: api_proto.ServerEvent.ConversationItemInputAudioTranscriptionFailed, ): error = transcription_failed["error"] logger.error( "OAI S2S failed to transcribe input audio: %s", error["message"], extra=self.logging_extra(), ) self.emit( "input_speech_transcription_failed", InputTranscriptionFailed( item_id=transcription_failed["item_id"], message=error["message"], ), ) def _handle_conversation_item_created( self, item_created: api_proto.ServerEvent.ConversationItemCreated ): previous_item_id = item_created["previous_item_id"] item = item_created["item"] item_type = item["type"] item_id = item["id"] # Create message based on item type # Leave the content empty and fill it in later from the content parts if item_type == "message": # Handle message items (system/user/assistant) item = cast(Union[api_proto.SystemItem, api_proto.UserItem], item) role = item["role"] message = llm.ChatMessage(id=item_id, role=role) if item.get("content"): content = item["content"][0] if content["type"] in ("text", "input_text"): content = cast(api_proto.InputTextContent, content) message.content = content["text"] elif content["type"] == "input_audio" and content.get("audio"): audio_data = base64.b64decode(content["audio"]) message.content = llm.ChatAudio( frame=rtc.AudioFrame( data=audio_data, sample_rate=api_proto.SAMPLE_RATE, num_channels=api_proto.NUM_CHANNELS, samples_per_channel=len(audio_data) // 2, ) ) elif item_type == "function_call": # Handle function call items item = cast(api_proto.FunctionCallItem, item) message = llm.ChatMessage( id=item_id, role="assistant", name=item["name"], tool_call_id=item["call_id"], ) elif item_type == "function_call_output": # Handle function call output items item = cast(api_proto.FunctionCallOutputItem, item) message = llm.ChatMessage( id=item_id, role="tool", tool_call_id=item["call_id"], content=item["output"], ) else: logger.error( f"unknown conversation item type {item_type}", extra=self.logging_extra(), ) return # Insert into conversation items self._remote_conversation_items.insert_after(previous_item_id, message) if item_id in self._item_created_futs: self._item_created_futs[item_id].set_result(True) del self._item_created_futs[item_id] logger.debug("conversation item created", extra=item_created) def _handle_conversation_item_deleted( self, item_deleted: api_proto.ServerEvent.ConversationItemDeleted ): # Delete from conversation items item_id = item_deleted["item_id"] self._remote_conversation_items.delete(item_id) if item_id in self._item_deleted_futs: self._item_deleted_futs[item_id].set_result(True) del self._item_deleted_futs[item_id] logger.debug("conversation item deleted", extra=item_deleted) def _handle_conversation_item_truncated( self, item_truncated: api_proto.ServerEvent.ConversationItemTruncated ): item_id = item_truncated["item_id"] if item_id in self._item_truncated_futs: self._item_truncated_futs[item_id].set_result(True) del self._item_truncated_futs[item_id] def _handle_response_created( self, response_created: api_proto.ServerEvent.ResponseCreated ): response = response_created["response"] done_fut = self._loop.create_future() status_details = response.get("status_details") new_response = RealtimeResponse( id=response["id"], status=response["status"], status_details=status_details, output=[], usage=response.get("usage"), done_fut=done_fut, _created_timestamp=time.time(), ) self._pending_responses[new_response.id] = new_response self._active_response_id = new_response.id # complete the create future if it exists if self._response_create_fut is not None: self._response_create_fut.set_result(None) self._response_create_fut = None self.emit("response_created", new_response) def _handle_response_output_item_added( self, response_output_added: api_proto.ServerEvent.ResponseOutputItemAdded ): response_id = response_output_added["response_id"] response = self._pending_responses[response_id] done_fut = self._loop.create_future() item_data = response_output_added["item"] item_type: Literal["message", "function_call"] = item_data["type"] # type: ignore assert item_type in ("message", "function_call") # function_call doesn't have a role field, defaulting it to assistant item_role: api_proto.Role = item_data.get("role") or "assistant" # type: ignore new_output = RealtimeOutput( response_id=response_id, item_id=item_data["id"], output_index=response_output_added["output_index"], type=item_type, role=item_role, content=[], done_fut=done_fut, ) response.output.append(new_output) self.emit("response_output_added", new_output) def _handle_response_content_part_added( self, response_content_added: api_proto.ServerEvent.ResponseContentPartAdded ): response_id = response_content_added["response_id"] response = self._pending_responses[response_id] output_index = response_content_added["output_index"] output = response.output[output_index] content_type = response_content_added["part"]["type"] text_ch = utils.aio.Chan[str]() audio_ch = utils.aio.Chan[rtc.AudioFrame]() new_content = RealtimeContent( response_id=response_id, item_id=response_content_added["item_id"], output_index=output_index, content_index=response_content_added["content_index"], text="", audio=[], text_stream=text_ch, audio_stream=audio_ch, tool_calls=[], content_type=content_type, ) output.content.append(new_content) response._first_token_timestamp = time.time() self.emit("response_content_added", new_content) def _handle_response_audio_delta( self, response_audio_delta: api_proto.ServerEvent.ResponseAudioDelta ): content = self._get_content(response_audio_delta) data = base64.b64decode(response_audio_delta["delta"]) audio = rtc.AudioFrame( data=data, sample_rate=api_proto.SAMPLE_RATE, num_channels=api_proto.NUM_CHANNELS, samples_per_channel=len(data) // 2, ) content.audio.append(audio) assert isinstance(content.audio_stream, utils.aio.Chan) content.audio_stream.send_nowait(audio) def _handle_response_audio_transcript_delta( self, response_audio_transcript_delta: api_proto.ServerEvent.ResponseAudioTranscriptDelta, ): content = self._get_content(response_audio_transcript_delta) transcript = response_audio_transcript_delta["delta"] content.text += transcript assert isinstance(content.text_stream, utils.aio.Chan) content.text_stream.send_nowait(transcript) def _handle_response_audio_done( self, response_audio_done: api_proto.ServerEvent.ResponseAudioDone ): content = self._get_content(response_audio_done) assert isinstance(content.audio_stream, utils.aio.Chan) content.audio_stream.close() def _handle_response_text_done( self, response_text_done: api_proto.ServerEvent.ResponseTextDone ): content = self._get_content(response_text_done) content.text = response_text_done["text"] def _handle_response_audio_transcript_done( self, response_audio_transcript_done: api_proto.ServerEvent.ResponseAudioTranscriptDone, ): content = self._get_content(response_audio_transcript_done) assert isinstance(content.text_stream, utils.aio.Chan) content.text_stream.close() def _handle_response_content_part_done( self, response_content_done: api_proto.ServerEvent.ResponseContentPartDone ): content = self._get_content(response_content_done) self.emit("response_content_done", content) def _handle_response_output_item_done( self, response_output_done: api_proto.ServerEvent.ResponseOutputItemDone ): response_id = response_output_done["response_id"] response = self._pending_responses[response_id] output_index = response_output_done["output_index"] output = response.output[output_index] if output.type == "function_call": if self._fnc_ctx is None: logger.error( "function call received but no fnc_ctx is available", extra=self.logging_extra(), ) return # parse the arguments and call the function inside the fnc_ctx item = response_output_done["item"] assert item["type"] == "function_call" fnc_call_info = _create_ai_function_info( self._fnc_ctx, item["call_id"], item["name"], item["arguments"], ) msg = self._remote_conversation_items.get(output.item_id) if msg is not None: # update the content of the message assert msg.tool_call_id == item["call_id"] assert msg.role == "assistant" msg.name = item["name"] msg.tool_calls = [fnc_call_info] self.emit("function_calls_collected", [fnc_call_info]) self._fnc_tasks.create_task( self._run_fnc_task(fnc_call_info, output.item_id) ) output.done_fut.set_result(None) self.emit("response_output_done", output) def _handle_response_done(self, response_done: api_proto.ServerEvent.ResponseDone): response_data = response_done["response"] response_id = response_data["id"] response = self._pending_responses[response_id] self._active_response_id = None response.done_fut.set_result(None) response.status = response_data["status"] response.status_details = response_data.get("status_details") response.usage = response_data.get("usage") metrics_error = None cancelled = False if response.status == "failed": assert response.status_details is not None error = response.status_details.get("error", {}) code: str | None = error.get("code") # type: ignore message: str | None = error.get("message") # type: ignore metrics_error = MultimodalLLMError( type=response.status_details.get("type"), code=code, message=message, ) logger.error( "response generation failed", extra={"code": code, "error": message, **self.logging_extra()}, ) elif response.status == "incomplete": assert response.status_details is not None reason = response.status_details.get("reason") metrics_error = MultimodalLLMError( type=response.status_details.get("type"), reason=reason, # type: ignore ) logger.warning( "response generation incomplete", extra={"reason": reason, **self.logging_extra()}, ) elif response.status == "cancelled": cancelled = True self.emit("response_done", response) # calculate metrics ttft = -1.0 if response._first_token_timestamp is not None: ttft = response._first_token_timestamp - response._created_timestamp duration = time.time() - response._created_timestamp usage = response.usage or {} # type: ignore input_token_details = usage.get("input_token_details", {}) metrics = MultimodalLLMMetrics( timestamp=response._created_timestamp, request_id=response.id, ttft=ttft, duration=duration, cancelled=cancelled, label=self._label, completion_tokens=usage.get("output_tokens", 0), prompt_tokens=usage.get("input_tokens", 0), total_tokens=usage.get("total_tokens", 0), tokens_per_second=usage.get("output_tokens", 0) / duration, error=metrics_error, input_token_details=MultimodalLLMMetrics.InputTokenDetails( cached_tokens=input_token_details.get("cached_tokens", 0), text_tokens=usage.get("input_token_details", {}).get("text_tokens", 0), audio_tokens=usage.get("input_token_details", {}).get( "audio_tokens", 0 ), cached_tokens_details=MultimodalLLMMetrics.CachedTokenDetails( text_tokens=input_token_details.get( "cached_tokens_details", {} ).get("text_tokens", 0), audio_tokens=input_token_details.get( "cached_tokens_details", {} ).get("audio_tokens", 0), ), ), output_token_details=MultimodalLLMMetrics.OutputTokenDetails( text_tokens=usage.get("output_token_details", {}).get("text_tokens", 0), audio_tokens=usage.get("output_token_details", {}).get( "audio_tokens", 0 ), ), ) self.emit("metrics_collected", metrics) def _get_content(self, ptr: _ContentPtr) -> RealtimeContent: response = self._pending_responses[ptr["response_id"]] output = response.output[ptr["output_index"]] content = output.content[ptr["content_index"]] return content @utils.log_exceptions(logger=logger) async def _run_fnc_task(self, fnc_call_info: llm.FunctionCallInfo, item_id: str): logger.debug( "executing ai function", extra={ "function": fnc_call_info.function_info.name, }, ) called_fnc = fnc_call_info.execute() await called_fnc.task tool_call = llm.ChatMessage.create_tool_from_called_function(called_fnc) logger.info( "creating response for tool call", extra={ "function": fnc_call_info.function_info.name, }, ) if called_fnc.result is not None: create_fut = self.conversation.item.create( tool_call, previous_item_id=item_id, ) await self.response.create(on_duplicate="keep_both") await create_fut # update the message with the tool call result msg = self._remote_conversation_items.get(tool_call.id) if msg is not None: assert msg.tool_call_id == tool_call.tool_call_id assert msg.role == "tool" msg.name = tool_call.name msg.content = tool_call.content msg.tool_exception = tool_call.tool_exception self.emit("function_calls_finished", [called_fnc]) def logging_extra(self) -> dict: return {"session_id": self._session_id}
Abstract base class for generic types.
On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::
class Mapping[KT, VT]: def __getitem__(self, key: KT) -> VT: ... # Etc.
On older versions of Python, however, generic classes have to explicitly inherit from Generic.
After a class has been declared to be generic, it can then be used as follows::
def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
Initialize a new instance of EventEmitter.
Ancestors
- EventEmitter
- typing.Generic
Class variables
var Conversation
var ConversationItem
var InputAudioBuffer
var Response
Instance variables
prop conversation : Conversation
-
Expand source code
@property def conversation(self) -> Conversation: return RealtimeSession.Conversation(self)
prop fnc_ctx : llm.FunctionContext | None
-
Expand source code
@property def fnc_ctx(self) -> llm.FunctionContext | None: return self._fnc_ctx
prop input_audio_buffer : InputAudioBuffer
-
Expand source code
@property def input_audio_buffer(self) -> InputAudioBuffer: return RealtimeSession.InputAudioBuffer(self)
prop response : Response
-
Expand source code
@property def response(self) -> Response: return RealtimeSession.Response(self)
Methods
async def aclose(self) ‑> None
-
Expand source code
async def aclose(self) -> None: if self._send_ch.closed: return self._send_ch.close() await self._main_atask
def chat_ctx_copy(self) ‑> ChatContext
-
Expand source code
def chat_ctx_copy(self) -> llm.ChatContext: return self._remote_conversation_items.to_chat_context()
def logging_extra(self) ‑> dict
-
Expand source code
def logging_extra(self) -> dict: return {"session_id": self._session_id}
def session_update(self,
*,
modalities: list[api_proto.Modality] | None = None,
instructions: str | None = None,
voice: api_proto.Voice | None = None,
input_audio_format: api_proto.AudioFormat | None = None,
output_audio_format: api_proto.AudioFormat | None = None,
input_audio_transcription: InputTranscriptionOptions | None = None,
turn_detection: ServerVadOptions | None = None,
tool_choice: api_proto.ToolChoice | None = None,
temperature: float | None = None,
max_response_output_tokens: "int | Literal['inf'] | None" = None) ‑> None-
Expand source code
def session_update( self, *, modalities: list[api_proto.Modality] | None = None, instructions: str | None = None, voice: api_proto.Voice | None = None, input_audio_format: api_proto.AudioFormat | None = None, output_audio_format: api_proto.AudioFormat | None = None, input_audio_transcription: InputTranscriptionOptions | None = None, turn_detection: ServerVadOptions | None = None, tool_choice: api_proto.ToolChoice | None = None, temperature: float | None = None, max_response_output_tokens: int | Literal["inf"] | None = None, ) -> None: self._opts = deepcopy(self._opts) if modalities is not None: self._opts.modalities = modalities if instructions is not None: self._opts.instructions = instructions if voice is not None: self._opts.voice = voice if input_audio_format is not None: self._opts.input_audio_format = input_audio_format if output_audio_format is not None: self._opts.output_audio_format = output_audio_format if input_audio_transcription is not None: self._opts.input_audio_transcription = input_audio_transcription if turn_detection is not None: self._opts.turn_detection = turn_detection if tool_choice is not None: self._opts.tool_choice = tool_choice if temperature is not None: self._opts.temperature = temperature if max_response_output_tokens is not None: self._opts.max_response_output_tokens = max_response_output_tokens tools = [] if self._fnc_ctx is not None: for fnc in self._fnc_ctx.ai_functions.values(): # the realtime API is using internally-tagged polymorphism. # build_oai_function_description was built for the ChatCompletion API function_data = build_oai_function_description(fnc)["function"] function_data["type"] = "function" tools.append(function_data) server_vad_opts: api_proto.ServerVad | None = None if self._opts.turn_detection is not None: server_vad_opts = { "type": "server_vad", "threshold": self._opts.turn_detection.threshold, "prefix_padding_ms": self._opts.turn_detection.prefix_padding_ms, "silence_duration_ms": self._opts.turn_detection.silence_duration_ms, } input_audio_transcription_opts: api_proto.InputAudioTranscription | None = None if self._opts.input_audio_transcription is not None: input_audio_transcription_opts = { "model": self._opts.input_audio_transcription.model, } session_data: api_proto.ClientEvent.SessionUpdateData = { "modalities": self._opts.modalities, "instructions": self._opts.instructions, "voice": self._opts.voice, "input_audio_format": self._opts.input_audio_format, "output_audio_format": self._opts.output_audio_format, "input_audio_transcription": input_audio_transcription_opts, "turn_detection": server_vad_opts, "tools": tools, "tool_choice": self._opts.tool_choice, "temperature": self._opts.temperature, "max_response_output_tokens": None, } # azure doesn't support inf for max_response_output_tokens if not self._opts.is_azure or isinstance( self._opts.max_response_output_tokens, int ): session_data["max_response_output_tokens"] = ( self._opts.max_response_output_tokens ) else: del session_data["max_response_output_tokens"] # type: ignore self._queue_msg( { "type": "session.update", "session": session_data, } )
async def set_chat_ctx(self, new_ctx: llm.ChatContext) ‑> None
-
Expand source code
async def set_chat_ctx(self, new_ctx: llm.ChatContext) -> None: """Sync the chat context with the agent's chat context. Compute the minimum number of insertions and deletions to transform the old chat context messages to the new chat context messages. """ original_ctx = self._remote_conversation_items.to_chat_context() # filter out messages that are not function calls and content is None filtered_messages = [ msg for msg in new_ctx.messages if msg.tool_call_id or msg.content is not None ] changes = utils._compute_changes( original_ctx.messages, filtered_messages, key_fnc=lambda x: x.id ) logger.debug( "sync chat context", extra={ "to_delete": [msg.id for msg in changes.to_delete], "to_add": [ (prev.id if prev else None, msg.id) for prev, msg in changes.to_add ], }, ) # append an empty audio message if all new messages are text if changes.to_add and not any( isinstance(msg.content, llm.ChatAudio) for _, msg in changes.to_add ): # Patch: append an empty audio message to set the API in audio mode changes.to_add.append((None, self._create_empty_user_audio_message(1.0))) _futs = [ self.conversation.item.delete(item_id=msg.id) for msg in changes.to_delete ] + [ self.conversation.item.create(msg, prev.id if prev else None) for prev, msg in changes.to_add ] # wait for all the futures to complete await asyncio.gather(*_futs)
Sync the chat context with the agent's chat context.
Compute the minimum number of insertions and deletions to transform the old chat context messages to the new chat context messages.
Inherited members
class RealtimeSessionOptions (model: api_proto.OpenAIModel | str,
modalities: list[api_proto.Modality],
instructions: str,
voice: api_proto.Voice,
input_audio_format: api_proto.AudioFormat,
output_audio_format: api_proto.AudioFormat,
input_audio_transcription: InputTranscriptionOptions | None,
turn_detection: ServerVadOptions | None,
tool_choice: api_proto.ToolChoice,
temperature: float,
max_response_output_tokens: "int | Literal['inf']")-
Expand source code
@dataclass class RealtimeSessionOptions: model: api_proto.OpenAIModel | str modalities: list[api_proto.Modality] instructions: str voice: api_proto.Voice input_audio_format: api_proto.AudioFormat output_audio_format: api_proto.AudioFormat input_audio_transcription: InputTranscriptionOptions | None turn_detection: ServerVadOptions | None tool_choice: api_proto.ToolChoice temperature: float max_response_output_tokens: int | Literal["inf"]
RealtimeSessionOptions(model: 'api_proto.OpenAIModel | str', modalities: 'list[api_proto.Modality]', instructions: 'str', voice: 'api_proto.Voice', input_audio_format: 'api_proto.AudioFormat', output_audio_format: 'api_proto.AudioFormat', input_audio_transcription: 'InputTranscriptionOptions | None', turn_detection: 'ServerVadOptions | None', tool_choice: 'api_proto.ToolChoice', temperature: 'float', max_response_output_tokens: "int | Literal['inf']")
Subclasses
- livekit.plugins.openai.realtime.realtime_model._ModelOptions
Class variables
var input_audio_format : Literal['pcm16', 'g711_ulaw', 'g711_alaw']
var input_audio_transcription : InputTranscriptionOptions | None
var instructions : str
var max_response_output_tokens : int | Literal['inf']
var modalities : list[typing.Literal['text', 'audio']]
var model : Literal['gpt-4o-realtime-preview', 'gpt-4o-realtime-preview-2024-10-01', 'gpt-4o-realtime-preview-2024-12-17', 'gpt-4o-mini-realtime-preview', 'gpt-4o-mini-realtime-preview-2024-12-17'] | str
var output_audio_format : Literal['pcm16', 'g711_ulaw', 'g711_alaw']
var temperature : float
var tool_choice : Literal['auto', 'none', 'required'] | FunctionToolChoice
var turn_detection : ServerVadOptions | None
var voice : Literal['alloy', 'echo', 'shimmer', 'ash', 'ballad', 'coral', 'sage', 'verse']
class RealtimeToolCall (name: str, arguments: str, tool_call_id: str)
-
Expand source code
@dataclass class RealtimeToolCall: name: str """name of the function""" arguments: str """accumulated arguments""" tool_call_id: str """id of the tool call"""
RealtimeToolCall(name: 'str', arguments: 'str', tool_call_id: 'str')
Class variables
var arguments : str
-
accumulated arguments
var name : str
-
name of the function
var tool_call_id : str
-
id of the tool call
class ServerVadOptions (threshold: float, prefix_padding_ms: int, silence_duration_ms: int)
-
Expand source code
@dataclass class ServerVadOptions: threshold: float prefix_padding_ms: int silence_duration_ms: int
ServerVadOptions(threshold: 'float', prefix_padding_ms: 'int', silence_duration_ms: 'int')
Class variables
var prefix_padding_ms : int
var silence_duration_ms : int
var threshold : float