Module livekit.agents.voice.avatar

Classes

class AudioReceiver
Expand source code
class AudioReceiver(ABC, rtc.EventEmitter[Literal["clear_buffer"]]):
    async def start(self) -> None:
        pass

    @abstractmethod
    def notify_playback_finished(
        self, playback_position: float, interrupted: bool
    ) -> None | Coroutine[None, None, None]:
        """Notify the sender that playback has finished"""

    @abstractmethod
    def __aiter__(self) -> AsyncIterator[rtc.AudioFrame | AudioSegmentEnd]:
        """Continuously stream out audio frames or AudioSegmentEnd when the stream ends"""

    async def aclose(self) -> None:
        pass

Helper class that provides a standard way to create an ABC using inheritance.

Initialize a new instance of EventEmitter.

Ancestors

Subclasses

  • livekit.agents.voice.avatar._datastream_io.DataStreamAudioReceiver
  • livekit.agents.voice.avatar._queue_io.QueueAudioOutput

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    pass
def notify_playback_finished(self, playback_position: float, interrupted: bool) ‑> None | collections.abc.Coroutine[None, None, None]
Expand source code
@abstractmethod
def notify_playback_finished(
    self, playback_position: float, interrupted: bool
) -> None | Coroutine[None, None, None]:
    """Notify the sender that playback has finished"""

Notify the sender that playback has finished

async def start(self) ‑> None
Expand source code
async def start(self) -> None:
    pass

Inherited members

class AudioSegmentEnd
Expand source code
class AudioSegmentEnd:
    pass
class AvatarOptions (video_width: int,
video_height: int,
video_fps: float,
audio_sample_rate: int,
audio_channels: int)
Expand source code
@dataclass
class AvatarOptions:
    video_width: int
    video_height: int
    video_fps: float
    audio_sample_rate: int
    audio_channels: int

AvatarOptions(video_width: 'int', video_height: 'int', video_fps: 'float', audio_sample_rate: 'int', audio_channels: 'int')

Instance variables

