Module livekit.agents.voice.avatar
Classes
class AudioReceiver
-
Expand source code
class AudioReceiver(ABC, rtc.EventEmitter[Literal["clear_buffer"]]): async def start(self) -> None: pass @abstractmethod def notify_playback_finished( self, playback_position: float, interrupted: bool ) -> None | Coroutine[None, None, None]: """Notify the sender that playback has finished""" @abstractmethod def __aiter__(self) -> AsyncIterator[rtc.AudioFrame | AudioSegmentEnd]: """Continuously stream out audio frames or AudioSegmentEnd when the stream ends""" async def aclose(self) -> None: pass
Helper class that provides a standard way to create an ABC using inheritance.
Initialize a new instance of EventEmitter.
Ancestors
- abc.ABC
- EventEmitter
- typing.Generic
Subclasses
- livekit.agents.voice.avatar._datastream_io.DataStreamAudioReceiver
- livekit.agents.voice.avatar._queue_io.QueueAudioOutput
Methods
async def aclose(self) ‑> None
-
Expand source code
async def aclose(self) -> None: pass
def notify_playback_finished(self, playback_position: float, interrupted: bool) ‑> None | collections.abc.Coroutine[None, None, None]
-
Expand source code
@abstractmethod def notify_playback_finished( self, playback_position: float, interrupted: bool ) -> None | Coroutine[None, None, None]: """Notify the sender that playback has finished"""
Notify the sender that playback has finished
async def start(self) ‑> None
-
Expand source code
async def start(self) -> None: pass
Inherited members
class AudioSegmentEnd
-
Expand source code
class AudioSegmentEnd: pass
class AvatarOptions (video_width: int,
video_height: int,
video_fps: float,
audio_sample_rate: int,
audio_channels: int)-
Expand source code
@dataclass class AvatarOptions: video_width: int video_height: int video_fps: float audio_sample_rate: int audio_channels: int
AvatarOptions(video_width: 'int', video_height: 'int', video_fps: 'float', audio_sample_rate: 'int', audio_channels: 'int')
Instance variables
var audio_channels : int
var audio_sample_rate : int
var video_fps : float
var video_height : int
var video_width : int
class AvatarRunner (room: rtc.Room,
*,
audio_recv: AudioReceiver,
video_gen: VideoGenerator,
options: AvatarOptions)-
Expand source code
class AvatarRunner: """Worker that generates synchronized avatar video based on received audio""" def __init__( self, room: rtc.Room, *, audio_recv: AudioReceiver, video_gen: VideoGenerator, options: AvatarOptions, _queue_size_ms: int = 100, # queue size of the AV synchronizer _lazy_publish: bool = True, # publish video and audio tracks until the first frame pushed ) -> None: self._room = room self._video_gen = video_gen self._options = options self._queue_size_ms = _queue_size_ms self._audio_recv = audio_recv self._playback_position = 0.0 self._audio_playing = False self._tasks: set[asyncio.Task[Any]] = set() self._lock = asyncio.Lock() self._audio_publication: rtc.LocalTrackPublication | None = None self._video_publication: rtc.LocalTrackPublication | None = None self._republish_atask: asyncio.Task[None] | None = None self._lazy_publish = _lazy_publish # Audio/video sources self._audio_source = rtc.AudioSource( sample_rate=options.audio_sample_rate, num_channels=options.audio_channels, queue_size_ms=self._queue_size_ms, ) self._video_source = rtc.VideoSource(width=options.video_width, height=options.video_height) # AV synchronizer self._av_sync = rtc.AVSynchronizer( audio_source=self._audio_source, video_source=self._video_source, video_fps=options.video_fps, video_queue_size_ms=self._queue_size_ms, ) self._forward_video_atask: asyncio.Task[None] | None = None self._room_connected_fut = asyncio.Future[None]() @property def av_sync(self) -> rtc.AVSynchronizer: return self._av_sync async def start(self) -> None: """Start the worker""" # start audio receiver await self._audio_recv.start() self._audio_recv.on("clear_buffer", self._on_clear_buffer) self._room.on("reconnected", self._on_reconnected) self._room.on("connection_state_changed", self._on_connection_state_changed) if self._room.isconnected(): self._room_connected_fut.set_result(None) if not self._lazy_publish: await self._publish_track() # start processing self._read_audio_atask = asyncio.create_task(self._read_audio()) self._forward_video_atask = asyncio.create_task(self._forward_video()) async def wait_for_complete(self) -> None: if not self._read_audio_atask or not self._forward_video_atask: raise RuntimeError("AvatarRunner not started") await asyncio.gather( self._read_audio_atask, self._forward_video_atask, ) async def _publish_track(self) -> None: async with self._lock: await self._room_connected_fut audio_track = rtc.LocalAudioTrack.create_audio_track("avatar_audio", self._audio_source) audio_options = rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_MICROPHONE) self._audio_publication = await self._room.local_participant.publish_track( audio_track, audio_options ) await self._audio_publication.wait_for_subscription() video_track = rtc.LocalVideoTrack.create_video_track("avatar_video", self._video_source) video_options = rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_CAMERA) self._video_publication = await self._room.local_participant.publish_track( video_track, video_options ) @log_exceptions(logger=logger) async def _read_audio(self) -> None: async for frame in self._audio_recv: if not self._audio_playing and isinstance(frame, rtc.AudioFrame): self._audio_playing = True await self._video_gen.push_audio(frame) @log_exceptions(logger=logger) async def _forward_video(self) -> None: """Forward video to the room through the AV synchronizer""" async for frame in self._video_gen: if isinstance(frame, AudioSegmentEnd): # notify the agent that the audio has finished playing if self._audio_playing: notify_task = self._audio_recv.notify_playback_finished( playback_position=self._playback_position, interrupted=False, ) self._audio_playing = False self._playback_position = 0.0 if asyncio.iscoroutine(notify_task): # avoid blocking the video forwarding task = asyncio.create_task(notify_task) self._tasks.add(task) task.add_done_callback(self._tasks.discard) continue if not self._video_publication: await self._publish_track() await self._av_sync.push(frame) if isinstance(frame, rtc.AudioFrame): self._playback_position += frame.duration def _on_clear_buffer(self) -> None: """Handle clearing the buffer and notify about interrupted playback""" @log_exceptions(logger=logger) async def _handle_clear_buffer(audio_playing: bool) -> None: clear_task = self._video_gen.clear_buffer() if asyncio.iscoroutine(clear_task): await clear_task if audio_playing: notify_task = self._audio_recv.notify_playback_finished( playback_position=self._playback_position, interrupted=True, ) self._playback_position = 0.0 if asyncio.iscoroutine(notify_task): await notify_task task = asyncio.create_task(_handle_clear_buffer(self._audio_playing)) self._tasks.add(task) task.add_done_callback(self._tasks.discard) self._audio_playing = False def _on_reconnected(self) -> None: if self._lazy_publish and not self._video_publication: return if self._republish_atask: self._republish_atask.cancel() self._republish_atask = asyncio.create_task(self._publish_track()) def _on_connection_state_changed(self, _: rtc.ConnectionState) -> None: if self._room.isconnected() and not self._room_connected_fut.done(): self._room_connected_fut.set_result(None) async def aclose(self) -> None: self._room.off("reconnected", self._on_reconnected) self._room.off("connection_state_changed", self._on_connection_state_changed) await self._audio_recv.aclose() if self._forward_video_atask: await aio.cancel_and_wait(self._forward_video_atask) if self._read_audio_atask: await aio.cancel_and_wait(self._read_audio_atask) await aio.cancel_and_wait(*self._tasks) if self._republish_atask: await aio.cancel_and_wait(self._republish_atask) await self._av_sync.aclose() await self._audio_source.aclose() await self._video_source.aclose()
Worker that generates synchronized avatar video based on received audio
Instance variables
prop av_sync : rtc.AVSynchronizer
-
Expand source code
@property def av_sync(self) -> rtc.AVSynchronizer: return self._av_sync
Methods
async def aclose(self) ‑> None
-
Expand source code
async def aclose(self) -> None: self._room.off("reconnected", self._on_reconnected) self._room.off("connection_state_changed", self._on_connection_state_changed) await self._audio_recv.aclose() if self._forward_video_atask: await aio.cancel_and_wait(self._forward_video_atask) if self._read_audio_atask: await aio.cancel_and_wait(self._read_audio_atask) await aio.cancel_and_wait(*self._tasks) if self._republish_atask: await aio.cancel_and_wait(self._republish_atask) await self._av_sync.aclose() await self._audio_source.aclose() await self._video_source.aclose()
async def start(self) ‑> None
-
Expand source code
async def start(self) -> None: """Start the worker""" # start audio receiver await self._audio_recv.start() self._audio_recv.on("clear_buffer", self._on_clear_buffer) self._room.on("reconnected", self._on_reconnected) self._room.on("connection_state_changed", self._on_connection_state_changed) if self._room.isconnected(): self._room_connected_fut.set_result(None) if not self._lazy_publish: await self._publish_track() # start processing self._read_audio_atask = asyncio.create_task(self._read_audio()) self._forward_video_atask = asyncio.create_task(self._forward_video())
Start the worker
async def wait_for_complete(self) ‑> None
-
Expand source code
async def wait_for_complete(self) -> None: if not self._read_audio_atask or not self._forward_video_atask: raise RuntimeError("AvatarRunner not started") await asyncio.gather( self._read_audio_atask, self._forward_video_atask, )
class DataStreamAudioOutput (room: rtc.Room,
*,
destination_identity: str,
sample_rate: int | None = None,
wait_remote_track: rtc.TrackKind.ValueType | None = None)-
Expand source code
class DataStreamAudioOutput(AudioOutput): """ AudioOutput implementation that streams audio to a remote avatar worker using LiveKit DataStream. """ # noqa: E501 _playback_finished_handlers: dict[str, Callable[[rtc.RpcInvocationData], str]] = {} _playback_finished_rpc_registered: bool = False def __init__( self, room: rtc.Room, *, destination_identity: str, sample_rate: int | None = None, wait_remote_track: rtc.TrackKind.ValueType | None = None, ): super().__init__(label="DataStreamIO", next_in_chain=None, sample_rate=sample_rate) self._room = room self._destination_identity = destination_identity self._wait_remote_track = wait_remote_track self._stream_writer: rtc.ByteStreamWriter | None = None self._pushed_duration: float = 0.0 self._tasks: set[asyncio.Task[Any]] = set() self._room_connected_fut = asyncio.Future[None]() self._room.on("connection_state_changed", self._handle_connection_state_changed) if self._room.isconnected(): self._room_connected_fut.set_result(None) self._started = False self._lock = asyncio.Lock() self._start_atask: asyncio.Task | None = None @utils.log_exceptions(logger=logger) async def _start_task(self) -> None: async with self._lock: if self._started: return await self._room_connected_fut self._register_playback_finished_rpc( self._room, caller_identity=self._destination_identity, handler=self._handle_playback_finished, ) logger.debug( "waiting for the remote participant", extra={"identity": self._destination_identity}, ) await utils.wait_for_participant(room=self._room, identity=self._destination_identity) if self._wait_remote_track: logger.debug( "waiting for the remote track", extra={ "identity": self._destination_identity, "kind": rtc.TrackKind.Name(self._wait_remote_track), }, ) await utils.wait_for_track_publication( room=self._room, identity=self._destination_identity, kind=self._wait_remote_track, ) logger.debug("remote participant ready", extra={"identity": self._destination_identity}) self._started = True async def capture_frame(self, frame: rtc.AudioFrame) -> None: """Capture and stream audio frame to remote worker""" # TODO(theomonnom): this class should be encapsuled somewhere else # to allow for a clean close if self._start_atask is None: self._start_atask = asyncio.create_task(self._start_task()) # TODO(theomonnom): what to do if start takes a while? # we want to avoid OOM & outdated speech? await asyncio.shield(self._start_atask) await super().capture_frame(frame) if not self._stream_writer: self._stream_writer = await self._room.local_participant.stream_bytes( name=utils.shortuuid("AUDIO_"), topic=AUDIO_STREAM_TOPIC, destination_identities=[self._destination_identity], attributes={ "sample_rate": str(frame.sample_rate), "num_channels": str(frame.num_channels), }, ) self._pushed_duration = 0.0 await self._stream_writer.write(bytes(frame.data)) self._pushed_duration += frame.duration def flush(self) -> None: """Mark end of current audio segment""" super().flush() if self._stream_writer is None or not self._started: return # close the stream marking the end of the segment task = asyncio.create_task(self._stream_writer.aclose()) self._tasks.add(task) task.add_done_callback(self._tasks.discard) self._stream_writer = None def clear_buffer(self) -> None: if not self._started: return task = asyncio.create_task( self._room.local_participant.perform_rpc( destination_identity=self._destination_identity, method=RPC_CLEAR_BUFFER, payload="", ) ) self._tasks.add(task) task.add_done_callback(self._tasks.discard) def _handle_playback_finished(self, data: rtc.RpcInvocationData) -> str: if data.caller_identity != self._destination_identity: logger.warning( "playback finished event received from unexpected participant", extra={ "caller_identity": data.caller_identity, "expected_identity": self._destination_identity, }, ) return "reject" logger.info( "playback finished event received", extra={"caller_identity": data.caller_identity}, ) event = PlaybackFinishedEvent(**json.loads(data.payload)) self.on_playback_finished( playback_position=event.playback_position, interrupted=event.interrupted, ) return "ok" def _handle_connection_state_changed(self, state: rtc.ConnectionState) -> None: if self._room.isconnected() and not self._room_connected_fut.done(): self._room_connected_fut.set_result(None) @classmethod def _register_playback_finished_rpc( cls, room: rtc.Room, *, caller_identity: str, handler: Callable[[rtc.RpcInvocationData], str], ) -> None: cls._playback_finished_handlers[caller_identity] = handler if cls._playback_finished_rpc_registered: return def _handler(data: rtc.RpcInvocationData) -> str: if data.caller_identity not in cls._playback_finished_handlers: logger.warning( "playback finished event received from unexpected participant", extra={ "caller_identity": data.caller_identity, "expected_identities": list(cls._playback_finished_handlers.keys()), }, ) return "reject" return cls._playback_finished_handlers[data.caller_identity](data) room.local_participant.register_rpc_method(RPC_PLAYBACK_FINISHED, _handler) cls._playback_finished_rpc_registered = True
AudioOutput implementation that streams audio to a remote avatar worker using LiveKit DataStream.
Args
sample_rate
- The sample rate required by the audio sink, if None, any sample rate is accepted
Ancestors
- AudioOutput
- abc.ABC
- EventEmitter
- typing.Generic
Methods
async def capture_frame(self, frame: rtc.AudioFrame) ‑> None
-
Expand source code
async def capture_frame(self, frame: rtc.AudioFrame) -> None: """Capture and stream audio frame to remote worker""" # TODO(theomonnom): this class should be encapsuled somewhere else # to allow for a clean close if self._start_atask is None: self._start_atask = asyncio.create_task(self._start_task()) # TODO(theomonnom): what to do if start takes a while? # we want to avoid OOM & outdated speech? await asyncio.shield(self._start_atask) await super().capture_frame(frame) if not self._stream_writer: self._stream_writer = await self._room.local_participant.stream_bytes( name=utils.shortuuid("AUDIO_"), topic=AUDIO_STREAM_TOPIC, destination_identities=[self._destination_identity], attributes={ "sample_rate": str(frame.sample_rate), "num_channels": str(frame.num_channels), }, ) self._pushed_duration = 0.0 await self._stream_writer.write(bytes(frame.data)) self._pushed_duration += frame.duration
Capture and stream audio frame to remote worker
def flush(self) ‑> None
-
Expand source code
def flush(self) -> None: """Mark end of current audio segment""" super().flush() if self._stream_writer is None or not self._started: return # close the stream marking the end of the segment task = asyncio.create_task(self._stream_writer.aclose()) self._tasks.add(task) task.add_done_callback(self._tasks.discard) self._stream_writer = None
Mark end of current audio segment
Inherited members
class DataStreamAudioReceiver (room: rtc.Room,
*,
sender_identity: str | None = None,
frame_size_ms: NotGivenOr[int] = NOT_GIVEN,
rpc_max_retries: int = 3)-
Expand source code
class DataStreamAudioReceiver(AudioReceiver): """ Audio receiver that receives streamed audio from a sender participant using LiveKit DataStream. If the sender_identity is provided, subscribe to the specified participant. If not provided, subscribe to the first agent participant in the room. """ _clear_buffer_rpc_registered: bool = False _clear_buffer_handlers: dict[str, Callable[[rtc.RpcInvocationData], str]] = {} def __init__( self, room: rtc.Room, *, sender_identity: str | None = None, frame_size_ms: NotGivenOr[int] = NOT_GIVEN, rpc_max_retries: int = 3, ): super().__init__() self._room = room self._sender_identity = sender_identity self._remote_participant: rtc.RemoteParticipant | None = None self._frame_size_ms = frame_size_ms or 100 self._stream_readers: list[rtc.ByteStreamReader] = [] self._stream_reader_changed: asyncio.Event = asyncio.Event() self._data_ch = utils.aio.Chan[Union[rtc.AudioFrame, AudioSegmentEnd]]() self._current_reader: rtc.ByteStreamReader | None = None self._current_reader_cleared: bool = False self._playback_finished_ch = utils.aio.Chan[PlaybackFinishedEvent]() self._rpc_max_retries = rpc_max_retries self._main_atask: asyncio.Task | None = None self._exception: Exception | None = None self._closing: bool = False async def start(self) -> None: # wait for the target participant or first agent participant to join self._remote_participant = await utils.wait_for_participant( room=self._room, identity=self._sender_identity, kind=rtc.ParticipantKind.PARTICIPANT_KIND_AGENT if not self._sender_identity else None, ) self._main_atask = asyncio.create_task(self._main_task()) def _handle_clear_buffer(data: rtc.RpcInvocationData) -> str: assert self._remote_participant is not None if data.caller_identity != self._remote_participant.identity: logger.warning( "clear buffer event received from unexpected participant", extra={ "caller_identity": data.caller_identity, "expected_identity": self._remote_participant.identity, }, ) return "reject" if self._current_reader: self._current_reader_cleared = True self.emit("clear_buffer") return "ok" def _handle_stream_received( reader: rtc.ByteStreamReader, remote_participant_id: str ) -> None: if ( not self._remote_participant or remote_participant_id != self._remote_participant.identity ): return self._stream_readers.append(reader) self._stream_reader_changed.set() self._register_clear_buffer_rpc( self._room, caller_identity=self._remote_participant.identity, handler=_handle_clear_buffer, ) self._room.register_byte_stream_handler(AUDIO_STREAM_TOPIC, _handle_stream_received) def notify_playback_finished(self, playback_position: float, interrupted: bool) -> None: self._playback_finished_ch.send_nowait( PlaybackFinishedEvent(playback_position=playback_position, interrupted=interrupted) ) async def _main_task(self) -> None: tasks = [ asyncio.create_task(self._recv_task()), asyncio.create_task(self._send_task()), ] try: await asyncio.gather(*tasks) except Exception as error: self._exception = error finally: self._playback_finished_ch.close() self._data_ch.close() await utils.aio.cancel_and_wait(*tasks) @utils.log_exceptions(logger=logger) async def _send_task(self) -> None: async for event in self._playback_finished_ch: assert self._remote_participant is not None retry_count = 0 # TODO: use retry logic in rust while retry_count < self._rpc_max_retries: logger.debug( f"notifying playback finished: {event.playback_position:.3f}s, " f"interrupted: {event.interrupted}" ) try: await self._room.local_participant.perform_rpc( destination_identity=self._remote_participant.identity, method=RPC_PLAYBACK_FINISHED, payload=json.dumps(asdict(event)), ) break except rtc.RpcError as e: if retry_count == self._rpc_max_retries - 1: logger.error( f"failed to notify playback finished after {retry_count + 1} retries", exc_info=e, ) raise retry_count += 1 logger.warning("failed to notify the agent playback finished, retrying...") await asyncio.sleep(0.1) @utils.log_exceptions(logger=logger) async def _recv_task(self) -> None: while not self._data_ch.closed: await self._stream_reader_changed.wait() while self._stream_readers: self._current_reader = self._stream_readers.pop(0) if ( not (attrs := self._current_reader.info.attributes) or "sample_rate" not in attrs or "num_channels" not in attrs ): raise ValueError("sample_rate or num_channels not found in byte stream") sample_rate = int(attrs["sample_rate"]) num_channels = int(attrs["num_channels"]) bstream = utils.audio.AudioByteStream( sample_rate=sample_rate, num_channels=num_channels, samples_per_channel=int(math.ceil(sample_rate * self._frame_size_ms / 1000)), ) try: async for data in self._current_reader: if self._current_reader_cleared: # ignore the rest data of the current reader if clear_buffer was called break for frame in bstream.push(data): self._data_ch.send_nowait(frame) if not self._current_reader_cleared: for frame in bstream.flush(): self._data_ch.send_nowait(frame) self._current_reader = None self._current_reader_cleared = False self._data_ch.send_nowait(AudioSegmentEnd()) except utils.aio.ChanClosed: if self._closing: return raise self._stream_reader_changed.clear() def __aiter__(self) -> AsyncIterator[rtc.AudioFrame | AudioSegmentEnd]: return self async def __anext__(self) -> rtc.AudioFrame | AudioSegmentEnd: try: return await self._data_ch.recv() except utils.aio.ChanClosed as e: if self._exception: raise self._exception from e raise StopAsyncIteration from None async def aclose(self) -> None: self._closing = True self._playback_finished_ch.close() self._data_ch.close() self._stream_reader_changed.set() if self._main_atask: await utils.aio.cancel_and_wait(self._main_atask) @classmethod def _register_clear_buffer_rpc( cls, room: rtc.Room, *, caller_identity: str, handler: Callable[[rtc.RpcInvocationData], str], ) -> None: cls._clear_buffer_handlers[caller_identity] = handler if cls._clear_buffer_rpc_registered: return def _handler(data: rtc.RpcInvocationData) -> str: if data.caller_identity not in cls._clear_buffer_handlers: logger.warning( "clear buffer event received from unexpected participant", extra={ "caller_identity": data.caller_identity, "expected_identities": list(cls._clear_buffer_handlers.keys()), }, ) return "reject" return cls._clear_buffer_handlers[data.caller_identity](data) room.local_participant.register_rpc_method(RPC_CLEAR_BUFFER, _handler) cls._clear_buffer_rpc_registered = True
Audio receiver that receives streamed audio from a sender participant using LiveKit DataStream. If the sender_identity is provided, subscribe to the specified participant. If not provided, subscribe to the first agent participant in the room.
Initialize a new instance of EventEmitter.
Ancestors
- livekit.agents.voice.avatar._types.AudioReceiver
- abc.ABC
- EventEmitter
- typing.Generic
Methods
async def aclose(self) ‑> None
-
Expand source code
async def aclose(self) -> None: self._closing = True self._playback_finished_ch.close() self._data_ch.close() self._stream_reader_changed.set() if self._main_atask: await utils.aio.cancel_and_wait(self._main_atask)
def notify_playback_finished(self, playback_position: float, interrupted: bool) ‑> None
-
Expand source code
def notify_playback_finished(self, playback_position: float, interrupted: bool) -> None: self._playback_finished_ch.send_nowait( PlaybackFinishedEvent(playback_position=playback_position, interrupted=interrupted) )
Notify the sender that playback has finished
async def start(self) ‑> None
-
Expand source code
async def start(self) -> None: # wait for the target participant or first agent participant to join self._remote_participant = await utils.wait_for_participant( room=self._room, identity=self._sender_identity, kind=rtc.ParticipantKind.PARTICIPANT_KIND_AGENT if not self._sender_identity else None, ) self._main_atask = asyncio.create_task(self._main_task()) def _handle_clear_buffer(data: rtc.RpcInvocationData) -> str: assert self._remote_participant is not None if data.caller_identity != self._remote_participant.identity: logger.warning( "clear buffer event received from unexpected participant", extra={ "caller_identity": data.caller_identity, "expected_identity": self._remote_participant.identity, }, ) return "reject" if self._current_reader: self._current_reader_cleared = True self.emit("clear_buffer") return "ok" def _handle_stream_received( reader: rtc.ByteStreamReader, remote_participant_id: str ) -> None: if ( not self._remote_participant or remote_participant_id != self._remote_participant.identity ): return self._stream_readers.append(reader) self._stream_reader_changed.set() self._register_clear_buffer_rpc( self._room, caller_identity=self._remote_participant.identity, handler=_handle_clear_buffer, ) self._room.register_byte_stream_handler(AUDIO_STREAM_TOPIC, _handle_stream_received)
Inherited members
class QueueAudioOutput (*, sample_rate: int | None = None)
-
Expand source code
class QueueAudioOutput( AudioOutput, AudioReceiver, rtc.EventEmitter[Literal["playback_finished", "clear_buffer"]], ): """ AudioOutput implementation that sends audio frames through a queue. """ def __init__(self, *, sample_rate: int | None = None): super().__init__(label="DebugQueueIO", next_in_chain=None, sample_rate=sample_rate) self._data_ch = utils.aio.Chan[Union[rtc.AudioFrame, AudioSegmentEnd]]() self._capturing = False async def capture_frame(self, frame: rtc.AudioFrame) -> None: """Capture and queue audio frame""" await super().capture_frame(frame) if not self._capturing: self._capturing = True await self._data_ch.send(frame) def flush(self) -> None: """Mark end of current audio segment""" super().flush() if not self._capturing: return self._capturing = False self._data_ch.send_nowait(AudioSegmentEnd()) # as AudioReceiver for AvatarRunner def clear_buffer(self) -> None: """Clear the audio buffer""" while True: try: self._data_ch.recv_nowait() except utils.aio.channel.ChanEmpty: break self.emit("clear_buffer") # type: ignore def notify_playback_finished(self, playback_position: float, interrupted: bool) -> None: self.on_playback_finished(playback_position=playback_position, interrupted=interrupted) def __aiter__(self) -> AsyncIterator[rtc.AudioFrame | AudioSegmentEnd]: return self._data_ch
AudioOutput implementation that sends audio frames through a queue.
Args
sample_rate
- The sample rate required by the audio sink, if None, any sample rate is accepted
Ancestors
- AudioOutput
- livekit.agents.voice.avatar._types.AudioReceiver
- abc.ABC
- EventEmitter
- typing.Generic
Methods
async def capture_frame(self, frame: rtc.AudioFrame) ‑> None
-
Expand source code
async def capture_frame(self, frame: rtc.AudioFrame) -> None: """Capture and queue audio frame""" await super().capture_frame(frame) if not self._capturing: self._capturing = True await self._data_ch.send(frame)
Capture and queue audio frame
def clear_buffer(self) ‑> None
-
Expand source code
def clear_buffer(self) -> None: """Clear the audio buffer""" while True: try: self._data_ch.recv_nowait() except utils.aio.channel.ChanEmpty: break self.emit("clear_buffer") # type: ignore
Clear the audio buffer
def flush(self) ‑> None
-
Expand source code
def flush(self) -> None: """Mark end of current audio segment""" super().flush() if not self._capturing: return self._capturing = False self._data_ch.send_nowait(AudioSegmentEnd())
Mark end of current audio segment
def notify_playback_finished(self, playback_position: float, interrupted: bool) ‑> None
-
Expand source code
def notify_playback_finished(self, playback_position: float, interrupted: bool) -> None: self.on_playback_finished(playback_position=playback_position, interrupted=interrupted)
Notify the sender that playback has finished
Inherited members
class VideoGenerator
-
Expand source code
class VideoGenerator(ABC): @abstractmethod async def push_audio(self, frame: rtc.AudioFrame | AudioSegmentEnd) -> None: """Push an audio frame to the video generator""" @abstractmethod def clear_buffer(self) -> None | Coroutine[None, None, None]: """Clear the audio buffer, stopping audio playback immediately""" @abstractmethod def __aiter__( self, ) -> AsyncIterator[rtc.VideoFrame | rtc.AudioFrame | AudioSegmentEnd]: """Continuously stream out video and audio frames, or AudioSegmentEnd when the audio segment ends""" # noqa: E501
Helper class that provides a standard way to create an ABC using inheritance.
Ancestors
- abc.ABC
Subclasses
- livekit.plugins.bithuman.avatar.BithumanGenerator
Methods
def clear_buffer(self) ‑> None | collections.abc.Coroutine[None, None, None]
-
Expand source code
@abstractmethod def clear_buffer(self) -> None | Coroutine[None, None, None]: """Clear the audio buffer, stopping audio playback immediately"""
Clear the audio buffer, stopping audio playback immediately
async def push_audio(self,
frame: rtc.AudioFrame | AudioSegmentEnd) ‑> None-
Expand source code
@abstractmethod async def push_audio(self, frame: rtc.AudioFrame | AudioSegmentEnd) -> None: """Push an audio frame to the video generator"""
Push an audio frame to the video generator