Module livekit.plugins.aws.experimental.realtime

Classes

class RealtimeModel (*,
voice: NotGivenOr[VOICE_ID] = NOT_GIVEN,
temperature: NotGivenOr[float] = NOT_GIVEN,
top_p: NotGivenOr[float] = NOT_GIVEN,
max_tokens: NotGivenOr[int] = NOT_GIVEN,
tool_choice: NotGivenOr[llm.ToolChoice | None] = NOT_GIVEN,
region: NotGivenOr[str] = NOT_GIVEN)
Expand source code
class RealtimeModel(llm.RealtimeModel):
    """High-level entry point that conforms to the LiveKit RealtimeModel interface.

    The object is very light-weight-– it mainly stores default inference options and
    spawns a RealtimeSession when session() is invoked.
    """

    def __init__(
        self,
        *,
        voice: NotGivenOr[VOICE_ID] = NOT_GIVEN,
        temperature: NotGivenOr[float] = NOT_GIVEN,
        top_p: NotGivenOr[float] = NOT_GIVEN,
        max_tokens: NotGivenOr[int] = NOT_GIVEN,
        tool_choice: NotGivenOr[llm.ToolChoice | None] = NOT_GIVEN,
        region: NotGivenOr[str] = NOT_GIVEN,
    ):
        """Instantiate a new RealtimeModel.

        Args:
            voice (VOICE_ID | NotGiven): Preferred voice id for Sonic TTS output. Falls back to "tiffany".
            temperature (float | NotGiven): Sampling temperature (0-1). Defaults to DEFAULT_TEMPERATURE.
            top_p (float | NotGiven): Nucleus sampling probability mass. Defaults to DEFAULT_TOP_P.
            max_tokens (int | NotGiven): Upper bound for tokens emitted by the model. Defaults to DEFAULT_MAX_TOKENS.
            tool_choice (llm.ToolChoice | None | NotGiven): Strategy for tool invocation ("auto", "required", or explicit function).
            region (str | NotGiven): AWS region of the Bedrock runtime endpoint.
        """  # noqa: E501
        super().__init__(
            capabilities=llm.RealtimeCapabilities(
                message_truncation=False,
                turn_detection=True,
                user_transcription=True,
                auto_tool_reply_generation=True,
                audio_output=True,
            )
        )
        self.model_id = "amazon.nova-sonic-v1:0"
        # note: temperature and top_p do not follow industry standards and are defined slightly differently for Sonic  # noqa: E501
        # temperature ranges from 0.0 to 1.0, where 0.0 is the most random and 1.0 is the most deterministic  # noqa: E501
        # top_p ranges from 0.0 to 1.0, where 0.0 is the most random and 1.0 is the most deterministic  # noqa: E501
        self.temperature = temperature
        self.top_p = top_p
        self._opts = _RealtimeOptions(
            voice=cast(VOICE_ID, voice) if is_given(voice) else "tiffany",
            temperature=temperature if is_given(temperature) else DEFAULT_TEMPERATURE,
            top_p=top_p if is_given(top_p) else DEFAULT_TOP_P,
            max_tokens=max_tokens if is_given(max_tokens) else DEFAULT_MAX_TOKENS,
            tool_choice=tool_choice or None,
            region=region if is_given(region) else "us-east-1",
        )
        self._sessions = weakref.WeakSet[RealtimeSession]()

    def session(self) -> RealtimeSession:
        """Return a new RealtimeSession bound to this model instance."""
        sess = RealtimeSession(self)

        # note: this is a hack to get the session to initialize itself
        # TODO: change how RealtimeSession is initialized by creating a single task main_atask that spawns subtasks  # noqa: E501
        asyncio.create_task(sess.initialize_streams())
        self._sessions.add(sess)
        return sess

        # stub b/c RealtimeSession.aclose() is invoked directly
        async def aclose(self) -> None:
            pass

High-level entry point that conforms to the LiveKit RealtimeModel interface.

The object is very light-weight-– it mainly stores default inference options and spawns a RealtimeSession when session() is invoked.

Instantiate a new RealtimeModel.

Args

voice : VOICE_ID | NotGiven
Preferred voice id for Sonic TTS output. Falls back to "tiffany".
temperature : float | NotGiven
Sampling temperature (0-1). Defaults to DEFAULT_TEMPERATURE.
top_p : float | NotGiven
Nucleus sampling probability mass. Defaults to DEFAULT_TOP_P.
max_tokens : int | NotGiven
Upper bound for tokens emitted by the model. Defaults to DEFAULT_MAX_TOKENS.
tool_choice : llm.ToolChoice | None | NotGiven
Strategy for tool invocation ("auto", "required", or explicit function).
region : str | NotGiven
AWS region of the Bedrock runtime endpoint.

Ancestors

  • livekit.agents.llm.realtime.RealtimeModel

Methods

def session(self) ‑> livekit.plugins.aws.experimental.realtime.realtime_model.RealtimeSession
Expand source code
def session(self) -> RealtimeSession:
    """Return a new RealtimeSession bound to this model instance."""
    sess = RealtimeSession(self)

    # note: this is a hack to get the session to initialize itself
    # TODO: change how RealtimeSession is initialized by creating a single task main_atask that spawns subtasks  # noqa: E501
    asyncio.create_task(sess.initialize_streams())
    self._sessions.add(sess)
    return sess

    # stub b/c RealtimeSession.aclose() is invoked directly
    async def aclose(self) -> None:
        pass

Return a new RealtimeSession bound to this model instance.