var audio_channels : int
var audio_sample_rate : int
var video_fps : float
var video_height : int
var video_width : int
class AvatarRunner (room: rtc.Room,
*,
audio_recv: AudioReceiver,
video_gen: VideoGenerator,
options: AvatarOptions)
Expand source code
class AvatarRunner:
    """Worker that generates synchronized avatar video based on received audio"""

    def __init__(
        self,
        room: rtc.Room,
        *,
        audio_recv: AudioReceiver,
        video_gen: VideoGenerator,
        options: AvatarOptions,
        _queue_size_ms: int = 100,
        # queue size of the AV synchronizer
        _lazy_publish: bool = True,
        # publish video and audio tracks until the first frame pushed
    ) -> None:
        self._room = room
        self._video_gen = video_gen
        self._options = options
        self._queue_size_ms = _queue_size_ms

        self._audio_recv = audio_recv
        self._playback_position = 0.0
        self._audio_playing = False
        self._tasks: set[asyncio.Task[Any]] = set()

        self._lock = asyncio.Lock()
        self._audio_publication: rtc.LocalTrackPublication | None = None
        self._video_publication: rtc.LocalTrackPublication | None = None
        self._republish_atask: asyncio.Task[None] | None = None
        self._lazy_publish = _lazy_publish

        # Audio/video sources
        self._audio_source = rtc.AudioSource(
            sample_rate=options.audio_sample_rate,
            num_channels=options.audio_channels,
            queue_size_ms=self._queue_size_ms,
        )
        self._video_source = rtc.VideoSource(width=options.video_width, height=options.video_height)
        # AV synchronizer
        self._av_sync = rtc.AVSynchronizer(
            audio_source=self._audio_source,
            video_source=self._video_source,
            video_fps=options.video_fps,
            video_queue_size_ms=self._queue_size_ms,
        )
        self._forward_video_atask: asyncio.Task[None] | None = None
        self._room_connected_fut = asyncio.Future[None]()

    @property
    def av_sync(self) -> rtc.AVSynchronizer:
        return self._av_sync

    async def start(self) -> None:
        """Start the worker"""

        # start audio receiver
        await self._audio_recv.start()
        self._audio_recv.on("clear_buffer", self._on_clear_buffer)

        self._room.on("reconnected", self._on_reconnected)
        self._room.on("connection_state_changed", self._on_connection_state_changed)
        if self._room.isconnected():
            self._room_connected_fut.set_result(None)

        if not self._lazy_publish:
            await self._publish_track()

        # start processing
        self._read_audio_atask = asyncio.create_task(self._read_audio())
        self._forward_video_atask = asyncio.create_task(self._forward_video())

    async def wait_for_complete(self) -> None:
        if not self._read_audio_atask or not self._forward_video_atask:
            raise RuntimeError("AvatarRunner not started")

        await asyncio.gather(
            self._read_audio_atask,
            self._forward_video_atask,
        )

    async def _publish_track(self) -> None:
        async with self._lock:
            await self._room_connected_fut

            audio_track = rtc.LocalAudioTrack.create_audio_track("avatar_audio", self._audio_source)
            audio_options = rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_MICROPHONE)
            self._audio_publication = await self._room.local_participant.publish_track(
                audio_track, audio_options
            )
            await self._audio_publication.wait_for_subscription()

            video_track = rtc.LocalVideoTrack.create_video_track("avatar_video", self._video_source)
            video_options = rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_CAMERA)
            self._video_publication = await self._room.local_participant.publish_track(
                video_track, video_options
            )

    @log_exceptions(logger=logger)
    async def _read_audio(self) -> None:
        async for frame in self._audio_recv:
            if not self._audio_playing and isinstance(frame, rtc.AudioFrame):
                self._audio_playing = True
            await self._video_gen.push_audio(frame)

    @log_exceptions(logger=logger)
    async def _forward_video(self) -> None:
        """Forward video to the room through the AV synchronizer"""

        async for frame in self._video_gen:
            if isinstance(frame, AudioSegmentEnd):
                # notify the agent that the audio has finished playing
                if self._audio_playing:
                    notify_task = self._audio_recv.notify_playback_finished(
                        playback_position=self._playback_position,
                        interrupted=False,
                    )
                    self._audio_playing = False
                    self._playback_position = 0.0
                    if asyncio.iscoroutine(notify_task):
                        # avoid blocking the video forwarding
                        task = asyncio.create_task(notify_task)
                        self._tasks.add(task)
                        task.add_done_callback(self._tasks.discard)
                continue

            if not self._video_publication:
                await self._publish_track()

            await self._av_sync.push(frame)
            if isinstance(frame, rtc.AudioFrame):
                self._playback_position += frame.duration

    def _on_clear_buffer(self) -> None:
        """Handle clearing the buffer and notify about interrupted playback"""

        @log_exceptions(logger=logger)
        async def _handle_clear_buffer(audio_playing: bool) -> None:
            clear_task = self._video_gen.clear_buffer()
            if asyncio.iscoroutine(clear_task):
                await clear_task

            if audio_playing:
                notify_task = self._audio_recv.notify_playback_finished(
                    playback_position=self._playback_position,
                    interrupted=True,
                )
                self._playback_position = 0.0
                if asyncio.iscoroutine(notify_task):
                    await notify_task

        task = asyncio.create_task(_handle_clear_buffer(self._audio_playing))
        self._tasks.add(task)
        task.add_done_callback(self._tasks.discard)
        self._audio_playing = False

    def _on_reconnected(self) -> None:
        if self._lazy_publish and not self._video_publication:
            return

        if self._republish_atask:
            self._republish_atask.cancel()
        self._republish_atask = asyncio.create_task(self._publish_track())

    def _on_connection_state_changed(self, _: rtc.ConnectionState) -> None:
        if self._room.isconnected() and not self._room_connected_fut.done():
            self._room_connected_fut.set_result(None)

    async def aclose(self) -> None:
        self._room.off("reconnected", self._on_reconnected)
        self._room.off("connection_state_changed", self._on_connection_state_changed)

        await self._audio_recv.aclose()
        if self._forward_video_atask:
            await aio.cancel_and_wait(self._forward_video_atask)
        if self._read_audio_atask:
            await aio.cancel_and_wait(self._read_audio_atask)
        await aio.cancel_and_wait(*self._tasks)

        if self._republish_atask:
            await aio.cancel_and_wait(self._republish_atask)

        await self._av_sync.aclose()
        await self._audio_source.aclose()
        await self._video_source.aclose()

