Module livekit.agents.voice.room_io

Classes

class AudioInputOptions (sample_rate: int = 24000,
num_channels: int = 1,
frame_size_ms: int = 50,
noise_cancellation: rtc.NoiseCancellationOptions | NoiseCancellationSelector | None = None,
pre_connect_audio: bool = True,
pre_connect_audio_timeout: float = 3.0)
Expand source code
@dataclass
class AudioInputOptions:
    sample_rate: int = 24000
    num_channels: int = 1
    frame_size_ms: int = 50
    """The frame size in milliseconds for the audio input."""
    noise_cancellation: rtc.NoiseCancellationOptions | NoiseCancellationSelector | None = None
    pre_connect_audio: bool = True
    """Pre-connect audio enabled or not."""
    pre_connect_audio_timeout: float = 3.0
    """The pre-connect audio will be ignored if it doesn't arrive within this time."""

AudioInputOptions(sample_rate: 'int' = 24000, num_channels: 'int' = 1, frame_size_ms: 'int' = 50, noise_cancellation: 'rtc.NoiseCancellationOptions | NoiseCancellationSelector | None' = None, pre_connect_audio: 'bool' = True, pre_connect_audio_timeout: 'float' = 3.0)

Instance variables

var frame_size_ms : int

The frame size in milliseconds for the audio input.

var noise_cancellationNoiseCancellationOptions | Callable[[livekit.agents.voice.room_io.types.NoiseCancellationParams], NoiseCancellationOptions | None] | None
var num_channels : int
var pre_connect_audio : bool

Pre-connect audio enabled or not.

var pre_connect_audio_timeout : float

The pre-connect audio will be ignored if it doesn't arrive within this time.

var sample_rate : int
class AudioOutputOptions (sample_rate: int = 24000,
num_channels: int = 1,
track_publish_options: rtc.TrackPublishOptions = <factory>,
track_name: NotGivenOr[str] = NOT_GIVEN)
Expand source code
@dataclass
class AudioOutputOptions:
    sample_rate: int = 24000
    num_channels: int = 1
    track_publish_options: rtc.TrackPublishOptions = field(
        default_factory=lambda: rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_MICROPHONE)
    )
    track_name: NotGivenOr[str] = NOT_GIVEN
    """The name of the audio track to publish. If not provided, default to "roomio_audio"."""

AudioOutputOptions(sample_rate: 'int' = 24000, num_channels: 'int' = 1, track_publish_options: 'rtc.TrackPublishOptions' = , track_name: 'NotGivenOr[str]' = NOT_GIVEN)

Instance variables

var num_channels : int
var sample_rate : int
var track_name : str | livekit.agents.types.NotGiven

The name of the audio track to publish. If not provided, default to "roomio_audio".

