Module livekit.rtc.audio_source

Classes

class AudioSource (sample_rate: int,
num_channels: int,
queue_size_ms: int = 1000,
loop: asyncio.AbstractEventLoop | None = None)
Expand source code
class AudioSource:
    """
    Represents a real-time audio source with an internal audio queue.

    The `AudioSource` class allows you to push audio frames into a real-time audio
    source, managing an internal queue of audio data up to a maximum duration defined
    by `queue_size_ms`. It supports asynchronous operations to capture audio frames
    and to wait for the playback of all queued audio data.
    """

    def __init__(
        self,
        sample_rate: int,
        num_channels: int,
        queue_size_ms: int = 1000,
        loop: asyncio.AbstractEventLoop | None = None,
    ) -> None:
        """
        Initializes a new instance of the audio source.

        Args:
            sample_rate (int): The sample rate of the audio source in Hz.
            num_channels (int): The number of audio channels.
            queue_size_ms (int, optional): The buffer size of the audio queue in milliseconds.
                Defaults to 1000 ms.
            loop (asyncio.AbstractEventLoop, optional): The event loop to use. Defaults to
                `asyncio.get_event_loop()`.
        """
        self._sample_rate = sample_rate
        self._num_channels = num_channels
        self._loop = loop or asyncio.get_event_loop()

        req = proto_ffi.FfiRequest()
        req.new_audio_source.type = (
            proto_audio_frame.AudioSourceType.AUDIO_SOURCE_NATIVE
        )
        req.new_audio_source.sample_rate = sample_rate
        req.new_audio_source.num_channels = num_channels
        req.new_audio_source.queue_size_ms = queue_size_ms

        resp = FfiClient.instance.request(req)
        self._info = resp.new_audio_source.source
        self._ffi_handle = FfiHandle(self._info.handle.id)

        self._last_capture = 0.0
        self._q_size = 0.0
        self._join_handle: asyncio.TimerHandle | None = None
        self._join_fut: asyncio.Future[None] | None = None

    @property
    def sample_rate(self) -> int:
        """The sample rate of the audio source in Hz."""
        return self._sample_rate

    @property
    def num_channels(self) -> int:
        """The number of audio channels."""
        return self._num_channels

    @property
    def queued_duration(self) -> float:
        """The current duration (in seconds) of audio data queued for playback."""
        return max(self._q_size - time.monotonic() + self._last_capture, 0.0)

    def clear_queue(self) -> None:
        """
        Clears the internal audio queue, discarding all buffered audio data.

        This method immediately removes all audio data currently queued for playback,
        effectively resetting the audio source's buffer. Any audio frames that have been
        captured but not yet played will be discarded. This is useful in scenarios where
        you need to stop playback abruptly or prevent outdated audio data from being played.
        """
        req = proto_ffi.FfiRequest()
        req.clear_audio_buffer.source_handle = self._ffi_handle.handle
        _ = FfiClient.instance.request(req)
        self._release_waiter()

    async def capture_frame(self, frame: AudioFrame) -> None:
        """
        Captures an `AudioFrame` and queues it for playback.

        This method is used to push new audio data into the audio source. The audio data
        will be processed and queued. If the size of the audio frame exceeds the internal
        queue size, the method will wait until there is enough space in the queue to
        accommodate the frame. The method returns only when all of the data in the buffer
        has been pushed.

        Args:
            frame (AudioFrame): The audio frame to capture and queue.

        Raises:
            Exception: If there is an error during frame capture.
        """

        if frame.samples_per_channel == 0:
            return

        now = time.monotonic()
        elapsed = 0.0 if self._last_capture == 0.0 else now - self._last_capture
        self._q_size += frame.samples_per_channel / self.sample_rate - elapsed
        self._last_capture = now

        if self._join_handle:
            self._join_handle.cancel()

        if self._join_fut is None:
            self._join_fut = self._loop.create_future()

        self._join_handle = self._loop.call_later(self._q_size, self._release_waiter)

        req = proto_ffi.FfiRequest()
        req.capture_audio_frame.source_handle = self._ffi_handle.handle
        req.capture_audio_frame.buffer.CopyFrom(frame._proto_info())

        queue = FfiClient.instance.queue.subscribe(loop=self._loop)
        try:
            resp = FfiClient.instance.request(req)
            cb: proto_ffi.FfiEvent = await queue.wait_for(
                lambda e: e.capture_audio_frame.async_id
                == resp.capture_audio_frame.async_id
            )
        finally:
            FfiClient.instance.queue.unsubscribe(queue)

        if cb.capture_audio_frame.error:
            raise Exception(cb.capture_audio_frame.error)

    async def wait_for_playout(self) -> None:
        """
        Waits for the audio source to finish playing out all audio data.

        This method ensures that all queued audio data has been played out before returning.
        It can be used to synchronize events after audio playback or to ensure that the
        audio queue is empty.
        """

        if self._join_fut is None:
            return

        await asyncio.shield(self._join_fut)

    def _release_waiter(self) -> None:
        if self._join_fut is None:
            return  # could be None when clear_queue is called

        if not self._join_fut.done():
            self._join_fut.set_result(None)

        self._last_capture = 0.0
        self._q_size = 0.0
        self._join_fut = None