Worker that generates synchronized avatar video based on received audio

Instance variables

prop av_sync : rtc.AVSynchronizer
Expand source code
@property
def av_sync(self) -> rtc.AVSynchronizer:
    return self._av_sync

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    self._room.off("reconnected", self._on_reconnected)
    self._room.off("connection_state_changed", self._on_connection_state_changed)

    await self._audio_recv.aclose()
    if self._forward_video_atask:
        await aio.cancel_and_wait(self._forward_video_atask)
    if self._read_audio_atask:
        await aio.cancel_and_wait(self._read_audio_atask)
    await aio.cancel_and_wait(*self._tasks)

    if self._republish_atask:
        await aio.cancel_and_wait(self._republish_atask)

    await self._av_sync.aclose()
    await self._audio_source.aclose()
    await self._video_source.aclose()
async def start(self) ‑> None
Expand source code
async def start(self) -> None:
    """Start the worker"""

    # start audio receiver
    await self._audio_recv.start()
    self._audio_recv.on("clear_buffer", self._on_clear_buffer)

    self._room.on("reconnected", self._on_reconnected)
    self._room.on("connection_state_changed", self._on_connection_state_changed)
    if self._room.isconnected():
        self._room_connected_fut.set_result(None)

    if not self._lazy_publish:
        await self._publish_track()

    # start processing
    self._read_audio_atask = asyncio.create_task(self._read_audio())
    self._forward_video_atask = asyncio.create_task(self._forward_video())

Start the worker

async def wait_for_complete(self) ‑> None
Expand source code
async def wait_for_complete(self) -> None:
    if not self._read_audio_atask or not self._forward_video_atask:
        raise RuntimeError("AvatarRunner not started")

    await asyncio.gather(
        self._read_audio_atask,
        self._forward_video_atask,
    )