var track_publish_options : room_pb2.TrackPublishOptions
class RoomIO (agent_session: AgentSession,
room: rtc.Room,
*,
participant: rtc.RemoteParticipant | str | None = None,
options: NotGivenOr[RoomOptions] = NOT_GIVEN,
input_options: NotGivenOr[RoomInputOptions] = NOT_GIVEN,
output_options: NotGivenOr[RoomOutputOptions] = NOT_GIVEN)
Expand source code
class RoomIO:
    def __init__(
        self,
        agent_session: AgentSession,
        room: rtc.Room,
        *,
        participant: rtc.RemoteParticipant | str | None = None,
        options: NotGivenOr[RoomOptions] = NOT_GIVEN,
        # deprecated
        input_options: NotGivenOr[RoomInputOptions] = NOT_GIVEN,
        output_options: NotGivenOr[RoomOutputOptions] = NOT_GIVEN,
    ) -> None:
        self._options = RoomOptions._ensure_options(
            options, room_input_options=input_options, room_output_options=output_options
        )
        self._text_input_cb: TextInputCallback | None = None

        self._agent_session, self._room = agent_session, room
        # self._input_options = input_options
        # self._output_options = output_options
        self._participant_identity = (
            participant.identity if isinstance(participant, rtc.RemoteParticipant) else participant
        )
        if self._participant_identity is None and utils.is_given(
            self._options.participant_identity
        ):
            self._participant_identity = self._options.participant_identity

        self._audio_input: _ParticipantAudioInputStream | None = None
        self._video_input: _ParticipantVideoInputStream | None = None
        self._audio_output: _ParticipantAudioOutput | None = None
        self._user_tr_output: _ParticipantTranscriptionOutput | None = None
        self._agent_tr_output: _ParticipantTranscriptionOutput | None = None
        self._tr_synchronizer: TranscriptSynchronizer | None = None

        self._participant_available_fut = asyncio.Future[rtc.RemoteParticipant]()
        self._room_connected_fut = asyncio.Future[None]()

        self._init_atask: asyncio.Task[None] | None = None
        self._user_transcript_ch: utils.aio.Chan[UserInputTranscribedEvent] | None = None
        self._user_transcript_atask: asyncio.Task[None] | None = None
        self._tasks: set[asyncio.Task[Any]] = set()
        self._update_state_atask: asyncio.Task[None] | None = None
        self._close_session_atask: asyncio.Task[None] | None = None
        self._delete_room_task: asyncio.Future[api.DeleteRoomResponse] | None = None

        self._pre_connect_audio_handler: PreConnectAudioHandler | None = None
        self._text_stream_handler_registered = False

    async def start(self) -> None:
        # -- create inputs --
        input_audio_options = self._options.get_audio_input_options()
        if input_audio_options and input_audio_options.pre_connect_audio:
            self._pre_connect_audio_handler = PreConnectAudioHandler(
                room=self._room,
                timeout=input_audio_options.pre_connect_audio_timeout,
            )
            self._pre_connect_audio_handler.register()

        input_text_options = self._options.get_text_input_options()
        if input_text_options:
            self._text_input_cb = input_text_options.text_input_cb
            try:
                self._room.register_text_stream_handler(TOPIC_CHAT, self._on_user_text_input)
                self._text_stream_handler_registered = True
            except ValueError:
                if utils.is_given(self._options.text_input):
                    logger.warning(
                        f"text stream handler for topic '{TOPIC_CHAT}' already set, ignoring"
                    )
        else:
            self._text_input_cb = None

        input_video_options = self._options.get_video_input_options()
        if input_video_options:
            self._video_input = _ParticipantVideoInputStream(self._room)

        if input_audio_options:
            self._audio_input = _ParticipantAudioInputStream(
                self._room,
                sample_rate=input_audio_options.sample_rate,
                num_channels=input_audio_options.num_channels,
                frame_size_ms=input_audio_options.frame_size_ms,
                noise_cancellation=input_audio_options.noise_cancellation,
                pre_connect_audio_handler=self._pre_connect_audio_handler,
            )

        # -- create outputs --
        output_audio_options = self._options.get_audio_output_options()
        if output_audio_options:
            self._audio_output = _ParticipantAudioOutput(
                self._room,
                sample_rate=output_audio_options.sample_rate,
                num_channels=output_audio_options.num_channels,
                track_publish_options=output_audio_options.track_publish_options,
                track_name=(
                    output_audio_options.track_name
                    if utils.is_given(output_audio_options.track_name)
                    else "roomio_audio"
                ),
            )

        output_text_options = self._options.get_text_output_options()
        if output_text_options:
            self._user_tr_output = _ParticipantTranscriptionOutput(
                room=self._room, is_delta_stream=False, participant=self._participant_identity
            )
            self._user_transcript_ch = utils.aio.Chan[UserInputTranscribedEvent]()
            self._user_transcript_atask = asyncio.create_task(
                self._forward_user_transcript(self._user_transcript_ch)
            )

            # TODO(long): add next in the chain for session.output.transcription
            self._agent_tr_output = _ParticipantTranscriptionOutput(
                room=self._room, is_delta_stream=True, participant=None
            )

            # use the RoomIO's audio output if available, otherwise use the agent's audio output
            # (e.g the audio output isn't using RoomIO with our avatar datastream impl)
            if output_text_options.sync_transcription is not False and (
                audio_output := self._audio_output or self._agent_session.output.audio
            ):
                self._tr_synchronizer = TranscriptSynchronizer(
                    next_in_chain_audio=audio_output,
                    next_in_chain_text=self._agent_tr_output,
                    speed=output_text_options.transcription_speed_factor,
                )

        # -- set the room event handlers --
        self._room.on("participant_connected", self._on_participant_connected)
        self._room.on("connection_state_changed", self._on_connection_state_changed)
        self._room.on("participant_disconnected", self._on_participant_disconnected)
        if self._room.isconnected():
            self._on_connection_state_changed(rtc.ConnectionState.CONN_CONNECTED)

        self._init_atask = asyncio.create_task(self._init_task())

        # -- attach to the agent session --
        if self.audio_input:
            self._agent_session.input.audio = self.audio_input

        if self.video_input:
            self._agent_session.input.video = self.video_input

        if self.audio_output:
            self._agent_session.output.audio = self.audio_output

        if self.transcription_output:
            self._agent_session.output.transcription = self.transcription_output

        self._agent_session.on("agent_state_changed", self._on_agent_state_changed)
        self._agent_session.on("user_input_transcribed", self._on_user_input_transcribed)
        self._agent_session.on("close", self._on_agent_session_close)

    @property
    def room(self) -> rtc.Room:
        return self._room

    async def aclose(self) -> None:
        self._room.off("participant_connected", self._on_participant_connected)
        self._room.off("connection_state_changed", self._on_connection_state_changed)
        self._agent_session.off("agent_state_changed", self._on_agent_state_changed)
        self._agent_session.off("user_input_transcribed", self._on_user_input_transcribed)
        self._agent_session.off("close", self._on_agent_session_close)

        if self._text_stream_handler_registered:
            self._room.unregister_text_stream_handler(TOPIC_CHAT)
            self._text_stream_handler_registered = False

        if self._init_atask:
            await utils.aio.cancel_and_wait(self._init_atask)

        if self._user_transcript_ch:
            self._user_transcript_ch.close()
        if self._user_transcript_atask:
            await utils.aio.cancel_and_wait(self._user_transcript_atask)

        if self._update_state_atask:
            await utils.aio.cancel_and_wait(self._update_state_atask)

        if self._pre_connect_audio_handler:
            await self._pre_connect_audio_handler.aclose()

        if self._audio_input:
            await self._audio_input.aclose()
        if self._video_input:
            await self._video_input.aclose()

        if self._tr_synchronizer:
            await self._tr_synchronizer.aclose()

        if self._audio_output:
            await self._audio_output.aclose()

        # cancel and wait for all pending tasks
        await utils.aio.cancel_and_wait(*self._tasks)
        self._tasks.clear()

    @property
    def audio_output(self) -> AudioOutput | None:
        if self._tr_synchronizer:
            return self._tr_synchronizer.audio_output

        return self._audio_output

    @property
    def transcription_output(self) -> TextOutput | None:
        if self._tr_synchronizer:
            return self._tr_synchronizer.text_output

        return self._agent_tr_output

    @property
    def audio_input(self) -> AudioInput | None:
        return self._audio_input

    @property
    def video_input(self) -> VideoInput | None:
        return self._video_input

    @property
    def linked_participant(self) -> rtc.RemoteParticipant | None:
        if not self._participant_available_fut.done():
            return None

        return self._participant_available_fut.result()

    @property
    def subscribed_fut(self) -> asyncio.Future[None] | None:
        if self._audio_output:
            return self._audio_output.subscribed
        return None

    def set_participant(self, participant_identity: str | None) -> None:
        """Switch audio and video streams to specified participant"""
        if participant_identity is None:
            self.unset_participant()
            return

        if (
            self._participant_identity is not None
            and self._participant_identity != participant_identity
        ):
            # reset future if switching to a different participant
            self._participant_available_fut = asyncio.Future[rtc.RemoteParticipant]()

            # check if new participant is already connected
            for participant in self._room.remote_participants.values():
                if participant.identity == participant_identity:
                    self._participant_available_fut.set_result(participant)
                    break

        # update participant identity and handlers
        self._participant_identity = participant_identity
        if self._audio_input:
            self._audio_input.set_participant(participant_identity)
        if self._video_input:
            self._video_input.set_participant(participant_identity)

        if self._user_tr_output:
            self._user_tr_output.set_participant(participant_identity)

    def unset_participant(self) -> None:
        self._participant_identity = None
        self._participant_available_fut = asyncio.Future[rtc.RemoteParticipant]()
        if self._audio_input:
            self._audio_input.set_participant(None)
        if self._video_input:
            self._video_input.set_participant(None)

        if self._user_tr_output:
            self._user_tr_output.set_participant(None)

    @utils.log_exceptions(logger=logger)
    async def _init_task(self) -> None:
        await self._room_connected_fut

        # check existing participants
        for participant in self._room.remote_participants.values():
            self._on_participant_connected(participant)

        participant = await self._participant_available_fut
        self.set_participant(participant.identity)

        # init outputs
        if self._agent_tr_output:
            self._agent_tr_output.set_participant(self._room.local_participant.identity)

        if self._audio_output:
            await self._audio_output.start()

    @utils.log_exceptions(logger=logger)
    async def _forward_user_transcript(
        self, event_ch: utils.aio.Chan[UserInputTranscribedEvent]
    ) -> None:
        async for ev in event_ch:
            if self._user_tr_output is None:
                continue

            await self._user_tr_output.capture_text(ev.transcript)
            if ev.is_final:
                self._user_tr_output.flush()

    def _on_connection_state_changed(self, state: rtc.ConnectionState.ValueType) -> None:
        if self._room.isconnected() and not self._room_connected_fut.done():
            self._room_connected_fut.set_result(None)

    def _on_participant_connected(self, participant: rtc.RemoteParticipant) -> None:
        if self._participant_available_fut.done():
            return

        if self._participant_identity is not None:
            if participant.identity != self._participant_identity:
                return
        # otherwise, skip participants that are marked as publishing for this agent
        elif (
            participant.attributes.get(ATTRIBUTE_PUBLISH_ON_BEHALF)
            == self._room.local_participant.identity
        ):
            return

        accepted_kinds = self._options.participant_kinds or DEFAULT_PARTICIPANT_KINDS
        if participant.kind not in accepted_kinds:
            # not an accepted participant kind, skip
            return

        self._participant_available_fut.set_result(participant)

    def _on_participant_disconnected(self, participant: rtc.RemoteParticipant) -> None:
        if not (linked := self.linked_participant) or participant.identity != linked.identity:
            return
        self._participant_available_fut = asyncio.Future[rtc.RemoteParticipant]()

        if (
            self._options.close_on_disconnect
            and participant.disconnect_reason in DEFAULT_CLOSE_ON_DISCONNECT_REASONS
            and not self._close_session_atask
            and not self._delete_room_task
        ):
            logger.info(
                "closing agent session due to participant disconnect "
                "(disable via `RoomInputOptions.close_on_disconnect=False`)",
                extra={
                    "room": self._room.name,
                    "participant": participant.identity,
                    "reason": rtc.DisconnectReason.Name(
                        participant.disconnect_reason or rtc.DisconnectReason.UNKNOWN_REASON
                    ),
                },
            )
            self._agent_session._close_soon(reason=CloseReason.PARTICIPANT_DISCONNECTED)

    def _on_user_input_transcribed(self, ev: UserInputTranscribedEvent) -> None:
        if self._user_transcript_ch:
            self._user_transcript_ch.send_nowait(ev)

    def _on_user_text_input(self, reader: rtc.TextStreamReader, participant_identity: str) -> None:
        if participant_identity != self._participant_identity:
            return

        participant = self._room.remote_participants.get(participant_identity)
        if not participant:
            logger.warning("participant not found, ignoring text input")
            return

        async def _read_text(text_input_cb: TextInputCallback) -> None:
            text = await reader.read_all()

            text_input_result = text_input_cb(
                self._agent_session,
                TextInputEvent(text=text, info=reader.info, participant=participant),
            )
            if asyncio.iscoroutine(text_input_result):
                await text_input_result

        if self._text_input_cb is None:
            logger.error("text input callback is not set, ignoring text input")
            return

        task = asyncio.create_task(_read_text(self._text_input_cb))
        self._tasks.add(task)
        task.add_done_callback(self._tasks.discard)

    def _on_agent_state_changed(self, ev: AgentStateChangedEvent) -> None:
        @utils.log_exceptions(logger=logger)
        async def _set_state() -> None:
            if self._room.isconnected():
                await self._room.local_participant.set_attributes(
                    {ATTRIBUTE_AGENT_STATE: ev.new_state}
                )

        if self._update_state_atask is not None:
            self._update_state_atask.cancel()

        self._update_state_atask = asyncio.create_task(_set_state())

    def _on_agent_session_close(self, ev: CloseEvent) -> None:
        def _on_delete_room_task_done(task: asyncio.Future[api.DeleteRoomResponse]) -> None:
            self._delete_room_task = None

        if self._options.delete_room_on_close and self._delete_room_task is None:
            job_ctx = get_job_context()
            logger.info(
                "deleting room on agent session close (disable via `RoomInputOptions.delete_room_on_close=False`)",
                extra={"room": self._room.name},
            )
            self._delete_room_task = job_ctx.delete_room(room_name=self._room.name)
            self._delete_room_task.add_done_callback(_on_delete_room_task_done)

