Module livekit.plugins.phonic.realtime
Sub-modules
livekit.plugins.phonic.realtime.realtime_model
Classes
class RealtimeModel (*,
api_key: NotGivenOr[str] = NOT_GIVEN,
phonic_agent: NotGivenOr[str] = NOT_GIVEN,
voice: NotGivenOr[str] = NOT_GIVEN,
welcome_message: NotGivenOr[str | None] = NOT_GIVEN,
generate_welcome_message: NotGivenOr[bool] = NOT_GIVEN,
project: NotGivenOr[str | None] = NOT_GIVEN,
languages: NotGivenOr[list[str]] = NOT_GIVEN,
audio_speed: NotGivenOr[float] = NOT_GIVEN,
phonic_tools: NotGivenOr[list[str]] = NOT_GIVEN,
boosted_keywords: NotGivenOr[list[str]] = NOT_GIVEN,
generate_no_input_poke_text: NotGivenOr[bool] = NOT_GIVEN,
no_input_poke_sec: NotGivenOr[float] = NOT_GIVEN,
no_input_poke_text: NotGivenOr[str] = NOT_GIVEN,
no_input_end_conversation_sec: NotGivenOr[float] = NOT_GIVEN,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0))-
Expand source code
class RealtimeModel(llm.RealtimeModel): def __init__( self, *, api_key: NotGivenOr[str] = NOT_GIVEN, phonic_agent: NotGivenOr[str] = NOT_GIVEN, voice: NotGivenOr[str] = NOT_GIVEN, welcome_message: NotGivenOr[str | None] = NOT_GIVEN, generate_welcome_message: NotGivenOr[bool] = NOT_GIVEN, project: NotGivenOr[str | None] = NOT_GIVEN, languages: NotGivenOr[list[str]] = NOT_GIVEN, audio_speed: NotGivenOr[float] = NOT_GIVEN, phonic_tools: NotGivenOr[list[str]] = NOT_GIVEN, boosted_keywords: NotGivenOr[list[str]] = NOT_GIVEN, generate_no_input_poke_text: NotGivenOr[bool] = NOT_GIVEN, no_input_poke_sec: NotGivenOr[float] = NOT_GIVEN, no_input_poke_text: NotGivenOr[str] = NOT_GIVEN, no_input_end_conversation_sec: NotGivenOr[float] = NOT_GIVEN, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> None: """ Initialize a RealtimeModel for Phonic's Realtime API. Args: api_key: Phonic API key. If not provided, reads from PHONIC_API_KEY environment variable. phonic_agent: Phonic agent to use for the conversation. Options explicitly set here will override the agent's default settings. voice: Voice ID for agent audio output. welcome_message: Message for the agent to say when the conversation starts. Ignored when ``generate_welcome_message`` is True. generate_welcome_message: When True, the welcome message is automatically generated and ``welcome_message`` is ignored. project: Project name to use for the conversation. languages: ISO 639-1 language codes the agent should recognize and speak. audio_speed: Audio playback speed multiplier. phonic_tools: Phonic tool names available to the assistant. boosted_keywords: Keywords to boost in speech recognition. generate_no_input_poke_text: When True, auto-generate poke text when the user is silent. no_input_poke_sec: Seconds of silence before sending a poke message. no_input_poke_text: Custom poke message text. Ignored when ``generate_no_input_poke_text`` is True. no_input_end_conversation_sec: Seconds of silence before ending the conversation. conn_options: Retry/backoff and connection settings. """ super().__init__( capabilities=llm.RealtimeCapabilities( message_truncation=False, turn_detection=True, user_transcription=True, auto_tool_reply_generation=True, audio_output=True, manual_function_calls=False, ) ) api_key = api_key or os.environ.get("PHONIC_API_KEY", NOT_GIVEN) if not is_given(api_key): raise ValueError( "Phonic API key is required. Provide `api_key` or " "set PHONIC_API_KEY environment variable." ) self._opts = _RealtimeOptions( api_key=api_key, phonic_agent=phonic_agent, voice=voice, welcome_message=welcome_message, generate_welcome_message=generate_welcome_message, project=project, languages=languages, audio_speed=audio_speed, phonic_tools=phonic_tools, boosted_keywords=boosted_keywords, generate_no_input_poke_text=generate_no_input_poke_text, no_input_poke_sec=no_input_poke_sec, no_input_poke_text=no_input_poke_text, no_input_end_conversation_sec=no_input_end_conversation_sec, conn_options=conn_options, ) self._sessions = weakref.WeakSet[RealtimeSession]() @property def model(self) -> str: return "phonic" @property def provider(self) -> str: return "phonic" def session(self) -> RealtimeSession: sess = RealtimeSession(self) self._sessions.add(sess) return sess def update_options( self, ) -> None: logger.warning("update_options is not supported by the Phonic realtime model.") async def aclose(self) -> None: passInitialize a RealtimeModel for Phonic's Realtime API.
Args
api_key- Phonic API key. If not provided, reads from PHONIC_API_KEY environment variable.
phonic_agent- Phonic agent to use for the conversation. Options explicitly set here will override the agent's default settings.
voice- Voice ID for agent audio output.
welcome_message- Message for the agent to say when the conversation starts.
Ignored when
generate_welcome_messageis True. generate_welcome_message- When True, the welcome message is automatically generated
and
welcome_messageis ignored. project- Project name to use for the conversation.
languages- ISO 639-1 language codes the agent should recognize and speak.
audio_speed- Audio playback speed multiplier.
phonic_tools- Phonic tool names available to the assistant.
boosted_keywords- Keywords to boost in speech recognition.
generate_no_input_poke_text- When True, auto-generate poke text when the user is silent.
no_input_poke_sec- Seconds of silence before sending a poke message.
no_input_poke_text- Custom poke message text. Ignored when
generate_no_input_poke_textis True. no_input_end_conversation_sec- Seconds of silence before ending the conversation.
conn_options- Retry/backoff and connection settings.
Ancestors
- livekit.agents.llm.realtime.RealtimeModel
Instance variables
prop model : str-
Expand source code
@property def model(self) -> str: return "phonic" prop provider : str-
Expand source code
@property def provider(self) -> str: return "phonic"
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: pass def session(self) ‑> RealtimeSession-
Expand source code
def session(self) -> RealtimeSession: sess = RealtimeSession(self) self._sessions.add(sess) return sess def update_options(self) ‑> None-
Expand source code
def update_options( self, ) -> None: logger.warning("update_options is not supported by the Phonic realtime model.")
class RealtimeSession (realtime_model: RealtimeModel)-
Expand source code
class RealtimeSession(llm.RealtimeSession): def __init__(self, realtime_model: RealtimeModel) -> None: super().__init__(realtime_model) self._opts = realtime_model._opts self._tools = llm.ToolContext.empty() self._chat_ctx = llm.ChatContext.empty() self._bstream = audio_utils.AudioByteStream( sample_rate=PHONIC_INPUT_SAMPLE_RATE, num_channels=PHONIC_NUM_CHANNELS, samples_per_channel=PHONIC_INPUT_SAMPLE_RATE * PHONIC_INPUT_FRAME_MS // 1000, ) self._input_resampler: rtc.AudioResampler | None = None self._input_resampler_rate: int | None = None self._client = AsyncPhonic( api_key=self._opts.api_key, ) self._socket: AsyncConversationsSocketClient | None = None self._socket_ctx: typing.AsyncContextManager[AsyncConversationsSocketClient] | None = None self._send_ch = utils.aio.Chan[AudioChunkPayload]() self._main_atask = asyncio.create_task(self._main_task(), name="phonic-realtime-session") self._current_generation: _ResponseGeneration | None = None self._conversation_id: str | None = None self._session_should_close = asyncio.Event() self._session_lock = asyncio.Lock() self._generate_reply_task: asyncio.Task[None] | None = None self._instructions_ready = asyncio.Event() self._tools_ready = asyncio.Event() self._ready_to_start = asyncio.Event() self._config_sent = False self._pending_tool_call_ids: set[str] = set() self._tool_definitions: list[dict] = [] self._system_prompt_postfix: str = "" async def _close_active_session(self) -> None: async with self._session_lock: if self._socket_ctx: try: await self._socket_ctx.__aexit__(None, None, None) except Exception as e: logger.warning(f"Error closing Phonic socket: {e}") finally: self._socket = None self._socket_ctx = None @property def chat_ctx(self) -> llm.ChatContext: return self._chat_ctx.copy() @property def tools(self) -> llm.ToolContext: return self._tools.copy() async def update_instructions(self, instructions: str) -> None: if self._config_sent: logger.warning( "update_instructions called after config was already sent. " "Phonic does not support updating instructions mid-session." ) return self._opts.instructions = instructions self._instructions_ready.set() async def update_chat_ctx(self, chat_ctx: llm.ChatContext) -> None: if not self._config_sent: messages = [ item for item in chat_ctx.items if isinstance(item, llm.ChatMessage) and item.text_content and item.text_content.strip() ] if messages: turn_history = "\n".join(f"{m.role}: {m.text_content}" for m in messages) if turn_history.strip(): logger.debug( "update_chat_ctx called with messages prior to config being sent to " "Phonic. Including conversation state in system instructions." ) self._system_prompt_postfix = ( "\n\nThis conversation is being continued from an existing " "conversation. You are the assistant speaking to the user. " "The following is the conversation history:\n" + turn_history ) self._chat_ctx = chat_ctx.copy() return diff_ops = llm.utils.compute_chat_ctx_diff(self._chat_ctx, chat_ctx) sent_tool_call_output = False sent_system_message = False for _, item_id in diff_ops.to_create: item = chat_ctx.get_by_id(item_id) if item is None: continue if ( isinstance(item, llm.FunctionCallOutput) and item.call_id in self._pending_tool_call_ids ): self._pending_tool_call_ids.remove(item.call_id) logger.info(f"Sending tool call output for {item.name} (call_id: {item.call_id})") if self._socket: await self._socket.send_tool_call_output( ToolCallOutputPayload( tool_call_id=item.call_id, output=str(item.output), ) ) sent_tool_call_output = True if isinstance(item, llm.ChatMessage) and item.role in ("system", "developer"): text = item.text_content if text: logger.debug(f"Sending add system message: {text}") if self._socket: await self._socket.send_add_system_message( AddSystemMessagePayload(system_message=text) ) sent_system_message = True self._chat_ctx = chat_ctx.copy() if not sent_tool_call_output and not sent_system_message: logger.warning( "update_chat_ctx called but no new tool call outputs to send. " "Phonic does not support general chat context updates." ) if sent_tool_call_output: self._start_new_assistant_turn() async def update_tools(self, tools: list[llm.Tool]) -> None: if self._config_sent: logger.warning( "update_tools called after config was already sent. " "Phonic does not support updating tools mid-session." ) return self._tools = llm.ToolContext(tools) self._tool_definitions = [] for tool_schema in self._tools.parse_function_tools("openai", strict=True): # We disallow tool chaining and tool calls during agent speech to reduce complexity # of managing state while operating within the LiveKit Realtime generations framework self._tool_definitions.append( { "type": "custom_websocket", "tool_schema": tool_schema, "tool_call_output_timeout_ms": TOOL_CALL_OUTPUT_TIMEOUT_MS, "wait_for_speech_before_tool_call": True, "allow_tool_chaining": False, } ) self._tools_ready.set() def update_options(self, *, tool_choice: NotGivenOr[llm.ToolChoice | None] = NOT_GIVEN) -> None: logger.warning("update_options is not supported by the Phonic realtime model.") def push_audio(self, frame: rtc.AudioFrame) -> None: if ( self._session_should_close.is_set() or not self._ready_to_start.is_set() or not self._socket ): return for f in self._resample_audio(frame): for nf in self._bstream.write(f.data.tobytes()): b64_audio = base64.b64encode(nf.data.tobytes()).decode("utf-8") self._send_ch.send_nowait(AudioChunkPayload(audio=b64_audio)) def push_video(self, frame: rtc.VideoFrame) -> None: logger.warning("push_video is not supported by the Phonic realtime model.") def generate_reply( self, *, instructions: NotGivenOr[str] = NOT_GIVEN ) -> asyncio.Future[llm.GenerationCreatedEvent]: payload = GenerateReplyPayload( system_message=instructions if is_given(instructions) else None, ) if self._generate_reply_task and not self._generate_reply_task.done(): self._generate_reply_task.cancel() self._generate_reply_task = asyncio.create_task(self._send_generate_reply(payload)) self._close_current_generation(interrupted=False) generation_ev = self._start_new_assistant_turn(user_initiated=True) fut = asyncio.Future[llm.GenerationCreatedEvent]() fut.set_result(generation_ev) return fut async def _send_generate_reply(self, payload: GenerateReplyPayload) -> None: await self._ready_to_start.wait() if self._session_should_close.is_set(): return if self._socket: await self._socket.send_generate_reply(payload) def commit_audio(self) -> None: logger.warning("commit_audio is not supported by the Phonic realtime model.") def clear_audio(self) -> None: logger.warning("clear_audio is not supported by the Phonic realtime model.") def interrupt(self) -> None: if self._current_generation: logger.warning( "interrupt() is not supported by Phonic realtime model. " "User interruptions are automatically handled by Phonic." ) def truncate( self, *, message_id: str, modalities: list[Literal["text", "audio"]], audio_end_ms: int, audio_transcript: NotGivenOr[str] = NOT_GIVEN, ) -> None: logger.warning( "truncate is not supported by the Phonic realtime model. " "User interruptions are automatically handled by Phonic." ) async def aclose(self) -> None: self._session_should_close.set() self._send_ch.close() self._instructions_ready.set() self._tools_ready.set() self._ready_to_start.set() self._close_current_generation(interrupted=False) if self._generate_reply_task and not self._generate_reply_task.done(): await utils.aio.cancel_and_wait(self._generate_reply_task) if self._main_atask: await utils.aio.cancel_and_wait(self._main_atask) await self._close_active_session() @utils.log_exceptions(logger=logger) async def _main_task(self) -> None: try: logger.debug("Connecting to Phonic Realtime API...") # The Phonic Python SDK uses an async context manager for connect() self._socket_ctx = self._client.conversations.connect() self._socket = await self._socket_ctx.__aenter__() # Need to wait for instructions and tools before sending config await self._instructions_ready.wait() await self._tools_ready.wait() if self._session_should_close.is_set(): return self._config_sent = True tools_payload: list[dict | str] = [] if self._opts.phonic_tools is not NOT_GIVEN and self._opts.phonic_tools: tools_payload.extend(self._opts.phonic_tools) tools_payload.extend(self._tool_definitions) if not is_given(self._opts.instructions): logger.warning("Instructions are not set. Phonic will not start a conversation.") return config = { "type": "config", "agent": self._opts.phonic_agent, "project": self._opts.project, "welcome_message": self._opts.welcome_message, "generate_welcome_message": self._opts.generate_welcome_message, "system_prompt": self._opts.instructions + self._system_prompt_postfix, "voice_id": self._opts.voice, "input_format": "pcm_44100", "output_format": "pcm_44100", "recognized_languages": self._opts.languages, "audio_speed": self._opts.audio_speed, "tools": tools_payload if len(tools_payload) > 0 else NOT_GIVEN, "boosted_keywords": self._opts.boosted_keywords, "generate_no_input_poke_text": self._opts.generate_no_input_poke_text, "no_input_poke_sec": self._opts.no_input_poke_sec, "no_input_poke_text": self._opts.no_input_poke_text, "no_input_end_conversation_sec": self._opts.no_input_end_conversation_sec, } # Filter out NOT_GIVEN values config_filtered = typing.cast( dict[str, typing.Any], {k: v for k, v in config.items() if v is not NOT_GIVEN}, ) await self._socket.send_config(ConfigPayload(**config_filtered)) recv_task = asyncio.create_task(self._recv_task(self._socket), name="phonic-recv") send_task = asyncio.create_task(self._send_task(self._socket), name="phonic-send") shutdown_wait_task = asyncio.create_task( self._session_should_close.wait(), name="phonic-shutdown-wait" ) done, pending = await asyncio.wait( [recv_task, send_task, shutdown_wait_task], return_when=asyncio.FIRST_COMPLETED, ) for task in done: exception = task.exception() if task is not shutdown_wait_task and exception: logger.error(f"Error in Phonic task: {exception}") raise exception for task in pending: await utils.aio.cancel_and_wait(task) except asyncio.CancelledError: pass except Exception as e: logger.error(f"Phonic Realtime API error: {e}", exc_info=e) self._emit_error(e, recoverable=False) finally: await self._close_active_session() self._close_current_generation(interrupted=False) @utils.log_exceptions(logger=logger) async def _send_task(self, socket: AsyncConversationsSocketClient) -> None: async for payload in self._send_ch: await socket.send_audio_chunk(payload) @utils.log_exceptions(logger=logger) async def _recv_task(self, socket: AsyncConversationsSocketClient) -> None: try: async for message in socket: if self._session_should_close.is_set(): break msg_type = message.type if msg_type == "assistant_started_speaking": self._start_new_assistant_turn() elif msg_type == "assistant_finished_speaking": self._close_current_generation(interrupted=False) elif msg_type == "audio_chunk": self._handle_audio_chunk(message) elif msg_type == "input_text": self._handle_input_text(message) elif msg_type == "user_started_speaking": self._handle_input_speech_started() elif msg_type == "user_finished_speaking": self._handle_input_speech_stopped() elif msg_type == "tool_call": self._handle_tool_call(message) elif msg_type == "warning": logger.warning(f"Phonic warning: {message.warning.message}") elif msg_type == "error": self._emit_error(Exception(message.error.message), recoverable=False) elif msg_type == "assistant_ended_conversation": self._emit_error( Exception( "assistant_ended_conversation is not supported by " "the Phonic realtime model with LiveKit Agents." ), recoverable=False, ) elif msg_type == "conversation_created": self._conversation_id = message.conversation_id logger.info(f"Phonic Conversation began with ID: {self._conversation_id}") elif msg_type == "tool_call_interrupted": self._handle_tool_call_interrupted(message) elif msg_type == "ready_to_start_conversation": self._ready_to_start.set() except Exception as e: if not self._session_should_close.is_set(): logger.error(f"Error in Phonic receive loop: {e}", exc_info=e) self._emit_error(e, recoverable=True) raise e def _start_new_assistant_turn(self, user_initiated: bool = False) -> llm.GenerationCreatedEvent: if self._current_generation: self._close_current_generation(interrupted=True) response_id = utils.shortuuid("PS_") self._current_generation = _ResponseGeneration( message_ch=utils.aio.Chan[llm.MessageGeneration](), function_ch=utils.aio.Chan[llm.FunctionCall](), text_ch=utils.aio.Chan[str](), audio_ch=utils.aio.Chan[rtc.AudioFrame](), response_id=response_id, input_id=utils.shortuuid("PI_"), ) msg_modalities = asyncio.Future[list[Literal["text", "audio"]]]() msg_modalities.set_result(["audio", "text"]) self._current_generation.message_ch.send_nowait( llm.MessageGeneration( message_id=response_id, text_stream=self._current_generation.text_ch, audio_stream=self._current_generation.audio_ch, modalities=msg_modalities, ) ) generation_ev = llm.GenerationCreatedEvent( message_stream=self._current_generation.message_ch, function_stream=self._current_generation.function_ch, user_initiated=user_initiated, response_id=response_id, ) self.emit("generation_created", generation_ev) return generation_ev def _close_current_generation(self, interrupted: bool) -> None: gen = self._current_generation if not gen or gen._done: return if gen.output_text: self._chat_ctx.add_message( role="assistant", content=gen.output_text, id=gen.response_id, interrupted=interrupted, ) if not gen.text_ch.closed: gen.text_ch.send_nowait("") gen.text_ch.close() if not gen.audio_ch.closed: gen.audio_ch.close() gen.function_ch.close() gen.message_ch.close() gen._done = True self._current_generation = None def _handle_audio_chunk(self, message: AudioChunkResponsePayload) -> None: # In Phonic, audio chunks can come in when assistant isn't explicitly active. # We start a generation if text is present to align with the framework pattern. if self._current_generation is None and message.text: logger.debug("Starting new generation due to text in audio chunk") self._start_new_assistant_turn() gen = self._current_generation if gen is None: return if message.text: gen.push_text(message.text) if message.audio: try: audio_bytes = base64.b64decode(message.audio) sample_count = len(audio_bytes) // 2 # 16-bit PCM = 2 bytes per sample if sample_count > 0: frame = rtc.AudioFrame( data=audio_bytes, sample_rate=PHONIC_OUTPUT_SAMPLE_RATE, num_channels=PHONIC_NUM_CHANNELS, samples_per_channel=sample_count // PHONIC_NUM_CHANNELS, ) gen.audio_ch.send_nowait(frame) except Exception as e: logger.error(f"Failed to decode Phonic audio chunk: {e}") def _handle_input_text(self, message: InputTextPayload) -> None: item_id = utils.shortuuid("PI_") transcript = message.text self.emit( "input_audio_transcription_completed", llm.InputTranscriptionCompleted( item_id=item_id, transcript=transcript, is_final=True, ), ) self._chat_ctx.add_message( role="user", content=transcript, id=item_id, ) def _handle_tool_call(self, message: ToolCallPayload) -> None: tool_call_id = message.tool_call_id tool_name = message.tool_name parameters = message.parameters self._pending_tool_call_ids.add(tool_call_id) if self._current_generation is None: logger.warning("Encountered tool call but no active generation. Starting new turn.") self._start_new_assistant_turn() assert self._current_generation is not None, ( "current_generation should not be None when handling tool call" ) self._current_generation.function_ch.send_nowait( llm.FunctionCall( call_id=tool_call_id, name=tool_name, arguments=json.dumps(parameters), ) ) # At most 1 tool call is supported per turn due to `allow_tool_chaining: False`, # allowing us to close the generation. self._close_current_generation(interrupted=False) def _handle_tool_call_interrupted(self, message: ToolCallInterruptedPayload) -> None: tool_call_id = message.tool_call_id tool_name = message.tool_name if tool_call_id in self._pending_tool_call_ids: self._pending_tool_call_ids.remove(tool_call_id) logger.warning( f"Tool call for {tool_name} (call_id: {tool_call_id}) " "was cancelled due to user interruption." ) def _handle_input_speech_started(self) -> None: self.emit("input_speech_started", llm.InputSpeechStartedEvent()) self._close_current_generation(interrupted=True) def _handle_input_speech_stopped(self) -> None: self.emit( "input_speech_stopped", llm.InputSpeechStoppedEvent(user_transcription_enabled=True), ) def _resample_audio(self, frame: rtc.AudioFrame) -> typing.Iterator[rtc.AudioFrame]: if self._input_resampler is not None: if frame.sample_rate != self._input_resampler_rate: self._input_resampler = None self._input_resampler_rate = None if self._input_resampler is None and ( frame.sample_rate != PHONIC_INPUT_SAMPLE_RATE or frame.num_channels != PHONIC_NUM_CHANNELS ): self._input_resampler = rtc.AudioResampler( input_rate=frame.sample_rate, output_rate=PHONIC_INPUT_SAMPLE_RATE, num_channels=PHONIC_NUM_CHANNELS, ) self._input_resampler_rate = frame.sample_rate if self._input_resampler is not None: yield from self._input_resampler.push(frame) else: yield frame def _emit_error(self, error: Exception, recoverable: bool) -> None: self.emit( "error", llm.RealtimeModelError( timestamp=time.time(), label=self._realtime_model._label, error=error, recoverable=recoverable, ), )Helper class that provides a standard way to create an ABC using inheritance.
Ancestors
- livekit.agents.llm.realtime.RealtimeSession
- abc.ABC
- EventEmitter
- typing.Generic
Instance variables
prop chat_ctx : llm.ChatContext-
Expand source code
@property def chat_ctx(self) -> llm.ChatContext: return self._chat_ctx.copy() prop tools : llm.ToolContext-
Expand source code
@property def tools(self) -> llm.ToolContext: return self._tools.copy()
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: self._session_should_close.set() self._send_ch.close() self._instructions_ready.set() self._tools_ready.set() self._ready_to_start.set() self._close_current_generation(interrupted=False) if self._generate_reply_task and not self._generate_reply_task.done(): await utils.aio.cancel_and_wait(self._generate_reply_task) if self._main_atask: await utils.aio.cancel_and_wait(self._main_atask) await self._close_active_session() def clear_audio(self) ‑> None-
Expand source code
def clear_audio(self) -> None: logger.warning("clear_audio is not supported by the Phonic realtime model.") def commit_audio(self) ‑> None-
Expand source code
def commit_audio(self) -> None: logger.warning("commit_audio is not supported by the Phonic realtime model.") def generate_reply(self, *, instructions: NotGivenOr[str] = NOT_GIVEN) ‑> _asyncio.Future[livekit.agents.llm.realtime.GenerationCreatedEvent]-
Expand source code
def generate_reply( self, *, instructions: NotGivenOr[str] = NOT_GIVEN ) -> asyncio.Future[llm.GenerationCreatedEvent]: payload = GenerateReplyPayload( system_message=instructions if is_given(instructions) else None, ) if self._generate_reply_task and not self._generate_reply_task.done(): self._generate_reply_task.cancel() self._generate_reply_task = asyncio.create_task(self._send_generate_reply(payload)) self._close_current_generation(interrupted=False) generation_ev = self._start_new_assistant_turn(user_initiated=True) fut = asyncio.Future[llm.GenerationCreatedEvent]() fut.set_result(generation_ev) return fut def interrupt(self) ‑> None-
Expand source code
def interrupt(self) -> None: if self._current_generation: logger.warning( "interrupt() is not supported by Phonic realtime model. " "User interruptions are automatically handled by Phonic." ) def push_audio(self, frame: rtc.AudioFrame) ‑> None-
Expand source code
def push_audio(self, frame: rtc.AudioFrame) -> None: if ( self._session_should_close.is_set() or not self._ready_to_start.is_set() or not self._socket ): return for f in self._resample_audio(frame): for nf in self._bstream.write(f.data.tobytes()): b64_audio = base64.b64encode(nf.data.tobytes()).decode("utf-8") self._send_ch.send_nowait(AudioChunkPayload(audio=b64_audio)) def push_video(self, frame: rtc.VideoFrame) ‑> None-
Expand source code
def push_video(self, frame: rtc.VideoFrame) -> None: logger.warning("push_video is not supported by the Phonic realtime model.") def truncate(self,
*,
message_id: str,
modalities: "list[Literal['text', 'audio']]",
audio_end_ms: int,
audio_transcript: NotGivenOr[str] = NOT_GIVEN) ‑> None-
Expand source code
def truncate( self, *, message_id: str, modalities: list[Literal["text", "audio"]], audio_end_ms: int, audio_transcript: NotGivenOr[str] = NOT_GIVEN, ) -> None: logger.warning( "truncate is not supported by the Phonic realtime model. " "User interruptions are automatically handled by Phonic." ) async def update_chat_ctx(self, chat_ctx: llm.ChatContext) ‑> None-
Expand source code
async def update_chat_ctx(self, chat_ctx: llm.ChatContext) -> None: if not self._config_sent: messages = [ item for item in chat_ctx.items if isinstance(item, llm.ChatMessage) and item.text_content and item.text_content.strip() ] if messages: turn_history = "\n".join(f"{m.role}: {m.text_content}" for m in messages) if turn_history.strip(): logger.debug( "update_chat_ctx called with messages prior to config being sent to " "Phonic. Including conversation state in system instructions." ) self._system_prompt_postfix = ( "\n\nThis conversation is being continued from an existing " "conversation. You are the assistant speaking to the user. " "The following is the conversation history:\n" + turn_history ) self._chat_ctx = chat_ctx.copy() return diff_ops = llm.utils.compute_chat_ctx_diff(self._chat_ctx, chat_ctx) sent_tool_call_output = False sent_system_message = False for _, item_id in diff_ops.to_create: item = chat_ctx.get_by_id(item_id) if item is None: continue if ( isinstance(item, llm.FunctionCallOutput) and item.call_id in self._pending_tool_call_ids ): self._pending_tool_call_ids.remove(item.call_id) logger.info(f"Sending tool call output for {item.name} (call_id: {item.call_id})") if self._socket: await self._socket.send_tool_call_output( ToolCallOutputPayload( tool_call_id=item.call_id, output=str(item.output), ) ) sent_tool_call_output = True if isinstance(item, llm.ChatMessage) and item.role in ("system", "developer"): text = item.text_content if text: logger.debug(f"Sending add system message: {text}") if self._socket: await self._socket.send_add_system_message( AddSystemMessagePayload(system_message=text) ) sent_system_message = True self._chat_ctx = chat_ctx.copy() if not sent_tool_call_output and not sent_system_message: logger.warning( "update_chat_ctx called but no new tool call outputs to send. " "Phonic does not support general chat context updates." ) if sent_tool_call_output: self._start_new_assistant_turn() async def update_instructions(self, instructions: str) ‑> None-
Expand source code
async def update_instructions(self, instructions: str) -> None: if self._config_sent: logger.warning( "update_instructions called after config was already sent. " "Phonic does not support updating instructions mid-session." ) return self._opts.instructions = instructions self._instructions_ready.set() def update_options(self, *, tool_choice: NotGivenOr[llm.ToolChoice | None] = NOT_GIVEN) ‑> None-
Expand source code
def update_options(self, *, tool_choice: NotGivenOr[llm.ToolChoice | None] = NOT_GIVEN) -> None: logger.warning("update_options is not supported by the Phonic realtime model.") async def update_tools(self, tools: list[llm.Tool]) ‑> None-
Expand source code
async def update_tools(self, tools: list[llm.Tool]) -> None: if self._config_sent: logger.warning( "update_tools called after config was already sent. " "Phonic does not support updating tools mid-session." ) return self._tools = llm.ToolContext(tools) self._tool_definitions = [] for tool_schema in self._tools.parse_function_tools("openai", strict=True): # We disallow tool chaining and tool calls during agent speech to reduce complexity # of managing state while operating within the LiveKit Realtime generations framework self._tool_definitions.append( { "type": "custom_websocket", "tool_schema": tool_schema, "tool_call_output_timeout_ms": TOOL_CALL_OUTPUT_TIMEOUT_MS, "wait_for_speech_before_tool_call": True, "allow_tool_chaining": False, } ) self._tools_ready.set()
Inherited members