class RealtimeSession (realtime_model: RealtimeModel)
Expand source code
class RealtimeSession(  # noqa: F811
    llm.RealtimeSession[Literal["bedrock_server_event_received", "bedrock_client_event_queued"]]
):
    """Bidirectional streaming session against the Nova Sonic Bedrock runtime.

    The session owns two asynchronous tasks:

    1. _process_audio_input – pushes user mic audio and tool results to Bedrock.
    2. _process_responses – receives server events from Bedrock and converts them into
       LiveKit abstractions such as llm.MessageGeneration.

    A set of helper handlers (_handle_*) transform the low-level Bedrock
    JSON payloads into higher-level application events and keep
    _ResponseGeneration state in sync.
    """

    def __init__(self, realtime_model: RealtimeModel) -> None:
        """Create and wire-up a new realtime session.

        Args:
            realtime_model (RealtimeModel): Parent model instance that stores static
                inference options and the Smithy Bedrock client configuration.
        """
        super().__init__(realtime_model)
        self._realtime_model: RealtimeModel = realtime_model
        self._event_builder = seb(
            prompt_name=str(uuid.uuid4()),
            audio_content_name=str(uuid.uuid4()),
        )
        self._input_resampler: rtc.AudioResampler | None = None
        self._bstream = utils.audio.AudioByteStream(
            DEFAULT_INPUT_SAMPLE_RATE, DEFAULT_CHANNELS, samples_per_channel=DEFAULT_CHUNK_SIZE
        )

        self._response_task = None
        self._audio_input_task = None
        self._stream_response = None
        self._bedrock_client = None
        self._pending_tools: set[str] = set()
        self._is_sess_active = asyncio.Event()
        self._chat_ctx = llm.ChatContext.empty()
        self._tools = llm.ToolContext.empty()
        self._tool_results_ch = utils.aio.Chan[dict[str, str]]()
        self._tools_ready = asyncio.get_running_loop().create_future()
        self._instructions_ready = asyncio.get_running_loop().create_future()
        self._chat_ctx_ready = asyncio.get_running_loop().create_future()
        self._instructions = DEFAULT_SYSTEM_PROMPT
        self._audio_input_chan = utils.aio.Chan[bytes]()
        self._current_generation: _ResponseGeneration | None = None

        # note: currently tracks session restart attempts across all sessions
        # TODO: track restart attempts per turn
        self._session_restart_attempts = 0

        self._event_handlers = {
            "completion_start": self._handle_completion_start_event,
            "audio_output_content_start": self._handle_audio_output_content_start_event,
            "audio_output_content": self._handle_audio_output_content_event,
            "audio_output_content_end": self._handle_audio_output_content_end_event,
            "text_output_content_start": self._handle_text_output_content_start_event,
            "text_output_content": self._handle_text_output_content_event,
            "text_output_content_end": self._handle_text_output_content_end_event,
            "tool_output_content_start": self._handle_tool_output_content_start_event,
            "tool_output_content": self._handle_tool_output_content_event,
            "tool_output_content_end": self._handle_tool_output_content_end_event,
            "completion_end": self._handle_completion_end_event,
            "usage": self._handle_usage_event,
            "other_event": self._handle_other_event,
        }
        self._turn_tracker = _TurnTracker(
            cast(Callable[[str, Any], None], self.emit),
            cast(Callable[[], None], self.emit_generation_event),
        )

    @utils.log_exceptions(logger=logger)
    def _initialize_client(self) -> None:
        """Instantiate the Bedrock runtime client"""
        config = Config(
            endpoint_uri=f"https://bedrock-runtime.{self._realtime_model._opts.region}.amazonaws.com",
            region=self._realtime_model._opts.region,
            aws_credentials_identity_resolver=Boto3CredentialsResolver(),
            http_auth_scheme_resolver=HTTPAuthSchemeResolver(),
            http_auth_schemes={"aws.auth#sigv4": SigV4AuthScheme()},
            user_agent_extra="x-client-framework:livekit-plugins-aws[realtime]",
        )
        self._bedrock_client = BedrockRuntimeClient(config=config)

    @utils.log_exceptions(logger=logger)
    async def _send_raw_event(self, event_json: str) -> None:
        """Low-level helper that serialises event_json and forwards it to the bidirectional stream.

        Args:
            event_json (str): The JSON payload (already in Bedrock wire format) to queue.

        Raises:
            Exception: Propagates any failures returned by the Bedrock runtime client.
        """
        if not self._stream_response:
            logger.warning("stream not initialized; dropping event (this should never occur)")
            return

        event = InvokeModelWithBidirectionalStreamInputChunk(
            value=BidirectionalInputPayloadPart(bytes_=event_json.encode("utf-8"))
        )

        try:
            await self._stream_response.input_stream.send(event)
        except Exception as e:
            logger.exception("Error sending event")
            err_msg = getattr(e, "message", str(e))
            request_id = None
            try:
                request_id = err_msg.split(" ")[0].split("=")[1]
            except Exception:
                pass

            self.emit(
                "error",
                llm.RealtimeModelError(
                    timestamp=time.monotonic(),
                    label=self._realtime_model._label,
                    error=APIStatusError(
                        message=err_msg,
                        status_code=500,
                        request_id=request_id,
                        body=e,
                        retryable=False,
                    ),
                    recoverable=False,
                ),
            )
            raise

    def _serialize_tool_config(self) -> ToolConfiguration | None:
        """Convert self.tools into the JSON structure expected by Sonic.

        If any tools are registered, the method also harmonises temperature and
        top_p defaults to Sonic's recommended greedy values (1.0).

        Returns:
            ToolConfiguration | None: None when no tools are present, otherwise a complete config block.
        """  # noqa: E501
        tool_cfg = None
        if self.tools.function_tools:
            tools = []
            for name, f in self.tools.function_tools.items():
                if llm.tool_context.is_function_tool(f):
                    description = llm.tool_context.get_function_info(f).description
                    input_schema = llm.utils.build_legacy_openai_schema(f, internally_tagged=True)[
                        "parameters"
                    ]
                elif llm.tool_context.is_raw_function_tool(f):
                    description = llm.tool_context.get_raw_function_info(f).raw_schema.get(
                        "description"
                    )
                    input_schema = llm.tool_context.get_raw_function_info(f).raw_schema[
                        "parameters"
                    ]
                else:
                    continue

                tool = Tool(
                    toolSpec=ToolSpec(
                        name=name,
                        description=description or "No description provided",
                        inputSchema=ToolInputSchema(json_=json.dumps(input_schema)),  # type: ignore
                    )
                )
                tools.append(tool)
            tool_choice = self._tool_choice_adapter(self._realtime_model._opts.tool_choice)
            logger.debug(f"TOOL CHOICE: {tool_choice}")
            tool_cfg = ToolConfiguration(tools=tools, toolChoice=tool_choice)

            # recommended to set greedy inference configs for tool calls
            if not is_given(self._realtime_model.top_p):
                self._realtime_model._opts.top_p = 1.0
            if not is_given(self._realtime_model.temperature):
                self._realtime_model._opts.temperature = 1.0
        return tool_cfg

    @utils.log_exceptions(logger=logger)
    async def initialize_streams(self, is_restart: bool = False) -> None:
        """Open the Bedrock bidirectional stream and spawn background worker tasks.

        This coroutine is idempotent and can be invoked again when recoverable
        errors (e.g. timeout, throttling) require a fresh session.

        Args:
            is_restart (bool, optional): Marks whether we are re-initialising an
                existing session after an error. Defaults to False.
        """
        try:
            if not self._bedrock_client:
                logger.info("Creating Bedrock client")
                self._initialize_client()
            assert self._bedrock_client is not None, "bedrock_client is None"

            logger.info("Initializing Bedrock stream")
            self._stream_response = (
                await self._bedrock_client.invoke_model_with_bidirectional_stream(
                    InvokeModelWithBidirectionalStreamOperationInput(
                        model_id=self._realtime_model.model_id
                    )
                )
            )

            if not is_restart:
                pending_events: list[asyncio.Future] = []
                if not self.tools.function_tools:
                    pending_events.append(self._tools_ready)
                if not self._instructions_ready.done():
                    pending_events.append(self._instructions_ready)
                if not self._chat_ctx_ready.done():
                    pending_events.append(self._chat_ctx_ready)

                # note: can't know during sess init whether tools were not added
                # or if they were added haven't yet been updated
                # therefore in the case there are no tools, we wait the entire timeout
                try:
                    if pending_events:
                        await asyncio.wait_for(asyncio.gather(*pending_events), timeout=0.5)
                except asyncio.TimeoutError:
                    if not self._tools_ready.done():
                        logger.warning("Tools not ready after 500ms, continuing without them")

                    if not self._instructions_ready.done():
                        logger.warning(
                            "Instructions not received after 500ms, proceeding with default instructions"  # noqa: E501
                        )
                    if not self._chat_ctx_ready.done():
                        logger.warning(
                            "Chat context not received after 500ms, proceeding with empty chat context"  # noqa: E501
                        )

            logger.info(
                f"Initializing Bedrock session with realtime options: {self._realtime_model._opts}"
            )
            # there is a 40-message limit on the chat context
            if len(self._chat_ctx.items) > MAX_MESSAGES:
                logger.warning(
                    f"Chat context has {len(self._chat_ctx.items)} messages, truncating to {MAX_MESSAGES}"  # noqa: E501
                )
                self._chat_ctx.truncate(max_items=MAX_MESSAGES)
            init_events = self._event_builder.create_prompt_start_block(
                voice_id=self._realtime_model._opts.voice,
                sample_rate=DEFAULT_OUTPUT_SAMPLE_RATE,  # type: ignore
                system_content=self._instructions,
                chat_ctx=self.chat_ctx,
                tool_configuration=self._serialize_tool_config(),
                max_tokens=self._realtime_model._opts.max_tokens,
                top_p=self._realtime_model._opts.top_p,
                temperature=self._realtime_model._opts.temperature,
            )

            for event in init_events:
                await self._send_raw_event(event)
                logger.debug(f"Sent event: {event}")

            if not is_restart:
                self._audio_input_task = asyncio.create_task(
                    self._process_audio_input(), name="RealtimeSession._process_audio_input"
                )

            self._response_task = asyncio.create_task(
                self._process_responses(), name="RealtimeSession._process_responses"
            )
            self._is_sess_active.set()
            logger.debug("Stream initialized successfully")
        except Exception as e:
            logger.debug(f"Failed to initialize stream: {str(e)}")
            raise
        return self

    @utils.log_exceptions(logger=logger)
    def emit_generation_event(self) -> None:
        """Publish a llm.GenerationCreatedEvent to external subscribers."""
        logger.debug("Emitting generation event")
        assert self._current_generation is not None, "current_generation is None"

        generation_ev = llm.GenerationCreatedEvent(
            message_stream=self._current_generation.message_ch,
            function_stream=self._current_generation.function_ch,
            user_initiated=False,
        )
        self.emit("generation_created", generation_ev)

    @utils.log_exceptions(logger=logger)
    async def _handle_event(self, event_data: dict) -> None:
        """Dispatch a raw Bedrock event to the corresponding _handle_* method."""
        event_type = self._event_builder.get_event_type(event_data)
        event_handler = self._event_handlers.get(event_type)
        if event_handler:
            await event_handler(event_data)
            self._turn_tracker.feed(event_data)
        else:
            logger.warning(f"No event handler found for event type: {event_type}")

    async def _handle_completion_start_event(self, event_data: dict) -> None:
        log_event_data(event_data)
        self._create_response_generation()

    def _create_response_generation(self) -> None:
        """Instantiate _ResponseGeneration and emit the GenerationCreated event."""
        if self._current_generation is None:
            self._current_generation = _ResponseGeneration(
                message_ch=utils.aio.Chan(),
                function_ch=utils.aio.Chan(),
                input_id=str(uuid.uuid4()),
                response_id=str(uuid.uuid4()),
                messages={},
                user_messages={},
                speculative_messages={},
                _created_timestamp=datetime.now().isoformat(),
            )
            msg_gen = _MessageGeneration(
                message_id=self._current_generation.response_id,
                text_ch=utils.aio.Chan(),
                audio_ch=utils.aio.Chan(),
            )
            msg_modalities = asyncio.Future[list[Literal["text", "audio"]]]()
            msg_modalities.set_result(
                ["audio", "text"] if self._realtime_model.capabilities.audio_output else ["text"]
            )
            self._current_generation.message_ch.send_nowait(
                llm.MessageGeneration(
                    message_id=msg_gen.message_id,
                    text_stream=msg_gen.text_ch,
                    audio_stream=msg_gen.audio_ch,
                    modalities=msg_modalities,
                )
            )
            self._current_generation.messages[self._current_generation.response_id] = msg_gen

    # will be completely ignoring post-ASR text events
    async def _handle_text_output_content_start_event(self, event_data: dict) -> None:
        """Handle text_output_content_start for both user and assistant roles."""
        log_event_data(event_data)
        role = event_data["event"]["contentStart"]["role"]
        self._create_response_generation()

        # note: does not work if you emit llm.GCE too early (for some reason)
        if role == "USER":
            assert self._current_generation is not None, "current_generation is None"

            content_id = event_data["event"]["contentStart"]["contentId"]
            self._current_generation.user_messages[content_id] = self._current_generation.input_id

        elif (
            role == "ASSISTANT"
            and "SPECULATIVE" in event_data["event"]["contentStart"]["additionalModelFields"]
        ):
            assert self._current_generation is not None, "current_generation is None"

            text_content_id = event_data["event"]["contentStart"]["contentId"]
            self._current_generation.speculative_messages[text_content_id] = (
                self._current_generation.response_id
            )

    async def _handle_text_output_content_event(self, event_data: dict) -> None:
        """Stream partial text tokens into the current _MessageGeneration."""
        log_event_data(event_data)
        text_content_id = event_data["event"]["textOutput"]["contentId"]
        text_content = f"{event_data['event']['textOutput']['content']}\n"

        # currently only agent can be interrupted
        if text_content == '{ "interrupted" : true }\n':
            # the interrupted flag is not being set correctly in chat_ctx
            # this is b/c audio playback is desynced from text transcription
            # TODO: fix this; possibly via a playback timer
            idx = self._chat_ctx.find_insertion_index(created_at=time.time()) - 1
            if idx < 0:
                logger.warning("Barge-in DETECTED but no previous message found")
                return

            logger.debug(
                f"BARGE-IN DETECTED using idx: {idx} and chat_msg: {self._chat_ctx.items[idx]}"
            )
            if (item := self._chat_ctx.items[idx]).type == "message":
                item.interrupted = True
            self._close_current_generation()
            return

        # ignore events until turn starts
        if self._current_generation is not None:
            # TODO: rename event to llm.InputTranscriptionUpdated
            if (
                self._current_generation.user_messages.get(text_content_id)
                == self._current_generation.input_id
            ):
                logger.debug(f"INPUT TRANSCRIPTION UPDATED: {text_content}")
                # note: user ASR text is slightly different than what is sent to LiveKit (newline vs whitespace)  # noqa: E501
                # TODO: fix this
                self._update_chat_ctx(role="user", text_content=text_content)

            elif (
                self._current_generation.speculative_messages.get(text_content_id)
                == self._current_generation.response_id
            ):
                curr_gen = self._current_generation.messages[self._current_generation.response_id]
                curr_gen.text_ch.send_nowait(text_content)
                # note: this update is per utterance, not per turn
                self._update_chat_ctx(role="assistant", text_content=text_content)

    def _update_chat_ctx(self, role: llm.ChatRole, text_content: str) -> None:
        """
        Update the chat context with the latest ASR text while guarding against model limitations:
            a) 40 total messages limit
            b) 1kB message size limit
        """
        logger.debug(f"Updating chat context with role: {role} and text_content: {text_content}")
        if len(self._chat_ctx.items) == 0:
            self._chat_ctx.add_message(role=role, content=text_content)
        else:
            prev_utterance = self._chat_ctx.items[-1]
            if prev_utterance.type == "message" and prev_utterance.role == role:
                if isinstance(prev_content := prev_utterance.content[0], str) and (
                    len(prev_content.encode("utf-8")) + len(text_content.encode("utf-8"))
                    < MAX_MESSAGE_SIZE
                ):
                    prev_utterance.content[0] = "\n".join([prev_content, text_content])
                else:
                    self._chat_ctx.add_message(role=role, content=text_content)
                    if len(self._chat_ctx.items) > MAX_MESSAGES:
                        self._chat_ctx.truncate(max_items=MAX_MESSAGES)
            else:
                self._chat_ctx.add_message(role=role, content=text_content)
                if len(self._chat_ctx.items) > MAX_MESSAGES:
                    self._chat_ctx.truncate(max_items=MAX_MESSAGES)

    # cannot rely on this event for user b/c stopReason=PARTIAL_TURN always for user
    async def _handle_text_output_content_end_event(self, event_data: dict) -> None:
        """Mark the assistant message closed when Bedrock signals END_TURN."""
        stop_reason = event_data["event"]["contentEnd"]["stopReason"]
        text_content_id = event_data["event"]["contentEnd"]["contentId"]
        if (
            self._current_generation
            is not None  # means that first utterance in the turn was an interrupt
            and self._current_generation.speculative_messages.get(text_content_id)
            == self._current_generation.response_id
            and stop_reason == "END_TURN"
        ):
            log_event_data(event_data)
            self._close_current_generation()

    async def _handle_tool_output_content_start_event(self, event_data: dict) -> None:
        """Track mapping content_id -> response_id for upcoming tool use."""
        log_event_data(event_data)
        assert self._current_generation is not None, "current_generation is None"

        tool_use_content_id = event_data["event"]["contentStart"]["contentId"]
        self._current_generation.tool_messages[tool_use_content_id] = (
            self._current_generation.response_id
        )

    # note: tool calls are synchronous for now
    async def _handle_tool_output_content_event(self, event_data: dict) -> None:
        """Execute the referenced tool locally and forward results back to Bedrock."""
        log_event_data(event_data)
        assert self._current_generation is not None, "current_generation is None"

        tool_use_content_id = event_data["event"]["toolUse"]["contentId"]
        tool_use_id = event_data["event"]["toolUse"]["toolUseId"]
        tool_name = event_data["event"]["toolUse"]["toolName"]
        if (
            self._current_generation.tool_messages.get(tool_use_content_id)
            == self._current_generation.response_id
        ):
            args = event_data["event"]["toolUse"]["content"]
            self._current_generation.function_ch.send_nowait(
                llm.FunctionCall(call_id=tool_use_id, name=tool_name, arguments=args)
            )
            self._pending_tools.add(tool_use_id)

            # performing these acrobatics in order to release the deadlock
            # LiveKit will not accept a new generation until the previous one is closed
            # the issue is that audio data cannot be generated until toolResult is received
            # however, toolResults only arrive after update_chat_ctx() is invoked
            # which will only occur after agent speech has completed
            # therefore we introduce an artificial turn to trigger update_chat_ctx()
            # TODO: this is messy-- investigate if there is a better way to handle this
            curr_gen = self._current_generation.messages[self._current_generation.response_id]
            curr_gen.audio_ch.close()
            curr_gen.text_ch.close()
            self._current_generation.message_ch.close()
            self._current_generation.message_ch = utils.aio.Chan()
            self._current_generation.function_ch.close()
            self._current_generation.function_ch = utils.aio.Chan()
            msg_gen = _MessageGeneration(
                message_id=self._current_generation.response_id,
                text_ch=utils.aio.Chan(),
                audio_ch=utils.aio.Chan(),
            )
            self._current_generation.messages[self._current_generation.response_id] = msg_gen
            msg_modalities = asyncio.Future[list[Literal["text", "audio"]]]()
            msg_modalities.set_result(
                ["audio", "text"] if self._realtime_model.capabilities.audio_output else ["text"]
            )
            self._current_generation.message_ch.send_nowait(
                llm.MessageGeneration(
                    message_id=msg_gen.message_id,
                    text_stream=msg_gen.text_ch,
                    audio_stream=msg_gen.audio_ch,
                    modalities=msg_modalities,
                )
            )
            self.emit_generation_event()

    async def _handle_tool_output_content_end_event(self, event_data: dict) -> None:
        log_event_data(event_data)

    async def _handle_audio_output_content_start_event(self, event_data: dict) -> None:
        """Associate the upcoming audio chunk with the active assistant message."""
        if self._current_generation is not None:
            log_event_data(event_data)
            audio_content_id = event_data["event"]["contentStart"]["contentId"]
            self._current_generation.speculative_messages[audio_content_id] = (
                self._current_generation.response_id
            )

    async def _handle_audio_output_content_event(self, event_data: dict) -> None:
        """Decode base64 audio from Bedrock and forward it to the audio stream."""
        if (
            self._current_generation is not None
            and self._current_generation.speculative_messages.get(
                event_data["event"]["audioOutput"]["contentId"]
            )
            == self._current_generation.response_id
        ):
            audio_content = event_data["event"]["audioOutput"]["content"]
            audio_bytes = base64.b64decode(audio_content)
            curr_gen = self._current_generation.messages[self._current_generation.response_id]
            curr_gen.audio_ch.send_nowait(
                rtc.AudioFrame(
                    data=audio_bytes,
                    sample_rate=DEFAULT_OUTPUT_SAMPLE_RATE,
                    num_channels=DEFAULT_CHANNELS,
                    samples_per_channel=len(audio_bytes) // 2,
                )
            )

    async def _handle_audio_output_content_end_event(self, event_data: dict) -> None:
        """Close the assistant message streams once Bedrock finishes audio for the turn."""
        if (
            self._current_generation is not None
            and event_data["event"]["contentEnd"]["stopReason"] == "END_TURN"
            and self._current_generation.speculative_messages.get(
                event_data["event"]["contentEnd"]["contentId"]
            )
            == self._current_generation.response_id
        ):
            log_event_data(event_data)
            self._close_current_generation()

    def _close_current_generation(self) -> None:
        """Helper that closes all channels of the active _ResponseGeneration."""
        if self._current_generation is not None:
            if self._current_generation.response_id in self._current_generation.messages:
                curr_gen = self._current_generation.messages[self._current_generation.response_id]
                if not curr_gen.audio_ch.closed:
                    curr_gen.audio_ch.close()
                if not curr_gen.text_ch.closed:
                    curr_gen.text_ch.close()

            # TODO: seems not needed, tool_messages[id] is a str, function_ch is closed below?
            # if self._current_generation.response_id in self._current_generation.tool_messages:
            #     curr_gen = self._current_generation.tool_messages[
            #         self._current_generation.response_id
            #     ]
            #     if not curr_gen.function_ch.closed:
            #         curr_gen.function_ch.close()

            if not self._current_generation.message_ch.closed:
                self._current_generation.message_ch.close()
            if not self._current_generation.function_ch.closed:
                self._current_generation.function_ch.close()

            self._current_generation = None

    async def _handle_completion_end_event(self, event_data: dict) -> None:
        log_event_data(event_data)

    async def _handle_other_event(self, event_data: dict) -> None:
        log_event_data(event_data)

    async def _handle_usage_event(self, event_data: dict) -> None:
        # log_event_data(event_data)
        # TODO: implement duration and ttft
        input_tokens = event_data["event"]["usageEvent"]["details"]["delta"]["input"]
        output_tokens = event_data["event"]["usageEvent"]["details"]["delta"]["output"]
        # Q: should we be counting per turn or utterance?
        metrics = RealtimeModelMetrics(
            label=self._realtime_model._label,
            # TODO: pass in the correct request_id
            request_id=event_data["event"]["usageEvent"]["completionId"],
            timestamp=time.monotonic(),
            duration=0,
            ttft=0,
            cancelled=False,
            input_tokens=input_tokens["speechTokens"] + input_tokens["textTokens"],
            output_tokens=output_tokens["speechTokens"] + output_tokens["textTokens"],
            total_tokens=input_tokens["speechTokens"]
            + input_tokens["textTokens"]
            + output_tokens["speechTokens"]
            + output_tokens["textTokens"],
            # need duration to calculate this
            tokens_per_second=0,
            input_token_details=RealtimeModelMetrics.InputTokenDetails(
                text_tokens=input_tokens["textTokens"],
                audio_tokens=input_tokens["speechTokens"],
                image_tokens=0,
                cached_tokens=0,
                cached_tokens_details=None,
            ),
            output_token_details=RealtimeModelMetrics.OutputTokenDetails(
                text_tokens=output_tokens["textTokens"],
                audio_tokens=output_tokens["speechTokens"],
                image_tokens=0,
            ),
        )
        self.emit("metrics_collected", metrics)

    @utils.log_exceptions(logger=logger)
    async def _process_responses(self) -> None:
        """Background task that drains Bedrock's output stream and feeds the event handlers."""
        try:
            await self._is_sess_active.wait()
            assert self._stream_response is not None, "stream_response is None"

            # note: may need another signal here to block input task until bedrock is ready
            # TODO: save this as a field so we're not re-awaiting it every time
            _, output_stream = await self._stream_response.await_output()
            while self._is_sess_active.is_set():
                # and not self.stream_response.output_stream.closed:
                try:
                    result = await output_stream.receive()
                    if result.value and result.value.bytes_:
                        try:
                            response_data = result.value.bytes_.decode("utf-8")
                            json_data = json.loads(response_data)
                            # logger.debug(f"Received event: {json_data}")
                            await self._handle_event(json_data)
                        except json.JSONDecodeError:
                            logger.warning(f"JSON decode error: {response_data}")
                    else:
                        logger.warning("No response received")
                except asyncio.CancelledError:
                    logger.info("Response processing task cancelled")
                    self._close_current_generation()
                    raise
                except ValidationException as ve:
                    # there is a 3min no-activity (e.g. silence) timeout on the stream, after which the stream is closed  # noqa: E501
                    if (
                        "InternalErrorCode=531::RST_STREAM closed stream. HTTP/2 error code: NO_ERROR"  # noqa: E501
                        in ve.message
                    ):
                        logger.warning(f"Validation error: {ve}\nAttempting to recover...")
                        await self._restart_session(ve)

                    else:
                        logger.error(f"Validation error: {ve}")
                        self.emit(
                            "error",
                            llm.RealtimeModelError(
                                timestamp=time.monotonic(),
                                label=self._realtime_model._label,
                                error=APIStatusError(
                                    message=ve.message,
                                    status_code=400,
                                    request_id="",
                                    body=ve,
                                    retryable=False,
                                ),
                                recoverable=False,
                            ),
                        )
                        raise
                except (ThrottlingException, ModelNotReadyException, ModelErrorException) as re:
                    logger.warning(f"Retryable error: {re}\nAttempting to recover...")
                    await self._restart_session(re)
                    break
                except ModelTimeoutException as mte:
                    logger.warning(f"Model timeout error: {mte}\nAttempting to recover...")
                    await self._restart_session(mte)
                    break
                except ValueError as val_err:
                    if "I/O operation on closed file." == val_err.args[0]:
                        logger.info("initiating graceful shutdown of session")
                        break
                    raise
                except OSError:
                    logger.info("stream already closed, exiting")
                    break
                except Exception as e:
                    err_msg = getattr(e, "message", str(e))
                    logger.error(f"Response processing error: {err_msg} (type: {type(e)})")
                    request_id = None
                    try:
                        request_id = err_msg.split(" ")[0].split("=")[1]
                    except Exception:
                        pass

                    self.emit(
                        "error",
                        llm.RealtimeModelError(
                            timestamp=time.monotonic(),
                            label=self._realtime_model._label,
                            error=APIStatusError(
                                message=err_msg,
                                status_code=500,
                                request_id=request_id,
                                body=e,
                                retryable=False,
                            ),
                            recoverable=False,
                        ),
                    )
                    raise

        finally:
            logger.info("main output response stream processing task exiting")
            self._is_sess_active.clear()

    async def _restart_session(self, ex: Exception) -> None:
        if self._session_restart_attempts >= DEFAULT_MAX_SESSION_RESTART_ATTEMPTS:
            logger.error("Max session restart attempts reached, exiting")
            err_msg = getattr(ex, "message", str(ex))
            request_id = None
            try:
                request_id = err_msg.split(" ")[0].split("=")[1]
            except Exception:
                pass
            self.emit(
                "error",
                llm.RealtimeModelError(
                    timestamp=time.monotonic(),
                    label=self._realtime_model._label,
                    error=APIStatusError(
                        message=f"Max restart attempts exceeded: {err_msg}",
                        status_code=500,
                        request_id=request_id,
                        body=ex,
                        retryable=False,
                    ),
                    recoverable=False,
                ),
            )
            self._is_sess_active.clear()
            return
        self._session_restart_attempts += 1
        self._is_sess_active.clear()
        delay = 2 ** (self._session_restart_attempts - 1) - 1
        await asyncio.sleep(min(delay, DEFAULT_MAX_SESSION_RESTART_DELAY))
        await self.initialize_streams(is_restart=True)
        logger.info(
            f"Session restarted successfully ({self._session_restart_attempts}/{DEFAULT_MAX_SESSION_RESTART_ATTEMPTS})"  # noqa: E501
        )

    @property
    def chat_ctx(self) -> llm.ChatContext:
        return self._chat_ctx.copy()

    @property
    def tools(self) -> llm.ToolContext:
        return self._tools.copy()

    async def update_instructions(self, instructions: str) -> None:
        """Injects the system prompt at the start of the session."""
        self._instructions = instructions
        self._instructions_ready.set_result(True)
        logger.debug(f"Instructions updated: {instructions}")

    async def update_chat_ctx(self, chat_ctx: llm.ChatContext) -> None:
        """Inject an initial chat history once during the very first session startup."""
        # sometimes fires randomly
        # add a guard here to only allow chat_ctx to be updated on
        # the very first session initialization
        if not self._chat_ctx_ready.done():
            self._chat_ctx = chat_ctx.copy()
            logger.debug(f"Chat context updated: {self._chat_ctx.items}")
            self._chat_ctx_ready.set_result(True)

        # for each function tool, send the result to aws
        for item in chat_ctx.items:
            if item.type != "function_call_output":
                continue

            if item.call_id not in self._pending_tools:
                continue

            logger.debug(f"function call output: {item}")
            self._pending_tools.discard(item.call_id)
            self._tool_results_ch.send_nowait(
                {
                    "tool_use_id": item.call_id,
                    "tool_result": item.output
                    if not item.is_error
                    else f"{{'error': '{item.output}'}}",
                }
            )

    async def _send_tool_events(self, tool_use_id: str, tool_result: str) -> None:
        """Send tool_result back to Bedrock, grouped under tool_use_id."""
        tool_content_name = str(uuid.uuid4())
        tool_events = self._event_builder.create_tool_content_block(
            content_name=tool_content_name,
            tool_use_id=tool_use_id,
            content=tool_result,
        )
        for event in tool_events:
            await self._send_raw_event(event)
            # logger.debug(f"Sent tool event: {event}")

    def _tool_choice_adapter(
        self, tool_choice: llm.ToolChoice | None
    ) -> dict[str, dict[str, str]] | None:
        """Translate the LiveKit ToolChoice enum into Sonic's JSON schema."""
        if tool_choice == "auto":
            return {"auto": {}}
        elif tool_choice == "required":
            return {"any": {}}
        elif isinstance(tool_choice, dict) and tool_choice["type"] == "function":
            return {"tool": {"name": tool_choice["function"]["name"]}}
        else:
            return None

    # note: return value from tool functions registered to Sonic must be Structured Output (a dict that is JSON serializable)  # noqa: E501
    async def update_tools(self, tools: list[llm.FunctionTool | llm.RawFunctionTool | Any]) -> None:
        """Replace the active tool set with tools and notify Sonic if necessary."""
        logger.debug(f"Updating tools: {tools}")
        retained_tools: list[llm.FunctionTool | llm.RawFunctionTool] = []

        for tool in tools:
            retained_tools.append(tool)
        self._tools = llm.ToolContext(retained_tools)
        if retained_tools:
            self._tools_ready.set_result(True)
            logger.debug("Tool list has been injected")

    def update_options(self, *, tool_choice: NotGivenOr[llm.ToolChoice | None] = NOT_GIVEN) -> None:
        """Live update of inference options is not supported by Sonic yet."""
        logger.warning(
            "updating inference configuration options is not yet supported by Nova Sonic's Realtime API"  # noqa: E501
        )

    @utils.log_exceptions(logger=logger)
    def _resample_audio(self, frame: rtc.AudioFrame) -> Iterator[rtc.AudioFrame]:
        """Ensure mic audio matches Sonic's required sample rate & channels."""
        if self._input_resampler:
            if frame.sample_rate != self._input_resampler._input_rate:
                self._input_resampler = None

        if self._input_resampler is None and (
            frame.sample_rate != DEFAULT_INPUT_SAMPLE_RATE or frame.num_channels != DEFAULT_CHANNELS
        ):
            self._input_resampler = rtc.AudioResampler(
                input_rate=frame.sample_rate,
                output_rate=DEFAULT_INPUT_SAMPLE_RATE,
                num_channels=DEFAULT_CHANNELS,
            )

        if self._input_resampler:
            # flush the resampler when the input source is changed
            yield from self._input_resampler.push(frame)
        else:
            yield frame

    @utils.log_exceptions(logger=logger)
    async def _process_audio_input(self) -> None:
        """Background task that feeds audio and tool results into the Bedrock stream."""
        await self._send_raw_event(self._event_builder.create_audio_content_start_event())
        logger.info("Starting audio input processing loop")
        while self._is_sess_active.is_set():
            try:
                # note: could potentially pull this out into a separate task
                try:
                    val = self._tool_results_ch.recv_nowait()
                    tool_result = val["tool_result"]
                    tool_use_id = val["tool_use_id"]
                    if not isinstance(tool_result, str):
                        tool_result = json.dumps(tool_result)
                    else:
                        try:
                            json.loads(tool_result)
                        except json.JSONDecodeError:
                            try:
                                tool_result = json.dumps(ast.literal_eval(tool_result))
                            except Exception:
                                # return the original value
                                pass

                    logger.debug(f"Sending tool result: {tool_result}")
                    await self._send_tool_events(tool_use_id, tool_result)

                except utils.aio.channel.ChanEmpty:
                    pass
                except utils.aio.channel.ChanClosed:
                    logger.warning(
                        "tool results channel closed, exiting audio input processing loop"
                    )
                    break

                try:
                    audio_bytes = await self._audio_input_chan.recv()
                    blob = base64.b64encode(audio_bytes)
                    audio_event = self._event_builder.create_audio_input_event(
                        audio_content=blob.decode("utf-8"),
                    )

                    await self._send_raw_event(audio_event)
                except utils.aio.channel.ChanEmpty:
                    pass
                except utils.aio.channel.ChanClosed:
                    logger.warning(
                        "audio input channel closed, exiting audio input processing loop"
                    )
                    break

            except asyncio.CancelledError:
                logger.info("Audio processing loop cancelled")
                self._audio_input_chan.close()
                self._tool_results_ch.close()
                raise
            except Exception:
                logger.exception("Error processing audio")

    # for debugging purposes only
    def _log_significant_audio(self, audio_bytes: bytes) -> None:
        """Utility that prints a debug message when the audio chunk has non-trivial RMS energy."""
        squared_sum = sum(sample**2 for sample in audio_bytes)
        if (squared_sum / len(audio_bytes)) ** 0.5 > 200:
            if lk_bedrock_debug:
                log_message("Enqueuing significant audio chunk", AnsiColors.BLUE)

    @utils.log_exceptions(logger=logger)
    def push_audio(self, frame: rtc.AudioFrame) -> None:
        """Enqueue an incoming mic rtc.AudioFrame for transcription."""
        if not self._audio_input_chan.closed:
            # logger.debug(f"Raw audio received: samples={len(frame.data)} rate={frame.sample_rate} channels={frame.num_channels}")  # noqa: E501
            for f in self._resample_audio(frame):
                # logger.debug(f"Resampled audio: samples={len(frame.data)} rate={frame.sample_rate} channels={frame.num_channels}")  # noqa: E501

                for nf in self._bstream.write(f.data.tobytes()):
                    self._log_significant_audio(nf.data)
                    self._audio_input_chan.send_nowait(nf.data)
        else:
            logger.warning("audio input channel closed, skipping audio")

    def generate_reply(
        self,
        *,
        instructions: NotGivenOr[str] = NOT_GIVEN,
    ) -> asyncio.Future[llm.GenerationCreatedEvent]:
        logger.warning("unprompted generation is not supported by Nova Sonic's Realtime API")
        fut = asyncio.Future[llm.GenerationCreatedEvent]()
        fut.set_exception(
            llm.RealtimeError("unprompted generation is not supported by Nova Sonic's Realtime API")
        )
        return fut

    def commit_audio(self) -> None:
        logger.warning("commit_audio is not supported by Nova Sonic's Realtime API")

    def clear_audio(self) -> None:
        logger.warning("clear_audio is not supported by Nova Sonic's Realtime API")

    def push_video(self, frame: rtc.VideoFrame) -> None:
        logger.warning("video is not supported by Nova Sonic's Realtime API")

    def interrupt(self) -> None:
        logger.warning("interrupt is not supported by Nova Sonic's Realtime API")

    def truncate(
        self,
        *,
        message_id: str,
        modalities: list[Literal["text", "audio"]],
        audio_end_ms: int,
        audio_transcript: NotGivenOr[str] = NOT_GIVEN,
    ) -> None:
        logger.warning("truncate is not supported by Nova Sonic's Realtime API")

    @utils.log_exceptions(logger=logger)
    async def aclose(self) -> None:
        """Gracefully shut down the realtime session and release network resources."""
        logger.info("attempting to shutdown agent session")
        if not self._is_sess_active.is_set():
            logger.info("agent session already inactive")
            return

        for event in self._event_builder.create_prompt_end_block():
            await self._send_raw_event(event)
        # allow event loops to fall out naturally
        # otherwise, the smithy layer will raise an InvalidStateError during cancellation
        self._is_sess_active.clear()

        if self._stream_response and not self._stream_response.output_stream.closed:
            await self._stream_response.output_stream.close()

        # note: even after the self.is_active flag is flipped and the output stream is closed,
        # there is a future inside output_stream.receive() at the AWS-CRT C layer that blocks
        # resulting in an error after cancellation
        # however, it's mostly cosmetic-- the event loop will still exit
        # TODO: fix this nit
        tasks: list[asyncio.Task[Any]] = []
        if self._response_task:
            try:
                await asyncio.wait_for(self._response_task, timeout=1.0)
            except asyncio.TimeoutError:
                logger.warning("shutdown of output event loop timed out-- cancelling")
                self._response_task.cancel()
            tasks.append(self._response_task)

        # must cancel the audio input task before closing the input stream
        if self._audio_input_task and not self._audio_input_task.done():
            self._audio_input_task.cancel()
            tasks.append(self._audio_input_task)
        if self._stream_response and not self._stream_response.input_stream.closed:
            await self._stream_response.input_stream.close()

        await asyncio.gather(*tasks, return_exceptions=True)
        logger.debug(f"CHAT CONTEXT: {self._chat_ctx.items}")
        logger.info("Session end")