Instance variables

prop audio_input : AudioInput | None
Expand source code
@property
def audio_input(self) -> AudioInput | None:
    return self._audio_input
prop audio_output : AudioOutput | None
Expand source code
@property
def audio_output(self) -> AudioOutput | None:
    if self._tr_synchronizer:
        return self._tr_synchronizer.audio_output

    return self._audio_output
prop linked_participant : rtc.RemoteParticipant | None
Expand source code
@property
def linked_participant(self) -> rtc.RemoteParticipant | None:
    if not self._participant_available_fut.done():
        return None

    return self._participant_available_fut.result()
prop room : rtc.Room
Expand source code
@property
def room(self) -> rtc.Room:
    return self._room
prop subscribed_fut : asyncio.Future[None] | None
Expand source code
@property
def subscribed_fut(self) -> asyncio.Future[None] | None:
    if self._audio_output:
        return self._audio_output.subscribed
    return None
prop transcription_output : TextOutput | None
Expand source code
@property
def transcription_output(self) -> TextOutput | None:
    if self._tr_synchronizer:
        return self._tr_synchronizer.text_output

    return self._agent_tr_output
prop video_input : VideoInput | None
Expand source code
@property
def video_input(self) -> VideoInput | None:
    return self._video_input

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    self._room.off("participant_connected", self._on_participant_connected)
    self._room.off("connection_state_changed", self._on_connection_state_changed)
    self._agent_session.off("agent_state_changed", self._on_agent_state_changed)
    self._agent_session.off("user_input_transcribed", self._on_user_input_transcribed)
    self._agent_session.off("close", self._on_agent_session_close)

    if self._text_stream_handler_registered:
        self._room.unregister_text_stream_handler(TOPIC_CHAT)
        self._text_stream_handler_registered = False

    if self._init_atask:
        await utils.aio.cancel_and_wait(self._init_atask)

    if self._user_transcript_ch:
        self._user_transcript_ch.close()
    if self._user_transcript_atask:
        await utils.aio.cancel_and_wait(self._user_transcript_atask)

    if self._update_state_atask:
        await utils.aio.cancel_and_wait(self._update_state_atask)

    if self._pre_connect_audio_handler:
        await self._pre_connect_audio_handler.aclose()

    if self._audio_input:
        await self._audio_input.aclose()
    if self._video_input:
        await self._video_input.aclose()

    if self._tr_synchronizer:
        await self._tr_synchronizer.aclose()

    if self._audio_output:
        await self._audio_output.aclose()

    # cancel and wait for all pending tasks
    await utils.aio.cancel_and_wait(*self._tasks)
    self._tasks.clear()
def set_participant(self, participant_identity: str | None) ‑> None
Expand source code
def set_participant(self, participant_identity: str | None) -> None:
    """Switch audio and video streams to specified participant"""
    if participant_identity is None:
        self.unset_participant()
        return

    if (
        self._participant_identity is not None
        and self._participant_identity != participant_identity
    ):
        # reset future if switching to a different participant
        self._participant_available_fut = asyncio.Future[rtc.RemoteParticipant]()

        # check if new participant is already connected
        for participant in self._room.remote_participants.values():
            if participant.identity == participant_identity:
                self._participant_available_fut.set_result(participant)
                break

    # update participant identity and handlers
    self._participant_identity = participant_identity
    if self._audio_input:
        self._audio_input.set_participant(participant_identity)
    if self._video_input:
        self._video_input.set_participant(participant_identity)

    if self._user_tr_output:
        self._user_tr_output.set_participant(participant_identity)

Switch audio and video streams to specified participant