class DataStreamAudioOutput (room: rtc.Room,
*,
destination_identity: str,
sample_rate: int | None = None,
wait_remote_track: rtc.TrackKind.ValueType | None = None)
Expand source code
class DataStreamAudioOutput(AudioOutput):
    """
    AudioOutput implementation that streams audio to a remote avatar worker using LiveKit DataStream.
    """  # noqa: E501

    _playback_finished_handlers: dict[str, Callable[[rtc.RpcInvocationData], str]] = {}
    _playback_finished_rpc_registered: bool = False

    def __init__(
        self,
        room: rtc.Room,
        *,
        destination_identity: str,
        sample_rate: int | None = None,
        wait_remote_track: rtc.TrackKind.ValueType | None = None,
    ):
        super().__init__(label="DataStreamIO", next_in_chain=None, sample_rate=sample_rate)
        self._room = room
        self._destination_identity = destination_identity
        self._wait_remote_track = wait_remote_track
        self._stream_writer: rtc.ByteStreamWriter | None = None
        self._pushed_duration: float = 0.0
        self._tasks: set[asyncio.Task[Any]] = set()

        self._room_connected_fut = asyncio.Future[None]()
        self._room.on("connection_state_changed", self._handle_connection_state_changed)
        if self._room.isconnected():
            self._room_connected_fut.set_result(None)
        self._started = False
        self._lock = asyncio.Lock()

        self._start_atask: asyncio.Task | None = None

    @utils.log_exceptions(logger=logger)
    async def _start_task(self) -> None:
        async with self._lock:
            if self._started:
                return

            await self._room_connected_fut

            self._register_playback_finished_rpc(
                self._room,
                caller_identity=self._destination_identity,
                handler=self._handle_playback_finished,
            )
            logger.debug(
                "waiting for the remote participant",
                extra={"identity": self._destination_identity},
            )
            await utils.wait_for_participant(room=self._room, identity=self._destination_identity)
            if self._wait_remote_track:
                logger.debug(
                    "waiting for the remote track",
                    extra={
                        "identity": self._destination_identity,
                        "kind": rtc.TrackKind.Name(self._wait_remote_track),
                    },
                )
                await utils.wait_for_track_publication(
                    room=self._room,
                    identity=self._destination_identity,
                    kind=self._wait_remote_track,
                )
            logger.debug("remote participant ready", extra={"identity": self._destination_identity})

            self._started = True

    async def capture_frame(self, frame: rtc.AudioFrame) -> None:
        """Capture and stream audio frame to remote worker"""
        # TODO(theomonnom): this class should be encapsuled somewhere else
        # to allow for a clean close
        if self._start_atask is None:
            self._start_atask = asyncio.create_task(self._start_task())

        # TODO(theomonnom): what to do if start takes a while?
        # we want to avoid OOM & outdated speech?
        await asyncio.shield(self._start_atask)

        await super().capture_frame(frame)

        if not self._stream_writer:
            self._stream_writer = await self._room.local_participant.stream_bytes(
                name=utils.shortuuid("AUDIO_"),
                topic=AUDIO_STREAM_TOPIC,
                destination_identities=[self._destination_identity],
                attributes={
                    "sample_rate": str(frame.sample_rate),
                    "num_channels": str(frame.num_channels),
                },
            )
            self._pushed_duration = 0.0
        await self._stream_writer.write(bytes(frame.data))
        self._pushed_duration += frame.duration

    def flush(self) -> None:
        """Mark end of current audio segment"""
        super().flush()
        if self._stream_writer is None or not self._started:
            return

        # close the stream marking the end of the segment
        task = asyncio.create_task(self._stream_writer.aclose())
        self._tasks.add(task)
        task.add_done_callback(self._tasks.discard)

        self._stream_writer = None

    def clear_buffer(self) -> None:
        if not self._started:
            return

        task = asyncio.create_task(
            self._room.local_participant.perform_rpc(
                destination_identity=self._destination_identity,
                method=RPC_CLEAR_BUFFER,
                payload="",
            )
        )
        self._tasks.add(task)
        task.add_done_callback(self._tasks.discard)

    def _handle_playback_finished(self, data: rtc.RpcInvocationData) -> str:
        if data.caller_identity != self._destination_identity:
            logger.warning(
                "playback finished event received from unexpected participant",
                extra={
                    "caller_identity": data.caller_identity,
                    "expected_identity": self._destination_identity,
                },
            )
            return "reject"

        logger.info(
            "playback finished event received",
            extra={"caller_identity": data.caller_identity},
        )

        event = PlaybackFinishedEvent(**json.loads(data.payload))
        self.on_playback_finished(
            playback_position=event.playback_position,
            interrupted=event.interrupted,
        )
        return "ok"

    def _handle_connection_state_changed(self, state: rtc.ConnectionState) -> None:
        if self._room.isconnected() and not self._room_connected_fut.done():
            self._room_connected_fut.set_result(None)

    @classmethod
    def _register_playback_finished_rpc(
        cls,
        room: rtc.Room,
        *,
        caller_identity: str,
        handler: Callable[[rtc.RpcInvocationData], str],
    ) -> None:
        cls._playback_finished_handlers[caller_identity] = handler

        if cls._playback_finished_rpc_registered:
            return

        def _handler(data: rtc.RpcInvocationData) -> str:
            if data.caller_identity not in cls._playback_finished_handlers:
                logger.warning(
                    "playback finished event received from unexpected participant",
                    extra={
                        "caller_identity": data.caller_identity,
                        "expected_identities": list(cls._playback_finished_handlers.keys()),
                    },
                )
                return "reject"
            return cls._playback_finished_handlers[data.caller_identity](data)

        room.local_participant.register_rpc_method(RPC_PLAYBACK_FINISHED, _handler)
        cls._playback_finished_rpc_registered = True

AudioOutput implementation that streams audio to a remote avatar worker using LiveKit DataStream.

Args

sample_rate
The sample rate required by the audio sink, if None, any sample rate is accepted