Bidirectional streaming session against the Nova Sonic Bedrock runtime.

The session owns two asynchronous tasks:

  1. _process_audio_input – pushes user mic audio and tool results to Bedrock.
  2. _process_responses – receives server events from Bedrock and converts them into LiveKit abstractions such as llm.MessageGeneration.

A set of helper handlers (handle*) transform the low-level Bedrock JSON payloads into higher-level application events and keep _ResponseGeneration state in sync.

Create and wire-up a new realtime session.

Args

realtime_model : RealtimeModel
Parent model instance that stores static inference options and the Smithy Bedrock client configuration.

Ancestors

  • livekit.agents.llm.realtime.RealtimeSession
  • abc.ABC
  • EventEmitter
  • typing.Generic

Instance variables

prop chat_ctx : llm.ChatContext
Expand source code
@property
def chat_ctx(self) -> llm.ChatContext:
    return self._chat_ctx.copy()
prop tools : llm.ToolContext
Expand source code
@property
def tools(self) -> llm.ToolContext:
    return self._tools.copy()

Methods

async def aclose(self) ‑> None
Expand source code
@utils.log_exceptions(logger=logger)
async def aclose(self) -> None:
    """Gracefully shut down the realtime session and release network resources."""
    logger.info("attempting to shutdown agent session")
    if not self._is_sess_active.is_set():
        logger.info("agent session already inactive")
        return

    for event in self._event_builder.create_prompt_end_block():
        await self._send_raw_event(event)
    # allow event loops to fall out naturally
    # otherwise, the smithy layer will raise an InvalidStateError during cancellation
    self._is_sess_active.clear()

    if self._stream_response and not self._stream_response.output_stream.closed:
        await self._stream_response.output_stream.close()

    # note: even after the self.is_active flag is flipped and the output stream is closed,
    # there is a future inside output_stream.receive() at the AWS-CRT C layer that blocks
    # resulting in an error after cancellation
    # however, it's mostly cosmetic-- the event loop will still exit
    # TODO: fix this nit
    tasks: list[asyncio.Task[Any]] = []
    if self._response_task:
        try:
            await asyncio.wait_for(self._response_task, timeout=1.0)
        except asyncio.TimeoutError:
            logger.warning("shutdown of output event loop timed out-- cancelling")
            self._response_task.cancel()
        tasks.append(self._response_task)

    # must cancel the audio input task before closing the input stream
    if self._audio_input_task and not self._audio_input_task.done():
        self._audio_input_task.cancel()
        tasks.append(self._audio_input_task)
    if self._stream_response and not self._stream_response.input_stream.closed:
        await self._stream_response.input_stream.close()

    await asyncio.gather(*tasks, return_exceptions=True)
    logger.debug(f"CHAT CONTEXT: {self._chat_ctx.items}")
    logger.info("Session end")

