Module livekit.rtc.audio_stream

Classes

class AudioFrameEvent (frame: AudioFrame)
Expand source code
@dataclass
class AudioFrameEvent:
    """An event representing a received audio frame.

    Attributes:
        frame (AudioFrame): The received audio frame.
    """

    frame: AudioFrame

An event representing a received audio frame.

Attributes

frame : AudioFrame
The received audio frame.

Class variables

var frameAudioFrame
class AudioStream (track: Track,
loop: Optional[asyncio.AbstractEventLoop] = None,
capacity: int = 0,
sample_rate: int = 48000,
num_channels: int = 1,
**kwargs)
Expand source code
class AudioStream:
    """An asynchronous audio stream for receiving audio frames from a participant or track.

    The `AudioStream` class provides an asynchronous iterator over audio frames received from
    a specific track or participant. It allows you to receive audio frames in real-time with
    customizable sample rates and channel configurations.
    """

    def __init__(
        self,
        track: Track,
        loop: Optional[asyncio.AbstractEventLoop] = None,
        capacity: int = 0,
        sample_rate: int = 48000,
        num_channels: int = 1,
        **kwargs,
    ) -> None:
        """Initialize an `AudioStream` instance.

        Args:
            track (Optional[Track]): The audio track from which to receive audio. If not provided,
                you must specify `participant` and `track_source` in `kwargs`.
            loop (Optional[asyncio.AbstractEventLoop], optional): The event loop to use.
                Defaults to the current event loop.
            capacity (int, optional): The capacity of the internal frame queue. Defaults to 0 (unbounded).
            sample_rate (int, optional): The sample rate for the audio stream in Hz.
                Defaults to 48000.
            num_channels (int, optional): The number of audio channels. Defaults to 1.
        Example:
            ```python
            audio_stream = AudioStream(
                track=audio_track,
                sample_rate=44100,
                num_channels=2,
            )

            audio_stream = AudioStream.from_track(
                track=audio_track,
                sample_rate=44100,
                num_channels=2,
            )
            ```
        """
        self._track: Track | None = track
        self._sample_rate = sample_rate
        self._num_channels = num_channels
        self._loop = loop or asyncio.get_event_loop()
        self._ffi_queue = FfiClient.instance.queue.subscribe(self._loop)
        self._queue: RingQueue[AudioFrameEvent | None] = RingQueue(capacity)

        self._task = self._loop.create_task(self._run())
        self._task.add_done_callback(task_done_logger)

        stream: Any = None
        if "participant" in kwargs:
            stream = self._create_owned_stream_from_participant(
                participant=kwargs["participant"], track_source=kwargs["track_source"]
            )
        else:
            stream = self._create_owned_stream()
        self._ffi_handle = FfiHandle(stream.handle.id)
        self._info = stream.info

    @classmethod
    def from_participant(
        cls,
        *,
        participant: Participant,
        track_source: TrackSource.ValueType,
        loop: Optional[asyncio.AbstractEventLoop] = None,
        capacity: int = 0,
        sample_rate: int = 48000,
        num_channels: int = 1,
    ) -> AudioStream:
        """Create an `AudioStream` from a participant's audio track.

        Args:
            participant (Participant): The participant from whom to receive audio.
            track_source (TrackSource.ValueType): The source of the audio track (e.g., microphone, screen share).
            loop (Optional[asyncio.AbstractEventLoop], optional): The event loop to use. Defaults to the current event loop.
            capacity (int, optional): The capacity of the internal frame queue. Defaults to 0 (unbounded).
            sample_rate (int, optional): The sample rate for the audio stream in Hz. Defaults to 48000.
            num_channels (int, optional): The number of audio channels. Defaults to 1.

        Returns:
            AudioStream: An instance of `AudioStream` that can be used to receive audio frames.

        Example:
            ```python
            audio_stream = AudioStream.from_participant(
                participant=participant,
                track_source=TrackSource.MICROPHONE,
                sample_rate=24000,
                num_channels=1,
            )
            ```
        """
        return AudioStream(
            participant=participant,
            track_source=track_source,
            loop=loop,
            capacity=capacity,
            track=None,  # type: ignore
            sample_rate=sample_rate,
            num_channels=num_channels,
        )

    @classmethod
    def from_track(
        cls,
        *,
        track: Track,
        loop: Optional[asyncio.AbstractEventLoop] = None,
        capacity: int = 0,
        sample_rate: int = 48000,
        num_channels: int = 1,
    ) -> AudioStream:
        """Create an `AudioStream` from an existing audio track.

        Args:
            track (Track): The audio track from which to receive audio.
            loop (Optional[asyncio.AbstractEventLoop], optional): The event loop to use. Defaults to the current event loop.
            capacity (int, optional): The capacity of the internal frame queue. Defaults to 0 (unbounded).
            sample_rate (int, optional): The sample rate for the audio stream in Hz. Defaults to 48000.
            num_channels (int, optional): The number of audio channels. Defaults to 1.

        Returns:
            AudioStream: An instance of `AudioStream` that can be used to receive audio frames.

        Example:
            ```python
            audio_stream = AudioStream.from_track(
                track=audio_track,
                sample_rate=44100,
                num_channels=2,
            )
            ```
        """
        return AudioStream(
            track=track,
            loop=loop,
            capacity=capacity,
            sample_rate=sample_rate,
            num_channels=num_channels,
        )

    def __del__(self) -> None:
        FfiClient.instance.queue.unsubscribe(self._ffi_queue)

    def _create_owned_stream(self) -> Any:
        assert self._track is not None
        req = proto_ffi.FfiRequest()
        new_audio_stream = req.new_audio_stream
        new_audio_stream.track_handle = self._track._ffi_handle.handle
        new_audio_stream.sample_rate = self._sample_rate
        new_audio_stream.num_channels = self._num_channels
        new_audio_stream.type = proto_audio_frame.AudioStreamType.AUDIO_STREAM_NATIVE
        resp = FfiClient.instance.request(req)
        return resp.new_audio_stream.stream

    def _create_owned_stream_from_participant(
        self, participant: Participant, track_source: TrackSource.ValueType
    ) -> Any:
        req = proto_ffi.FfiRequest()
        audio_stream_from_participant = req.audio_stream_from_participant
        audio_stream_from_participant.participant_handle = (
            participant._ffi_handle.handle
        )
        audio_stream_from_participant.sample_rate = self._sample_rate
        audio_stream_from_participant.num_channels = self._num_channels
        audio_stream_from_participant.type = (
            proto_audio_frame.AudioStreamType.AUDIO_STREAM_NATIVE
        )
        audio_stream_from_participant.track_source = track_source
        resp = FfiClient.instance.request(req)
        return resp.audio_stream_from_participant.stream

    async def _run(self):
        while True:
            event = await self._ffi_queue.wait_for(self._is_event)
            audio_event: proto_audio_frame.AudioStreamEvent = event.audio_stream_event

            if audio_event.HasField("frame_received"):
                owned_buffer_info = audio_event.frame_received.frame
                frame = AudioFrame._from_owned_info(owned_buffer_info)
                event = AudioFrameEvent(frame)
                self._queue.put(event)
            elif audio_event.HasField("eos"):
                self._queue.put(None)
                break

        FfiClient.instance.queue.unsubscribe(self._ffi_queue)

    async def aclose(self) -> None:
        """Asynchronously close the audio stream.

        This method cleans up resources associated with the audio stream and waits for
        any pending operations to complete.
        """
        self._ffi_handle.dispose()
        await self._task

    def _is_event(self, e: proto_ffi.FfiEvent) -> bool:
        return e.audio_stream_event.stream_handle == self._ffi_handle.handle

    def __aiter__(self) -> AsyncIterator[AudioFrameEvent]:
        return self

    async def __anext__(self) -> AudioFrameEvent:
        if self._task.done():
            raise StopAsyncIteration

        item = await self._queue.get()
        if item is None:
            raise StopAsyncIteration

        return item