Ancestors

Methods

async def capture_frame(self, frame: rtc.AudioFrame) ‑> None
Expand source code
async def capture_frame(self, frame: rtc.AudioFrame) -> None:
    """Capture and stream audio frame to remote worker"""
    # TODO(theomonnom): this class should be encapsuled somewhere else
    # to allow for a clean close
    if self._start_atask is None:
        self._start_atask = asyncio.create_task(self._start_task())

    # TODO(theomonnom): what to do if start takes a while?
    # we want to avoid OOM & outdated speech?
    await asyncio.shield(self._start_atask)

    await super().capture_frame(frame)

    if not self._stream_writer:
        self._stream_writer = await self._room.local_participant.stream_bytes(
            name=utils.shortuuid("AUDIO_"),
            topic=AUDIO_STREAM_TOPIC,
            destination_identities=[self._destination_identity],
            attributes={
                "sample_rate": str(frame.sample_rate),
                "num_channels": str(frame.num_channels),
            },
        )
        self._pushed_duration = 0.0
    await self._stream_writer.write(bytes(frame.data))
    self._pushed_duration += frame.duration

Capture and stream audio frame to remote worker

def flush(self) ‑> None
Expand source code
def flush(self) -> None:
    """Mark end of current audio segment"""
    super().flush()
    if self._stream_writer is None or not self._started:
        return

    # close the stream marking the end of the segment
    task = asyncio.create_task(self._stream_writer.aclose())
    self._tasks.add(task)
    task.add_done_callback(self._tasks.discard)

    self._stream_writer = None

Mark end of current audio segment

Inherited members