Gracefully shut down the realtime session and release network resources.

def clear_audio(self) ‑> None
Expand source code
def clear_audio(self) -> None:
    logger.warning("clear_audio is not supported by Nova Sonic's Realtime API")
def commit_audio(self) ‑> None
Expand source code
def commit_audio(self) -> None:
    logger.warning("commit_audio is not supported by Nova Sonic's Realtime API")
def emit_generation_event(self) ‑> None
Expand source code
@utils.log_exceptions(logger=logger)
def emit_generation_event(self) -> None:
    """Publish a llm.GenerationCreatedEvent to external subscribers."""
    logger.debug("Emitting generation event")
    assert self._current_generation is not None, "current_generation is None"

    generation_ev = llm.GenerationCreatedEvent(
        message_stream=self._current_generation.message_ch,
        function_stream=self._current_generation.function_ch,
        user_initiated=False,
    )
    self.emit("generation_created", generation_ev)

Publish a llm.GenerationCreatedEvent to external subscribers.

def generate_reply(self, *, instructions: NotGivenOr[str] = NOT_GIVEN) ‑> _asyncio.Future[livekit.agents.llm.realtime.GenerationCreatedEvent]
Expand source code
def generate_reply(
    self,
    *,
    instructions: NotGivenOr[str] = NOT_GIVEN,
) -> asyncio.Future[llm.GenerationCreatedEvent]:
    logger.warning("unprompted generation is not supported by Nova Sonic's Realtime API")
    fut = asyncio.Future[llm.GenerationCreatedEvent]()
    fut.set_exception(
        llm.RealtimeError("unprompted generation is not supported by Nova Sonic's Realtime API")
    )
    return fut
