Module livekit.agents.voice.room_io
Classes
class AudioInputOptions (sample_rate: int = 24000,
num_channels: int = 1,
frame_size_ms: int = 50,
noise_cancellation: rtc.NoiseCancellationOptions | NoiseCancellationSelector | None = None,
pre_connect_audio: bool = True,
pre_connect_audio_timeout: float = 3.0)-
Expand source code
@dataclass class AudioInputOptions: sample_rate: int = 24000 num_channels: int = 1 frame_size_ms: int = 50 """The frame size in milliseconds for the audio input.""" noise_cancellation: rtc.NoiseCancellationOptions | NoiseCancellationSelector | None = None pre_connect_audio: bool = True """Pre-connect audio enabled or not.""" pre_connect_audio_timeout: float = 3.0 """The pre-connect audio will be ignored if it doesn't arrive within this time."""AudioInputOptions(sample_rate: 'int' = 24000, num_channels: 'int' = 1, frame_size_ms: 'int' = 50, noise_cancellation: 'rtc.NoiseCancellationOptions | NoiseCancellationSelector | None' = None, pre_connect_audio: 'bool' = True, pre_connect_audio_timeout: 'float' = 3.0)
Instance variables
var frame_size_ms : int-
The frame size in milliseconds for the audio input.
var noise_cancellation : NoiseCancellationOptions | Callable[[livekit.agents.voice.room_io.types.NoiseCancellationParams], NoiseCancellationOptions | None] | Nonevar num_channels : intvar pre_connect_audio : bool-
Pre-connect audio enabled or not.
var pre_connect_audio_timeout : float-
The pre-connect audio will be ignored if it doesn't arrive within this time.
var sample_rate : int
class AudioOutputOptions (sample_rate: int = 24000,
num_channels: int = 1,
track_publish_options: rtc.TrackPublishOptions = <factory>,
track_name: NotGivenOr[str] = NOT_GIVEN)-
Expand source code
@dataclass class AudioOutputOptions: sample_rate: int = 24000 num_channels: int = 1 track_publish_options: rtc.TrackPublishOptions = field( default_factory=lambda: rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_MICROPHONE) ) track_name: NotGivenOr[str] = NOT_GIVEN """The name of the audio track to publish. If not provided, default to "roomio_audio"."""AudioOutputOptions(sample_rate: 'int' = 24000, num_channels: 'int' = 1, track_publish_options: 'rtc.TrackPublishOptions' =
, track_name: 'NotGivenOr[str]' = NOT_GIVEN) Instance variables
var num_channels : intvar sample_rate : intvar track_name : str | livekit.agents.types.NotGiven-
The name of the audio track to publish. If not provided, default to "roomio_audio".
var track_publish_options : room_pb2.TrackPublishOptions
class RoomIO (agent_session: AgentSession,
room: rtc.Room,
*,
participant: rtc.RemoteParticipant | str | None = None,
options: NotGivenOr[RoomOptions] = NOT_GIVEN,
input_options: NotGivenOr[RoomInputOptions] = NOT_GIVEN,
output_options: NotGivenOr[RoomOutputOptions] = NOT_GIVEN)-
Expand source code
class RoomIO: def __init__( self, agent_session: AgentSession, room: rtc.Room, *, participant: rtc.RemoteParticipant | str | None = None, options: NotGivenOr[RoomOptions] = NOT_GIVEN, # deprecated input_options: NotGivenOr[RoomInputOptions] = NOT_GIVEN, output_options: NotGivenOr[RoomOutputOptions] = NOT_GIVEN, ) -> None: self._options = RoomOptions._ensure_options( options, room_input_options=input_options, room_output_options=output_options ) self._text_input_cb: TextInputCallback | None = None self._agent_session, self._room = agent_session, room # self._input_options = input_options # self._output_options = output_options self._participant_identity = ( participant.identity if isinstance(participant, rtc.RemoteParticipant) else participant ) if self._participant_identity is None and utils.is_given( self._options.participant_identity ): self._participant_identity = self._options.participant_identity self._audio_input: _ParticipantAudioInputStream | None = None self._video_input: _ParticipantVideoInputStream | None = None self._audio_output: _ParticipantAudioOutput | None = None self._user_tr_output: _ParticipantTranscriptionOutput | None = None self._agent_tr_output: _ParticipantTranscriptionOutput | None = None self._tr_synchronizer: TranscriptSynchronizer | None = None self._participant_available_fut = asyncio.Future[rtc.RemoteParticipant]() self._room_connected_fut = asyncio.Future[None]() self._init_atask: asyncio.Task[None] | None = None self._user_transcript_ch: utils.aio.Chan[UserInputTranscribedEvent] | None = None self._user_transcript_atask: asyncio.Task[None] | None = None self._tasks: set[asyncio.Task[Any]] = set() self._update_state_atask: asyncio.Task[None] | None = None self._close_session_atask: asyncio.Task[None] | None = None self._delete_room_task: asyncio.Future[api.DeleteRoomResponse] | None = None self._pre_connect_audio_handler: PreConnectAudioHandler | None = None self._text_stream_handler_registered = False async def start(self) -> None: # -- create inputs -- input_audio_options = self._options.get_audio_input_options() if input_audio_options and input_audio_options.pre_connect_audio: self._pre_connect_audio_handler = PreConnectAudioHandler( room=self._room, timeout=input_audio_options.pre_connect_audio_timeout, ) self._pre_connect_audio_handler.register() input_text_options = self._options.get_text_input_options() if input_text_options: self._text_input_cb = input_text_options.text_input_cb try: self._room.register_text_stream_handler(TOPIC_CHAT, self._on_user_text_input) self._text_stream_handler_registered = True except ValueError: if utils.is_given(self._options.text_input): logger.warning( f"text stream handler for topic '{TOPIC_CHAT}' already set, ignoring" ) else: self._text_input_cb = None input_video_options = self._options.get_video_input_options() if input_video_options: self._video_input = _ParticipantVideoInputStream(self._room) if input_audio_options: self._audio_input = _ParticipantAudioInputStream( self._room, sample_rate=input_audio_options.sample_rate, num_channels=input_audio_options.num_channels, frame_size_ms=input_audio_options.frame_size_ms, noise_cancellation=input_audio_options.noise_cancellation, pre_connect_audio_handler=self._pre_connect_audio_handler, ) # -- create outputs -- output_audio_options = self._options.get_audio_output_options() if output_audio_options: self._audio_output = _ParticipantAudioOutput( self._room, sample_rate=output_audio_options.sample_rate, num_channels=output_audio_options.num_channels, track_publish_options=output_audio_options.track_publish_options, track_name=( output_audio_options.track_name if utils.is_given(output_audio_options.track_name) else "roomio_audio" ), ) output_text_options = self._options.get_text_output_options() if output_text_options: self._user_tr_output = _ParticipantTranscriptionOutput( room=self._room, is_delta_stream=False, participant=self._participant_identity ) self._user_transcript_ch = utils.aio.Chan[UserInputTranscribedEvent]() self._user_transcript_atask = asyncio.create_task( self._forward_user_transcript(self._user_transcript_ch) ) # TODO(long): add next in the chain for session.output.transcription self._agent_tr_output = _ParticipantTranscriptionOutput( room=self._room, is_delta_stream=True, participant=None ) # use the RoomIO's audio output if available, otherwise use the agent's audio output # (e.g the audio output isn't using RoomIO with our avatar datastream impl) if output_text_options.sync_transcription is not False and ( audio_output := self._audio_output or self._agent_session.output.audio ): self._tr_synchronizer = TranscriptSynchronizer( next_in_chain_audio=audio_output, next_in_chain_text=self._agent_tr_output, speed=output_text_options.transcription_speed_factor, ) # -- set the room event handlers -- self._room.on("participant_connected", self._on_participant_connected) self._room.on("connection_state_changed", self._on_connection_state_changed) self._room.on("participant_disconnected", self._on_participant_disconnected) if self._room.isconnected(): self._on_connection_state_changed(rtc.ConnectionState.CONN_CONNECTED) self._init_atask = asyncio.create_task(self._init_task()) # -- attach to the agent session -- if self.audio_input: self._agent_session.input.audio = self.audio_input if self.video_input: self._agent_session.input.video = self.video_input if self.audio_output: self._agent_session.output.audio = self.audio_output if self.transcription_output: self._agent_session.output.transcription = self.transcription_output self._agent_session.on("agent_state_changed", self._on_agent_state_changed) self._agent_session.on("user_input_transcribed", self._on_user_input_transcribed) self._agent_session.on("close", self._on_agent_session_close) @property def room(self) -> rtc.Room: return self._room async def aclose(self) -> None: self._room.off("participant_connected", self._on_participant_connected) self._room.off("connection_state_changed", self._on_connection_state_changed) self._agent_session.off("agent_state_changed", self._on_agent_state_changed) self._agent_session.off("user_input_transcribed", self._on_user_input_transcribed) self._agent_session.off("close", self._on_agent_session_close) if self._text_stream_handler_registered: self._room.unregister_text_stream_handler(TOPIC_CHAT) self._text_stream_handler_registered = False if self._init_atask: await utils.aio.cancel_and_wait(self._init_atask) if self._user_transcript_ch: self._user_transcript_ch.close() if self._user_transcript_atask: await utils.aio.cancel_and_wait(self._user_transcript_atask) if self._update_state_atask: await utils.aio.cancel_and_wait(self._update_state_atask) if self._pre_connect_audio_handler: await self._pre_connect_audio_handler.aclose() if self._audio_input: await self._audio_input.aclose() if self._video_input: await self._video_input.aclose() if self._tr_synchronizer: await self._tr_synchronizer.aclose() if self._audio_output: await self._audio_output.aclose() # cancel and wait for all pending tasks await utils.aio.cancel_and_wait(*self._tasks) self._tasks.clear() @property def audio_output(self) -> AudioOutput | None: if self._tr_synchronizer: return self._tr_synchronizer.audio_output return self._audio_output @property def transcription_output(self) -> TextOutput | None: if self._tr_synchronizer: return self._tr_synchronizer.text_output return self._agent_tr_output @property def audio_input(self) -> AudioInput | None: return self._audio_input @property def video_input(self) -> VideoInput | None: return self._video_input @property def linked_participant(self) -> rtc.RemoteParticipant | None: if not self._participant_available_fut.done(): return None return self._participant_available_fut.result() @property def subscribed_fut(self) -> asyncio.Future[None] | None: if self._audio_output: return self._audio_output.subscribed return None def set_participant(self, participant_identity: str | None) -> None: """Switch audio and video streams to specified participant""" if participant_identity is None: self.unset_participant() return if ( self._participant_identity is not None and self._participant_identity != participant_identity ): # reset future if switching to a different participant self._participant_available_fut = asyncio.Future[rtc.RemoteParticipant]() # check if new participant is already connected for participant in self._room.remote_participants.values(): if participant.identity == participant_identity: self._participant_available_fut.set_result(participant) break # update participant identity and handlers self._participant_identity = participant_identity if self._audio_input: self._audio_input.set_participant(participant_identity) if self._video_input: self._video_input.set_participant(participant_identity) if self._user_tr_output: self._user_tr_output.set_participant(participant_identity) def unset_participant(self) -> None: self._participant_identity = None self._participant_available_fut = asyncio.Future[rtc.RemoteParticipant]() if self._audio_input: self._audio_input.set_participant(None) if self._video_input: self._video_input.set_participant(None) if self._user_tr_output: self._user_tr_output.set_participant(None) @utils.log_exceptions(logger=logger) async def _init_task(self) -> None: await self._room_connected_fut # check existing participants for participant in self._room.remote_participants.values(): self._on_participant_connected(participant) participant = await self._participant_available_fut self.set_participant(participant.identity) # init outputs if self._agent_tr_output: self._agent_tr_output.set_participant(self._room.local_participant.identity) if self._audio_output: await self._audio_output.start() @utils.log_exceptions(logger=logger) async def _forward_user_transcript( self, event_ch: utils.aio.Chan[UserInputTranscribedEvent] ) -> None: async for ev in event_ch: if self._user_tr_output is None: continue await self._user_tr_output.capture_text(ev.transcript) if ev.is_final: self._user_tr_output.flush() def _on_connection_state_changed(self, state: rtc.ConnectionState.ValueType) -> None: if self._room.isconnected() and not self._room_connected_fut.done(): self._room_connected_fut.set_result(None) def _on_participant_connected(self, participant: rtc.RemoteParticipant) -> None: if self._participant_available_fut.done(): return if self._participant_identity is not None: if participant.identity != self._participant_identity: return # otherwise, skip participants that are marked as publishing for this agent elif ( participant.attributes.get(ATTRIBUTE_PUBLISH_ON_BEHALF) == self._room.local_participant.identity ): return accepted_kinds = self._options.participant_kinds or DEFAULT_PARTICIPANT_KINDS if participant.kind not in accepted_kinds: # not an accepted participant kind, skip return self._participant_available_fut.set_result(participant) def _on_participant_disconnected(self, participant: rtc.RemoteParticipant) -> None: if not (linked := self.linked_participant) or participant.identity != linked.identity: return self._participant_available_fut = asyncio.Future[rtc.RemoteParticipant]() if ( self._options.close_on_disconnect and participant.disconnect_reason in DEFAULT_CLOSE_ON_DISCONNECT_REASONS and not self._close_session_atask and not self._delete_room_task ): logger.info( "closing agent session due to participant disconnect " "(disable via `RoomInputOptions.close_on_disconnect=False`)", extra={ "room": self._room.name, "participant": participant.identity, "reason": rtc.DisconnectReason.Name( participant.disconnect_reason or rtc.DisconnectReason.UNKNOWN_REASON ), }, ) self._agent_session._close_soon(reason=CloseReason.PARTICIPANT_DISCONNECTED) def _on_user_input_transcribed(self, ev: UserInputTranscribedEvent) -> None: if self._user_transcript_ch: self._user_transcript_ch.send_nowait(ev) def _on_user_text_input(self, reader: rtc.TextStreamReader, participant_identity: str) -> None: if participant_identity != self._participant_identity: return participant = self._room.remote_participants.get(participant_identity) if not participant: logger.warning("participant not found, ignoring text input") return async def _read_text(text_input_cb: TextInputCallback) -> None: text = await reader.read_all() text_input_result = text_input_cb( self._agent_session, TextInputEvent(text=text, info=reader.info, participant=participant), ) if asyncio.iscoroutine(text_input_result): await text_input_result if self._text_input_cb is None: logger.error("text input callback is not set, ignoring text input") return task = asyncio.create_task(_read_text(self._text_input_cb)) self._tasks.add(task) task.add_done_callback(self._tasks.discard) def _on_agent_state_changed(self, ev: AgentStateChangedEvent) -> None: @utils.log_exceptions(logger=logger) async def _set_state() -> None: if self._room.isconnected(): await self._room.local_participant.set_attributes( {ATTRIBUTE_AGENT_STATE: ev.new_state} ) if self._update_state_atask is not None: self._update_state_atask.cancel() self._update_state_atask = asyncio.create_task(_set_state()) def _on_agent_session_close(self, ev: CloseEvent) -> None: def _on_delete_room_task_done(task: asyncio.Future[api.DeleteRoomResponse]) -> None: self._delete_room_task = None if self._options.delete_room_on_close and self._delete_room_task is None: job_ctx = get_job_context() logger.info( "deleting room on agent session close (disable via `RoomInputOptions.delete_room_on_close=False`)", extra={"room": self._room.name}, ) self._delete_room_task = job_ctx.delete_room(room_name=self._room.name) self._delete_room_task.add_done_callback(_on_delete_room_task_done)Instance variables
prop audio_input : AudioInput | None-
Expand source code
@property def audio_input(self) -> AudioInput | None: return self._audio_input prop audio_output : AudioOutput | None-
Expand source code
@property def audio_output(self) -> AudioOutput | None: if self._tr_synchronizer: return self._tr_synchronizer.audio_output return self._audio_output prop linked_participant : rtc.RemoteParticipant | None-
Expand source code
@property def linked_participant(self) -> rtc.RemoteParticipant | None: if not self._participant_available_fut.done(): return None return self._participant_available_fut.result() prop room : rtc.Room-
Expand source code
@property def room(self) -> rtc.Room: return self._room prop subscribed_fut : asyncio.Future[None] | None-
Expand source code
@property def subscribed_fut(self) -> asyncio.Future[None] | None: if self._audio_output: return self._audio_output.subscribed return None prop transcription_output : TextOutput | None-
Expand source code
@property def transcription_output(self) -> TextOutput | None: if self._tr_synchronizer: return self._tr_synchronizer.text_output return self._agent_tr_output prop video_input : VideoInput | None-
Expand source code
@property def video_input(self) -> VideoInput | None: return self._video_input
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: self._room.off("participant_connected", self._on_participant_connected) self._room.off("connection_state_changed", self._on_connection_state_changed) self._agent_session.off("agent_state_changed", self._on_agent_state_changed) self._agent_session.off("user_input_transcribed", self._on_user_input_transcribed) self._agent_session.off("close", self._on_agent_session_close) if self._text_stream_handler_registered: self._room.unregister_text_stream_handler(TOPIC_CHAT) self._text_stream_handler_registered = False if self._init_atask: await utils.aio.cancel_and_wait(self._init_atask) if self._user_transcript_ch: self._user_transcript_ch.close() if self._user_transcript_atask: await utils.aio.cancel_and_wait(self._user_transcript_atask) if self._update_state_atask: await utils.aio.cancel_and_wait(self._update_state_atask) if self._pre_connect_audio_handler: await self._pre_connect_audio_handler.aclose() if self._audio_input: await self._audio_input.aclose() if self._video_input: await self._video_input.aclose() if self._tr_synchronizer: await self._tr_synchronizer.aclose() if self._audio_output: await self._audio_output.aclose() # cancel and wait for all pending tasks await utils.aio.cancel_and_wait(*self._tasks) self._tasks.clear() def set_participant(self, participant_identity: str | None) ‑> None-
Expand source code
def set_participant(self, participant_identity: str | None) -> None: """Switch audio and video streams to specified participant""" if participant_identity is None: self.unset_participant() return if ( self._participant_identity is not None and self._participant_identity != participant_identity ): # reset future if switching to a different participant self._participant_available_fut = asyncio.Future[rtc.RemoteParticipant]() # check if new participant is already connected for participant in self._room.remote_participants.values(): if participant.identity == participant_identity: self._participant_available_fut.set_result(participant) break # update participant identity and handlers self._participant_identity = participant_identity if self._audio_input: self._audio_input.set_participant(participant_identity) if self._video_input: self._video_input.set_participant(participant_identity) if self._user_tr_output: self._user_tr_output.set_participant(participant_identity)Switch audio and video streams to specified participant
async def start(self) ‑> None-
Expand source code
async def start(self) -> None: # -- create inputs -- input_audio_options = self._options.get_audio_input_options() if input_audio_options and input_audio_options.pre_connect_audio: self._pre_connect_audio_handler = PreConnectAudioHandler( room=self._room, timeout=input_audio_options.pre_connect_audio_timeout, ) self._pre_connect_audio_handler.register() input_text_options = self._options.get_text_input_options() if input_text_options: self._text_input_cb = input_text_options.text_input_cb try: self._room.register_text_stream_handler(TOPIC_CHAT, self._on_user_text_input) self._text_stream_handler_registered = True except ValueError: if utils.is_given(self._options.text_input): logger.warning( f"text stream handler for topic '{TOPIC_CHAT}' already set, ignoring" ) else: self._text_input_cb = None input_video_options = self._options.get_video_input_options() if input_video_options: self._video_input = _ParticipantVideoInputStream(self._room) if input_audio_options: self._audio_input = _ParticipantAudioInputStream( self._room, sample_rate=input_audio_options.sample_rate, num_channels=input_audio_options.num_channels, frame_size_ms=input_audio_options.frame_size_ms, noise_cancellation=input_audio_options.noise_cancellation, pre_connect_audio_handler=self._pre_connect_audio_handler, ) # -- create outputs -- output_audio_options = self._options.get_audio_output_options() if output_audio_options: self._audio_output = _ParticipantAudioOutput( self._room, sample_rate=output_audio_options.sample_rate, num_channels=output_audio_options.num_channels, track_publish_options=output_audio_options.track_publish_options, track_name=( output_audio_options.track_name if utils.is_given(output_audio_options.track_name) else "roomio_audio" ), ) output_text_options = self._options.get_text_output_options() if output_text_options: self._user_tr_output = _ParticipantTranscriptionOutput( room=self._room, is_delta_stream=False, participant=self._participant_identity ) self._user_transcript_ch = utils.aio.Chan[UserInputTranscribedEvent]() self._user_transcript_atask = asyncio.create_task( self._forward_user_transcript(self._user_transcript_ch) ) # TODO(long): add next in the chain for session.output.transcription self._agent_tr_output = _ParticipantTranscriptionOutput( room=self._room, is_delta_stream=True, participant=None ) # use the RoomIO's audio output if available, otherwise use the agent's audio output # (e.g the audio output isn't using RoomIO with our avatar datastream impl) if output_text_options.sync_transcription is not False and ( audio_output := self._audio_output or self._agent_session.output.audio ): self._tr_synchronizer = TranscriptSynchronizer( next_in_chain_audio=audio_output, next_in_chain_text=self._agent_tr_output, speed=output_text_options.transcription_speed_factor, ) # -- set the room event handlers -- self._room.on("participant_connected", self._on_participant_connected) self._room.on("connection_state_changed", self._on_connection_state_changed) self._room.on("participant_disconnected", self._on_participant_disconnected) if self._room.isconnected(): self._on_connection_state_changed(rtc.ConnectionState.CONN_CONNECTED) self._init_atask = asyncio.create_task(self._init_task()) # -- attach to the agent session -- if self.audio_input: self._agent_session.input.audio = self.audio_input if self.video_input: self._agent_session.input.video = self.video_input if self.audio_output: self._agent_session.output.audio = self.audio_output if self.transcription_output: self._agent_session.output.transcription = self.transcription_output self._agent_session.on("agent_state_changed", self._on_agent_state_changed) self._agent_session.on("user_input_transcribed", self._on_user_input_transcribed) self._agent_session.on("close", self._on_agent_session_close) def unset_participant(self) ‑> None-
Expand source code
def unset_participant(self) -> None: self._participant_identity = None self._participant_available_fut = asyncio.Future[rtc.RemoteParticipant]() if self._audio_input: self._audio_input.set_participant(None) if self._video_input: self._video_input.set_participant(None) if self._user_tr_output: self._user_tr_output.set_participant(None)
class RoomInputOptions (text_enabled: NotGivenOr[bool] = NOT_GIVEN,
audio_enabled: NotGivenOr[bool] = NOT_GIVEN,
video_enabled: NotGivenOr[bool] = NOT_GIVEN,
audio_sample_rate: int = 24000,
audio_num_channels: int = 1,
audio_frame_size_ms: int = 50,
noise_cancellation: rtc.NoiseCancellationOptions | None = None,
text_input_cb: TextInputCallback = <function _default_text_input_cb>,
participant_kinds: NotGivenOr[list[rtc.ParticipantKind.ValueType]] = NOT_GIVEN,
participant_identity: NotGivenOr[str] = NOT_GIVEN,
pre_connect_audio: bool = True,
pre_connect_audio_timeout: float = 3.0,
close_on_disconnect: bool = True,
delete_room_on_close: bool = False)-
Expand source code
@dataclass class RoomInputOptions: text_enabled: NotGivenOr[bool] = NOT_GIVEN """If not given, default to True.""" audio_enabled: NotGivenOr[bool] = NOT_GIVEN """If not given, default to True.""" video_enabled: NotGivenOr[bool] = NOT_GIVEN """If not given, default to False.""" audio_sample_rate: int = 24000 audio_num_channels: int = 1 audio_frame_size_ms: int = 50 """The frame size in milliseconds for the audio input.""" noise_cancellation: rtc.NoiseCancellationOptions | None = None text_input_cb: TextInputCallback = _default_text_input_cb participant_kinds: NotGivenOr[list[rtc.ParticipantKind.ValueType]] = NOT_GIVEN """Participant kinds accepted for auto subscription. If not provided, accept `DEFAULT_PARTICIPANT_KINDS`.""" participant_identity: NotGivenOr[str] = NOT_GIVEN """The participant to link to. If not provided, link to the first participant. Can be overridden by the `participant` argument of RoomIO constructor or `set_participant`.""" pre_connect_audio: bool = True """Pre-connect audio enabled or not.""" pre_connect_audio_timeout: float = 3.0 """The pre-connect audio will be ignored if it doesn't arrive within this time.""" close_on_disconnect: bool = True """Close the AgentSession if the linked participant disconnects with reasons in CLIENT_INITIATED, ROOM_DELETED, or USER_REJECTED.""" delete_room_on_close: bool = False """Delete the room when the AgentSession is closed, default to False"""RoomInputOptions(text_enabled: 'NotGivenOr[bool]' = NOT_GIVEN, audio_enabled: 'NotGivenOr[bool]' = NOT_GIVEN, video_enabled: 'NotGivenOr[bool]' = NOT_GIVEN, audio_sample_rate: 'int' = 24000, audio_num_channels: 'int' = 1, audio_frame_size_ms: 'int' = 50, noise_cancellation: 'rtc.NoiseCancellationOptions | None' = None, text_input_cb: 'TextInputCallback' =
, participant_kinds: 'NotGivenOr[list[rtc.ParticipantKind.ValueType]]' = NOT_GIVEN, participant_identity: 'NotGivenOr[str]' = NOT_GIVEN, pre_connect_audio: 'bool' = True, pre_connect_audio_timeout: 'float' = 3.0, close_on_disconnect: 'bool' = True, delete_room_on_close: 'bool' = False) Instance variables
var audio_enabled : NotGivenOr[bool]-
If not given, default to True.
var audio_frame_size_ms : int-
The frame size in milliseconds for the audio input.
var audio_num_channels : intvar audio_sample_rate : intvar close_on_disconnect : bool-
Close the AgentSession if the linked participant disconnects with reasons in CLIENT_INITIATED, ROOM_DELETED, or USER_REJECTED.
var delete_room_on_close : bool-
Delete the room when the AgentSession is closed, default to False
var noise_cancellation : rtc.NoiseCancellationOptions | Nonevar participant_identity : NotGivenOr[str]-
The participant to link to. If not provided, link to the first participant. Can be overridden by the
participantargument of RoomIO constructor orset_participant. var participant_kinds : NotGivenOr[list[rtc.ParticipantKind.ValueType]]-
Participant kinds accepted for auto subscription. If not provided, accept
DEFAULT_PARTICIPANT_KINDS. var pre_connect_audio : bool-
Pre-connect audio enabled or not.
var pre_connect_audio_timeout : float-
The pre-connect audio will be ignored if it doesn't arrive within this time.
var text_enabled : NotGivenOr[bool]-
If not given, default to True.
var video_enabled : NotGivenOr[bool]-
If not given, default to False.
Methods
def text_input_cb(sess: AgentSession,
ev: TextInputEvent) ‑> None-
Expand source code
def _default_text_input_cb(sess: AgentSession, ev: TextInputEvent) -> None: sess.interrupt() sess.generate_reply(user_input=ev.text)
class RoomOptions (text_input: NotGivenOr[TextInputOptions | bool] = NOT_GIVEN,
audio_input: NotGivenOr[AudioInputOptions | bool] = NOT_GIVEN,
video_input: NotGivenOr[VideoInputOptions | bool] = NOT_GIVEN,
audio_output: NotGivenOr[AudioOutputOptions | bool] = NOT_GIVEN,
text_output: NotGivenOr[TextOutputOptions | bool] = NOT_GIVEN,
participant_kinds: NotGivenOr[list[rtc.ParticipantKind.ValueType]] = NOT_GIVEN,
participant_identity: NotGivenOr[str] = NOT_GIVEN,
close_on_disconnect: bool = True,
delete_room_on_close: bool = False)-
Expand source code
@dataclass class RoomOptions: text_input: NotGivenOr[TextInputOptions | bool] = NOT_GIVEN """The text input options. If not provided, default to True.""" audio_input: NotGivenOr[AudioInputOptions | bool] = NOT_GIVEN """The audio input options. If not provided, default to True.""" video_input: NotGivenOr[VideoInputOptions | bool] = NOT_GIVEN """The video input options. If not provided, default to False.""" audio_output: NotGivenOr[AudioOutputOptions | bool] = NOT_GIVEN """The audio output options. If not provided, default to True.""" text_output: NotGivenOr[TextOutputOptions | bool] = NOT_GIVEN """The transcription output options. If not provided, default to True.""" participant_kinds: NotGivenOr[list[rtc.ParticipantKind.ValueType]] = NOT_GIVEN """Participant kinds accepted for auto subscription. If not provided, accept `DEFAULT_PARTICIPANT_KINDS`.""" participant_identity: NotGivenOr[str] = NOT_GIVEN """The participant to link to. If not provided, link to the first participant. Can be overridden by the `participant` argument of RoomIO constructor or `set_participant`.""" close_on_disconnect: bool = True """Close the AgentSession if the linked participant disconnects with reasons in CLIENT_INITIATED, ROOM_DELETED, or USER_REJECTED.""" delete_room_on_close: bool = False """Delete the room when the AgentSession is closed, default to False""" def get_text_input_options(self) -> TextInputOptions | None: if isinstance(self.text_input, TextInputOptions): return self.text_input # if text_input is not given, default to enabled return TextInputOptions() if self.text_input is not False else None def get_audio_input_options(self) -> AudioInputOptions | None: if isinstance(self.audio_input, AudioInputOptions): return self.audio_input # if audio_input is not given, default to enabled return AudioInputOptions() if self.audio_input is not False else None def get_video_input_options(self) -> VideoInputOptions | None: if isinstance(self.video_input, VideoInputOptions): return self.video_input # if video_input is not given, default to disabled return VideoInputOptions() if self.video_input is True else None def get_audio_output_options(self) -> AudioOutputOptions | None: if isinstance(self.audio_output, AudioOutputOptions): return self.audio_output return AudioOutputOptions() if self.audio_output is not False else None def get_text_output_options(self) -> TextOutputOptions | None: if isinstance(self.text_output, TextOutputOptions): return self.text_output return TextOutputOptions() if self.text_output is not False else None @classmethod def _ensure_options( cls, options: NotGivenOr[RoomOptions], *, room_input_options: NotGivenOr[RoomInputOptions] = NOT_GIVEN, room_output_options: NotGivenOr[RoomOutputOptions] = NOT_GIVEN, ) -> RoomOptions: if is_given(room_input_options) or is_given(room_output_options): logger.warning( "RoomInputOptions and RoomOutputOptions are deprecated, use RoomOptions instead" ) if not is_given(options): return cls._create_from_legacy(room_input_options, room_output_options) if isinstance(options, RoomOptions): return options elif is_given(options): raise ValueError(f"expected RoomOptions, got {type(options)}") else: return cls() @classmethod def _create_from_legacy( cls, input_options: NotGivenOr[RoomInputOptions], output_options: NotGivenOr[RoomOutputOptions], ) -> RoomOptions: opts = cls() if input_options: opts.text_input = ( TextInputOptions(text_input_cb=input_options.text_input_cb) if input_options.text_enabled is not False else False ) opts.audio_input = ( AudioInputOptions( sample_rate=input_options.audio_sample_rate, num_channels=input_options.audio_num_channels, frame_size_ms=input_options.audio_frame_size_ms, noise_cancellation=input_options.noise_cancellation, pre_connect_audio=input_options.pre_connect_audio, pre_connect_audio_timeout=input_options.pre_connect_audio_timeout, ) if input_options.audio_enabled is not False else False ) opts.video_input = input_options.video_enabled opts.participant_kinds = input_options.participant_kinds opts.participant_identity = input_options.participant_identity opts.close_on_disconnect = input_options.close_on_disconnect opts.delete_room_on_close = input_options.delete_room_on_close if output_options: opts.audio_output = ( AudioOutputOptions( sample_rate=output_options.audio_sample_rate, num_channels=output_options.audio_num_channels, track_publish_options=output_options.audio_publish_options, track_name=output_options.audio_track_name, ) if output_options.audio_enabled is not False else False ) opts.text_output = ( TextOutputOptions( sync_transcription=output_options.sync_transcription, transcription_speed_factor=output_options.transcription_speed_factor, ) if output_options.transcription_enabled is not False else False ) return optsRoomOptions(text_input: 'NotGivenOr[TextInputOptions | bool]' = NOT_GIVEN, audio_input: 'NotGivenOr[AudioInputOptions | bool]' = NOT_GIVEN, video_input: 'NotGivenOr[VideoInputOptions | bool]' = NOT_GIVEN, audio_output: 'NotGivenOr[AudioOutputOptions | bool]' = NOT_GIVEN, text_output: 'NotGivenOr[TextOutputOptions | bool]' = NOT_GIVEN, participant_kinds: 'NotGivenOr[list[rtc.ParticipantKind.ValueType]]' = NOT_GIVEN, participant_identity: 'NotGivenOr[str]' = NOT_GIVEN, close_on_disconnect: 'bool' = True, delete_room_on_close: 'bool' = False)
Instance variables
var audio_input : livekit.agents.voice.room_io.types.AudioInputOptions | bool | livekit.agents.types.NotGiven-
The audio input options. If not provided, default to True.
var audio_output : livekit.agents.voice.room_io.types.AudioOutputOptions | bool | livekit.agents.types.NotGiven-
The audio output options. If not provided, default to True.
var close_on_disconnect : bool-
Close the AgentSession if the linked participant disconnects with reasons in CLIENT_INITIATED, ROOM_DELETED, or USER_REJECTED.
var delete_room_on_close : bool-
Delete the room when the AgentSession is closed, default to False
var participant_identity : str | livekit.agents.types.NotGiven-
The participant to link to. If not provided, link to the first participant. Can be overridden by the
participantargument of RoomIO constructor orset_participant. var participant_kinds : list[int] | livekit.agents.types.NotGiven-
Participant kinds accepted for auto subscription. If not provided, accept
DEFAULT_PARTICIPANT_KINDS. var text_input : livekit.agents.voice.room_io.types.TextInputOptions | bool | livekit.agents.types.NotGiven-
The text input options. If not provided, default to True.
var text_output : livekit.agents.voice.room_io.types.TextOutputOptions | bool | livekit.agents.types.NotGiven-
The transcription output options. If not provided, default to True.
var video_input : livekit.agents.voice.room_io.types.VideoInputOptions | bool | livekit.agents.types.NotGiven-
The video input options. If not provided, default to False.
Methods
def get_audio_input_options(self) ‑> livekit.agents.voice.room_io.types.AudioInputOptions | None-
Expand source code
def get_audio_input_options(self) -> AudioInputOptions | None: if isinstance(self.audio_input, AudioInputOptions): return self.audio_input # if audio_input is not given, default to enabled return AudioInputOptions() if self.audio_input is not False else None def get_audio_output_options(self) ‑> livekit.agents.voice.room_io.types.AudioOutputOptions | None-
Expand source code
def get_audio_output_options(self) -> AudioOutputOptions | None: if isinstance(self.audio_output, AudioOutputOptions): return self.audio_output return AudioOutputOptions() if self.audio_output is not False else None def get_text_input_options(self) ‑> livekit.agents.voice.room_io.types.TextInputOptions | None-
Expand source code
def get_text_input_options(self) -> TextInputOptions | None: if isinstance(self.text_input, TextInputOptions): return self.text_input # if text_input is not given, default to enabled return TextInputOptions() if self.text_input is not False else None def get_text_output_options(self) ‑> livekit.agents.voice.room_io.types.TextOutputOptions | None-
Expand source code
def get_text_output_options(self) -> TextOutputOptions | None: if isinstance(self.text_output, TextOutputOptions): return self.text_output return TextOutputOptions() if self.text_output is not False else None def get_video_input_options(self) ‑> livekit.agents.voice.room_io.types.VideoInputOptions | None-
Expand source code
def get_video_input_options(self) -> VideoInputOptions | None: if isinstance(self.video_input, VideoInputOptions): return self.video_input # if video_input is not given, default to disabled return VideoInputOptions() if self.video_input is True else None
class RoomOutputOptions (transcription_enabled: NotGivenOr[bool] = NOT_GIVEN,
audio_enabled: NotGivenOr[bool] = NOT_GIVEN,
audio_sample_rate: int = 24000,
audio_num_channels: int = 1,
audio_publish_options: rtc.TrackPublishOptions = <factory>,
audio_track_name: NotGivenOr[str] = NOT_GIVEN,
sync_transcription: NotGivenOr[bool] = NOT_GIVEN,
transcription_speed_factor: float = 1.0)-
Expand source code
@dataclass class RoomOutputOptions: transcription_enabled: NotGivenOr[bool] = NOT_GIVEN """If not given, default to True.""" audio_enabled: NotGivenOr[bool] = NOT_GIVEN """If not given, default to True.""" audio_sample_rate: int = 24000 audio_num_channels: int = 1 audio_publish_options: rtc.TrackPublishOptions = field( default_factory=lambda: rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_MICROPHONE) ) audio_track_name: NotGivenOr[str] = NOT_GIVEN """The name of the audio track to publish. If not provided, default to "roomio_audio".""" sync_transcription: NotGivenOr[bool] = NOT_GIVEN """False to disable transcription synchronization with audio output. Otherwise, transcription is emitted as quickly as available.""" transcription_speed_factor: float = 1.0 """Speed factor of transcription synchronization with audio output. Only effective if `sync_transcription` is True."""RoomOutputOptions(transcription_enabled: 'NotGivenOr[bool]' = NOT_GIVEN, audio_enabled: 'NotGivenOr[bool]' = NOT_GIVEN, audio_sample_rate: 'int' = 24000, audio_num_channels: 'int' = 1, audio_publish_options: 'rtc.TrackPublishOptions' =
, audio_track_name: 'NotGivenOr[str]' = NOT_GIVEN, sync_transcription: 'NotGivenOr[bool]' = NOT_GIVEN, transcription_speed_factor: 'float' = 1.0) Instance variables
var audio_enabled : bool | livekit.agents.types.NotGiven-
If not given, default to True.
var audio_num_channels : intvar audio_publish_options : room_pb2.TrackPublishOptionsvar audio_sample_rate : intvar audio_track_name : str | livekit.agents.types.NotGiven-
The name of the audio track to publish. If not provided, default to "roomio_audio".
var sync_transcription : bool | livekit.agents.types.NotGiven-
False to disable transcription synchronization with audio output. Otherwise, transcription is emitted as quickly as available.
var transcription_enabled : bool | livekit.agents.types.NotGiven-
If not given, default to True.
var transcription_speed_factor : float-
Speed factor of transcription synchronization with audio output. Only effective if
sync_transcriptionis True.
class TextInputEvent (text: str, info: rtc.TextStreamInfo, participant: rtc.RemoteParticipant)-
Expand source code
@dataclass class TextInputEvent: text: str info: rtc.TextStreamInfo participant: rtc.RemoteParticipantTextInputEvent(text: 'str', info: 'rtc.TextStreamInfo', participant: 'rtc.RemoteParticipant')
Instance variables
var info : TextStreamInfovar participant : RemoteParticipantvar text : str
class TextInputOptions (text_input_cb: TextInputCallback = <function _default_text_input_cb>)-
Expand source code
@dataclass class TextInputOptions: text_input_cb: TextInputCallback = _default_text_input_cbTextInputOptions(text_input_cb: 'TextInputCallback' =
) Methods
def text_input_cb(sess: AgentSession,
ev: TextInputEvent) ‑> None-
Expand source code
def _default_text_input_cb(sess: AgentSession, ev: TextInputEvent) -> None: sess.interrupt() sess.generate_reply(user_input=ev.text)
class TextOutputOptions (sync_transcription: NotGivenOr[bool] = NOT_GIVEN,
transcription_speed_factor: float = 1.0)-
Expand source code
@dataclass class TextOutputOptions: sync_transcription: NotGivenOr[bool] = NOT_GIVEN """False to disable transcription synchronization with audio output. Otherwise, transcription is emitted as quickly as available.""" transcription_speed_factor: float = 1.0 """Speed factor of transcription synchronization with audio output. Only effective if `sync_transcription` is True."""TextOutputOptions(sync_transcription: 'NotGivenOr[bool]' = NOT_GIVEN, transcription_speed_factor: 'float' = 1.0)
Instance variables
var sync_transcription : bool | livekit.agents.types.NotGiven-
False to disable transcription synchronization with audio output. Otherwise, transcription is emitted as quickly as available.
var transcription_speed_factor : float-
Speed factor of transcription synchronization with audio output. Only effective if
sync_transcriptionis True.
class VideoInputOptions-
Expand source code
@dataclass class VideoInputOptions: passVideoInputOptions()
class _ParticipantAudioOutput (room: rtc.Room,
*,
sample_rate: int,
num_channels: int,
track_publish_options: rtc.TrackPublishOptions,
track_name: str = 'roomio_audio')-
Expand source code
class _ParticipantAudioOutput(io.AudioOutput): def __init__( self, room: rtc.Room, *, sample_rate: int, num_channels: int, track_publish_options: rtc.TrackPublishOptions, track_name: str = "roomio_audio", ) -> None: super().__init__( label="RoomIO", next_in_chain=None, sample_rate=sample_rate, capabilities=io.AudioOutputCapabilities(pause=True), ) self._room = room self._track_name = track_name self._lock = asyncio.Lock() self._audio_source = rtc.AudioSource(sample_rate, num_channels, queue_size_ms=200) self._publish_options = track_publish_options self._publication: rtc.LocalTrackPublication | None = None self._subscribed_fut = asyncio.Future[None]() self._audio_buf = utils.aio.Chan[rtc.AudioFrame]() self._audio_bstream = utils.audio.AudioByteStream( sample_rate, num_channels, samples_per_channel=sample_rate // 20 ) # chunk the frame into a small, fixed size # used to republish track on reconnection self._republish_task: asyncio.Task[None] | None = None self._flush_task: asyncio.Task[None] | None = None self._interrupted_event = asyncio.Event() self._forwarding_task: asyncio.Task[None] | None = None self._pushed_duration: float = 0.0 self._playback_enabled = asyncio.Event() self._playback_enabled.set() async def _publish_track(self) -> None: async with self._lock: track = rtc.LocalAudioTrack.create_audio_track(self._track_name, self._audio_source) self._publication = await self._room.local_participant.publish_track( track, self._publish_options ) await self._publication.wait_for_subscription() if not self._subscribed_fut.done(): self._subscribed_fut.set_result(None) @property def subscribed(self) -> asyncio.Future[None]: return self._subscribed_fut async def start(self) -> None: self._forwarding_task = asyncio.create_task(self._forward_audio()) await self._publish_track() self._room.on("reconnected", self._on_reconnected) async def aclose(self) -> None: self._room.off("reconnected", self._on_reconnected) if self._republish_task: await utils.aio.cancel_and_wait(self._republish_task) if self._flush_task: await utils.aio.cancel_and_wait(self._flush_task) if self._forwarding_task: await utils.aio.cancel_and_wait(self._forwarding_task) await self._audio_source.aclose() async def capture_frame(self, frame: rtc.AudioFrame) -> None: await self._subscribed_fut await super().capture_frame(frame) if self._flush_task and not self._flush_task.done(): logger.error("capture_frame called while flush is in progress") await self._flush_task for f in self._audio_bstream.push(frame.data): await self._audio_buf.send(f) self._pushed_duration += f.duration def flush(self) -> None: super().flush() for f in self._audio_bstream.flush(): self._audio_buf.send_nowait(f) self._pushed_duration += f.duration if not self._pushed_duration: return if self._flush_task and not self._flush_task.done(): # shouldn't happen if only one active speech handle at a time logger.error("flush called while playback is in progress") self._flush_task.cancel() self._flush_task = asyncio.create_task(self._wait_for_playout()) def clear_buffer(self) -> None: self._audio_bstream.clear() if not self._pushed_duration: return self._interrupted_event.set() def pause(self) -> None: super().pause() self._playback_enabled.clear() # self._audio_source.clear_queue() def resume(self) -> None: super().resume() self._playback_enabled.set() async def _wait_for_playout(self) -> None: wait_for_interruption = asyncio.create_task(self._interrupted_event.wait()) async def _wait_buffered_audio() -> None: while not self._audio_buf.empty(): if not self._playback_enabled.is_set(): await self._playback_enabled.wait() await self._audio_source.wait_for_playout() # avoid deadlock when clear_buffer called before capture_frame await asyncio.sleep(0) wait_for_playout = asyncio.create_task(_wait_buffered_audio()) await asyncio.wait( [wait_for_playout, wait_for_interruption], return_when=asyncio.FIRST_COMPLETED, ) interrupted = wait_for_interruption.done() pushed_duration = self._pushed_duration if interrupted: queued_duration = self._audio_source.queued_duration while not self._audio_buf.empty(): queued_duration += self._audio_buf.recv_nowait().duration pushed_duration = max(pushed_duration - queued_duration, 0) self._audio_source.clear_queue() wait_for_playout.cancel() else: wait_for_interruption.cancel() self._pushed_duration = 0 self._interrupted_event.clear() self.on_playback_finished(playback_position=pushed_duration, interrupted=interrupted) async def _forward_audio(self) -> None: async for frame in self._audio_buf: if not self._playback_enabled.is_set(): self._audio_source.clear_queue() await self._playback_enabled.wait() # TODO(long): save the frames in the queue and play them later # TODO(long): ignore frames from previous syllable if self._interrupted_event.is_set() or self._pushed_duration == 0: if self._interrupted_event.is_set() and self._flush_task: await self._flush_task # ignore frames if interrupted continue await self._audio_source.capture_frame(frame) def _on_reconnected(self) -> None: if self._republish_task: self._republish_task.cancel() self._republish_task = asyncio.create_task(self._publish_track())Helper class that provides a standard way to create an ABC using inheritance.
Args
sample_rate- The sample rate required by the audio sink, if None, any sample rate is accepted
Ancestors
- AudioOutput
- abc.ABC
- EventEmitter
- typing.Generic
Instance variables
prop subscribed : asyncio.Future[None]-
Expand source code
@property def subscribed(self) -> asyncio.Future[None]: return self._subscribed_fut
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: self._room.off("reconnected", self._on_reconnected) if self._republish_task: await utils.aio.cancel_and_wait(self._republish_task) if self._flush_task: await utils.aio.cancel_and_wait(self._flush_task) if self._forwarding_task: await utils.aio.cancel_and_wait(self._forwarding_task) await self._audio_source.aclose() async def start(self) ‑> None-
Expand source code
async def start(self) -> None: self._forwarding_task = asyncio.create_task(self._forward_audio()) await self._publish_track() self._room.on("reconnected", self._on_reconnected)
Inherited members
class _ParticipantStreamTranscriptionOutput (room: rtc.Room,
*,
is_delta_stream: bool = True,
participant: rtc.Participant | str | None = None,
attributes: dict[str, str] | None = None)-
Expand source code
class _ParticipantStreamTranscriptionOutput: def __init__( self, room: rtc.Room, *, is_delta_stream: bool = True, participant: rtc.Participant | str | None = None, attributes: dict[str, str] | None = None, ): self._room, self._is_delta_stream = room, is_delta_stream self._track_id: str | None = None self._participant_identity: str | None = None self._additional_attributes = attributes or {} self._writer: rtc.TextStreamWriter | None = None self._room.on("track_published", self._on_track_published) self._room.on("local_track_published", self._on_local_track_published) self._flush_atask: asyncio.Task[None] | None = None self._reset_state() self.set_participant(participant) def set_participant( self, participant: rtc.Participant | str | None, ) -> None: self._participant_identity = ( participant.identity if isinstance(participant, rtc.Participant) else participant ) if self._participant_identity is None: return try: self._track_id = find_micro_track_id(self._room, self._participant_identity) except ValueError: # track id is optional for TextStream when audio is not published self._track_id = None self.flush() self._reset_state() def _reset_state(self) -> None: self._current_id = utils.shortuuid("SG_") self._capturing = False self._latest_text = "" async def _create_text_writer( self, attributes: dict[str, str] | None = None ) -> rtc.TextStreamWriter: assert self._participant_identity is not None, "participant_identity is not set" if not attributes: attributes = { ATTRIBUTE_TRANSCRIPTION_FINAL: "false", } if self._track_id: attributes[ATTRIBUTE_TRANSCRIPTION_TRACK_ID] = self._track_id attributes[ATTRIBUTE_TRANSCRIPTION_SEGMENT_ID] = self._current_id for key, val in self._additional_attributes.items(): if key not in attributes: attributes[key] = val return await self._room.local_participant.stream_text( topic=TOPIC_TRANSCRIPTION, sender_identity=self._participant_identity, attributes=attributes, ) @utils.log_exceptions(logger=logger) async def capture_text(self, text: str) -> None: if self._participant_identity is None: return if self._flush_atask and not self._flush_atask.done(): await self._flush_atask if not self._capturing: self._reset_state() self._capturing = True self._latest_text = text try: if self._room.isconnected(): if self._is_delta_stream: # reuse the existing writer if self._writer is None: self._writer = await self._create_text_writer() await self._writer.write(text) else: # always create a new writer tmp_writer = await self._create_text_writer() await tmp_writer.write(text) await tmp_writer.aclose() except Exception as e: logger.warning("failed to publish transcription", exc_info=e) async def _flush_task(self, writer: rtc.TextStreamWriter | None) -> None: attributes = {ATTRIBUTE_TRANSCRIPTION_FINAL: "true"} if self._track_id: attributes[ATTRIBUTE_TRANSCRIPTION_TRACK_ID] = self._track_id try: if self._room.isconnected(): if self._is_delta_stream: if writer: await writer.aclose(attributes=attributes) else: tmp_writer = await self._create_text_writer(attributes=attributes) await tmp_writer.write(self._latest_text) await tmp_writer.aclose() except Exception as e: logger.warning("failed to publish transcription", exc_info=e) def flush(self) -> None: if self._participant_identity is None or not self._capturing: return self._capturing = False curr_writer = self._writer self._writer = None self._flush_atask = asyncio.create_task(self._flush_task(curr_writer)) def _on_track_published( self, track: rtc.RemoteTrackPublication, participant: rtc.RemoteParticipant ) -> None: if ( self._participant_identity is None or participant.identity != self._participant_identity or track.source != rtc.TrackSource.SOURCE_MICROPHONE ): return self._track_id = track.sid def _on_local_track_published(self, track: rtc.LocalTrackPublication, _: rtc.Track) -> None: if ( self._participant_identity is None or self._participant_identity != self._room.local_participant.identity or track.source != rtc.TrackSource.SOURCE_MICROPHONE ): return self._track_id = track.sidMethods
async def capture_text(self, text: str) ‑> None-
Expand source code
@utils.log_exceptions(logger=logger) async def capture_text(self, text: str) -> None: if self._participant_identity is None: return if self._flush_atask and not self._flush_atask.done(): await self._flush_atask if not self._capturing: self._reset_state() self._capturing = True self._latest_text = text try: if self._room.isconnected(): if self._is_delta_stream: # reuse the existing writer if self._writer is None: self._writer = await self._create_text_writer() await self._writer.write(text) else: # always create a new writer tmp_writer = await self._create_text_writer() await tmp_writer.write(text) await tmp_writer.aclose() except Exception as e: logger.warning("failed to publish transcription", exc_info=e) def flush(self) ‑> None-
Expand source code
def flush(self) -> None: if self._participant_identity is None or not self._capturing: return self._capturing = False curr_writer = self._writer self._writer = None self._flush_atask = asyncio.create_task(self._flush_task(curr_writer)) def set_participant(self, participant: rtc.Participant | str | None) ‑> None-
Expand source code
def set_participant( self, participant: rtc.Participant | str | None, ) -> None: self._participant_identity = ( participant.identity if isinstance(participant, rtc.Participant) else participant ) if self._participant_identity is None: return try: self._track_id = find_micro_track_id(self._room, self._participant_identity) except ValueError: # track id is optional for TextStream when audio is not published self._track_id = None self.flush() self._reset_state()
class _ParticipantTranscriptionOutput (*,
room: rtc.Room,
is_delta_stream: bool = True,
participant: rtc.Participant | str | None = None,
next_in_chain: io.TextOutput | None = None)-
Expand source code
class _ParticipantTranscriptionOutput(io.TextOutput): def __init__( self, *, room: rtc.Room, is_delta_stream: bool = True, participant: rtc.Participant | str | None = None, next_in_chain: io.TextOutput | None = None, ) -> None: super().__init__(label="RoomIO", next_in_chain=next_in_chain) self.__outputs: list[ _ParticipantLegacyTranscriptionOutput | _ParticipantStreamTranscriptionOutput ] = [ _ParticipantLegacyTranscriptionOutput( room=room, is_delta_stream=is_delta_stream, participant=participant, ), _ParticipantStreamTranscriptionOutput( room=room, is_delta_stream=is_delta_stream, participant=participant, ), ] def set_participant(self, participant: rtc.Participant | str | None) -> None: for source in self.__outputs: source.set_participant(participant) async def capture_text(self, text: str) -> None: await asyncio.gather(*[sink.capture_text(text) for sink in self.__outputs]) if self.next_in_chain: await self.next_in_chain.capture_text(text) def flush(self) -> None: for source in self.__outputs: source.flush() if self.next_in_chain: self.next_in_chain.flush()Helper class that provides a standard way to create an ABC using inheritance.
Ancestors
- TextOutput
- abc.ABC
Methods
def set_participant(self, participant: rtc.Participant | str | None) ‑> None-
Expand source code
def set_participant(self, participant: rtc.Participant | str | None) -> None: for source in self.__outputs: source.set_participant(participant)
Inherited members