class DataStreamAudioReceiver (room: rtc.Room,
*,
sender_identity: str | None = None,
frame_size_ms: NotGivenOr[int] = NOT_GIVEN,
rpc_max_retries: int = 3)
Expand source code
class DataStreamAudioReceiver(AudioReceiver):
    """
    Audio receiver that receives streamed audio from a sender participant using LiveKit DataStream.
    If the sender_identity is provided, subscribe to the specified participant. If not provided,
    subscribe to the first agent participant in the room.
    """

    _clear_buffer_rpc_registered: bool = False
    _clear_buffer_handlers: dict[str, Callable[[rtc.RpcInvocationData], str]] = {}

    def __init__(
        self,
        room: rtc.Room,
        *,
        sender_identity: str | None = None,
        frame_size_ms: NotGivenOr[int] = NOT_GIVEN,
        rpc_max_retries: int = 3,
    ):
        super().__init__()
        self._room = room
        self._sender_identity = sender_identity
        self._remote_participant: rtc.RemoteParticipant | None = None
        self._frame_size_ms = frame_size_ms or 100

        self._stream_readers: list[rtc.ByteStreamReader] = []
        self._stream_reader_changed: asyncio.Event = asyncio.Event()
        self._data_ch = utils.aio.Chan[Union[rtc.AudioFrame, AudioSegmentEnd]]()

        self._current_reader: rtc.ByteStreamReader | None = None
        self._current_reader_cleared: bool = False

        self._playback_finished_ch = utils.aio.Chan[PlaybackFinishedEvent]()
        self._rpc_max_retries = rpc_max_retries

        self._main_atask: asyncio.Task | None = None
        self._exception: Exception | None = None
        self._closing: bool = False

    async def start(self) -> None:
        # wait for the target participant or first agent participant to join
        self._remote_participant = await utils.wait_for_participant(
            room=self._room,
            identity=self._sender_identity,
            kind=rtc.ParticipantKind.PARTICIPANT_KIND_AGENT if not self._sender_identity else None,
        )
        self._main_atask = asyncio.create_task(self._main_task())

        def _handle_clear_buffer(data: rtc.RpcInvocationData) -> str:
            assert self._remote_participant is not None
            if data.caller_identity != self._remote_participant.identity:
                logger.warning(
                    "clear buffer event received from unexpected participant",
                    extra={
                        "caller_identity": data.caller_identity,
                        "expected_identity": self._remote_participant.identity,
                    },
                )
                return "reject"

            if self._current_reader:
                self._current_reader_cleared = True
            self.emit("clear_buffer")
            return "ok"

        def _handle_stream_received(
            reader: rtc.ByteStreamReader, remote_participant_id: str
        ) -> None:
            if (
                not self._remote_participant
                or remote_participant_id != self._remote_participant.identity
            ):
                return

            self._stream_readers.append(reader)
            self._stream_reader_changed.set()

        self._register_clear_buffer_rpc(
            self._room,
            caller_identity=self._remote_participant.identity,
            handler=_handle_clear_buffer,
        )
        self._room.register_byte_stream_handler(AUDIO_STREAM_TOPIC, _handle_stream_received)

    def notify_playback_finished(self, playback_position: float, interrupted: bool) -> None:
        self._playback_finished_ch.send_nowait(
            PlaybackFinishedEvent(playback_position=playback_position, interrupted=interrupted)
        )

    async def _main_task(self) -> None:
        tasks = [
            asyncio.create_task(self._recv_task()),
            asyncio.create_task(self._send_task()),
        ]
        try:
            await asyncio.gather(*tasks)
        except Exception as error:
            self._exception = error
        finally:
            self._playback_finished_ch.close()
            self._data_ch.close()
            await utils.aio.cancel_and_wait(*tasks)

    @utils.log_exceptions(logger=logger)
    async def _send_task(self) -> None:
        async for event in self._playback_finished_ch:
            assert self._remote_participant is not None

            retry_count = 0  # TODO: use retry logic in rust
            while retry_count < self._rpc_max_retries:
                logger.debug(
                    f"notifying playback finished: {event.playback_position:.3f}s, "
                    f"interrupted: {event.interrupted}"
                )
                try:
                    await self._room.local_participant.perform_rpc(
                        destination_identity=self._remote_participant.identity,
                        method=RPC_PLAYBACK_FINISHED,
                        payload=json.dumps(asdict(event)),
                    )
                    break
                except rtc.RpcError as e:
                    if retry_count == self._rpc_max_retries - 1:
                        logger.error(
                            f"failed to notify playback finished after {retry_count + 1} retries",
                            exc_info=e,
                        )
                        raise
                    retry_count += 1
                    logger.warning("failed to notify the agent playback finished, retrying...")
                    await asyncio.sleep(0.1)

    @utils.log_exceptions(logger=logger)
    async def _recv_task(self) -> None:
        while not self._data_ch.closed:
            await self._stream_reader_changed.wait()

            while self._stream_readers:
                self._current_reader = self._stream_readers.pop(0)

                if (
                    not (attrs := self._current_reader.info.attributes)
                    or "sample_rate" not in attrs
                    or "num_channels" not in attrs
                ):
                    raise ValueError("sample_rate or num_channels not found in byte stream")

                sample_rate = int(attrs["sample_rate"])
                num_channels = int(attrs["num_channels"])
                bstream = utils.audio.AudioByteStream(
                    sample_rate=sample_rate,
                    num_channels=num_channels,
                    samples_per_channel=int(math.ceil(sample_rate * self._frame_size_ms / 1000)),
                )

                try:
                    async for data in self._current_reader:
                        if self._current_reader_cleared:
                            # ignore the rest data of the current reader if clear_buffer was called
                            break
                        for frame in bstream.push(data):
                            self._data_ch.send_nowait(frame)

                    if not self._current_reader_cleared:
                        for frame in bstream.flush():
                            self._data_ch.send_nowait(frame)

                    self._current_reader = None
                    self._current_reader_cleared = False
                    self._data_ch.send_nowait(AudioSegmentEnd())

                except utils.aio.ChanClosed:
                    if self._closing:
                        return
                    raise

            self._stream_reader_changed.clear()

    def __aiter__(self) -> AsyncIterator[rtc.AudioFrame | AudioSegmentEnd]:
        return self

    async def __anext__(self) -> rtc.AudioFrame | AudioSegmentEnd:
        try:
            return await self._data_ch.recv()
        except utils.aio.ChanClosed as e:
            if self._exception:
                raise self._exception from e

            raise StopAsyncIteration from None

    async def aclose(self) -> None:
        self._closing = True
        self._playback_finished_ch.close()
        self._data_ch.close()
        self._stream_reader_changed.set()
        if self._main_atask:
            await utils.aio.cancel_and_wait(self._main_atask)

    @classmethod
    def _register_clear_buffer_rpc(
        cls,
        room: rtc.Room,
        *,
        caller_identity: str,
        handler: Callable[[rtc.RpcInvocationData], str],
    ) -> None:
        cls._clear_buffer_handlers[caller_identity] = handler

        if cls._clear_buffer_rpc_registered:
            return

        def _handler(data: rtc.RpcInvocationData) -> str:
            if data.caller_identity not in cls._clear_buffer_handlers:
                logger.warning(
                    "clear buffer event received from unexpected participant",
                    extra={
                        "caller_identity": data.caller_identity,
                        "expected_identities": list(cls._clear_buffer_handlers.keys()),
                    },
                )
                return "reject"
            return cls._clear_buffer_handlers[data.caller_identity](data)

        room.local_participant.register_rpc_method(RPC_CLEAR_BUFFER, _handler)
        cls._clear_buffer_rpc_registered = True