async def start(self) ‑> None
Expand source code
async def start(self) -> None:
    # -- create inputs --
    input_audio_options = self._options.get_audio_input_options()
    if input_audio_options and input_audio_options.pre_connect_audio:
        self._pre_connect_audio_handler = PreConnectAudioHandler(
            room=self._room,
            timeout=input_audio_options.pre_connect_audio_timeout,
        )
        self._pre_connect_audio_handler.register()

    input_text_options = self._options.get_text_input_options()
    if input_text_options:
        self._text_input_cb = input_text_options.text_input_cb
        try:
            self._room.register_text_stream_handler(TOPIC_CHAT, self._on_user_text_input)
            self._text_stream_handler_registered = True
        except ValueError:
            if utils.is_given(self._options.text_input):
                logger.warning(
                    f"text stream handler for topic '{TOPIC_CHAT}' already set, ignoring"
                )
    else:
        self._text_input_cb = None

    input_video_options = self._options.get_video_input_options()
    if input_video_options:
        self._video_input = _ParticipantVideoInputStream(self._room)

    if input_audio_options:
        self._audio_input = _ParticipantAudioInputStream(
            self._room,
            sample_rate=input_audio_options.sample_rate,
            num_channels=input_audio_options.num_channels,
            frame_size_ms=input_audio_options.frame_size_ms,
            noise_cancellation=input_audio_options.noise_cancellation,
            pre_connect_audio_handler=self._pre_connect_audio_handler,
        )

    # -- create outputs --
    output_audio_options = self._options.get_audio_output_options()
    if output_audio_options:
        self._audio_output = _ParticipantAudioOutput(
            self._room,
            sample_rate=output_audio_options.sample_rate,
            num_channels=output_audio_options.num_channels,
            track_publish_options=output_audio_options.track_publish_options,
            track_name=(
                output_audio_options.track_name
                if utils.is_given(output_audio_options.track_name)
                else "roomio_audio"
            ),
        )

    output_text_options = self._options.get_text_output_options()
    if output_text_options:
        self._user_tr_output = _ParticipantTranscriptionOutput(
            room=self._room, is_delta_stream=False, participant=self._participant_identity
        )
        self._user_transcript_ch = utils.aio.Chan[UserInputTranscribedEvent]()
        self._user_transcript_atask = asyncio.create_task(
            self._forward_user_transcript(self._user_transcript_ch)
        )

        # TODO(long): add next in the chain for session.output.transcription
        self._agent_tr_output = _ParticipantTranscriptionOutput(
            room=self._room, is_delta_stream=True, participant=None
        )

        # use the RoomIO's audio output if available, otherwise use the agent's audio output
        # (e.g the audio output isn't using RoomIO with our avatar datastream impl)
        if output_text_options.sync_transcription is not False and (
            audio_output := self._audio_output or self._agent_session.output.audio
        ):
            self._tr_synchronizer = TranscriptSynchronizer(
                next_in_chain_audio=audio_output,
                next_in_chain_text=self._agent_tr_output,
                speed=output_text_options.transcription_speed_factor,
            )

    # -- set the room event handlers --
    self._room.on("participant_connected", self._on_participant_connected)
    self._room.on("connection_state_changed", self._on_connection_state_changed)
    self._room.on("participant_disconnected", self._on_participant_disconnected)
    if self._room.isconnected():
        self._on_connection_state_changed(rtc.ConnectionState.CONN_CONNECTED)

    self._init_atask = asyncio.create_task(self._init_task())

    # -- attach to the agent session --
    if self.audio_input:
        self._agent_session.input.audio = self.audio_input

    if self.video_input:
        self._agent_session.input.video = self.video_input

    if self.audio_output:
        self._agent_session.output.audio = self.audio_output

    if self.transcription_output:
        self._agent_session.output.transcription = self.transcription_output

    self._agent_session.on("agent_state_changed", self._on_agent_state_changed)
    self._agent_session.on("user_input_transcribed", self._on_user_input_transcribed)
    self._agent_session.on("close", self._on_agent_session_close)
def unset_participant(self) ‑> None
Expand source code
def unset_participant(self) -> None:
    self._participant_identity = None
    self._participant_available_fut = asyncio.Future[rtc.RemoteParticipant]()
    if self._audio_input:
        self._audio_input.set_participant(None)
    if self._video_input:
        self._video_input.set_participant(None)

    if self._user_tr_output:
        self._user_tr_output.set_participant(None)
class RoomInputOptions (text_enabled: NotGivenOr[bool] = NOT_GIVEN,
audio_enabled: NotGivenOr[bool] = NOT_GIVEN,
video_enabled: NotGivenOr[bool] = NOT_GIVEN,
audio_sample_rate: int = 24000,
audio_num_channels: int = 1,
audio_frame_size_ms: int = 50,
noise_cancellation: rtc.NoiseCancellationOptions | None = None,
text_input_cb: TextInputCallback = <function _default_text_input_cb>,
participant_kinds: NotGivenOr[list[rtc.ParticipantKind.ValueType]] = NOT_GIVEN,
participant_identity: NotGivenOr[str] = NOT_GIVEN,
pre_connect_audio: bool = True,
pre_connect_audio_timeout: float = 3.0,
close_on_disconnect: bool = True,
delete_room_on_close: bool = False)
Expand source code
@dataclass
class RoomInputOptions:
    text_enabled: NotGivenOr[bool] = NOT_GIVEN
    """If not given, default to True."""
    audio_enabled: NotGivenOr[bool] = NOT_GIVEN
    """If not given, default to True."""
    video_enabled: NotGivenOr[bool] = NOT_GIVEN
    """If not given, default to False."""
    audio_sample_rate: int = 24000
    audio_num_channels: int = 1
    audio_frame_size_ms: int = 50
    """The frame size in milliseconds for the audio input."""
    noise_cancellation: rtc.NoiseCancellationOptions | None = None
    text_input_cb: TextInputCallback = _default_text_input_cb
    participant_kinds: NotGivenOr[list[rtc.ParticipantKind.ValueType]] = NOT_GIVEN
    """Participant kinds accepted for auto subscription. If not provided,
    accept `DEFAULT_PARTICIPANT_KINDS`."""
    participant_identity: NotGivenOr[str] = NOT_GIVEN
    """The participant to link to. If not provided, link to the first participant.
    Can be overridden by the `participant` argument of RoomIO constructor or `set_participant`."""
    pre_connect_audio: bool = True
    """Pre-connect audio enabled or not."""
    pre_connect_audio_timeout: float = 3.0
    """The pre-connect audio will be ignored if it doesn't arrive within this time."""
    close_on_disconnect: bool = True
    """Close the AgentSession if the linked participant disconnects with reasons in
    CLIENT_INITIATED, ROOM_DELETED, or USER_REJECTED."""
    delete_room_on_close: bool = False
    """Delete the room when the AgentSession is closed, default to False"""

RoomInputOptions(text_enabled: 'NotGivenOr[bool]' = NOT_GIVEN, audio_enabled: 'NotGivenOr[bool]' = NOT_GIVEN, video_enabled: 'NotGivenOr[bool]' = NOT_GIVEN, audio_sample_rate: 'int' = 24000, audio_num_channels: 'int' = 1, audio_frame_size_ms: 'int' = 50, noise_cancellation: 'rtc.NoiseCancellationOptions | None' = None, text_input_cb: 'TextInputCallback' = , participant_kinds: 'NotGivenOr[list[rtc.ParticipantKind.ValueType]]' = NOT_GIVEN, participant_identity: 'NotGivenOr[str]' = NOT_GIVEN, pre_connect_audio: 'bool' = True, pre_connect_audio_timeout: 'float' = 3.0, close_on_disconnect: 'bool' = True, delete_room_on_close: 'bool' = False)

Instance variables

var audio_enabled : NotGivenOr[bool]

If not given, default to True.

var audio_frame_size_ms : int

The frame size in milliseconds for the audio input.

var audio_num_channels : int
var audio_sample_rate : int
var close_on_disconnect : bool

Close the AgentSession if the linked participant disconnects with reasons in CLIENT_INITIATED, ROOM_DELETED, or USER_REJECTED.

var delete_room_on_close : bool

Delete the room when the AgentSession is closed, default to False

var noise_cancellation : rtc.NoiseCancellationOptions | None
var participant_identity : NotGivenOr[str]

The participant to link to. If not provided, link to the first participant. Can be overridden by the participant argument of RoomIO constructor or set_participant.

var participant_kinds : NotGivenOr[list[rtc.ParticipantKind.ValueType]]

Participant kinds accepted for auto subscription. If not provided, accept DEFAULT_PARTICIPANT_KINDS.

var pre_connect_audio : bool

Pre-connect audio enabled or not.

var pre_connect_audio_timeout : float

The pre-connect audio will be ignored if it doesn't arrive within this time.