async def initialize_streams(self, is_restart: bool = False) ‑> None
Expand source code
@utils.log_exceptions(logger=logger)
async def initialize_streams(self, is_restart: bool = False) -> None:
    """Open the Bedrock bidirectional stream and spawn background worker tasks.

    This coroutine is idempotent and can be invoked again when recoverable
    errors (e.g. timeout, throttling) require a fresh session.

    Args:
        is_restart (bool, optional): Marks whether we are re-initialising an
            existing session after an error. Defaults to False.
    """
    try:
        if not self._bedrock_client:
            logger.info("Creating Bedrock client")
            self._initialize_client()
        assert self._bedrock_client is not None, "bedrock_client is None"

        logger.info("Initializing Bedrock stream")
        self._stream_response = (
            await self._bedrock_client.invoke_model_with_bidirectional_stream(
                InvokeModelWithBidirectionalStreamOperationInput(
                    model_id=self._realtime_model.model_id
                )
            )
        )

        if not is_restart:
            pending_events: list[asyncio.Future] = []
            if not self.tools.function_tools:
                pending_events.append(self._tools_ready)
            if not self._instructions_ready.done():
                pending_events.append(self._instructions_ready)
            if not self._chat_ctx_ready.done():
                pending_events.append(self._chat_ctx_ready)

            # note: can't know during sess init whether tools were not added
            # or if they were added haven't yet been updated
            # therefore in the case there are no tools, we wait the entire timeout
            try:
                if pending_events:
                    await asyncio.wait_for(asyncio.gather(*pending_events), timeout=0.5)
            except asyncio.TimeoutError:
                if not self._tools_ready.done():
                    logger.warning("Tools not ready after 500ms, continuing without them")

                if not self._instructions_ready.done():
                    logger.warning(
                        "Instructions not received after 500ms, proceeding with default instructions"  # noqa: E501
                    )
                if not self._chat_ctx_ready.done():
                    logger.warning(
                        "Chat context not received after 500ms, proceeding with empty chat context"  # noqa: E501
                    )

        logger.info(
            f"Initializing Bedrock session with realtime options: {self._realtime_model._opts}"
        )
        # there is a 40-message limit on the chat context
        if len(self._chat_ctx.items) > MAX_MESSAGES:
            logger.warning(
                f"Chat context has {len(self._chat_ctx.items)} messages, truncating to {MAX_MESSAGES}"  # noqa: E501
            )
            self._chat_ctx.truncate(max_items=MAX_MESSAGES)
        init_events = self._event_builder.create_prompt_start_block(
            voice_id=self._realtime_model._opts.voice,
            sample_rate=DEFAULT_OUTPUT_SAMPLE_RATE,  # type: ignore
            system_content=self._instructions,
            chat_ctx=self.chat_ctx,
            tool_configuration=self._serialize_tool_config(),
            max_tokens=self._realtime_model._opts.max_tokens,
            top_p=self._realtime_model._opts.top_p,
            temperature=self._realtime_model._opts.temperature,
        )

        for event in init_events:
            await self._send_raw_event(event)
            logger.debug(f"Sent event: {event}")

        if not is_restart:
            self._audio_input_task = asyncio.create_task(
                self._process_audio_input(), name="RealtimeSession._process_audio_input"
            )

        self._response_task = asyncio.create_task(
            self._process_responses(), name="RealtimeSession._process_responses"
        )
        self._is_sess_active.set()
        logger.debug("Stream initialized successfully")
    except Exception as e:
        logger.debug(f"Failed to initialize stream: {str(e)}")
        raise
    return self