Audio receiver that receives streamed audio from a sender participant using LiveKit DataStream. If the sender_identity is provided, subscribe to the specified participant. If not provided, subscribe to the first agent participant in the room.

Initialize a new instance of EventEmitter.

Ancestors

  • livekit.agents.voice.avatar._types.AudioReceiver
  • abc.ABC
  • EventEmitter
  • typing.Generic

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    self._closing = True
    self._playback_finished_ch.close()
    self._data_ch.close()
    self._stream_reader_changed.set()
    if self._main_atask:
        await utils.aio.cancel_and_wait(self._main_atask)
def notify_playback_finished(self, playback_position: float, interrupted: bool) ‑> None
Expand source code
def notify_playback_finished(self, playback_position: float, interrupted: bool) -> None:
    self._playback_finished_ch.send_nowait(
        PlaybackFinishedEvent(playback_position=playback_position, interrupted=interrupted)
    )

Notify the sender that playback has finished

async def start(self) ‑> None
Expand source code
async def start(self) -> None:
    # wait for the target participant or first agent participant to join
    self._remote_participant = await utils.wait_for_participant(
        room=self._room,
        identity=self._sender_identity,
        kind=rtc.ParticipantKind.PARTICIPANT_KIND_AGENT if not self._sender_identity else None,
    )
    self._main_atask = asyncio.create_task(self._main_task())

    def _handle_clear_buffer(data: rtc.RpcInvocationData) -> str:
        assert self._remote_participant is not None
        if data.caller_identity != self._remote_participant.identity:
            logger.warning(
                "clear buffer event received from unexpected participant",
                extra={
                    "caller_identity": data.caller_identity,
                    "expected_identity": self._remote_participant.identity,
                },
            )
            return "reject"

        if self._current_reader:
            self._current_reader_cleared = True
        self.emit("clear_buffer")
        return "ok"

    def _handle_stream_received(
        reader: rtc.ByteStreamReader, remote_participant_id: str
    ) -> None:
        if (
            not self._remote_participant
            or remote_participant_id != self._remote_participant.identity
        ):
            return

        self._stream_readers.append(reader)
        self._stream_reader_changed.set()

    self._register_clear_buffer_rpc(
        self._room,
        caller_identity=self._remote_participant.identity,
        handler=_handle_clear_buffer,
    )
    self._room.register_byte_stream_handler(AUDIO_STREAM_TOPIC, _handle_stream_received)

Inherited members