var text_enabled : NotGivenOr[bool]

If not given, default to True.

var video_enabled : NotGivenOr[bool]

If not given, default to False.

Methods

def text_input_cb(sess: AgentSession,
ev: TextInputEvent) ‑> None
Expand source code
def _default_text_input_cb(sess: AgentSession, ev: TextInputEvent) -> None:
    sess.interrupt()
    sess.generate_reply(user_input=ev.text)
class RoomOptions (text_input: NotGivenOr[TextInputOptions | bool] = NOT_GIVEN,
audio_input: NotGivenOr[AudioInputOptions | bool] = NOT_GIVEN,
video_input: NotGivenOr[VideoInputOptions | bool] = NOT_GIVEN,
audio_output: NotGivenOr[AudioOutputOptions | bool] = NOT_GIVEN,
text_output: NotGivenOr[TextOutputOptions | bool] = NOT_GIVEN,
participant_kinds: NotGivenOr[list[rtc.ParticipantKind.ValueType]] = NOT_GIVEN,
participant_identity: NotGivenOr[str] = NOT_GIVEN,
close_on_disconnect: bool = True,
delete_room_on_close: bool = False)
Expand source code
@dataclass
class RoomOptions:
    text_input: NotGivenOr[TextInputOptions | bool] = NOT_GIVEN
    """The text input options. If not provided, default to True."""
    audio_input: NotGivenOr[AudioInputOptions | bool] = NOT_GIVEN
    """The audio input options. If not provided, default to True."""
    video_input: NotGivenOr[VideoInputOptions | bool] = NOT_GIVEN
    """The video input options. If not provided, default to False."""
    audio_output: NotGivenOr[AudioOutputOptions | bool] = NOT_GIVEN
    """The audio output options. If not provided, default to True."""
    text_output: NotGivenOr[TextOutputOptions | bool] = NOT_GIVEN
    """The transcription output options. If not provided, default to True."""

    participant_kinds: NotGivenOr[list[rtc.ParticipantKind.ValueType]] = NOT_GIVEN
    """Participant kinds accepted for auto subscription. If not provided,
    accept `DEFAULT_PARTICIPANT_KINDS`."""
    participant_identity: NotGivenOr[str] = NOT_GIVEN
    """The participant to link to. If not provided, link to the first participant.
    Can be overridden by the `participant` argument of RoomIO constructor or `set_participant`."""
    close_on_disconnect: bool = True
    """Close the AgentSession if the linked participant disconnects with reasons in
    CLIENT_INITIATED, ROOM_DELETED, or USER_REJECTED."""
    delete_room_on_close: bool = False
    """Delete the room when the AgentSession is closed, default to False"""

    def get_text_input_options(self) -> TextInputOptions | None:
        if isinstance(self.text_input, TextInputOptions):
            return self.text_input
        # if text_input is not given, default to enabled
        return TextInputOptions() if self.text_input is not False else None

    def get_audio_input_options(self) -> AudioInputOptions | None:
        if isinstance(self.audio_input, AudioInputOptions):
            return self.audio_input
        # if audio_input is not given, default to enabled
        return AudioInputOptions() if self.audio_input is not False else None

    def get_video_input_options(self) -> VideoInputOptions | None:
        if isinstance(self.video_input, VideoInputOptions):
            return self.video_input
        # if video_input is not given, default to disabled
        return VideoInputOptions() if self.video_input is True else None

    def get_audio_output_options(self) -> AudioOutputOptions | None:
        if isinstance(self.audio_output, AudioOutputOptions):
            return self.audio_output
        return AudioOutputOptions() if self.audio_output is not False else None

    def get_text_output_options(self) -> TextOutputOptions | None:
        if isinstance(self.text_output, TextOutputOptions):
            return self.text_output
        return TextOutputOptions() if self.text_output is not False else None

    @classmethod
    def _ensure_options(
        cls,
        options: NotGivenOr[RoomOptions],
        *,
        room_input_options: NotGivenOr[RoomInputOptions] = NOT_GIVEN,
        room_output_options: NotGivenOr[RoomOutputOptions] = NOT_GIVEN,
    ) -> RoomOptions:
        if is_given(room_input_options) or is_given(room_output_options):
            logger.warning(
                "RoomInputOptions and RoomOutputOptions are deprecated, use RoomOptions instead"
            )
            if not is_given(options):
                return cls._create_from_legacy(room_input_options, room_output_options)

        if isinstance(options, RoomOptions):
            return options
        elif is_given(options):
            raise ValueError(f"expected RoomOptions, got {type(options)}")
        else:
            return cls()

    @classmethod
    def _create_from_legacy(
        cls,
        input_options: NotGivenOr[RoomInputOptions],
        output_options: NotGivenOr[RoomOutputOptions],
    ) -> RoomOptions:
        opts = cls()
        if input_options:
            opts.text_input = (
                TextInputOptions(text_input_cb=input_options.text_input_cb)
                if input_options.text_enabled is not False
                else False
            )
            opts.audio_input = (
                AudioInputOptions(
                    sample_rate=input_options.audio_sample_rate,
                    num_channels=input_options.audio_num_channels,
                    frame_size_ms=input_options.audio_frame_size_ms,
                    noise_cancellation=input_options.noise_cancellation,
                    pre_connect_audio=input_options.pre_connect_audio,
                    pre_connect_audio_timeout=input_options.pre_connect_audio_timeout,
                )
                if input_options.audio_enabled is not False
                else False
            )
            opts.video_input = input_options.video_enabled

            opts.participant_kinds = input_options.participant_kinds
            opts.participant_identity = input_options.participant_identity
            opts.close_on_disconnect = input_options.close_on_disconnect
            opts.delete_room_on_close = input_options.delete_room_on_close

        if output_options:
            opts.audio_output = (
                AudioOutputOptions(
                    sample_rate=output_options.audio_sample_rate,
                    num_channels=output_options.audio_num_channels,
                    track_publish_options=output_options.audio_publish_options,
                    track_name=output_options.audio_track_name,
                )
                if output_options.audio_enabled is not False
                else False
            )
            opts.text_output = (
                TextOutputOptions(
                    sync_transcription=output_options.sync_transcription,
                    transcription_speed_factor=output_options.transcription_speed_factor,
                )
                if output_options.transcription_enabled is not False
                else False
            )
        return opts

RoomOptions(text_input: 'NotGivenOr[TextInputOptions | bool]' = NOT_GIVEN, audio_input: 'NotGivenOr[AudioInputOptions | bool]' = NOT_GIVEN, video_input: 'NotGivenOr[VideoInputOptions | bool]' = NOT_GIVEN, audio_output: 'NotGivenOr[AudioOutputOptions | bool]' = NOT_GIVEN, text_output: 'NotGivenOr[TextOutputOptions | bool]' = NOT_GIVEN, participant_kinds: 'NotGivenOr[list[rtc.ParticipantKind.ValueType]]' = NOT_GIVEN, participant_identity: 'NotGivenOr[str]' = NOT_GIVEN, close_on_disconnect: 'bool' = True, delete_room_on_close: 'bool' = False)

Instance variables

var audio_input : livekit.agents.voice.room_io.types.AudioInputOptions | bool | livekit.agents.types.NotGiven

The audio input options. If not provided, default to True.

var audio_output : livekit.agents.voice.room_io.types.AudioOutputOptions | bool | livekit.agents.types.NotGiven

The audio output options. If not provided, default to True.

var close_on_disconnect : bool