Open the Bedrock bidirectional stream and spawn background worker tasks.

This coroutine is idempotent and can be invoked again when recoverable errors (e.g. timeout, throttling) require a fresh session.

Args

is_restart : bool, optional
Marks whether we are re-initialising an existing session after an error. Defaults to False.
def interrupt(self) ‑> None
Expand source code
def interrupt(self) -> None:
    logger.warning("interrupt is not supported by Nova Sonic's Realtime API")
def push_audio(self, frame: rtc.AudioFrame) ‑> None
Expand source code
@utils.log_exceptions(logger=logger)
def push_audio(self, frame: rtc.AudioFrame) -> None:
    """Enqueue an incoming mic rtc.AudioFrame for transcription."""
    if not self._audio_input_chan.closed:
        # logger.debug(f"Raw audio received: samples={len(frame.data)} rate={frame.sample_rate} channels={frame.num_channels}")  # noqa: E501
        for f in self._resample_audio(frame):
            # logger.debug(f"Resampled audio: samples={len(frame.data)} rate={frame.sample_rate} channels={frame.num_channels}")  # noqa: E501

            for nf in self._bstream.write(f.data.tobytes()):
                self._log_significant_audio(nf.data)
                self._audio_input_chan.send_nowait(nf.data)
    else:
        logger.warning("audio input channel closed, skipping audio")