class QueueAudioOutput (*, sample_rate: int | None = None)
Expand source code
class QueueAudioOutput(
    AudioOutput,
    AudioReceiver,
    rtc.EventEmitter[Literal["playback_finished", "clear_buffer"]],
):
    """
    AudioOutput implementation that sends audio frames through a queue.
    """

    def __init__(self, *, sample_rate: int | None = None):
        super().__init__(label="DebugQueueIO", next_in_chain=None, sample_rate=sample_rate)
        self._data_ch = utils.aio.Chan[Union[rtc.AudioFrame, AudioSegmentEnd]]()
        self._capturing = False

    async def capture_frame(self, frame: rtc.AudioFrame) -> None:
        """Capture and queue audio frame"""
        await super().capture_frame(frame)
        if not self._capturing:
            self._capturing = True

        await self._data_ch.send(frame)

    def flush(self) -> None:
        """Mark end of current audio segment"""
        super().flush()
        if not self._capturing:
            return
        self._capturing = False
        self._data_ch.send_nowait(AudioSegmentEnd())

    # as AudioReceiver for AvatarRunner

    def clear_buffer(self) -> None:
        """Clear the audio buffer"""
        while True:
            try:
                self._data_ch.recv_nowait()
            except utils.aio.channel.ChanEmpty:
                break
        self.emit("clear_buffer")  # type: ignore

    def notify_playback_finished(self, playback_position: float, interrupted: bool) -> None:
        self.on_playback_finished(playback_position=playback_position, interrupted=interrupted)

    def __aiter__(self) -> AsyncIterator[rtc.AudioFrame | AudioSegmentEnd]:
        return self._data_ch

AudioOutput implementation that sends audio frames through a queue.

Args

sample_rate
The sample rate required by the audio sink, if None, any sample rate is accepted

Ancestors

Methods

async def capture_frame(self, frame: rtc.AudioFrame) ‑> None
Expand source code
async def capture_frame(self, frame: rtc.AudioFrame) -> None:
    """Capture and queue audio frame"""
    await super().capture_frame(frame)
    if not self._capturing:
        self._capturing = True

    await self._data_ch.send(frame)

Capture and queue audio frame

def clear_buffer(self) ‑> None
Expand source code
def clear_buffer(self) -> None:
    """Clear the audio buffer"""
    while True:
        try:
            self._data_ch.recv_nowait()
        except utils.aio.channel.ChanEmpty:
            break
    self.emit("clear_buffer")  # type: ignore

Clear the audio buffer

def flush(self) ‑> None
Expand source code
def flush(self) -> None:
    """Mark end of current audio segment"""
    super().flush()
    if not self._capturing:
        return
    self._capturing = False
    self._data_ch.send_nowait(AudioSegmentEnd())

Mark end of current audio segment

def notify_playback_finished(self, playback_position: float, interrupted: bool) ‑> None
Expand source code
def notify_playback_finished(self, playback_position: float, interrupted: bool) -> None:
    self.on_playback_finished(playback_position=playback_position, interrupted=interrupted)

Notify the sender that playback has finished

Inherited members

class VideoGenerator
Expand source code
class VideoGenerator(ABC):
    @abstractmethod
    async def push_audio(self, frame: rtc.AudioFrame | AudioSegmentEnd) -> None:
        """Push an audio frame to the video generator"""

    @abstractmethod
    def clear_buffer(self) -> None | Coroutine[None, None, None]:
        """Clear the audio buffer, stopping audio playback immediately"""

    @abstractmethod
    def __aiter__(
        self,
    ) -> AsyncIterator[rtc.VideoFrame | rtc.AudioFrame | AudioSegmentEnd]:
        """Continuously stream out video and audio frames, or AudioSegmentEnd when the audio segment ends"""  # noqa: E501

Helper class that provides a standard way to create an ABC using inheritance.

Ancestors

  • abc.ABC

Subclasses

  • livekit.plugins.bithuman.avatar.BithumanGenerator

Methods

def clear_buffer(self) ‑> None | collections.abc.Coroutine[None, None, None]
Expand source code
@abstractmethod
def clear_buffer(self) -> None | Coroutine[None, None, None]:
    """Clear the audio buffer, stopping audio playback immediately"""

Clear the audio buffer, stopping audio playback immediately

async def push_audio(self,
frame: rtc.AudioFrame | AudioSegmentEnd) ‑> None
Expand source code
@abstractmethod
async def push_audio(self, frame: rtc.AudioFrame | AudioSegmentEnd) -> None:
    """Push an audio frame to the video generator"""

Push an audio frame to the video generator