Close the AgentSession if the linked participant disconnects with reasons in CLIENT_INITIATED, ROOM_DELETED, or USER_REJECTED.

var delete_room_on_close : bool

Delete the room when the AgentSession is closed, default to False

var participant_identity : str | livekit.agents.types.NotGiven

The participant to link to. If not provided, link to the first participant. Can be overridden by the participant argument of RoomIO constructor or set_participant.

var participant_kinds : list[int] | livekit.agents.types.NotGiven

Participant kinds accepted for auto subscription. If not provided, accept DEFAULT_PARTICIPANT_KINDS.

var text_input : livekit.agents.voice.room_io.types.TextInputOptions | bool | livekit.agents.types.NotGiven

The text input options. If not provided, default to True.

var text_output : livekit.agents.voice.room_io.types.TextOutputOptions | bool | livekit.agents.types.NotGiven

The transcription output options. If not provided, default to True.

var video_input : livekit.agents.voice.room_io.types.VideoInputOptions | bool | livekit.agents.types.NotGiven

The video input options. If not provided, default to False.

Methods

def get_audio_input_options(self) ‑> livekit.agents.voice.room_io.types.AudioInputOptions | None
Expand source code
def get_audio_input_options(self) -> AudioInputOptions | None:
    if isinstance(self.audio_input, AudioInputOptions):
        return self.audio_input
    # if audio_input is not given, default to enabled
    return AudioInputOptions() if self.audio_input is not False else None
def get_audio_output_options(self) ‑> livekit.agents.voice.room_io.types.AudioOutputOptions | None
Expand source code
def get_audio_output_options(self) -> AudioOutputOptions | None:
    if isinstance(self.audio_output, AudioOutputOptions):
        return self.audio_output
    return AudioOutputOptions() if self.audio_output is not False else None
def get_text_input_options(self) ‑> livekit.agents.voice.room_io.types.TextInputOptions | None
Expand source code
def get_text_input_options(self) -> TextInputOptions | None:
    if isinstance(self.text_input, TextInputOptions):
        return self.text_input
    # if text_input is not given, default to enabled
    return TextInputOptions() if self.text_input is not False else None
def get_text_output_options(self) ‑> livekit.agents.voice.room_io.types.TextOutputOptions | None
Expand source code
def get_text_output_options(self) -> TextOutputOptions | None:
    if isinstance(self.text_output, TextOutputOptions):
        return self.text_output
    return TextOutputOptions() if self.text_output is not False else None
def get_video_input_options(self) ‑> livekit.agents.voice.room_io.types.VideoInputOptions | None
Expand source code
def get_video_input_options(self) -> VideoInputOptions | None:
    if isinstance(self.video_input, VideoInputOptions):
        return self.video_input
    # if video_input is not given, default to disabled
    return VideoInputOptions() if self.video_input is True else None
class RoomOutputOptions (transcription_enabled: NotGivenOr[bool] = NOT_GIVEN,
audio_enabled: NotGivenOr[bool] = NOT_GIVEN,
audio_sample_rate: int = 24000,
audio_num_channels: int = 1,
audio_publish_options: rtc.TrackPublishOptions = <factory>,
audio_track_name: NotGivenOr[str] = NOT_GIVEN,
sync_transcription: NotGivenOr[bool] = NOT_GIVEN,
transcription_speed_factor: float = 1.0)
Expand source code
@dataclass
class RoomOutputOptions:
    transcription_enabled: NotGivenOr[bool] = NOT_GIVEN
    """If not given, default to True."""
    audio_enabled: NotGivenOr[bool] = NOT_GIVEN
    """If not given, default to True."""
    audio_sample_rate: int = 24000
    audio_num_channels: int = 1
    audio_publish_options: rtc.TrackPublishOptions = field(
        default_factory=lambda: rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_MICROPHONE)
    )
    audio_track_name: NotGivenOr[str] = NOT_GIVEN
    """The name of the audio track to publish. If not provided, default to "roomio_audio"."""
    sync_transcription: NotGivenOr[bool] = NOT_GIVEN
    """False to disable transcription synchronization with audio output.
    Otherwise, transcription is emitted as quickly as available."""
    transcription_speed_factor: float = 1.0
    """Speed factor of transcription synchronization with audio output.
    Only effective if `sync_transcription` is True."""

RoomOutputOptions(transcription_enabled: 'NotGivenOr[bool]' = NOT_GIVEN, audio_enabled: 'NotGivenOr[bool]' = NOT_GIVEN, audio_sample_rate: 'int' = 24000, audio_num_channels: 'int' = 1, audio_publish_options: 'rtc.TrackPublishOptions' = , audio_track_name: 'NotGivenOr[str]' = NOT_GIVEN, sync_transcription: 'NotGivenOr[bool]' = NOT_GIVEN, transcription_speed_factor: 'float' = 1.0)

Instance variables

var audio_enabled : bool | livekit.agents.types.NotGiven

If not given, default to True.

var audio_num_channels : int
var audio_publish_options : room_pb2.TrackPublishOptions
var audio_sample_rate : int
var audio_track_name : str | livekit.agents.types.NotGiven

The name of the audio track to publish. If not provided, default to "roomio_audio".

var sync_transcription : bool | livekit.agents.types.NotGiven

False to disable transcription synchronization with audio output. Otherwise, transcription is emitted as quickly as available.

var transcription_enabled : bool | livekit.agents.types.NotGiven

If not given, default to True.

var transcription_speed_factor : float

Speed factor of transcription synchronization with audio output. Only effective if sync_transcription is True.

class TextInputEvent (text: str, info: rtc.TextStreamInfo, participant: rtc.RemoteParticipant)
Expand source code
@dataclass
class TextInputEvent:
    text: str
    info: rtc.TextStreamInfo
    participant: rtc.RemoteParticipant

TextInputEvent(text: 'str', info: 'rtc.TextStreamInfo', participant: 'rtc.RemoteParticipant')

Instance variables

var infoTextStreamInfo
var participantRemoteParticipant
var text : str
class TextInputOptions (text_input_cb: TextInputCallback = <function _default_text_input_cb>)
Expand source code
@dataclass
class TextInputOptions:
    text_input_cb: TextInputCallback = _default_text_input_cb

TextInputOptions(text_input_cb: 'TextInputCallback' = )

Methods

def text_input_cb(sess: AgentSession,
ev: TextInputEvent) ‑> None
Expand source code
def _default_text_input_cb(sess: AgentSession, ev: TextInputEvent) -> None:
    sess.interrupt()
    sess.generate_reply(user_input=ev.text)
class TextOutputOptions (sync_transcription: NotGivenOr[bool] = NOT_GIVEN,
transcription_speed_factor: float = 1.0)
Expand source code
@dataclass
class TextOutputOptions:
    sync_transcription: NotGivenOr[bool] = NOT_GIVEN
    """False to disable transcription synchronization with audio output.
    Otherwise, transcription is emitted as quickly as available."""
    transcription_speed_factor: float = 1.0
    """Speed factor of transcription synchronization with audio output.
    Only effective if `sync_transcription` is True."""

TextOutputOptions(sync_transcription: 'NotGivenOr[bool]' = NOT_GIVEN, transcription_speed_factor: 'float' = 1.0)

Instance variables

var sync_transcription : bool | livekit.agents.types.NotGiven

False to disable transcription synchronization with audio output. Otherwise, transcription is emitted as quickly as available.

var transcription_speed_factor : float

Speed factor of transcription synchronization with audio output. Only effective if sync_transcription is True.