Represents a real-time audio source with an internal audio queue.

The AudioSource class allows you to push audio frames into a real-time audio source, managing an internal queue of audio data up to a maximum duration defined by queue_size_ms. It supports asynchronous operations to capture audio frames and to wait for the playback of all queued audio data.

Initializes a new instance of the audio source.

Args

sample_rate : int
The sample rate of the audio source in Hz.
num_channels : int
The number of audio channels.
queue_size_ms : int, optional
The buffer size of the audio queue in milliseconds. Defaults to 1000 ms.
loop : asyncio.AbstractEventLoop, optional
The event loop to use. Defaults to asyncio.get_event_loop().

Instance variables

prop num_channels : int
Expand source code
@property
def num_channels(self) -> int:
    """The number of audio channels."""
    return self._num_channels

The number of audio channels.

prop queued_duration : float
Expand source code
@property
def queued_duration(self) -> float:
    """The current duration (in seconds) of audio data queued for playback."""
    return max(self._q_size - time.monotonic() + self._last_capture, 0.0)

The current duration (in seconds) of audio data queued for playback.

prop sample_rate : int
Expand source code
@property
def sample_rate(self) -> int:
    """The sample rate of the audio source in Hz."""
    return self._sample_rate

The sample rate of the audio source in Hz.

Methods

async def capture_frame(self, frame: AudioFrame) ‑> None
Expand source code
async def capture_frame(self, frame: AudioFrame) -> None:
    """
    Captures an `AudioFrame` and queues it for playback.

    This method is used to push new audio data into the audio source. The audio data
    will be processed and queued. If the size of the audio frame exceeds the internal
    queue size, the method will wait until there is enough space in the queue to
    accommodate the frame. The method returns only when all of the data in the buffer
    has been pushed.

    Args:
        frame (AudioFrame): The audio frame to capture and queue.

    Raises:
        Exception: If there is an error during frame capture.
    """

    if frame.samples_per_channel == 0:
        return

    now = time.monotonic()
    elapsed = 0.0 if self._last_capture == 0.0 else now - self._last_capture
    self._q_size += frame.samples_per_channel / self.sample_rate - elapsed
    self._last_capture = now

    if self._join_handle:
        self._join_handle.cancel()

    if self._join_fut is None:
        self._join_fut = self._loop.create_future()

    self._join_handle = self._loop.call_later(self._q_size, self._release_waiter)

    req = proto_ffi.FfiRequest()
    req.capture_audio_frame.source_handle = self._ffi_handle.handle
    req.capture_audio_frame.buffer.CopyFrom(frame._proto_info())

    queue = FfiClient.instance.queue.subscribe(loop=self._loop)
    try:
        resp = FfiClient.instance.request(req)
        cb: proto_ffi.FfiEvent = await queue.wait_for(
            lambda e: e.capture_audio_frame.async_id
            == resp.capture_audio_frame.async_id
        )
    finally:
        FfiClient.instance.queue.unsubscribe(queue)

    if cb.capture_audio_frame.error:
        raise Exception(cb.capture_audio_frame.error)

Captures an AudioFrame and queues it for playback.

This method is used to push new audio data into the audio source. The audio data will be processed and queued. If the size of the audio frame exceeds the internal queue size, the method will wait until there is enough space in the queue to accommodate the frame. The method returns only when all of the data in the buffer has been pushed.

Args

frame : AudioFrame
The audio frame to capture and queue.

Raises

Exception
If there is an error during frame capture.
def clear_queue(self) ‑> None
Expand source code
def clear_queue(self) -> None:
    """
    Clears the internal audio queue, discarding all buffered audio data.

    This method immediately removes all audio data currently queued for playback,
    effectively resetting the audio source's buffer. Any audio frames that have been
    captured but not yet played will be discarded. This is useful in scenarios where
    you need to stop playback abruptly or prevent outdated audio data from being played.
    """
    req = proto_ffi.FfiRequest()
    req.clear_audio_buffer.source_handle = self._ffi_handle.handle
    _ = FfiClient.instance.request(req)
    self._release_waiter()

Clears the internal audio queue, discarding all buffered audio data.

This method immediately removes all audio data currently queued for playback, effectively resetting the audio source's buffer. Any audio frames that have been captured but not yet played will be discarded. This is useful in scenarios where you need to stop playback abruptly or prevent outdated audio data from being played.

async def wait_for_playout(self) ‑> None
Expand source code
async def wait_for_playout(self) -> None:
    """
    Waits for the audio source to finish playing out all audio data.

    This method ensures that all queued audio data has been played out before returning.
    It can be used to synchronize events after audio playback or to ensure that the
    audio queue is empty.
    """

    if self._join_fut is None:
        return

    await asyncio.shield(self._join_fut)

Waits for the audio source to finish playing out all audio data.

This method ensures that all queued audio data has been played out before returning. It can be used to synchronize events after audio playback or to ensure that the audio queue is empty.