Enqueue an incoming mic rtc.AudioFrame for transcription.

def push_video(self, frame: rtc.VideoFrame) ‑> None
Expand source code
def push_video(self, frame: rtc.VideoFrame) -> None:
    logger.warning("video is not supported by Nova Sonic's Realtime API")
def truncate(self,
*,
message_id: str,
modalities: "list[Literal['text', 'audio']]",
audio_end_ms: int,
audio_transcript: NotGivenOr[str] = NOT_GIVEN) ‑> None
Expand source code
def truncate(
    self,
    *,
    message_id: str,
    modalities: list[Literal["text", "audio"]],
    audio_end_ms: int,
    audio_transcript: NotGivenOr[str] = NOT_GIVEN,
) -> None:
    logger.warning("truncate is not supported by Nova Sonic's Realtime API")
async def update_chat_ctx(self, chat_ctx: llm.ChatContext) ‑> None
Expand source code
async def update_chat_ctx(self, chat_ctx: llm.ChatContext) -> None:
    """Inject an initial chat history once during the very first session startup."""
    # sometimes fires randomly
    # add a guard here to only allow chat_ctx to be updated on
    # the very first session initialization
    if not self._chat_ctx_ready.done():
        self._chat_ctx = chat_ctx.copy()
        logger.debug(f"Chat context updated: {self._chat_ctx.items}")
        self._chat_ctx_ready.set_result(True)

    # for each function tool, send the result to aws
    for item in chat_ctx.items:
        if item.type != "function_call_output":
            continue

        if item.call_id not in self._pending_tools:
            continue

        logger.debug(f"function call output: {item}")
        self._pending_tools.discard(item.call_id)
        self._tool_results_ch.send_nowait(
            {
                "tool_use_id": item.call_id,
                "tool_result": item.output
                if not item.is_error
                else f"{{'error': '{item.output}'}}",
            }
        )