class VideoInputOptions
Expand source code
@dataclass
class VideoInputOptions:
    pass

VideoInputOptions()

class _ParticipantAudioOutput (room: rtc.Room,
*,
sample_rate: int,
num_channels: int,
track_publish_options: rtc.TrackPublishOptions,
track_name: str = 'roomio_audio')
Expand source code
class _ParticipantAudioOutput(io.AudioOutput):
    def __init__(
        self,
        room: rtc.Room,
        *,
        sample_rate: int,
        num_channels: int,
        track_publish_options: rtc.TrackPublishOptions,
        track_name: str = "roomio_audio",
    ) -> None:
        super().__init__(
            label="RoomIO",
            next_in_chain=None,
            sample_rate=sample_rate,
            capabilities=io.AudioOutputCapabilities(pause=True),
        )
        self._room = room
        self._track_name = track_name
        self._lock = asyncio.Lock()
        self._audio_source = rtc.AudioSource(sample_rate, num_channels, queue_size_ms=200)
        self._publish_options = track_publish_options
        self._publication: rtc.LocalTrackPublication | None = None
        self._subscribed_fut = asyncio.Future[None]()

        self._audio_buf = utils.aio.Chan[rtc.AudioFrame]()
        self._audio_bstream = utils.audio.AudioByteStream(
            sample_rate, num_channels, samples_per_channel=sample_rate // 20
        )  # chunk the frame into a small, fixed size

        # used to republish track on reconnection
        self._republish_task: asyncio.Task[None] | None = None
        self._flush_task: asyncio.Task[None] | None = None
        self._interrupted_event = asyncio.Event()
        self._forwarding_task: asyncio.Task[None] | None = None

        self._pushed_duration: float = 0.0

        self._playback_enabled = asyncio.Event()
        self._playback_enabled.set()

    async def _publish_track(self) -> None:
        async with self._lock:
            track = rtc.LocalAudioTrack.create_audio_track(self._track_name, self._audio_source)
            self._publication = await self._room.local_participant.publish_track(
                track, self._publish_options
            )
            await self._publication.wait_for_subscription()
            if not self._subscribed_fut.done():
                self._subscribed_fut.set_result(None)

    @property
    def subscribed(self) -> asyncio.Future[None]:
        return self._subscribed_fut

    async def start(self) -> None:
        self._forwarding_task = asyncio.create_task(self._forward_audio())
        await self._publish_track()
        self._room.on("reconnected", self._on_reconnected)

    async def aclose(self) -> None:
        self._room.off("reconnected", self._on_reconnected)
        if self._republish_task:
            await utils.aio.cancel_and_wait(self._republish_task)
        if self._flush_task:
            await utils.aio.cancel_and_wait(self._flush_task)
        if self._forwarding_task:
            await utils.aio.cancel_and_wait(self._forwarding_task)

        await self._audio_source.aclose()

    async def capture_frame(self, frame: rtc.AudioFrame) -> None:
        await self._subscribed_fut

        await super().capture_frame(frame)

        if self._flush_task and not self._flush_task.done():
            logger.error("capture_frame called while flush is in progress")
            await self._flush_task

        for f in self._audio_bstream.push(frame.data):
            await self._audio_buf.send(f)
            self._pushed_duration += f.duration

    def flush(self) -> None:
        super().flush()

        for f in self._audio_bstream.flush():
            self._audio_buf.send_nowait(f)
            self._pushed_duration += f.duration

        if not self._pushed_duration:
            return

        if self._flush_task and not self._flush_task.done():
            # shouldn't happen if only one active speech handle at a time
            logger.error("flush called while playback is in progress")
            self._flush_task.cancel()

        self._flush_task = asyncio.create_task(self._wait_for_playout())

    def clear_buffer(self) -> None:
        self._audio_bstream.clear()

        if not self._pushed_duration:
            return
        self._interrupted_event.set()

    def pause(self) -> None:
        super().pause()
        self._playback_enabled.clear()
        # self._audio_source.clear_queue()

    def resume(self) -> None:
        super().resume()
        self._playback_enabled.set()

    async def _wait_for_playout(self) -> None:
        wait_for_interruption = asyncio.create_task(self._interrupted_event.wait())

        async def _wait_buffered_audio() -> None:
            while not self._audio_buf.empty():
                if not self._playback_enabled.is_set():
                    await self._playback_enabled.wait()

                await self._audio_source.wait_for_playout()
                # avoid deadlock when clear_buffer called before capture_frame
                await asyncio.sleep(0)

        wait_for_playout = asyncio.create_task(_wait_buffered_audio())
        await asyncio.wait(
            [wait_for_playout, wait_for_interruption],
            return_when=asyncio.FIRST_COMPLETED,
        )

        interrupted = wait_for_interruption.done()
        pushed_duration = self._pushed_duration

        if interrupted:
            queued_duration = self._audio_source.queued_duration
            while not self._audio_buf.empty():
                queued_duration += self._audio_buf.recv_nowait().duration

            pushed_duration = max(pushed_duration - queued_duration, 0)
            self._audio_source.clear_queue()
            wait_for_playout.cancel()
        else:
            wait_for_interruption.cancel()

        self._pushed_duration = 0
        self._interrupted_event.clear()
        self.on_playback_finished(playback_position=pushed_duration, interrupted=interrupted)

    async def _forward_audio(self) -> None:
        async for frame in self._audio_buf:
            if not self._playback_enabled.is_set():
                self._audio_source.clear_queue()
                await self._playback_enabled.wait()
                # TODO(long): save the frames in the queue and play them later
                # TODO(long): ignore frames from previous syllable

            if self._interrupted_event.is_set() or self._pushed_duration == 0:
                if self._interrupted_event.is_set() and self._flush_task:
                    await self._flush_task

                # ignore frames if interrupted
                continue

            await self._audio_source.capture_frame(frame)

    def _on_reconnected(self) -> None:
        if self._republish_task:
            self._republish_task.cancel()
        self._republish_task = asyncio.create_task(self._publish_track())

Helper class that provides a standard way to create an ABC using inheritance.

Args

sample_rate
The sample rate required by the audio sink, if None, any sample rate is accepted

Ancestors

Instance variables

prop subscribed : asyncio.Future[None]
Expand source code
@property
def subscribed(self) -> asyncio.Future[None]:
    return self._subscribed_fut

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    self._room.off("reconnected", self._on_reconnected)
    if self._republish_task:
        await utils.aio.cancel_and_wait(self._republish_task)
    if self._flush_task:
        await utils.aio.cancel_and_wait(self._flush_task)
    if self._forwarding_task:
        await utils.aio.cancel_and_wait(self._forwarding_task)

    await self._audio_source.aclose()
async def start(self) ‑> None
Expand source code
async def start(self) -> None:
    self._forwarding_task = asyncio.create_task(self._forward_audio())
    await self._publish_track()
    self._room.on("reconnected", self._on_reconnected)

Inherited members

