Module livekit.plugins.openai.realtime
Sub-modules
livekit.plugins.openai.realtime.api_proto
livekit.plugins.openai.realtime.log
livekit.plugins.openai.realtime.realtime_model
Classes
class InputTranscriptionCompleted (item_id: str, transcript: str)
-
InputTranscriptionCompleted(item_id: 'str', transcript: 'str')
Expand source code
@dataclass class InputTranscriptionCompleted: item_id: str """id of the item""" transcript: str """transcript of the input audio"""
Class variables
var item_id : str
-
id of the item
var transcript : str
-
transcript of the input audio
class InputTranscriptionFailed (item_id: str, message: str)
-
InputTranscriptionFailed(item_id: 'str', message: 'str')
Expand source code
@dataclass class InputTranscriptionFailed: item_id: str """id of the item""" message: str """error message"""
Class variables
var item_id : str
-
id of the item
var message : str
-
error message
class InputTranscriptionOptions (model: api_proto.InputTranscriptionModel | str)
-
InputTranscriptionOptions(model: 'api_proto.InputTranscriptionModel | str')
Expand source code
@dataclass class InputTranscriptionOptions: model: api_proto.InputTranscriptionModel | str
Class variables
var model : Union[Literal['whisper-1'], str]
class RealtimeContent (response_id: str, item_id: str, output_index: int, content_index: int, text: str, audio: list[rtc.AudioFrame], text_stream: AsyncIterable[str], audio_stream: AsyncIterable[rtc.AudioFrame], tool_calls: list[RealtimeToolCall])
-
RealtimeContent(response_id: 'str', item_id: 'str', output_index: 'int', content_index: 'int', text: 'str', audio: 'list[rtc.AudioFrame]', text_stream: 'AsyncIterable[str]', audio_stream: 'AsyncIterable[rtc.AudioFrame]', tool_calls: 'list[RealtimeToolCall]')
Expand source code
@dataclass class RealtimeContent: response_id: str """id of the response""" item_id: str """id of the item""" output_index: int """index of the output""" content_index: int """index of the content""" text: str """accumulated text content""" audio: list[rtc.AudioFrame] """accumulated audio content""" text_stream: AsyncIterable[str] """stream of text content""" audio_stream: AsyncIterable[rtc.AudioFrame] """stream of audio content""" tool_calls: list[RealtimeToolCall] """pending tool calls"""
Class variables
var audio : list[AudioFrame]
-
accumulated audio content
var audio_stream : AsyncIterable[AudioFrame]
-
stream of audio content
var content_index : int
-
index of the content
var item_id : str
-
id of the item
var output_index : int
-
index of the output
var response_id : str
-
id of the response
var text : str
-
accumulated text content
var text_stream : AsyncIterable[str]
-
stream of text content
var tool_calls : list[RealtimeToolCall]
-
pending tool calls
class RealtimeModel (*, instructions: str = '', modalities: list[api_proto.Modality] = ['text', 'audio'], model: str | None = 'gpt-4o-realtime-preview-2024-10-01', voice: api_proto.Voice = 'alloy', input_audio_format: api_proto.AudioFormat = 'pcm16', output_audio_format: api_proto.AudioFormat = 'pcm16', input_audio_transcription: InputTranscriptionOptions = InputTranscriptionOptions(model='whisper-1'), turn_detection: ServerVadOptions = ServerVadOptions(threshold=0.5, prefix_padding_ms=300, silence_duration_ms=500), tool_choice: api_proto.ToolChoice = 'auto', temperature: float = 0.8, max_response_output_tokens: "int | Literal['inf']" = 'inf', base_url: str | None = None, http_session: aiohttp.ClientSession | None = None, loop: asyncio.AbstractEventLoop | None = None, azure_deployment: str | None = None, entra_token: str | None = None, api_key: str | None = None, api_version: str | None = None)
-
Initializes a RealtimeClient instance for interacting with OpenAI's Realtime API.
Args
instructions
:str
, optional- Initial system instructions for the model. Defaults to "".
api_key
:str
orNone
, optional- OpenAI API key. If None, will attempt to read from the environment variable OPENAI_API_KEY
modalities
:list[api_proto.Modality]
, optional- Modalities to use, such as ["text", "audio"]. Defaults to ["text", "audio"].
model
:str
orNone
, optional- The name of the model to use. Defaults to "gpt-4o-realtime-preview-2024-10-01".
voice
:api_proto.Voice
, optional- Voice setting for audio outputs. Defaults to "alloy".
input_audio_format
:api_proto.AudioFormat
, optional- Format of input audio data. Defaults to "pcm16".
output_audio_format
:api_proto.AudioFormat
, optional- Format of output audio data. Defaults to "pcm16".
input_audio_transcription
:InputTranscriptionOptions
, optional- Options for transcribing input audio. Defaults to DEFAULT_INPUT_AUDIO_TRANSCRIPTION.
turn_detection
:ServerVadOptions
, optional- Options for server-based voice activity detection (VAD). Defaults to DEFAULT_SERVER_VAD_OPTIONS.
tool_choice
:api_proto.ToolChoice
, optional- Tool choice for the model, such as "auto". Defaults to "auto".
temperature
:float
, optional- Sampling temperature for response generation. Defaults to 0.8.
- max_response_output_tokens (int or Literal["inf"], optional): Maximum number of tokens in the response. Defaults to "inf".
base_url
:str
orNone
, optional- Base URL for the API endpoint. If None, defaults to OpenAI's default API URL.
http_session
:aiohttp.ClientSession
orNone
, optional- Async HTTP session to use for requests. If None, a new session will be created.
loop
:asyncio.AbstractEventLoop
orNone
, optional- Event loop to use for async operations. If None, the current event loop is used.
Raises
ValueError
- If the API key is not provided and cannot be found in environment variables.
Expand source code
class RealtimeModel: @overload def __init__( self, *, instructions: str = "", modalities: list[api_proto.Modality] = ["text", "audio"], model: str = "gpt-4o-realtime-preview-2024-10-01", voice: api_proto.Voice = "alloy", input_audio_format: api_proto.AudioFormat = "pcm16", output_audio_format: api_proto.AudioFormat = "pcm16", input_audio_transcription: InputTranscriptionOptions = DEFAULT_INPUT_AUDIO_TRANSCRIPTION, turn_detection: ServerVadOptions = DEFAULT_SERVER_VAD_OPTIONS, tool_choice: api_proto.ToolChoice = "auto", temperature: float = 0.8, max_response_output_tokens: int | Literal["inf"] = "inf", api_key: str | None = None, base_url: str | None = None, http_session: aiohttp.ClientSession | None = None, loop: asyncio.AbstractEventLoop | None = None, ) -> None: ... @overload def __init__( self, *, azure_deployment: str | None = None, entra_token: str | None = None, api_key: str | None = None, api_version: str | None = None, base_url: str | None = None, instructions: str = "", modalities: list[api_proto.Modality] = ["text", "audio"], voice: api_proto.Voice = "alloy", input_audio_format: api_proto.AudioFormat = "pcm16", output_audio_format: api_proto.AudioFormat = "pcm16", input_audio_transcription: InputTranscriptionOptions = DEFAULT_INPUT_AUDIO_TRANSCRIPTION, turn_detection: ServerVadOptions = DEFAULT_SERVER_VAD_OPTIONS, tool_choice: api_proto.ToolChoice = "auto", temperature: float = 0.8, max_response_output_tokens: int | Literal["inf"] = "inf", http_session: aiohttp.ClientSession | None = None, loop: asyncio.AbstractEventLoop | None = None, ) -> None: ... def __init__( self, *, instructions: str = "", modalities: list[api_proto.Modality] = ["text", "audio"], model: str | None = "gpt-4o-realtime-preview-2024-10-01", voice: api_proto.Voice = "alloy", input_audio_format: api_proto.AudioFormat = "pcm16", output_audio_format: api_proto.AudioFormat = "pcm16", input_audio_transcription: InputTranscriptionOptions = DEFAULT_INPUT_AUDIO_TRANSCRIPTION, turn_detection: ServerVadOptions = DEFAULT_SERVER_VAD_OPTIONS, tool_choice: api_proto.ToolChoice = "auto", temperature: float = 0.8, max_response_output_tokens: int | Literal["inf"] = "inf", base_url: str | None = None, http_session: aiohttp.ClientSession | None = None, loop: asyncio.AbstractEventLoop | None = None, # azure specific parameters azure_deployment: str | None = None, entra_token: str | None = None, api_key: str | None = None, api_version: str | None = None, ) -> None: """ Initializes a RealtimeClient instance for interacting with OpenAI's Realtime API. Args: instructions (str, optional): Initial system instructions for the model. Defaults to "". api_key (str or None, optional): OpenAI API key. If None, will attempt to read from the environment variable OPENAI_API_KEY modalities (list[api_proto.Modality], optional): Modalities to use, such as ["text", "audio"]. Defaults to ["text", "audio"]. model (str or None, optional): The name of the model to use. Defaults to "gpt-4o-realtime-preview-2024-10-01". voice (api_proto.Voice, optional): Voice setting for audio outputs. Defaults to "alloy". input_audio_format (api_proto.AudioFormat, optional): Format of input audio data. Defaults to "pcm16". output_audio_format (api_proto.AudioFormat, optional): Format of output audio data. Defaults to "pcm16". input_audio_transcription (InputTranscriptionOptions, optional): Options for transcribing input audio. Defaults to DEFAULT_INPUT_AUDIO_TRANSCRIPTION. turn_detection (ServerVadOptions, optional): Options for server-based voice activity detection (VAD). Defaults to DEFAULT_SERVER_VAD_OPTIONS. tool_choice (api_proto.ToolChoice, optional): Tool choice for the model, such as "auto". Defaults to "auto". temperature (float, optional): Sampling temperature for response generation. Defaults to 0.8. max_response_output_tokens (int or Literal["inf"], optional): Maximum number of tokens in the response. Defaults to "inf". base_url (str or None, optional): Base URL for the API endpoint. If None, defaults to OpenAI's default API URL. http_session (aiohttp.ClientSession or None, optional): Async HTTP session to use for requests. If None, a new session will be created. loop (asyncio.AbstractEventLoop or None, optional): Event loop to use for async operations. If None, the current event loop is used. Raises: ValueError: If the API key is not provided and cannot be found in environment variables. """ super().__init__() self._base_url = base_url is_azure = ( api_version is not None or entra_token is not None or azure_deployment is not None ) api_key = api_key or os.environ.get("OPENAI_API_KEY") if api_key is None and not is_azure: raise ValueError( "OpenAI API key is required, either using the argument or by setting the OPENAI_API_KEY environmental variable" ) if not base_url: base_url = os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1") self._default_opts = _ModelOptions( model=model, modalities=modalities, instructions=instructions, voice=voice, input_audio_format=input_audio_format, output_audio_format=output_audio_format, input_audio_transcription=input_audio_transcription, turn_detection=turn_detection, temperature=temperature, tool_choice=tool_choice, max_response_output_tokens=max_response_output_tokens, api_key=api_key, base_url=base_url, azure_deployment=azure_deployment, entra_token=entra_token, is_azure=is_azure, api_version=api_version, ) self._loop = loop or asyncio.get_event_loop() self._rt_sessions: list[RealtimeSession] = [] self._http_session = http_session @classmethod def with_azure( cls, *, azure_deployment: str, azure_endpoint: str | None = None, api_version: str | None = None, api_key: str | None = None, entra_token: str | None = None, base_url: str | None = None, instructions: str = "", modalities: list[api_proto.Modality] = ["text", "audio"], voice: api_proto.Voice = "alloy", input_audio_format: api_proto.AudioFormat = "pcm16", output_audio_format: api_proto.AudioFormat = "pcm16", input_audio_transcription: InputTranscriptionOptions = DEFAULT_INPUT_AUDIO_TRANSCRIPTION, turn_detection: ServerVadOptions = DEFAULT_SERVER_VAD_OPTIONS, tool_choice: api_proto.ToolChoice = "auto", temperature: float = 0.8, max_response_output_tokens: int | Literal["inf"] = "inf", http_session: aiohttp.ClientSession | None = None, loop: asyncio.AbstractEventLoop | None = None, ): """ Create a RealtimeClient instance configured for Azure OpenAI Service. Args: azure_deployment (str): The name of your Azure OpenAI deployment. azure_endpoint (str or None, optional): The endpoint URL for your Azure OpenAI resource. If None, will attempt to read from the environment variable AZURE_OPENAI_ENDPOINT. api_version (str or None, optional): API version to use with Azure OpenAI Service. If None, will attempt to read from the environment variable OPENAI_API_VERSION. api_key (str or None, optional): Azure OpenAI API key. If None, will attempt to read from the environment variable AZURE_OPENAI_API_KEY. entra_token (str or None, optional): Azure Entra authentication token. Required if not using API key authentication. base_url (str or None, optional): Base URL for the API endpoint. If None, constructed from the azure_endpoint. instructions (str, optional): Initial system instructions for the model. Defaults to "". modalities (list[api_proto.Modality], optional): Modalities to use, such as ["text", "audio"]. Defaults to ["text", "audio"]. voice (api_proto.Voice, optional): Voice setting for audio outputs. Defaults to "alloy". input_audio_format (api_proto.AudioFormat, optional): Format of input audio data. Defaults to "pcm16". output_audio_format (api_proto.AudioFormat, optional): Format of output audio data. Defaults to "pcm16". input_audio_transcription (InputTranscriptionOptions, optional): Options for transcribing input audio. Defaults to DEFAULT_INPUT_AUDIO_TRANSCRIPTION. turn_detection (ServerVadOptions, optional): Options for server-based voice activity detection (VAD). Defaults to DEFAULT_SERVER_VAD_OPTIONS. tool_choice (api_proto.ToolChoice, optional): Tool choice for the model, such as "auto". Defaults to "auto". temperature (float, optional): Sampling temperature for response generation. Defaults to 0.8. max_response_output_tokens (int or Literal["inf"], optional): Maximum number of tokens in the response. Defaults to "inf". http_session (aiohttp.ClientSession or None, optional): Async HTTP session to use for requests. If None, a new session will be created. loop (asyncio.AbstractEventLoop or None, optional): Event loop to use for async operations. If None, the current event loop is used. Returns: RealtimeClient: An instance of RealtimeClient configured for Azure OpenAI Service. Raises: ValueError: If required Azure parameters are missing or invalid. """ api_key = api_key or os.getenv("AZURE_OPENAI_API_KEY") if api_key is None and entra_token is None: raise ValueError( "Missing credentials. Please pass one of `api_key`, `entra_token`, or the `AZURE_OPENAI_API_KEY` environment variable." ) api_version = api_version or os.getenv("OPENAI_API_VERSION") if api_version is None: raise ValueError( "Must provide either the `api_version` argument or the `OPENAI_API_VERSION` environment variable" ) if base_url is None: azure_endpoint = azure_endpoint or os.getenv("AZURE_OPENAI_ENDPOINT") if azure_endpoint is None: raise ValueError( "Missing Azure endpoint. Please pass the `azure_endpoint` parameter or set the `AZURE_OPENAI_ENDPOINT` environment variable." ) base_url = f"{azure_endpoint.rstrip('/')}/openai" elif azure_endpoint is not None: raise ValueError("base_url and azure_endpoint are mutually exclusive") return cls( instructions=instructions, modalities=modalities, voice=voice, input_audio_format=input_audio_format, output_audio_format=output_audio_format, input_audio_transcription=input_audio_transcription, turn_detection=turn_detection, tool_choice=tool_choice, temperature=temperature, max_response_output_tokens=max_response_output_tokens, api_key=api_key, http_session=http_session, loop=loop, azure_deployment=azure_deployment, api_version=api_version, entra_token=entra_token, base_url=base_url, ) def _ensure_session(self) -> aiohttp.ClientSession: if not self._http_session: self._http_session = utils.http_context.http_session() return self._http_session @property def sessions(self) -> list[RealtimeSession]: return self._rt_sessions def session( self, *, chat_ctx: llm.ChatContext | None = None, fnc_ctx: llm.FunctionContext | None = None, modalities: list[api_proto.Modality] | None = None, instructions: str | None = None, voice: api_proto.Voice | None = None, input_audio_format: api_proto.AudioFormat | None = None, output_audio_format: api_proto.AudioFormat | None = None, tool_choice: api_proto.ToolChoice | None = None, input_audio_transcription: InputTranscriptionOptions | None = None, turn_detection: ServerVadOptions | None = None, temperature: float | None = None, max_response_output_tokens: int | Literal["inf"] | None = None, ) -> RealtimeSession: opts = deepcopy(self._default_opts) if modalities is not None: opts.modalities = modalities if instructions is not None: opts.instructions = instructions if voice is not None: opts.voice = voice if input_audio_format is not None: opts.input_audio_format = input_audio_format if output_audio_format is not None: opts.output_audio_format = output_audio_format if tool_choice is not None: opts.tool_choice = tool_choice if input_audio_transcription is not None: opts.input_audio_transcription if turn_detection is not None: opts.turn_detection = turn_detection if temperature is not None: opts.temperature = temperature if max_response_output_tokens is not None: opts.max_response_output_tokens = max_response_output_tokens new_session = RealtimeSession( chat_ctx=chat_ctx or llm.ChatContext(), fnc_ctx=fnc_ctx, opts=opts, http_session=self._ensure_session(), loop=self._loop, ) self._rt_sessions.append(new_session) return new_session async def aclose(self) -> None: for session in self._rt_sessions: await session.aclose()
Static methods
def with_azure(*, azure_deployment: str, azure_endpoint: str | None = None, api_version: str | None = None, api_key: str | None = None, entra_token: str | None = None, base_url: str | None = None, instructions: str = '', modalities: list[api_proto.Modality] = ['text', 'audio'], voice: api_proto.Voice = 'alloy', input_audio_format: api_proto.AudioFormat = 'pcm16', output_audio_format: api_proto.AudioFormat = 'pcm16', input_audio_transcription: InputTranscriptionOptions = InputTranscriptionOptions(model='whisper-1'), turn_detection: ServerVadOptions = ServerVadOptions(threshold=0.5, prefix_padding_ms=300, silence_duration_ms=500), tool_choice: api_proto.ToolChoice = 'auto', temperature: float = 0.8, max_response_output_tokens: "int | Literal['inf']" = 'inf', http_session: aiohttp.ClientSession | None = None, loop: asyncio.AbstractEventLoop | None = None)
-
Create a RealtimeClient instance configured for Azure OpenAI Service.
Args
azure_deployment
:str
- The name of your Azure OpenAI deployment.
azure_endpoint
:str
orNone
, optional- The endpoint URL for your Azure OpenAI resource. If None, will attempt to read from the environment variable AZURE_OPENAI_ENDPOINT.
api_version
:str
orNone
, optional- API version to use with Azure OpenAI Service. If None, will attempt to read from the environment variable OPENAI_API_VERSION.
api_key
:str
orNone
, optional- Azure OpenAI API key. If None, will attempt to read from the environment variable AZURE_OPENAI_API_KEY.
entra_token
:str
orNone
, optional- Azure Entra authentication token. Required if not using API key authentication.
base_url
:str
orNone
, optional- Base URL for the API endpoint. If None, constructed from the azure_endpoint.
instructions
:str
, optional- Initial system instructions for the model. Defaults to "".
modalities
:list[api_proto.Modality]
, optional- Modalities to use, such as ["text", "audio"]. Defaults to ["text", "audio"].
voice
:api_proto.Voice
, optional- Voice setting for audio outputs. Defaults to "alloy".
input_audio_format
:api_proto.AudioFormat
, optional- Format of input audio data. Defaults to "pcm16".
output_audio_format
:api_proto.AudioFormat
, optional- Format of output audio data. Defaults to "pcm16".
input_audio_transcription
:InputTranscriptionOptions
, optional- Options for transcribing input audio. Defaults to DEFAULT_INPUT_AUDIO_TRANSCRIPTION.
turn_detection
:ServerVadOptions
, optional- Options for server-based voice activity detection (VAD). Defaults to DEFAULT_SERVER_VAD_OPTIONS.
tool_choice
:api_proto.ToolChoice
, optional- Tool choice for the model, such as "auto". Defaults to "auto".
temperature
:float
, optional- Sampling temperature for response generation. Defaults to 0.8.
- max_response_output_tokens (int or Literal["inf"], optional): Maximum number of tokens in the response. Defaults to "inf".
http_session
:aiohttp.ClientSession
orNone
, optional- Async HTTP session to use for requests. If None, a new session will be created.
loop
:asyncio.AbstractEventLoop
orNone
, optional- Event loop to use for async operations. If None, the current event loop is used.
Returns
RealtimeClient
- An instance of RealtimeClient configured for Azure OpenAI Service.
Raises
ValueError
- If required Azure parameters are missing or invalid.
Instance variables
prop sessions : list[RealtimeSession]
-
Expand source code
@property def sessions(self) -> list[RealtimeSession]: return self._rt_sessions
Methods
async def aclose(self) ‑> None
def session(self, *, chat_ctx: llm.ChatContext | None = None, fnc_ctx: llm.FunctionContext | None = None, modalities: list[api_proto.Modality] | None = None, instructions: str | None = None, voice: api_proto.Voice | None = None, input_audio_format: api_proto.AudioFormat | None = None, output_audio_format: api_proto.AudioFormat | None = None, tool_choice: api_proto.ToolChoice | None = None, input_audio_transcription: InputTranscriptionOptions | None = None, turn_detection: ServerVadOptions | None = None, temperature: float | None = None, max_response_output_tokens: "int | Literal['inf'] | None" = None) ‑> RealtimeSession
class RealtimeOutput (response_id: str, item_id: str, output_index: int, role: api_proto.Role, type: "Literal['message', 'function_call']", content: list[RealtimeContent], done_fut: asyncio.Future[None])
-
RealtimeOutput(response_id: 'str', item_id: 'str', output_index: 'int', role: 'api_proto.Role', type: "Literal['message', 'function_call']", content: 'list[RealtimeContent]', done_fut: 'asyncio.Future[None]')
Expand source code
@dataclass class RealtimeOutput: response_id: str """id of the response""" item_id: str """id of the item""" output_index: int """index of the output""" role: api_proto.Role """role of the message""" type: Literal["message", "function_call"] """type of the output""" content: list[RealtimeContent] """list of content""" done_fut: asyncio.Future[None] """future that will be set when the output is completed"""
Class variables
var content : list[RealtimeContent]
-
list of content
var done_fut : _asyncio.Future[None]
-
future that will be set when the output is completed
var item_id : str
-
id of the item
var output_index : int
-
index of the output
var response_id : str
-
id of the response
var role : Literal['system', 'assistant', 'user', 'tool']
-
role of the message
var type : Literal['message', 'function_call']
-
type of the output
class RealtimeResponse (id: str, status: api_proto.ResponseStatus, status_details: api_proto.ResponseStatusDetails | None, output: list[RealtimeOutput], done_fut: asyncio.Future[None])
-
RealtimeResponse(id: 'str', status: 'api_proto.ResponseStatus', status_details: 'api_proto.ResponseStatusDetails | None', output: 'list[RealtimeOutput]', done_fut: 'asyncio.Future[None]')
Expand source code
@dataclass class RealtimeResponse: id: str """id of the message""" status: api_proto.ResponseStatus """status of the response""" status_details: api_proto.ResponseStatusDetails | None """details of the status (only with "incomplete, cancelled and failed")""" output: list[RealtimeOutput] """list of outputs""" done_fut: asyncio.Future[None] """future that will be set when the response is completed"""
Class variables
var done_fut : _asyncio.Future[None]
-
future that will be set when the response is completed
var id : str
-
id of the message
var output : list[RealtimeOutput]
-
list of outputs
var status : Literal['in_progress', 'completed', 'incomplete', 'cancelled', 'failed']
-
status of the response
var status_details : Union[CancelledStatusDetails, IncompleteStatusDetails, FailedStatusDetails, ForwardRef(None)]
-
details of the status (only with "incomplete, cancelled and failed")
class RealtimeSession (*, opts: _ModelOptions, http_session: aiohttp.ClientSession, chat_ctx: llm.ChatContext, fnc_ctx: llm.FunctionContext | None, loop: asyncio.AbstractEventLoop)
-
Abstract base class for generic types.
On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::
class Mapping[KT, VT]: def __getitem__(self, key: KT) -> VT: ... # Etc.
On older versions of Python, however, generic classes have to explicitly inherit from Generic.
After a class has been declared to be generic, it can then be used as follows::
def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
Initialize a new instance of EventEmitter.
Expand source code
class RealtimeSession(utils.EventEmitter[EventTypes]): class InputAudioBuffer: def __init__(self, sess: RealtimeSession) -> None: self._sess = sess def append(self, frame: rtc.AudioFrame) -> None: self._sess._queue_msg( { "type": "input_audio_buffer.append", "audio": base64.b64encode(frame.data).decode("utf-8"), } ) def clear(self) -> None: self._sess._queue_msg({"type": "input_audio_buffer.clear"}) def commit(self) -> None: self._sess._queue_msg({"type": "input_audio_buffer.commit"}) class ConversationItem: def __init__(self, sess: RealtimeSession) -> None: self._sess = sess def create( self, message: llm.ChatMessage, previous_item_id: str | None = None ) -> None: message_content = message.content if message_content is None: return tool_call_id = message.tool_call_id event: api_proto.ClientEvent.ConversationItemCreate | None = None if tool_call_id: assert isinstance(message_content, str) event = { "type": "conversation.item.create", "previous_item_id": previous_item_id, "item": { "type": "function_call_output", "call_id": tool_call_id, "output": message_content, }, } else: if not isinstance(message_content, list): message_content = [message_content] if message.role == "user": user_contents: list[ api_proto.InputTextContent | api_proto.InputAudioContent ] = [] for cnt in message_content: if isinstance(cnt, str): user_contents.append( { "type": "input_text", "text": cnt, } ) elif isinstance(cnt, llm.ChatAudio): user_contents.append( { "type": "input_audio", "audio": base64.b64encode( utils.merge_frames(cnt.frame).data ).decode("utf-8"), } ) event = { "type": "conversation.item.create", "previous_item_id": previous_item_id, "item": { "type": "message", "role": "user", "content": user_contents, }, } elif message.role == "assistant": assistant_contents: list[api_proto.TextContent] = [] for cnt in message_content: if isinstance(cnt, str): assistant_contents.append( { "type": "text", "text": cnt, } ) elif isinstance(cnt, llm.ChatAudio): logger.warning( "audio content in assistant message is not supported" ) event = { "type": "conversation.item.create", "previous_item_id": previous_item_id, "item": { "type": "message", "role": "assistant", "content": assistant_contents, }, } elif message.role == "system": system_contents: list[api_proto.InputTextContent] = [] for cnt in message_content: if isinstance(cnt, str): system_contents.append( { "type": "input_text", "text": cnt, } ) elif isinstance(cnt, llm.ChatAudio): logger.warning( "audio content in system message is not supported" ) if event is None: logger.warning( "chat message is not supported inside the realtime API %s", message, extra=self._sess.logging_extra(), ) return self._sess._queue_msg(event) def truncate( self, *, item_id: str, content_index: int, audio_end_ms: int ) -> None: self._sess._queue_msg( { "type": "conversation.item.truncate", "item_id": item_id, "content_index": content_index, "audio_end_ms": audio_end_ms, } ) def delete(self, *, item_id: str) -> None: self._sess._queue_msg( { "type": "conversation.item.delete", "item_id": item_id, } ) class Conversation: def __init__(self, sess: RealtimeSession) -> None: self._sess = sess @property def item(self) -> RealtimeSession.ConversationItem: return RealtimeSession.ConversationItem(self._sess) class Response: def __init__(self, sess: RealtimeSession) -> None: self._sess = sess def create(self) -> None: self._sess._queue_msg({"type": "response.create"}) def cancel(self) -> None: self._sess._queue_msg({"type": "response.cancel"}) def __init__( self, *, opts: _ModelOptions, http_session: aiohttp.ClientSession, chat_ctx: llm.ChatContext, fnc_ctx: llm.FunctionContext | None, loop: asyncio.AbstractEventLoop, ) -> None: super().__init__() self._main_atask = asyncio.create_task( self._main_task(), name="openai-realtime-session" ) self._chat_ctx = chat_ctx self._fnc_ctx = fnc_ctx self._loop = loop self._opts = opts self._send_ch = utils.aio.Chan[api_proto.ClientEvents]() self._http_session = http_session self._pending_responses: dict[str, RealtimeResponse] = {} self._session_id = "not-connected" self.session_update() # initial session init self._fnc_tasks = utils.aio.TaskSet() async def aclose(self) -> None: if self._send_ch.closed: return self._send_ch.close() await self._main_atask @property def chat_ctx(self) -> llm.ChatContext: return self._chat_ctx @property def fnc_ctx(self) -> llm.FunctionContext | None: return self._fnc_ctx @fnc_ctx.setter def fnc_ctx(self, fnc_ctx: llm.FunctionContext | None) -> None: self._fnc_ctx = fnc_ctx @property def conversation(self) -> Conversation: return RealtimeSession.Conversation(self) @property def input_audio_buffer(self) -> InputAudioBuffer: return RealtimeSession.InputAudioBuffer(self) @property def response(self) -> Response: return RealtimeSession.Response(self) def session_update( self, *, modalities: list[api_proto.Modality] | None = None, instructions: str | None = None, voice: api_proto.Voice | None = None, input_audio_format: api_proto.AudioFormat | None = None, output_audio_format: api_proto.AudioFormat | None = None, input_audio_transcription: InputTranscriptionOptions | None = None, turn_detection: ServerVadOptions | None = None, tool_choice: api_proto.ToolChoice | None = None, temperature: float | None = None, max_response_output_tokens: int | Literal["inf"] | None = None, ) -> None: self._opts = deepcopy(self._opts) if modalities is not None: self._opts.modalities = modalities if instructions is not None: self._opts.instructions = instructions if voice is not None: self._opts.voice = voice if input_audio_format is not None: self._opts.input_audio_format = input_audio_format if output_audio_format is not None: self._opts.output_audio_format = output_audio_format if input_audio_transcription is not None: self._opts.input_audio_transcription = input_audio_transcription if turn_detection is not None: self._opts.turn_detection = turn_detection if tool_choice is not None: self._opts.tool_choice = tool_choice if temperature is not None: self._opts.temperature = temperature if max_response_output_tokens is not None: self._opts.max_response_output_tokens = max_response_output_tokens tools = [] if self._fnc_ctx is not None: for fnc in self._fnc_ctx.ai_functions.values(): # the realtime API is using internally-tagged polymorphism. # build_oai_function_description was built for the ChatCompletion API function_data = llm._oai_api.build_oai_function_description(fnc)[ "function" ] function_data["type"] = "function" tools.append(function_data) server_vad_opts: api_proto.ServerVad = { "type": "server_vad", "threshold": self._opts.turn_detection.threshold, "prefix_padding_ms": self._opts.turn_detection.prefix_padding_ms, "silence_duration_ms": self._opts.turn_detection.silence_duration_ms, } session_data: api_proto.ClientEvent.SessionUpdateData = { "modalities": self._opts.modalities, "instructions": self._opts.instructions, "voice": self._opts.voice, "input_audio_format": self._opts.input_audio_format, "output_audio_format": self._opts.output_audio_format, "input_audio_transcription": { "model": self._opts.input_audio_transcription.model, }, "turn_detection": server_vad_opts, "tools": tools, "tool_choice": self._opts.tool_choice, "temperature": self._opts.temperature, "max_response_output_tokens": None, } # azure doesn't support inf for max_response_output_tokens if not self._opts.is_azure or isinstance( self._opts.max_response_output_tokens, int ): session_data["max_response_output_tokens"] = ( self._opts.max_response_output_tokens ) self._queue_msg( { "type": "session.update", "session": session_data, } ) def _queue_msg(self, msg: api_proto.ClientEvents) -> None: self._send_ch.send_nowait(msg) @utils.log_exceptions(logger=logger) async def _main_task(self) -> None: try: headers = {"User-Agent": "LiveKit Agents"} query_params: dict[str, str] = {} base_url = self._opts.base_url if self._opts.is_azure: if self._opts.entra_token: headers["Authorization"] = f"Bearer {self._opts.entra_token}" if self._opts.api_key: headers["api-key"] = self._opts.api_key if self._opts.api_version: query_params["api-version"] = self._opts.api_version if self._opts.azure_deployment: query_params["deployment"] = self._opts.azure_deployment else: # OAI endpoint headers["Authorization"] = f"Bearer {self._opts.api_key}" headers["OpenAI-Beta"] = "realtime=v1" if self._opts.model: query_params["model"] = self._opts.model url = f"{base_url.rstrip('/')}/realtime?{urlencode(query_params)}" if url.startswith("http"): url = url.replace("http", "ws", 1) ws_conn = await self._http_session.ws_connect( url, headers=headers, ) except Exception: logger.exception("failed to connect to OpenAI API S2S") return closing = False @utils.log_exceptions(logger=logger) async def _send_task(): nonlocal closing async for msg in self._send_ch: await ws_conn.send_json(msg) closing = True await ws_conn.close() @utils.log_exceptions(logger=logger) async def _recv_task(): while True: msg = await ws_conn.receive() if msg.type in ( aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE, aiohttp.WSMsgType.CLOSING, ): if closing: return raise Exception("OpenAI S2S connection closed unexpectedly") if msg.type != aiohttp.WSMsgType.TEXT: logger.warning( "unexpected OpenAI S2S message type %s", msg.type, extra=self.logging_extra(), ) continue try: data = msg.json() event: api_proto.ServerEventType = data["type"] if event == "session.created": self._handle_session_created(data) elif event == "error": self._handle_error(data) elif event == "input_audio_buffer.speech_started": self._handle_input_audio_buffer_speech_started(data) elif event == "input_audio_buffer.speech_stopped": self._handle_input_audio_buffer_speech_stopped(data) elif event == "input_audio_buffer.committed": self._handle_input_audio_buffer_speech_committed(data) elif ( event == "conversation.item.input_audio_transcription.completed" ): self._handle_conversation_item_input_audio_transcription_completed( data ) elif event == "conversation.item.input_audio_transcription.failed": self._handle_conversation_item_input_audio_transcription_failed( data ) elif event == "response.created": self._handle_response_created(data) elif event == "response.output_item.added": self._handle_response_output_item_added(data) elif event == "response.content_part.added": self._handle_response_content_part_added(data) elif event == "response.audio.delta": self._handle_response_audio_delta(data) elif event == "response.audio_transcript.delta": self._handle_response_audio_transcript_delta(data) elif event == "response.audio.done": self._handle_response_audio_done(data) elif event == "response.audio_transcript.done": self._handle_response_audio_transcript_done(data) elif event == "response.content_part.done": self._handle_response_content_part_done(data) elif event == "response.output_item.done": self._handle_response_output_item_done(data) elif event == "response.done": self._handle_response_done(data) except Exception: logger.exception( "failed to handle OpenAI S2S message", extra={"websocket_message": msg, **self.logging_extra()}, ) tasks = [ asyncio.create_task(_send_task(), name="openai-realtime-send"), asyncio.create_task(_recv_task(), name="openai-realtime-recv"), ] try: await asyncio.gather(*tasks) finally: await utils.aio.gracefully_cancel(*tasks) def _handle_session_created( self, session_created: api_proto.ServerEvent.SessionCreated ): self._session_id = session_created["session"]["id"] def _handle_error(self, error: api_proto.ServerEvent.Error): logger.error( "OpenAI S2S error %s", error, extra=self.logging_extra(), ) def _handle_input_audio_buffer_speech_started( self, speech_started: api_proto.ServerEvent.InputAudioBufferSpeechStarted ): self.emit("input_speech_started") def _handle_input_audio_buffer_speech_stopped( self, speech_stopped: api_proto.ServerEvent.InputAudioBufferSpeechStopped ): self.emit("input_speech_stopped") def _handle_input_audio_buffer_speech_committed( self, speech_committed: api_proto.ServerEvent.InputAudioBufferCommitted ): self.emit("input_speech_committed") def _handle_conversation_item_input_audio_transcription_completed( self, transcription_completed: api_proto.ServerEvent.ConversationItemInputAudioTranscriptionCompleted, ): transcript = transcription_completed["transcript"] self.emit( "input_speech_transcription_completed", InputTranscriptionCompleted( item_id=transcription_completed["item_id"], transcript=transcript, ), ) def _handle_conversation_item_input_audio_transcription_failed( self, transcription_failed: api_proto.ServerEvent.ConversationItemInputAudioTranscriptionFailed, ): error = transcription_failed["error"] logger.error( "OAI S2S failed to transcribe input audio: %s", error["message"], extra=self.logging_extra(), ) self.emit( "input_speech_transcription_failed", InputTranscriptionFailed( item_id=transcription_failed["item_id"], message=error["message"], ), ) def _handle_response_created( self, response_created: api_proto.ServerEvent.ResponseCreated ): response = response_created["response"] done_fut = self._loop.create_future() status_details = response.get("status_details") new_response = RealtimeResponse( id=response["id"], status=response["status"], status_details=status_details, output=[], done_fut=done_fut, ) self._pending_responses[new_response.id] = new_response self.emit("response_created", new_response) def _handle_response_output_item_added( self, response_output_added: api_proto.ServerEvent.ResponseOutputItemAdded ): response_id = response_output_added["response_id"] response = self._pending_responses[response_id] done_fut = self._loop.create_future() item_data = response_output_added["item"] item_type: Literal["message", "function_call"] = item_data["type"] # type: ignore assert item_type in ("message", "function_call") # function_call doesn't have a role field, defaulting it to assistant item_role: api_proto.Role = item_data.get("role") or "assistant" # type: ignore new_output = RealtimeOutput( response_id=response_id, item_id=item_data["id"], output_index=response_output_added["output_index"], type=item_type, role=item_role, content=[], done_fut=done_fut, ) response.output.append(new_output) self.emit("response_output_added", new_output) def _handle_response_content_part_added( self, response_content_added: api_proto.ServerEvent.ResponseContentPartAdded ): response_id = response_content_added["response_id"] response = self._pending_responses[response_id] output_index = response_content_added["output_index"] output = response.output[output_index] text_ch = utils.aio.Chan[str]() audio_ch = utils.aio.Chan[rtc.AudioFrame]() new_content = RealtimeContent( response_id=response_id, item_id=response_content_added["item_id"], output_index=output_index, content_index=response_content_added["content_index"], text="", audio=[], text_stream=text_ch, audio_stream=audio_ch, tool_calls=[], ) output.content.append(new_content) self.emit("response_content_added", new_content) def _handle_response_audio_delta( self, response_audio_delta: api_proto.ServerEvent.ResponseAudioDelta ): content = self._get_content(response_audio_delta) data = base64.b64decode(response_audio_delta["delta"]) audio = rtc.AudioFrame( data=data, sample_rate=api_proto.SAMPLE_RATE, num_channels=api_proto.NUM_CHANNELS, samples_per_channel=len(data) // 2, ) content.audio.append(audio) assert isinstance(content.audio_stream, utils.aio.Chan) content.audio_stream.send_nowait(audio) def _handle_response_audio_transcript_delta( self, response_audio_transcript_delta: api_proto.ServerEvent.ResponseAudioTranscriptDelta, ): content = self._get_content(response_audio_transcript_delta) transcript = response_audio_transcript_delta["delta"] content.text += transcript assert isinstance(content.text_stream, utils.aio.Chan) content.text_stream.send_nowait(transcript) def _handle_response_audio_done( self, response_audio_done: api_proto.ServerEvent.ResponseAudioDone ): content = self._get_content(response_audio_done) assert isinstance(content.audio_stream, utils.aio.Chan) content.audio_stream.close() def _handle_response_audio_transcript_done( self, response_audio_transcript_done: api_proto.ServerEvent.ResponseAudioTranscriptDone, ): content = self._get_content(response_audio_transcript_done) assert isinstance(content.text_stream, utils.aio.Chan) content.text_stream.close() def _handle_response_content_part_done( self, response_content_done: api_proto.ServerEvent.ResponseContentPartDone ): content = self._get_content(response_content_done) self.emit("response_content_done", content) def _handle_response_output_item_done( self, response_output_done: api_proto.ServerEvent.ResponseOutputItemDone ): response_id = response_output_done["response_id"] response = self._pending_responses[response_id] output_index = response_output_done["output_index"] output = response.output[output_index] if output.type == "function_call": if self._fnc_ctx is None: logger.error( "function call received but no fnc_ctx is available", extra=self.logging_extra(), ) return # parse the arguments and call the function inside the fnc_ctx item = response_output_done["item"] assert item["type"] == "function_call" fnc_call_info = _oai_api.create_ai_function_info( self._fnc_ctx, item["call_id"], item["name"], item["arguments"], ) self._fnc_tasks.create_task( self._run_fnc_task(fnc_call_info, output.item_id) ) output.done_fut.set_result(None) self.emit("response_output_done", output) def _handle_response_done(self, response_done: api_proto.ServerEvent.ResponseDone): response_data = response_done["response"] response_id = response_data["id"] response = self._pending_responses[response_id] response.done_fut.set_result(None) response.status = response_data["status"] response.status_details = response_data.get("status_details") if response.status == "failed": assert response.status_details is not None error = response.status_details.get("error") code: str | None = None message: str | None = None if error is not None: code = error.get("code") # type: ignore message = error.get("message") # type: ignore logger.error( "response generation failed", extra={"code": code, "error": message, **self.logging_extra()}, ) elif response.status == "incomplete": assert response.status_details is not None reason = response.status_details.get("reason") logger.warning( "response generation incomplete", extra={"reason": reason, **self.logging_extra()}, ) self.emit("response_done", response) def _get_content(self, ptr: _ContentPtr) -> RealtimeContent: response = self._pending_responses[ptr["response_id"]] output = response.output[ptr["output_index"]] content = output.content[ptr["content_index"]] return content @utils.log_exceptions(logger=logger) async def _run_fnc_task(self, fnc_call_info: llm.FunctionCallInfo, item_id: str): logger.debug( "executing ai function", extra={ "function": fnc_call_info.function_info.name, }, ) called_fnc = fnc_call_info.execute() await called_fnc.task tool_call = llm.ChatMessage.create_tool_from_called_function(called_fnc) if called_fnc.result is not None: self.conversation.item.create(tool_call, item_id) self.response.create() def logging_extra(self) -> dict: return {"session_id": self._session_id}
Ancestors
- EventEmitter
- typing.Generic
Class variables
var Conversation
var ConversationItem
var InputAudioBuffer
var Response
Instance variables
prop chat_ctx : llm.ChatContext
-
Expand source code
@property def chat_ctx(self) -> llm.ChatContext: return self._chat_ctx
prop conversation : Conversation
-
Expand source code
@property def conversation(self) -> Conversation: return RealtimeSession.Conversation(self)
prop fnc_ctx : llm.FunctionContext | None
-
Expand source code
@property def fnc_ctx(self) -> llm.FunctionContext | None: return self._fnc_ctx
prop input_audio_buffer : InputAudioBuffer
-
Expand source code
@property def input_audio_buffer(self) -> InputAudioBuffer: return RealtimeSession.InputAudioBuffer(self)
prop response : Response
-
Expand source code
@property def response(self) -> Response: return RealtimeSession.Response(self)
Methods
async def aclose(self) ‑> None
def logging_extra(self) ‑> dict
def session_update(self, *, modalities: list[api_proto.Modality] | None = None, instructions: str | None = None, voice: api_proto.Voice | None = None, input_audio_format: api_proto.AudioFormat | None = None, output_audio_format: api_proto.AudioFormat | None = None, input_audio_transcription: InputTranscriptionOptions | None = None, turn_detection: ServerVadOptions | None = None, tool_choice: api_proto.ToolChoice | None = None, temperature: float | None = None, max_response_output_tokens: "int | Literal['inf'] | None" = None) ‑> None
Inherited members
class RealtimeToolCall (name: str, arguments: str, tool_call_id: str)
-
RealtimeToolCall(name: 'str', arguments: 'str', tool_call_id: 'str')
Expand source code
@dataclass class RealtimeToolCall: name: str """name of the function""" arguments: str """accumulated arguments""" tool_call_id: str """id of the tool call"""
Class variables
var arguments : str
-
accumulated arguments
var name : str
-
name of the function
var tool_call_id : str
-
id of the tool call
class ServerVadOptions (threshold: float, prefix_padding_ms: int, silence_duration_ms: int)
-
ServerVadOptions(threshold: 'float', prefix_padding_ms: 'int', silence_duration_ms: 'int')
Expand source code
@dataclass class ServerVadOptions: threshold: float prefix_padding_ms: int silence_duration_ms: int
Class variables
var prefix_padding_ms : int
var silence_duration_ms : int
var threshold : float