Inject an initial chat history once during the very first session startup.

async def update_instructions(self, instructions: str) ‑> None
Expand source code
async def update_instructions(self, instructions: str) -> None:
    """Injects the system prompt at the start of the session."""
    self._instructions = instructions
    self._instructions_ready.set_result(True)
    logger.debug(f"Instructions updated: {instructions}")

Injects the system prompt at the start of the session.

def update_options(self, *, tool_choice: NotGivenOr[llm.ToolChoice | None] = NOT_GIVEN) ‑> None
Expand source code
def update_options(self, *, tool_choice: NotGivenOr[llm.ToolChoice | None] = NOT_GIVEN) -> None:
    """Live update of inference options is not supported by Sonic yet."""
    logger.warning(
        "updating inference configuration options is not yet supported by Nova Sonic's Realtime API"  # noqa: E501
    )

Live update of inference options is not supported by Sonic yet.

async def update_tools(self, tools: list[llm.FunctionTool | llm.RawFunctionTool | Any]) ‑> None
Expand source code
async def update_tools(self, tools: list[llm.FunctionTool | llm.RawFunctionTool | Any]) -> None:
    """Replace the active tool set with tools and notify Sonic if necessary."""
    logger.debug(f"Updating tools: {tools}")
    retained_tools: list[llm.FunctionTool | llm.RawFunctionTool] = []

    for tool in tools:
        retained_tools.append(tool)
    self._tools = llm.ToolContext(retained_tools)
    if retained_tools:
        self._tools_ready.set_result(True)
        logger.debug("Tool list has been injected")

Replace the active tool set with tools and notify Sonic if necessary.

Inherited members