class _ParticipantStreamTranscriptionOutput (room: rtc.Room,
*,
is_delta_stream: bool = True,
participant: rtc.Participant | str | None = None,
attributes: dict[str, str] | None = None)
Expand source code
class _ParticipantStreamTranscriptionOutput:
    def __init__(
        self,
        room: rtc.Room,
        *,
        is_delta_stream: bool = True,
        participant: rtc.Participant | str | None = None,
        attributes: dict[str, str] | None = None,
    ):
        self._room, self._is_delta_stream = room, is_delta_stream
        self._track_id: str | None = None
        self._participant_identity: str | None = None
        self._additional_attributes = attributes or {}

        self._writer: rtc.TextStreamWriter | None = None

        self._room.on("track_published", self._on_track_published)
        self._room.on("local_track_published", self._on_local_track_published)
        self._flush_atask: asyncio.Task[None] | None = None

        self._reset_state()
        self.set_participant(participant)

    def set_participant(
        self,
        participant: rtc.Participant | str | None,
    ) -> None:
        self._participant_identity = (
            participant.identity if isinstance(participant, rtc.Participant) else participant
        )
        if self._participant_identity is None:
            return

        try:
            self._track_id = find_micro_track_id(self._room, self._participant_identity)
        except ValueError:
            # track id is optional for TextStream when audio is not published
            self._track_id = None

        self.flush()
        self._reset_state()

    def _reset_state(self) -> None:
        self._current_id = utils.shortuuid("SG_")
        self._capturing = False
        self._latest_text = ""

    async def _create_text_writer(
        self, attributes: dict[str, str] | None = None
    ) -> rtc.TextStreamWriter:
        assert self._participant_identity is not None, "participant_identity is not set"

        if not attributes:
            attributes = {
                ATTRIBUTE_TRANSCRIPTION_FINAL: "false",
            }
            if self._track_id:
                attributes[ATTRIBUTE_TRANSCRIPTION_TRACK_ID] = self._track_id
        attributes[ATTRIBUTE_TRANSCRIPTION_SEGMENT_ID] = self._current_id

        for key, val in self._additional_attributes.items():
            if key not in attributes:
                attributes[key] = val

        return await self._room.local_participant.stream_text(
            topic=TOPIC_TRANSCRIPTION,
            sender_identity=self._participant_identity,
            attributes=attributes,
        )

    @utils.log_exceptions(logger=logger)
    async def capture_text(self, text: str) -> None:
        if self._participant_identity is None:
            return

        if self._flush_atask and not self._flush_atask.done():
            await self._flush_atask

        if not self._capturing:
            self._reset_state()
            self._capturing = True

        self._latest_text = text

        try:
            if self._room.isconnected():
                if self._is_delta_stream:  # reuse the existing writer
                    if self._writer is None:
                        self._writer = await self._create_text_writer()

                    await self._writer.write(text)
                else:  # always create a new writer
                    tmp_writer = await self._create_text_writer()
                    await tmp_writer.write(text)
                    await tmp_writer.aclose()
        except Exception as e:
            logger.warning("failed to publish transcription", exc_info=e)

    async def _flush_task(self, writer: rtc.TextStreamWriter | None) -> None:
        attributes = {ATTRIBUTE_TRANSCRIPTION_FINAL: "true"}
        if self._track_id:
            attributes[ATTRIBUTE_TRANSCRIPTION_TRACK_ID] = self._track_id

        try:
            if self._room.isconnected():
                if self._is_delta_stream:
                    if writer:
                        await writer.aclose(attributes=attributes)
                else:
                    tmp_writer = await self._create_text_writer(attributes=attributes)
                    await tmp_writer.write(self._latest_text)
                    await tmp_writer.aclose()
        except Exception as e:
            logger.warning("failed to publish transcription", exc_info=e)

    def flush(self) -> None:
        if self._participant_identity is None or not self._capturing:
            return

        self._capturing = False
        curr_writer = self._writer
        self._writer = None
        self._flush_atask = asyncio.create_task(self._flush_task(curr_writer))

    def _on_track_published(
        self, track: rtc.RemoteTrackPublication, participant: rtc.RemoteParticipant
    ) -> None:
        if (
            self._participant_identity is None
            or participant.identity != self._participant_identity
            or track.source != rtc.TrackSource.SOURCE_MICROPHONE
        ):
            return

        self._track_id = track.sid

    def _on_local_track_published(self, track: rtc.LocalTrackPublication, _: rtc.Track) -> None:
        if (
            self._participant_identity is None
            or self._participant_identity != self._room.local_participant.identity
            or track.source != rtc.TrackSource.SOURCE_MICROPHONE
        ):
            return

        self._track_id = track.sid

Methods

async def capture_text(self, text: str) ‑> None
Expand source code
@utils.log_exceptions(logger=logger)
async def capture_text(self, text: str) -> None:
    if self._participant_identity is None:
        return

    if self._flush_atask and not self._flush_atask.done():
        await self._flush_atask

    if not self._capturing:
        self._reset_state()
        self._capturing = True

    self._latest_text = text

    try:
        if self._room.isconnected():
            if self._is_delta_stream:  # reuse the existing writer
                if self._writer is None:
                    self._writer = await self._create_text_writer()

                await self._writer.write(text)
            else:  # always create a new writer
                tmp_writer = await self._create_text_writer()
                await tmp_writer.write(text)
                await tmp_writer.aclose()
    except Exception as e:
        logger.warning("failed to publish transcription", exc_info=e)
def flush(self) ‑> None
Expand source code
def flush(self) -> None:
    if self._participant_identity is None or not self._capturing:
        return

    self._capturing = False
    curr_writer = self._writer
    self._writer = None
    self._flush_atask = asyncio.create_task(self._flush_task(curr_writer))
def set_participant(self, participant: rtc.Participant | str | None) ‑> None
Expand source code
def set_participant(
    self,
    participant: rtc.Participant | str | None,
) -> None:
    self._participant_identity = (
        participant.identity if isinstance(participant, rtc.Participant) else participant
    )
    if self._participant_identity is None:
        return

    try:
        self._track_id = find_micro_track_id(self._room, self._participant_identity)
    except ValueError:
        # track id is optional for TextStream when audio is not published
        self._track_id = None

    self.flush()
    self._reset_state()
class _ParticipantTranscriptionOutput (*,
room: rtc.Room,
is_delta_stream: bool = True,
participant: rtc.Participant | str | None = None,
next_in_chain: io.TextOutput | None = None)
Expand source code
class _ParticipantTranscriptionOutput(io.TextOutput):
    def __init__(
        self,
        *,
        room: rtc.Room,
        is_delta_stream: bool = True,
        participant: rtc.Participant | str | None = None,
        next_in_chain: io.TextOutput | None = None,
    ) -> None:
        super().__init__(label="RoomIO", next_in_chain=next_in_chain)

        self.__outputs: list[
            _ParticipantLegacyTranscriptionOutput | _ParticipantStreamTranscriptionOutput
        ] = [
            _ParticipantLegacyTranscriptionOutput(
                room=room,
                is_delta_stream=is_delta_stream,
                participant=participant,
            ),
            _ParticipantStreamTranscriptionOutput(
                room=room,
                is_delta_stream=is_delta_stream,
                participant=participant,
            ),
        ]

    def set_participant(self, participant: rtc.Participant | str | None) -> None:
        for source in self.__outputs:
            source.set_participant(participant)

    async def capture_text(self, text: str) -> None:
        await asyncio.gather(*[sink.capture_text(text) for sink in self.__outputs])

        if self.next_in_chain:
            await self.next_in_chain.capture_text(text)

    def flush(self) -> None:
        for source in self.__outputs:
            source.flush()

        if self.next_in_chain:
            self.next_in_chain.flush()

Helper class that provides a standard way to create an ABC using inheritance.

Ancestors

Methods

def set_participant(self, participant: rtc.Participant | str | None) ‑> None
Expand source code
def set_participant(self, participant: rtc.Participant | str | None) -> None:
    for source in self.__outputs:
        source.set_participant(participant)

Inherited members