An asynchronous audio stream for receiving audio frames from a participant or track.

The AudioStream class provides an asynchronous iterator over audio frames received from a specific track or participant. It allows you to receive audio frames in real-time with customizable sample rates and channel configurations.

Initialize an AudioStream instance.

Args

track : Optional[Track]
The audio track from which to receive audio. If not provided, you must specify participant and track_source in kwargs.
loop : Optional[asyncio.AbstractEventLoop], optional
The event loop to use. Defaults to the current event loop.
capacity : int, optional
The capacity of the internal frame queue. Defaults to 0 (unbounded).
sample_rate : int, optional
The sample rate for the audio stream in Hz. Defaults to 48000.
num_channels : int, optional
The number of audio channels. Defaults to 1.

Example

audio_stream = AudioStream(
    track=audio_track,
    sample_rate=44100,
    num_channels=2,
)

audio_stream = AudioStream.from_track(
    track=audio_track,
    sample_rate=44100,
    num_channels=2,
)

Static methods

def from_participant(*,
participant: Participant,
track_source: TrackSource.ValueType,
loop: Optional[asyncio.AbstractEventLoop] = None,
capacity: int = 0,
sample_rate: int = 48000,
num_channels: int = 1) ‑> AudioStream

Create an AudioStream from a participant's audio track.

Args

participant : Participant
The participant from whom to receive audio.
track_source : TrackSource.ValueType
The source of the audio track (e.g., microphone, screen share).
loop : Optional[asyncio.AbstractEventLoop], optional
The event loop to use. Defaults to the current event loop.
capacity : int, optional
The capacity of the internal frame queue. Defaults to 0 (unbounded).
sample_rate : int, optional
The sample rate for the audio stream in Hz. Defaults to 48000.
num_channels : int, optional
The number of audio channels. Defaults to 1.

Returns

AudioStream
An instance of AudioStream that can be used to receive audio frames.

Example

audio_stream = AudioStream.from_participant(
    participant=participant,
    track_source=TrackSource.MICROPHONE,
    sample_rate=24000,
    num_channels=1,
)
def from_track(*,
track: Track,
loop: Optional[asyncio.AbstractEventLoop] = None,
capacity: int = 0,
sample_rate: int = 48000,
num_channels: int = 1) ‑> AudioStream

Create an AudioStream from an existing audio track.

Args

track : Track
The audio track from which to receive audio.
loop : Optional[asyncio.AbstractEventLoop], optional
The event loop to use. Defaults to the current event loop.
capacity : int, optional
The capacity of the internal frame queue. Defaults to 0 (unbounded).
sample_rate : int, optional
The sample rate for the audio stream in Hz. Defaults to 48000.
num_channels : int, optional
The number of audio channels. Defaults to 1.

Returns

AudioStream
An instance of AudioStream that can be used to receive audio frames.

Example

audio_stream = AudioStream.from_track(
    track=audio_track,
    sample_rate=44100,
    num_channels=2,
)

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    """Asynchronously close the audio stream.

    This method cleans up resources associated with the audio stream and waits for
    any pending operations to complete.
    """
    self._ffi_handle.dispose()
    await self._task

Asynchronously close the audio stream.

This method cleans up resources associated with the audio stream and waits for any pending operations to complete.