Module livekit.rtc

LiveKit RTC SDK

Sub-modules

livekit.rtc.audio_frame
livekit.rtc.audio_resampler
livekit.rtc.audio_source
livekit.rtc.audio_stream
livekit.rtc.chat
livekit.rtc.e2ee
livekit.rtc.event_emitter
livekit.rtc.log
livekit.rtc.participant
livekit.rtc.resources

Used by importlib.resources and setuptools

livekit.rtc.room
livekit.rtc.rpc
livekit.rtc.track
livekit.rtc.track_publication
livekit.rtc.transcription
livekit.rtc.utils
livekit.rtc.version
livekit.rtc.video_frame
livekit.rtc.video_source
livekit.rtc.video_stream

Functions

def combine_audio_frames(buffer: AudioFrame | list[AudioFrame]) ‑> AudioFrame

Combines one or more rtc.AudioFrame objects into a single rtc.AudioFrame.

This function concatenates the audio data from multiple frames, ensuring that all frames have the same sample rate and number of channels. It efficiently merges the data by preallocating the necessary memory and copying the frame data without unnecessary reallocations.

Args

buffer
A single rtc.AudioFrame or a list of rtc.AudioFrame objects to be combined.

Returns

rtc.AudioFrame
A new rtc.AudioFrame containing the combined audio data.

Raises

ValueError
If the buffer is empty.
ValueError
If frames have differing sample rates.
ValueError
If frames have differing numbers of channels.

Example

>>> frame1 = rtc.AudioFrame(
...     data=b"", sample_rate=48000, num_channels=2, samples_per_channel=1
... )
>>> frame2 = rtc.AudioFrame(
...     data=b"", sample_rate=48000, num_channels=2, samples_per_channel=1
... )
>>> combined_frame = combine_audio_frames([frame1, frame2])
>>> combined_frame.data
b''
>>> combined_frame.sample_rate
48000
>>> combined_frame.num_channels
2
>>> combined_frame.samples_per_channel
2

Classes

class AudioFrame (data: Union[bytes, bytearray, memoryview], sample_rate: int, num_channels: int, samples_per_channel: int)

A class that represents a frame of audio data with specific properties such as sample rate, number of channels, and samples per channel.

The format of the audio data is 16-bit signed integers (int16) interleaved by channel.

Initialize an AudioFrame instance.

Args

data : Union[bytes, bytearray, memoryview]
The raw audio data, which must be at least num_channels * samples_per_channel * sizeof(int16) bytes long.
sample_rate : int
The sample rate of the audio in Hz.
num_channels : int
The number of audio channels (e.g., 1 for mono, 2 for stereo).
samples_per_channel : int
The number of samples per channel.

Raises

ValueError
If the length of data is smaller than the required size.
Expand source code
class AudioFrame:
    """
    A class that represents a frame of audio data with specific properties such as sample rate,
    number of channels, and samples per channel.

    The format of the audio data is 16-bit signed integers (int16) interleaved by channel.
    """

    def __init__(
        self,
        data: Union[bytes, bytearray, memoryview],
        sample_rate: int,
        num_channels: int,
        samples_per_channel: int,
    ) -> None:
        """
        Initialize an AudioFrame instance.

        Args:
            data (Union[bytes, bytearray, memoryview]): The raw audio data, which must be at least
                `num_channels * samples_per_channel * sizeof(int16)` bytes long.
            sample_rate (int): The sample rate of the audio in Hz.
            num_channels (int): The number of audio channels (e.g., 1 for mono, 2 for stereo).
            samples_per_channel (int): The number of samples per channel.

        Raises:
            ValueError: If the length of `data` is smaller than the required size.
        """
        if len(data) < num_channels * samples_per_channel * ctypes.sizeof(
            ctypes.c_int16
        ):
            raise ValueError(
                "data length must be >= num_channels * samples_per_channel * sizeof(int16)"
            )

        self._data = bytearray(data)
        self._sample_rate = sample_rate
        self._num_channels = num_channels
        self._samples_per_channel = samples_per_channel

    @staticmethod
    def create(
        sample_rate: int, num_channels: int, samples_per_channel: int
    ) -> "AudioFrame":
        """
        Create a new empty AudioFrame instance with specified sample rate, number of channels,
        and samples per channel.

        Args:
            sample_rate (int): The sample rate of the audio in Hz.
            num_channels (int): The number of audio channels (e.g., 1 for mono, 2 for stereo).
            samples_per_channel (int): The number of samples per channel.

        Returns:
            AudioFrame: A new AudioFrame instance with uninitialized (zeroed) data.
        """
        size = num_channels * samples_per_channel * ctypes.sizeof(ctypes.c_int16)
        data = bytearray(size)
        return AudioFrame(data, sample_rate, num_channels, samples_per_channel)

    @staticmethod
    def _from_owned_info(owned_info: proto_audio.OwnedAudioFrameBuffer) -> "AudioFrame":
        info = owned_info.info
        size = info.num_channels * info.samples_per_channel
        cdata = (ctypes.c_int16 * size).from_address(info.data_ptr)
        data = bytearray(cdata)
        FfiHandle(owned_info.handle.id)
        return AudioFrame(
            data, info.sample_rate, info.num_channels, info.samples_per_channel
        )

    def remix_and_resample(self, sample_rate: int, num_channels: int) -> "AudioFrame":
        """Resample the audio frame to the given sample rate and number of channels.

        .. warning::
            This method is deprecated and will be removed in a future release.
            Please use the `rtc.AudioResampler` class instead.
        """
        req = proto_ffi.FfiRequest()
        req.new_audio_resampler.CopyFrom(proto_audio.NewAudioResamplerRequest())

        resp = FfiClient.instance.request(req)
        resampler_handle = FfiHandle(resp.new_audio_resampler.resampler.handle.id)

        resample_req = proto_ffi.FfiRequest()
        resample_req.remix_and_resample.resampler_handle = resampler_handle.handle
        resample_req.remix_and_resample.buffer.CopyFrom(self._proto_info())
        resample_req.remix_and_resample.sample_rate = sample_rate
        resample_req.remix_and_resample.num_channels = num_channels

        resp = FfiClient.instance.request(resample_req)
        return AudioFrame._from_owned_info(resp.remix_and_resample.buffer)

    def _proto_info(self) -> proto_audio.AudioFrameBufferInfo:
        audio_info = proto_audio.AudioFrameBufferInfo()
        audio_info.data_ptr = get_address(memoryview(self._data))
        audio_info.sample_rate = self.sample_rate
        audio_info.num_channels = self.num_channels
        audio_info.samples_per_channel = self.samples_per_channel
        return audio_info

    @property
    def data(self) -> memoryview:
        """
        Returns a memory view of the audio data as 16-bit signed integers.

        Returns:
            memoryview: A memory view of the audio data.
        """
        return memoryview(self._data).cast("h")

    @property
    def sample_rate(self) -> int:
        """
        Returns the sample rate of the audio frame.

        Returns:
            int: The sample rate in Hz.
        """
        return self._sample_rate

    @property
    def num_channels(self) -> int:
        """
        Returns the number of channels in the audio frame.

        Returns:
            int: The number of audio channels (e.g., 1 for mono, 2 for stereo).
        """
        return self._num_channels

    @property
    def samples_per_channel(self) -> int:
        """
        Returns the number of samples per channel.

        Returns:
            int: The number of samples per channel.
        """
        return self._samples_per_channel

    @property
    def duration(self) -> float:
        """
        Returns the duration of the audio frame in seconds.

        Returns:
            float: The duration in seconds.
        """
        return self.samples_per_channel / self.sample_rate

    def to_wav_bytes(self) -> bytes:
        """
        Convert the audio frame data to a WAV-formatted byte stream.

        Returns:
            bytes: The audio data encoded in WAV format.
        """
        import wave
        import io

        with io.BytesIO() as wav_file:
            with wave.open(wav_file, "wb") as wav:
                wav.setnchannels(self.num_channels)
                wav.setsampwidth(2)
                wav.setframerate(self.sample_rate)
                wav.writeframes(self._data)

            return wav_file.getvalue()

    def __repr__(self) -> str:
        return (
            f"rtc.AudioFrame(sample_rate={self.sample_rate}, "
            f"num_channels={self.num_channels}, "
            f"samples_per_channel={self.samples_per_channel}, "
            f"duration={self.duration:.3f})"
        )

Static methods

def create(sample_rate: int, num_channels: int, samples_per_channel: int) ‑> AudioFrame

Create a new empty AudioFrame instance with specified sample rate, number of channels, and samples per channel.

Args

sample_rate : int
The sample rate of the audio in Hz.
num_channels : int
The number of audio channels (e.g., 1 for mono, 2 for stereo).
samples_per_channel : int
The number of samples per channel.

Returns

AudioFrame
A new AudioFrame instance with uninitialized (zeroed) data.

Instance variables

prop data : memoryview

Returns a memory view of the audio data as 16-bit signed integers.

Returns

memoryview
A memory view of the audio data.
Expand source code
@property
def data(self) -> memoryview:
    """
    Returns a memory view of the audio data as 16-bit signed integers.

    Returns:
        memoryview: A memory view of the audio data.
    """
    return memoryview(self._data).cast("h")
prop duration : float

Returns the duration of the audio frame in seconds.

Returns

float
The duration in seconds.
Expand source code
@property
def duration(self) -> float:
    """
    Returns the duration of the audio frame in seconds.

    Returns:
        float: The duration in seconds.
    """
    return self.samples_per_channel / self.sample_rate
prop num_channels : int

Returns the number of channels in the audio frame.

Returns

int
The number of audio channels (e.g., 1 for mono, 2 for stereo).
Expand source code
@property
def num_channels(self) -> int:
    """
    Returns the number of channels in the audio frame.

    Returns:
        int: The number of audio channels (e.g., 1 for mono, 2 for stereo).
    """
    return self._num_channels
prop sample_rate : int

Returns the sample rate of the audio frame.

Returns

int
The sample rate in Hz.
Expand source code
@property
def sample_rate(self) -> int:
    """
    Returns the sample rate of the audio frame.

    Returns:
        int: The sample rate in Hz.
    """
    return self._sample_rate
prop samples_per_channel : int

Returns the number of samples per channel.

Returns

int
The number of samples per channel.
Expand source code
@property
def samples_per_channel(self) -> int:
    """
    Returns the number of samples per channel.

    Returns:
        int: The number of samples per channel.
    """
    return self._samples_per_channel

Methods

def remix_and_resample(self, sample_rate: int, num_channels: int) ‑> AudioFrame

Resample the audio frame to the given sample rate and number of channels.

Warning

This method is deprecated and will be removed in a future release. Please use the rtc.AudioResampler class instead.

def to_wav_bytes(self) ‑> bytes

Convert the audio frame data to a WAV-formatted byte stream.

Returns

bytes
The audio data encoded in WAV format.
class AudioFrameEvent (frame: AudioFrame)

An event representing a received audio frame.

Attributes

frame : AudioFrame
The received audio frame.
Expand source code
@dataclass
class AudioFrameEvent:
    """An event representing a received audio frame.

    Attributes:
        frame (AudioFrame): The received audio frame.
    """

    frame: AudioFrame

Class variables

var frameAudioFrame
class AudioResampler (input_rate: int, output_rate: int, *, num_channels: int = 1, quality: AudioResamplerQuality = AudioResamplerQuality.MEDIUM)

A class for resampling audio data from one sample rate to another.

AudioResampler provides functionality to resample audio data from an input sample rate to an output sample rate using the Sox resampling library. It supports multiple channels and configurable resampling quality.

Initialize an AudioResampler instance for resampling audio data.

Args

input_rate : int
The sample rate of the input audio data (in Hz).
output_rate : int
The desired sample rate of the output audio data (in Hz).
num_channels : int, optional
The number of audio channels (e.g., 1 for mono, 2 for stereo). Defaults to 1.
quality : AudioResamplerQuality, optional
The quality setting for the resampler. Can be one of the AudioResamplerQuality enum values: QUICK, LOW, MEDIUM, HIGH, VERY_HIGH. Higher quality settings result in better audio quality but require more processing power. Defaults to AudioResamplerQuality.MEDIUM.

Raises

Exception
If there is an error creating the resampler.
Expand source code
class AudioResampler:
    """
    A class for resampling audio data from one sample rate to another.

    `AudioResampler` provides functionality to resample audio data from an input sample rate to an output
    sample rate using the Sox resampling library. It supports multiple channels and configurable resampling quality.
    """

    def __init__(
        self,
        input_rate: int,
        output_rate: int,
        *,
        num_channels: int = 1,
        quality: AudioResamplerQuality = AudioResamplerQuality.MEDIUM,
    ) -> None:
        """
        Initialize an `AudioResampler` instance for resampling audio data.

        Args:
            input_rate (int): The sample rate of the input audio data (in Hz).
            output_rate (int): The desired sample rate of the output audio data (in Hz).
            num_channels (int, optional): The number of audio channels (e.g., 1 for mono, 2 for stereo). Defaults to 1.
            quality (AudioResamplerQuality, optional): The quality setting for the resampler. Can be one of the
                `AudioResamplerQuality` enum values: `QUICK`, `LOW`, `MEDIUM`, `HIGH`, `VERY_HIGH`. Higher quality settings
                result in better audio quality but require more processing power. Defaults to `AudioResamplerQuality.MEDIUM`.

        Raises:
            Exception: If there is an error creating the resampler.
        """
        self._input_rate = input_rate
        self._output_rate = output_rate
        self._num_channels = num_channels

        req = proto_ffi.FfiRequest()
        req.new_sox_resampler.input_rate = input_rate
        req.new_sox_resampler.output_rate = output_rate
        req.new_sox_resampler.num_channels = num_channels
        req.new_sox_resampler.quality_recipe = _to_proto_quality(quality)

        # not exposed for now
        req.new_sox_resampler.input_data_type = (
            proto_audio_frame.SoxResamplerDataType.SOXR_DATATYPE_INT16I
        )
        req.new_sox_resampler.output_data_type = (
            proto_audio_frame.SoxResamplerDataType.SOXR_DATATYPE_INT16I
        )
        req.new_sox_resampler.flags = 0  # default

        resp = FfiClient.instance.request(req)

        if resp.new_sox_resampler.error:
            raise Exception(resp.new_sox_resampler.error)

        self._ffi_handle = FfiHandle(resp.new_sox_resampler.resampler.handle.id)

    def push(self, data: bytearray | AudioFrame) -> list[AudioFrame]:
        """
        Push audio data into the resampler and retrieve any available resampled data.

        This method accepts audio data, resamples it according to the configured input and output rates,
        and returns any resampled data that is available after processing the input.

        Args:
            data (bytearray | AudioFrame): The audio data to resample. This can be a `bytearray` containing
                raw audio bytes in int16le format or an `AudioFrame` object.

        Returns:
            list[AudioFrame]: A list of `AudioFrame` objects containing the resampled audio data.
                The list may be empty if no output data is available yet.

        Raises:
            Exception: If there is an error during resampling.
        """
        bdata = data if isinstance(data, bytearray) else data.data.cast("b")

        req = proto_ffi.FfiRequest()
        req.push_sox_resampler.resampler_handle = self._ffi_handle.handle
        req.push_sox_resampler.data_ptr = get_address(memoryview(bdata))
        req.push_sox_resampler.size = len(bdata)

        resp = FfiClient.instance.request(req)

        if resp.push_sox_resampler.error:
            raise Exception(resp.push_sox_resampler.error)

        if not resp.push_sox_resampler.output_ptr:
            return []

        cdata = (ctypes.c_int8 * resp.push_sox_resampler.size).from_address(
            resp.push_sox_resampler.output_ptr
        )
        output_data = bytearray(cdata)
        return [
            AudioFrame(
                output_data,
                self._output_rate,
                self._num_channels,
                len(output_data)
                // (self._num_channels * ctypes.sizeof(ctypes.c_int16)),
            )
        ]

    def flush(self) -> list[AudioFrame]:
        """
        Flush any remaining audio data through the resampler and retrieve the resampled data.

        This method should be called when no more input data will be provided to ensure that all internal
        buffers are processed and all resampled data is output.

        Returns:
            list[AudioFrame]: A list of `AudioFrame` objects containing the remaining resampled audio data after flushing.
                The list may be empty if no output data remains.

        Raises:
            Exception: If there is an error during flushing.
        """
        req = proto_ffi.FfiRequest()
        req.flush_sox_resampler.resampler_handle = self._ffi_handle.handle

        resp = FfiClient.instance.request(req)

        if not resp.flush_sox_resampler.output_ptr:
            return []

        cdata = (ctypes.c_int8 * resp.flush_sox_resampler.size).from_address(
            resp.flush_sox_resampler.output_ptr
        )
        output_data = bytearray(cdata)
        return [
            AudioFrame(
                output_data,
                self._output_rate,
                self._num_channels,
                len(output_data)
                // (self._num_channels * ctypes.sizeof(ctypes.c_int16)),
            )
        ]

Methods

def flush(self) ‑> list[AudioFrame]

Flush any remaining audio data through the resampler and retrieve the resampled data.

This method should be called when no more input data will be provided to ensure that all internal buffers are processed and all resampled data is output.

Returns

list[AudioFrame]
A list of AudioFrame objects containing the remaining resampled audio data after flushing. The list may be empty if no output data remains.

Raises

Exception
If there is an error during flushing.
def push(self, data: bytearray | AudioFrame) ‑> list[AudioFrame]

Push audio data into the resampler and retrieve any available resampled data.

This method accepts audio data, resamples it according to the configured input and output rates, and returns any resampled data that is available after processing the input.

Args

data : bytearray | AudioFrame
The audio data to resample. This can be a bytearray containing raw audio bytes in int16le format or an AudioFrame object.

Returns

list[AudioFrame]
A list of AudioFrame objects containing the resampled audio data. The list may be empty if no output data is available yet.

Raises

Exception
If there is an error during resampling.
class AudioResamplerQuality (*args, **kwds)

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

Expand source code
@unique
class AudioResamplerQuality(str, Enum):
    QUICK = "quick"
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    VERY_HIGH = "very_high"

Ancestors

  • builtins.str
  • enum.Enum

Class variables

var HIGH
var LOW
var MEDIUM
var QUICK
var VERY_HIGH
class AudioSource (sample_rate: int, num_channels: int, queue_size_ms: int = 1000, loop: asyncio.AbstractEventLoop | None = None)

Represents a real-time audio source with an internal audio queue.

The AudioSource class allows you to push audio frames into a real-time audio source, managing an internal queue of audio data up to a maximum duration defined by queue_size_ms. It supports asynchronous operations to capture audio frames and to wait for the playback of all queued audio data.

Initializes a new instance of the audio source.

Args

sample_rate : int
The sample rate of the audio source in Hz.
num_channels : int
The number of audio channels.
queue_size_ms : int, optional
The buffer size of the audio queue in milliseconds. Defaults to 1000 ms.
loop : asyncio.AbstractEventLoop, optional
The event loop to use. Defaults to asyncio.get_event_loop().
Expand source code
class AudioSource:
    """
    Represents a real-time audio source with an internal audio queue.

    The `AudioSource` class allows you to push audio frames into a real-time audio
    source, managing an internal queue of audio data up to a maximum duration defined
    by `queue_size_ms`. It supports asynchronous operations to capture audio frames
    and to wait for the playback of all queued audio data.
    """

    def __init__(
        self,
        sample_rate: int,
        num_channels: int,
        queue_size_ms: int = 1000,
        loop: asyncio.AbstractEventLoop | None = None,
    ) -> None:
        """
        Initializes a new instance of the audio source.

        Args:
            sample_rate (int): The sample rate of the audio source in Hz.
            num_channels (int): The number of audio channels.
            queue_size_ms (int, optional): The buffer size of the audio queue in milliseconds.
                Defaults to 1000 ms.
            loop (asyncio.AbstractEventLoop, optional): The event loop to use. Defaults to
                `asyncio.get_event_loop()`.
        """
        self._sample_rate = sample_rate
        self._num_channels = num_channels
        self._loop = loop or asyncio.get_event_loop()

        req = proto_ffi.FfiRequest()
        req.new_audio_source.type = (
            proto_audio_frame.AudioSourceType.AUDIO_SOURCE_NATIVE
        )
        req.new_audio_source.sample_rate = sample_rate
        req.new_audio_source.num_channels = num_channels
        req.new_audio_source.queue_size_ms = queue_size_ms

        resp = FfiClient.instance.request(req)
        self._info = resp.new_audio_source.source
        self._ffi_handle = FfiHandle(self._info.handle.id)

        self._last_capture = 0.0
        self._q_size = 0.0
        self._join_handle: asyncio.TimerHandle | None = None
        self._join_fut: asyncio.Future[None] | None = None

    @property
    def sample_rate(self) -> int:
        """The sample rate of the audio source in Hz."""
        return self._sample_rate

    @property
    def num_channels(self) -> int:
        """The number of audio channels."""
        return self._num_channels

    @property
    def queued_duration(self) -> float:
        """The current duration (in seconds) of audio data queued for playback."""
        return max(self._q_size - time.monotonic() + self._last_capture, 0.0)

    def clear_queue(self) -> None:
        """
        Clears the internal audio queue, discarding all buffered audio data.

        This method immediately removes all audio data currently queued for playback,
        effectively resetting the audio source's buffer. Any audio frames that have been
        captured but not yet played will be discarded. This is useful in scenarios where
        you need to stop playback abruptly or prevent outdated audio data from being played.
        """
        req = proto_ffi.FfiRequest()
        req.clear_audio_buffer.source_handle = self._ffi_handle.handle
        _ = FfiClient.instance.request(req)
        self._release_waiter()

    async def capture_frame(self, frame: AudioFrame) -> None:
        """
        Captures an `AudioFrame` and queues it for playback.

        This method is used to push new audio data into the audio source. The audio data
        will be processed and queued. If the size of the audio frame exceeds the internal
        queue size, the method will wait until there is enough space in the queue to
        accommodate the frame. The method returns only when all of the data in the buffer
        has been pushed.

        Args:
            frame (AudioFrame): The audio frame to capture and queue.

        Raises:
            Exception: If there is an error during frame capture.
        """

        if frame.samples_per_channel == 0:
            return

        now = time.monotonic()
        elapsed = 0.0 if self._last_capture == 0.0 else now - self._last_capture
        self._q_size += frame.samples_per_channel / self.sample_rate - elapsed
        self._last_capture = now

        if self._join_handle:
            self._join_handle.cancel()

        if self._join_fut is None:
            self._join_fut = self._loop.create_future()

        self._join_handle = self._loop.call_later(self._q_size, self._release_waiter)

        req = proto_ffi.FfiRequest()
        req.capture_audio_frame.source_handle = self._ffi_handle.handle
        req.capture_audio_frame.buffer.CopyFrom(frame._proto_info())

        queue = FfiClient.instance.queue.subscribe(loop=self._loop)
        try:
            resp = FfiClient.instance.request(req)
            cb: proto_ffi.FfiEvent = await queue.wait_for(
                lambda e: e.capture_audio_frame.async_id
                == resp.capture_audio_frame.async_id
            )
        finally:
            FfiClient.instance.queue.unsubscribe(queue)

        if cb.capture_audio_frame.error:
            raise Exception(cb.capture_audio_frame.error)

    async def wait_for_playout(self) -> None:
        """
        Waits for the audio source to finish playing out all audio data.

        This method ensures that all queued audio data has been played out before returning.
        It can be used to synchronize events after audio playback or to ensure that the
        audio queue is empty.
        """

        if self._join_fut is None:
            return

        await asyncio.shield(self._join_fut)

    def _release_waiter(self) -> None:
        if self._join_fut is None:
            return  # could be None when clear_queue is called

        if not self._join_fut.done():
            self._join_fut.set_result(None)

        self._last_capture = 0.0
        self._q_size = 0.0
        self._join_fut = None

Instance variables

prop num_channels : int

The number of audio channels.

Expand source code
@property
def num_channels(self) -> int:
    """The number of audio channels."""
    return self._num_channels
prop queued_duration : float

The current duration (in seconds) of audio data queued for playback.

Expand source code
@property
def queued_duration(self) -> float:
    """The current duration (in seconds) of audio data queued for playback."""
    return max(self._q_size - time.monotonic() + self._last_capture, 0.0)
prop sample_rate : int

The sample rate of the audio source in Hz.

Expand source code
@property
def sample_rate(self) -> int:
    """The sample rate of the audio source in Hz."""
    return self._sample_rate

Methods

async def capture_frame(self, frame: AudioFrame) ‑> None

Captures an AudioFrame and queues it for playback.

This method is used to push new audio data into the audio source. The audio data will be processed and queued. If the size of the audio frame exceeds the internal queue size, the method will wait until there is enough space in the queue to accommodate the frame. The method returns only when all of the data in the buffer has been pushed.

Args

frame : AudioFrame
The audio frame to capture and queue.

Raises

Exception
If there is an error during frame capture.
def clear_queue(self) ‑> None

Clears the internal audio queue, discarding all buffered audio data.

This method immediately removes all audio data currently queued for playback, effectively resetting the audio source's buffer. Any audio frames that have been captured but not yet played will be discarded. This is useful in scenarios where you need to stop playback abruptly or prevent outdated audio data from being played.

async def wait_for_playout(self) ‑> None

Waits for the audio source to finish playing out all audio data.

This method ensures that all queued audio data has been played out before returning. It can be used to synchronize events after audio playback or to ensure that the audio queue is empty.

class AudioStream (track: Track, loop: Optional[asyncio.AbstractEventLoop] = None, capacity: int = 0, sample_rate: int = 48000, num_channels: int = 1, **kwargs)

An asynchronous audio stream for receiving audio frames from a participant or track.

The AudioStream class provides an asynchronous iterator over audio frames received from a specific track or participant. It allows you to receive audio frames in real-time with customizable sample rates and channel configurations.

Initialize an AudioStream instance.

Args

track : Optional[Track]
The audio track from which to receive audio. If not provided, you must specify livekit.rtc.participant and track_source in kwargs.
loop : Optional[asyncio.AbstractEventLoop], optional
The event loop to use. Defaults to the current event loop.
capacity : int, optional
The capacity of the internal frame queue. Defaults to 0 (unbounded).
sample_rate : int, optional
The sample rate for the audio stream in Hz. Defaults to 48000.
num_channels : int, optional
The number of audio channels. Defaults to 1.

Example

audio_stream = AudioStream(
    track=audio_track,
    sample_rate=44100,
    num_channels=2,
)

audio_stream = AudioStream.from_track(
    track=audio_track,
    sample_rate=44100,
    num_channels=2,
)
Expand source code
class AudioStream:
    """An asynchronous audio stream for receiving audio frames from a participant or track.

    The `AudioStream` class provides an asynchronous iterator over audio frames received from
    a specific track or participant. It allows you to receive audio frames in real-time with
    customizable sample rates and channel configurations.
    """

    def __init__(
        self,
        track: Track,
        loop: Optional[asyncio.AbstractEventLoop] = None,
        capacity: int = 0,
        sample_rate: int = 48000,
        num_channels: int = 1,
        **kwargs,
    ) -> None:
        """Initialize an `AudioStream` instance.

        Args:
            track (Optional[Track]): The audio track from which to receive audio. If not provided,
                you must specify `participant` and `track_source` in `kwargs`.
            loop (Optional[asyncio.AbstractEventLoop], optional): The event loop to use.
                Defaults to the current event loop.
            capacity (int, optional): The capacity of the internal frame queue. Defaults to 0 (unbounded).
            sample_rate (int, optional): The sample rate for the audio stream in Hz.
                Defaults to 48000.
            num_channels (int, optional): The number of audio channels. Defaults to 1.
        Example:
            ```python
            audio_stream = AudioStream(
                track=audio_track,
                sample_rate=44100,
                num_channels=2,
            )

            audio_stream = AudioStream.from_track(
                track=audio_track,
                sample_rate=44100,
                num_channels=2,
            )
            ```
        """
        self._track: Track | None = track
        self._sample_rate = sample_rate
        self._num_channels = num_channels
        self._loop = loop or asyncio.get_event_loop()
        self._ffi_queue = FfiClient.instance.queue.subscribe(self._loop)
        self._queue: RingQueue[AudioFrameEvent | None] = RingQueue(capacity)

        self._task = self._loop.create_task(self._run())
        self._task.add_done_callback(task_done_logger)

        stream: Any = None
        if "participant" in kwargs:
            stream = self._create_owned_stream_from_participant(
                participant=kwargs["participant"], track_source=kwargs["track_source"]
            )
        else:
            stream = self._create_owned_stream()
        self._ffi_handle = FfiHandle(stream.handle.id)
        self._info = stream.info

    @classmethod
    def from_participant(
        cls,
        *,
        participant: Participant,
        track_source: TrackSource.ValueType,
        loop: Optional[asyncio.AbstractEventLoop] = None,
        capacity: int = 0,
        sample_rate: int = 48000,
        num_channels: int = 1,
    ) -> AudioStream:
        """Create an `AudioStream` from a participant's audio track.

        Args:
            participant (Participant): The participant from whom to receive audio.
            track_source (TrackSource.ValueType): The source of the audio track (e.g., microphone, screen share).
            loop (Optional[asyncio.AbstractEventLoop], optional): The event loop to use. Defaults to the current event loop.
            capacity (int, optional): The capacity of the internal frame queue. Defaults to 0 (unbounded).
            sample_rate (int, optional): The sample rate for the audio stream in Hz. Defaults to 48000.
            num_channels (int, optional): The number of audio channels. Defaults to 1.

        Returns:
            AudioStream: An instance of `AudioStream` that can be used to receive audio frames.

        Example:
            ```python
            audio_stream = AudioStream.from_participant(
                participant=participant,
                track_source=TrackSource.MICROPHONE,
                sample_rate=24000,
                num_channels=1,
            )
            ```
        """
        return AudioStream(
            participant=participant,
            track_source=track_source,
            loop=loop,
            capacity=capacity,
            track=None,  # type: ignore
            sample_rate=sample_rate,
            num_channels=num_channels,
        )

    @classmethod
    def from_track(
        cls,
        *,
        track: Track,
        loop: Optional[asyncio.AbstractEventLoop] = None,
        capacity: int = 0,
        sample_rate: int = 48000,
        num_channels: int = 1,
    ) -> AudioStream:
        """Create an `AudioStream` from an existing audio track.

        Args:
            track (Track): The audio track from which to receive audio.
            loop (Optional[asyncio.AbstractEventLoop], optional): The event loop to use. Defaults to the current event loop.
            capacity (int, optional): The capacity of the internal frame queue. Defaults to 0 (unbounded).
            sample_rate (int, optional): The sample rate for the audio stream in Hz. Defaults to 48000.
            num_channels (int, optional): The number of audio channels. Defaults to 1.

        Returns:
            AudioStream: An instance of `AudioStream` that can be used to receive audio frames.

        Example:
            ```python
            audio_stream = AudioStream.from_track(
                track=audio_track,
                sample_rate=44100,
                num_channels=2,
            )
            ```
        """
        return AudioStream(
            track=track,
            loop=loop,
            capacity=capacity,
            sample_rate=sample_rate,
            num_channels=num_channels,
        )

    def __del__(self) -> None:
        FfiClient.instance.queue.unsubscribe(self._ffi_queue)

    def _create_owned_stream(self) -> Any:
        assert self._track is not None
        req = proto_ffi.FfiRequest()
        new_audio_stream = req.new_audio_stream
        new_audio_stream.track_handle = self._track._ffi_handle.handle
        new_audio_stream.sample_rate = self._sample_rate
        new_audio_stream.num_channels = self._num_channels
        new_audio_stream.type = proto_audio_frame.AudioStreamType.AUDIO_STREAM_NATIVE
        resp = FfiClient.instance.request(req)
        return resp.new_audio_stream.stream

    def _create_owned_stream_from_participant(
        self, participant: Participant, track_source: TrackSource.ValueType
    ) -> Any:
        req = proto_ffi.FfiRequest()
        audio_stream_from_participant = req.audio_stream_from_participant
        audio_stream_from_participant.participant_handle = (
            participant._ffi_handle.handle
        )
        audio_stream_from_participant.sample_rate = self._sample_rate
        audio_stream_from_participant.num_channels = self._num_channels
        audio_stream_from_participant.type = (
            proto_audio_frame.AudioStreamType.AUDIO_STREAM_NATIVE
        )
        audio_stream_from_participant.track_source = track_source
        resp = FfiClient.instance.request(req)
        return resp.audio_stream_from_participant.stream

    async def _run(self):
        while True:
            event = await self._ffi_queue.wait_for(self._is_event)
            audio_event: proto_audio_frame.AudioStreamEvent = event.audio_stream_event

            if audio_event.HasField("frame_received"):
                owned_buffer_info = audio_event.frame_received.frame
                frame = AudioFrame._from_owned_info(owned_buffer_info)
                event = AudioFrameEvent(frame)
                self._queue.put(event)
            elif audio_event.HasField("eos"):
                self._queue.put(None)
                break

        FfiClient.instance.queue.unsubscribe(self._ffi_queue)

    async def aclose(self) -> None:
        """Asynchronously close the audio stream.

        This method cleans up resources associated with the audio stream and waits for
        any pending operations to complete.
        """
        self._ffi_handle.dispose()
        await self._task

    def _is_event(self, e: proto_ffi.FfiEvent) -> bool:
        return e.audio_stream_event.stream_handle == self._ffi_handle.handle

    def __aiter__(self) -> AsyncIterator[AudioFrameEvent]:
        return self

    async def __anext__(self) -> AudioFrameEvent:
        if self._task.done():
            raise StopAsyncIteration

        item = await self._queue.get()
        if item is None:
            raise StopAsyncIteration

        return item

Static methods

def from_participant(*, participant: Participant, track_source: TrackSource.ValueType, loop: Optional[asyncio.AbstractEventLoop] = None, capacity: int = 0, sample_rate: int = 48000, num_channels: int = 1) ‑> AudioStream

Create an AudioStream from a participant's audio track.

Args

participant : Participant
The participant from whom to receive audio.
track_source : TrackSource.ValueType
The source of the audio track (e.g., microphone, screen share).
loop : Optional[asyncio.AbstractEventLoop], optional
The event loop to use. Defaults to the current event loop.
capacity : int, optional
The capacity of the internal frame queue. Defaults to 0 (unbounded).
sample_rate : int, optional
The sample rate for the audio stream in Hz. Defaults to 48000.
num_channels : int, optional
The number of audio channels. Defaults to 1.

Returns

AudioStream
An instance of AudioStream that can be used to receive audio frames.

Example

audio_stream = AudioStream.from_participant(
    participant=participant,
    track_source=TrackSource.MICROPHONE,
    sample_rate=24000,
    num_channels=1,
)
def from_track(*, track: Track, loop: Optional[asyncio.AbstractEventLoop] = None, capacity: int = 0, sample_rate: int = 48000, num_channels: int = 1) ‑> AudioStream

Create an AudioStream from an existing audio track.

Args

track : Track
The audio track from which to receive audio.
loop : Optional[asyncio.AbstractEventLoop], optional
The event loop to use. Defaults to the current event loop.
capacity : int, optional
The capacity of the internal frame queue. Defaults to 0 (unbounded).
sample_rate : int, optional
The sample rate for the audio stream in Hz. Defaults to 48000.
num_channels : int, optional
The number of audio channels. Defaults to 1.

Returns

AudioStream
An instance of AudioStream that can be used to receive audio frames.

Example

audio_stream = AudioStream.from_track(
    track=audio_track,
    sample_rate=44100,
    num_channels=2,
)

Methods

async def aclose(self) ‑> None

Asynchronously close the audio stream.

This method cleans up resources associated with the audio stream and waits for any pending operations to complete.

class ChatManager (room: Room)

A utility class that sends and receives chat messages in the active session.

It implements LiveKit Chat Protocol, and serializes data to/from JSON data packets.

Initialize a new instance of EventEmitter.

Expand source code
class ChatManager(EventEmitter[EventTypes]):
    """A utility class that sends and receives chat messages in the active session.

    It implements LiveKit Chat Protocol, and serializes data to/from JSON data packets.
    """

    def __init__(self, room: Room):
        super().__init__()
        self._lp = room.local_participant
        self._room = room

        room.on("data_received", self._on_data_received)

    def close(self):
        self._room.off("data_received", self._on_data_received)

    async def send_message(self, message: str) -> "ChatMessage":
        """Send a chat message to the end user using LiveKit Chat Protocol.

        Args:
            message (str): the message to send

        Returns:
            ChatMessage: the message that was sent
        """
        msg = ChatMessage(
            message=message,
            is_local=True,
            participant=self._lp,
        )
        await self._lp.publish_data(
            payload=json.dumps(msg.asjsondict()),
            topic=_CHAT_TOPIC,
        )
        return msg

    async def update_message(self, message: "ChatMessage"):
        """Update a chat message that was previously sent.

        If message.deleted is set to True, we'll signal to remote participants that the message
        should be deleted.
        """
        await self._lp.publish_data(
            payload=json.dumps(message.asjsondict()),
            topic=_CHAT_UPDATE_TOPIC,
        )

    def _on_data_received(self, dp: DataPacket):
        # handle both new and updates the same way, as long as the ID is in there
        # the user can decide how to replace the previous message
        if dp.topic == _CHAT_TOPIC or dp.topic == _CHAT_UPDATE_TOPIC:
            try:
                parsed = json.loads(dp.data)
                msg = ChatMessage.from_jsondict(parsed)
                if dp.participant:
                    msg.participant = dp.participant
                self.emit("message_received", msg)
            except Exception as e:
                logging.warning("failed to parse chat message: %s", e, exc_info=e)

Ancestors

Methods

def close(self)
async def send_message(self, message: str) ‑> ChatMessage

Send a chat message to the end user using LiveKit Chat Protocol.

Args

message : str
the message to send

Returns

ChatMessage
the message that was sent
async def update_message(self, message: ChatMessage)

Update a chat message that was previously sent.

If message.deleted is set to True, we'll signal to remote participants that the message should be deleted.

Inherited members

class ChatMessage (message: str | None = None, id: str = <factory>, timestamp: datetime.datetime = <factory>, deleted: bool = False, participant: Optional[Participant] = None, is_local: bool = False)

ChatMessage(message: Optional[str] = None, id: str = , timestamp: datetime.datetime = , deleted: bool = False, participant: Optional[livekit.rtc.participant.Participant] = None, is_local: bool = False)

Expand source code
@dataclass
class ChatMessage:
    message: Optional[str] = None
    id: str = field(default_factory=generate_random_base62)
    timestamp: datetime = field(default_factory=datetime.now)
    deleted: bool = field(default=False)

    # These fields are not part of the wire protocol. They are here to provide
    # context for the application.
    participant: Optional[Participant] = None
    is_local: bool = field(default=False)

    @classmethod
    def from_jsondict(cls, d: Dict[str, Any]) -> "ChatMessage":
        # older version of the protocol didn't contain a message ID, so we'll create one
        id = d.get("id") or generate_random_base62()
        timestamp = datetime.now()
        if d.get("timestamp"):
            timestamp = datetime.fromtimestamp(d.get("timestamp", 0) / 1000.0)
        msg = cls(
            id=id,
            timestamp=timestamp,
        )
        msg.update_from_jsondict(d)
        return msg

    def update_from_jsondict(self, d: Dict[str, Any]) -> None:
        self.message = d.get("message")
        self.deleted = d.get("deleted", False)

    def asjsondict(self):
        """Returns a JSON serializable dictionary representation of the message."""
        d = {
            "id": self.id,
            "message": self.message,
            "timestamp": int(self.timestamp.timestamp() * 1000),
        }
        if self.deleted:
            d["deleted"] = True
        return d

Class variables

var deleted : bool
var id : str
var is_local : bool
var message : str | None
var participant : Optional[Participant]
var timestamp : datetime.datetime

Static methods

def from_jsondict(d: Dict[str, Any]) ‑> ChatMessage

Methods

def asjsondict(self)

Returns a JSON serializable dictionary representation of the message.

def update_from_jsondict(self, d: Dict[str, Any]) ‑> None
class ConnectError (message: str)

Common base class for all non-exit exceptions.

Expand source code
class ConnectError(Exception):
    def __init__(self, message: str):
        self.message = message

Ancestors

  • builtins.Exception
  • builtins.BaseException
class DataPacket (data: bytes, kind: proto_room.DataPacketKind.ValueType, participant: RemoteParticipant | None, topic: str | None = None)

DataPacket(data: 'bytes', kind: 'proto_room.DataPacketKind.ValueType', participant: 'RemoteParticipant | None', topic: 'str | None' = None)

Expand source code
@dataclass
class DataPacket:
    data: bytes
    """The payload of the data packet."""
    kind: proto_room.DataPacketKind.ValueType
    """Type of the data packet (e.g., RELIABLE, LOSSY)."""
    participant: RemoteParticipant | None
    """Participant who sent the data. None when sent by a server SDK."""
    topic: str | None = None
    """Topic associated with the data packet."""

Class variables

var data : bytes

The payload of the data packet.

var kind : int

Type of the data packet (e.g., RELIABLE, LOSSY).

var participantRemoteParticipant | None

Participant who sent the data. None when sent by a server SDK.

var topic : str | None

Topic associated with the data packet.

class E2EEManager (room_handle: int, options: Optional[E2EEOptions])
Expand source code
class E2EEManager:
    def __init__(self, room_handle: int, options: Optional[E2EEOptions]):
        self.options = options
        self._room_handle = room_handle
        self._enabled = options is not None

        if options is not None:
            self._key_provider = KeyProvider(
                self._room_handle, options.key_provider_options
            )

    @property
    def key_provider(self) -> Optional[KeyProvider]:
        return self._key_provider

    @property
    def enabled(self) -> bool:
        return self._enabled

    def set_enabled(self, enabled: bool) -> None:
        """Enables or disables end-to-end encryption.

        Parameters:
            enabled (bool): True to enable, False to disable.

        Example:
            ```python
            e2ee_manager.set_enabled(True)
            ```
        """
        self._enabled = enabled
        req = proto_ffi.FfiRequest()
        req.e2ee.room_handle = self._room_handle
        req.e2ee.manager_set_enabled.enabled = enabled
        FfiClient.instance.request(req)

    def frame_cryptors(self) -> List[FrameCryptor]:
        """Retrieves the list of frame cryptors for participants.

        Returns:
            List[FrameCryptor]: A list of FrameCryptor instances.

        Example:
            ```python
            cryptors = e2ee_manager.frame_cryptors()
            for cryptor in cryptors:
                print(cryptor.participant_identity)
            ```
        """
        req = proto_ffi.FfiRequest()
        req.e2ee.room_handle = self._room_handle

        resp = FfiClient.instance.request(req)
        frame_cryptors = []
        for frame_cryptor in resp.e2ee.manager_get_frame_cryptors.frame_cryptors:
            frame_cryptors.append(
                FrameCryptor(
                    self._room_handle,
                    frame_cryptor.participant_identity,
                    frame_cryptor.key_index,
                    frame_cryptor.enabled,
                )
            )
        return frame_cryptors

Instance variables

prop enabled : bool
Expand source code
@property
def enabled(self) -> bool:
    return self._enabled
prop key_provider : Optional[KeyProvider]
Expand source code
@property
def key_provider(self) -> Optional[KeyProvider]:
    return self._key_provider

Methods

def frame_cryptors(self) ‑> List[FrameCryptor]

Retrieves the list of frame cryptors for participants.

Returns

List[FrameCryptor]
A list of FrameCryptor instances.

Example

cryptors = e2ee_manager.frame_cryptors()
for cryptor in cryptors:
    print(cryptor.participant_identity)
def set_enabled(self, enabled: bool) ‑> None

Enables or disables end-to-end encryption.

Parameters

enabled (bool): True to enable, False to disable.

Example

e2ee_manager.set_enabled(True)
class E2EEOptions (key_provider_options: KeyProviderOptions = <factory>, encryption_type: int = 1)

E2EEOptions(key_provider_options: livekit.rtc.e2ee.KeyProviderOptions = , encryption_type: int = 1)

Expand source code
@dataclass
class E2EEOptions:
    key_provider_options: KeyProviderOptions = field(default_factory=KeyProviderOptions)
    encryption_type: proto_e2ee.EncryptionType.ValueType = proto_e2ee.EncryptionType.GCM

Class variables

var encryption_type : int
var key_provider_optionsKeyProviderOptions
class EventEmitter

Abstract base class for generic types.

On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::

class Mapping[KT, VT]:
    def __getitem__(self, key: KT) -> VT:
        ...
    # Etc.

On older versions of Python, however, generic classes have to explicitly inherit from Generic.

After a class has been declared to be generic, it can then be used as follows::

def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT:
    try:
        return mapping[key]
    except KeyError:
        return default

Initialize a new instance of EventEmitter.

Expand source code
class EventEmitter(Generic[T_contra]):
    def __init__(self) -> None:
        """
        Initialize a new instance of EventEmitter.
        """
        self._events: Dict[T_contra, Set[Callable]] = dict()

    def emit(self, event: T_contra, *args) -> None:
        """
        Trigger all callbacks associated with the given event.

        Args:
            event (T): The event to emit.
            *args: Positional arguments to pass to the callbacks.

        Example:
            Basic usage of emit:

            ```python
            emitter = EventEmitter[str]()

            def greet(name):
                print(f"Hello, {name}!")

            emitter.on('greet', greet)
            emitter.emit('greet', 'Alice')  # Output: Hello, Alice!
            ```
        """
        if event in self._events:
            callables = self._events[event].copy()
            for callback in callables:
                try:
                    sig = inspect.signature(callback)
                    params = sig.parameters.values()

                    has_varargs = any(p.kind == p.VAR_POSITIONAL for p in params)
                    if has_varargs:
                        callback(*args)
                    else:
                        positional_params = [
                            p
                            for p in params
                            if p.kind in (p.POSITIONAL_ONLY, p.POSITIONAL_OR_KEYWORD)
                        ]
                        num_params = len(positional_params)
                        num_args = min(len(args), num_params)
                        callback_args = args[:num_args]

                        callback(*callback_args)
                except TypeError:
                    raise
                except Exception:
                    logger.exception(f"failed to emit event {event}")

    def once(self, event: T_contra, callback: Optional[Callable] = None) -> Callable:
        """
        Register a callback to be called only once when the event is emitted.

        If a callback is provided, it registers the callback directly.
        If no callback is provided, it returns a decorator for use with function definitions.

        Args:
            event (T): The event to listen for.
            callback (Callable, optional): The callback to register. Defaults to None.

        Returns:
            Callable: The registered callback or a decorator if callback is None.

        Example:
            Using once with a direct callback:

            ```python
            emitter = EventEmitter[str]()

            def greet_once(name):
                print(f"Hello once, {name}!")

            emitter.once('greet', greet_once)
            emitter.emit('greet', 'Bob')    # Output: Hello once, Bob!
            emitter.emit('greet', 'Bob')    # No output, callback was removed after first call
            ```

            Using once as a decorator:

            ```python
            emitter = EventEmitter[str]()

            @emitter.once('greet')
            def greet_once(name):
                print(f"Hello once, {name}!")

            emitter.emit('greet', 'Bob')    # Output: Hello once, Bob!
            emitter.emit('greet', 'Bob')    # No output
            ```
        """
        if callback is not None:

            def once_callback(*args, **kwargs):
                self.off(event, once_callback)
                callback(*args, **kwargs)

            return self.on(event, once_callback)
        else:

            def decorator(callback: Callable) -> Callable:
                self.once(event, callback)
                return callback

            return decorator

    def on(self, event: T_contra, callback: Optional[Callable] = None) -> Callable:
        """
        Register a callback to be called whenever the event is emitted.

        If a callback is provided, it registers the callback directly.
        If no callback is provided, it returns a decorator for use with function definitions.

        Args:
            event (T): The event to listen for.
            callback (Callable, optional): The callback to register. Defaults to None.

        Returns:
            Callable: The registered callback or a decorator if callback is None.

        Example:
            Using on with a direct callback:

            ```python
            emitter = EventEmitter[str]()

            def greet(name):
                print(f"Hello, {name}!")

            emitter.on('greet', greet)
            emitter.emit('greet', 'Charlie')  # Output: Hello, Charlie!
            ```

            Using on as a decorator:

            ```python
            emitter = EventEmitter[str]()

            @emitter.on('greet')
            def greet(name):
                print(f"Hello, {name}!")

            emitter.emit('greet', 'Charlie')  # Output: Hello, Charlie!
            ```
        """
        if callback is not None:
            if event not in self._events:
                self._events[event] = set()
            self._events[event].add(callback)
            return callback
        else:

            def decorator(callback: Callable) -> Callable:
                self.on(event, callback)
                return callback

            return decorator

    def off(self, event: T_contra, callback: Callable) -> None:
        """
        Unregister a callback from an event.

        Args:
            event (T): The event to stop listening to.
            callback (Callable): The callback to remove.

        Example:
            Removing a callback:

            ```python
            emitter = EventEmitter[str]()

            def greet(name):
                print(f"Hello, {name}!")

            emitter.on('greet', greet)
            emitter.off('greet', greet)
            emitter.emit('greet', 'Dave')  # No output, callback was removed
            ```
        """
        if event in self._events:
            self._events[event].remove(callback)

Ancestors

  • typing.Generic

Subclasses

Methods

def emit(self, event: -T_contra, *args) ‑> None

Trigger all callbacks associated with the given event.

Args

event : T
The event to emit.
*args
Positional arguments to pass to the callbacks.

Example

Basic usage of emit:

emitter = EventEmitter[str]()

def greet(name):
    print(f"Hello, {name}!")

emitter.on('greet', greet)
emitter.emit('greet', 'Alice')  # Output: Hello, Alice!
def off(self, event: -T_contra, callback: Callable) ‑> None

Unregister a callback from an event.

Args

event : T
The event to stop listening to.
callback : Callable
The callback to remove.

Example

Removing a callback:

emitter = EventEmitter[str]()

def greet(name):
    print(f"Hello, {name}!")

emitter.on('greet', greet)
emitter.off('greet', greet)
emitter.emit('greet', 'Dave')  # No output, callback was removed
def on(self, event: -T_contra, callback: Optional[Callable] = None) ‑> Callable

Register a callback to be called whenever the event is emitted.

If a callback is provided, it registers the callback directly. If no callback is provided, it returns a decorator for use with function definitions.

Args

event : T
The event to listen for.
callback : Callable, optional
The callback to register. Defaults to None.

Returns

Callable
The registered callback or a decorator if callback is None.

Example

Using on with a direct callback:

emitter = EventEmitter[str]()

def greet(name):
    print(f"Hello, {name}!")

emitter.on('greet', greet)
emitter.emit('greet', 'Charlie')  # Output: Hello, Charlie!

Using on as a decorator:

emitter = EventEmitter[str]()

@emitter.on('greet')
def greet(name):
    print(f"Hello, {name}!")

emitter.emit('greet', 'Charlie')  # Output: Hello, Charlie!
def once(self, event: -T_contra, callback: Optional[Callable] = None) ‑> Callable

Register a callback to be called only once when the event is emitted.

If a callback is provided, it registers the callback directly. If no callback is provided, it returns a decorator for use with function definitions.

Args

event : T
The event to listen for.
callback : Callable, optional
The callback to register. Defaults to None.

Returns

Callable
The registered callback or a decorator if callback is None.

Example

Using once with a direct callback:

emitter = EventEmitter[str]()

def greet_once(name):
    print(f"Hello once, {name}!")

emitter.once('greet', greet_once)
emitter.emit('greet', 'Bob')    # Output: Hello once, Bob!
emitter.emit('greet', 'Bob')    # No output, callback was removed after first call

Using once as a decorator:

emitter = EventEmitter[str]()

@emitter.once('greet')
def greet_once(name):
    print(f"Hello once, {name}!")

emitter.emit('greet', 'Bob')    # Output: Hello once, Bob!
emitter.emit('greet', 'Bob')    # No output
class FrameCryptor (room_handle: int, participant_identity: str, key_index: int, enabled: bool)
Expand source code
class FrameCryptor:
    def __init__(
        self, room_handle: int, participant_identity: str, key_index: int, enabled: bool
    ):
        self._room_handle = room_handle
        self._enabled = enabled
        self._participant_identity = participant_identity
        self._key_index = key_index

    @property
    def participant_identity(self) -> str:
        return self._participant_identity

    @property
    def key_index(self) -> int:
        return self._key_index

    @property
    def enabled(self) -> bool:
        return self._enabled

    def set_enabled(self, enabled: bool) -> None:
        """Enables or disables frame encryption.

        Parameters:
            enabled (bool): True to enable, False to disable.

        Example:
            ```python
            frame_cryptor.set_enabled(True)
            ```
        """
        self._enabled = enabled
        req = proto_ffi.FfiRequest()
        req.e2ee.room_handle = self._room_handle
        req.e2ee.cryptor_set_enabled.participant_identity = self._participant_identity
        req.e2ee.cryptor_set_enabled.enabled = enabled
        FfiClient.instance.request(req)

    def set_key_index(self, key_index: int) -> None:
        """Sets the key index for encryption/decryption.

        Parameters:
            key_index (int): The new key index.

        Example:
            ```python
            frame_cryptor.set_key_index(3)
            ```
        """
        self._key_index = key_index
        req = proto_ffi.FfiRequest()
        req.e2ee.room_handle = self._room_handle
        req.e2ee.cryptor_set_key_index.participant_identity = self._participant_identity
        req.e2ee.cryptor_set_key_index.key_index = key_index
        FfiClient.instance.request(req)

Instance variables

prop enabled : bool
Expand source code
@property
def enabled(self) -> bool:
    return self._enabled
prop key_index : int
Expand source code
@property
def key_index(self) -> int:
    return self._key_index
prop participant_identity : str
Expand source code
@property
def participant_identity(self) -> str:
    return self._participant_identity

Methods

def set_enabled(self, enabled: bool) ‑> None

Enables or disables frame encryption.

Parameters

enabled (bool): True to enable, False to disable.

Example

frame_cryptor.set_enabled(True)
def set_key_index(self, key_index: int) ‑> None

Sets the key index for encryption/decryption.

Parameters

key_index (int): The new key index.

Example

frame_cryptor.set_key_index(3)
class IceServer (*args, **kwargs)

A ProtocolMessage

Ancestors

  • google._upb._message.Message
  • google.protobuf.message.Message

Class variables

var DESCRIPTOR
class KeyProvider (room_handle: int, options: KeyProviderOptions)
Expand source code
class KeyProvider:
    def __init__(self, room_handle: int, options: KeyProviderOptions):
        self._options = options
        self._room_handle = room_handle

    @property
    def options(self) -> KeyProviderOptions:
        return self._options

    def set_shared_key(self, key: bytes, key_index: int) -> None:
        """Sets the shared encryption key.

        Parameters:
            key (bytes): The new shared key.
            key_index (int): The index of the key.

        Example:
            ```python
            key_provider.set_shared_key(b"my_shared_key", key_index=1)
            ```
        """
        req = proto_ffi.FfiRequest()
        req.e2ee.room_handle = self._room_handle
        req.e2ee.set_shared_key.key_index = key_index
        req.e2ee.set_shared_key.shared_key = key
        FfiClient.instance.request(req)

    def export_shared_key(self, key_index: int) -> bytes:
        """Exports the shared encryption key.

        Parameters:
            key_index (int): The index of the key to export.

        Returns:
            bytes: The exported shared key.

        Example:
            ```python
            key = key_provider.export_shared_key(key_index=1)
            ```
        """
        req = proto_ffi.FfiRequest()
        req.e2ee.room_handle = self._room_handle
        req.e2ee.get_shared_key.key_index = key_index
        resp = FfiClient.instance.request(req)
        key = resp.e2ee.get_shared_key.key
        return key

    def ratchet_shared_key(self, key_index: int) -> bytes:
        """Ratchets the shared encryption key to a new key.

        Parameters:
            key_index (int): The index of the key to ratchet.

        Returns:
            bytes: The new ratcheted shared key.

        Example:
            ```python
            new_key = key_provider.ratchet_shared_key(key_index=1)
            ```
        """
        req = proto_ffi.FfiRequest()
        req.e2ee.room_handle = self._room_handle
        req.e2ee.ratchet_shared_key.key_index = key_index

        resp = FfiClient.instance.request(req)

        new_key = resp.e2ee.ratchet_shared_key.new_key
        return new_key

    def set_key(self, participant_identity: str, key: bytes, key_index: int) -> None:
        """Sets the encryption key for a specific participant.

        Parameters:
            participant_identity (str): The identity of the participant.
            key (bytes): The encryption key to set.
            key_index (int): The index of the key.

        Example:
            ```python
            key_provider.set_key("participant123", b"participant_key", key_index=2)
            ```
        """
        req = proto_ffi.FfiRequest()
        req.e2ee.room_handle = self._room_handle
        req.e2ee.set_key.participant_identity = participant_identity
        req.e2ee.set_key.key_index = key_index
        req.e2ee.set_key.key = key

        self.key_index = key_index
        FfiClient.instance.request(req)

    def export_key(self, participant_identity: str, key_index: int) -> bytes:
        """Exports the encryption key for a specific participant.

        Parameters:
            participant_identity (str): The identity of the participant.
            key_index (int): The index of the key to export.

        Returns:
            bytes: The exported key.

        Example:
            ```python
            key = key_provider.export_key("participant123", key_index=2)
            ```
        """
        req = proto_ffi.FfiRequest()
        req.e2ee.room_handle = self._room_handle
        req.e2ee.get_key.participant_identity = participant_identity
        req.e2ee.get_key.key_index = key_index
        resp = FfiClient.instance.request(req)
        key = resp.e2ee.get_key.key
        return key

    def ratchet_key(self, participant_identity: str, key_index: int) -> bytes:
        """Ratchets the encryption key for a specific participant to a new key.

        Parameters:
            participant_identity (str): The identity of the participant.
            key_index (int): The index of the key to ratchet.

        Returns:
            bytes: The new ratcheted key.

        Example:
            ```python
            new_key = key_provider.ratchet_key("participant123", key_index=2)
            ```
        """
        req = proto_ffi.FfiRequest()
        req.e2ee.room_handle = self._room_handle
        req.e2ee.ratchet_key.participant_identity = participant_identity
        req.e2ee.ratchet_key.key_index = key_index

        resp = FfiClient.instance.request(req)
        new_key = resp.e2ee.ratchet_key.new_key
        return new_key

Instance variables

prop optionsKeyProviderOptions
Expand source code
@property
def options(self) -> KeyProviderOptions:
    return self._options

Methods

def export_key(self, participant_identity: str, key_index: int) ‑> bytes

Exports the encryption key for a specific participant.

Parameters

participant_identity (str): The identity of the participant. key_index (int): The index of the key to export.

Returns

bytes
The exported key.

Example

key = key_provider.export_key("participant123", key_index=2)
def export_shared_key(self, key_index: int) ‑> bytes

Exports the shared encryption key.

Parameters

key_index (int): The index of the key to export.

Returns

bytes
The exported shared key.

Example

key = key_provider.export_shared_key(key_index=1)
def ratchet_key(self, participant_identity: str, key_index: int) ‑> bytes

Ratchets the encryption key for a specific participant to a new key.

Parameters

participant_identity (str): The identity of the participant. key_index (int): The index of the key to ratchet.

Returns

bytes
The new ratcheted key.

Example

new_key = key_provider.ratchet_key("participant123", key_index=2)
def ratchet_shared_key(self, key_index: int) ‑> bytes

Ratchets the shared encryption key to a new key.

Parameters

key_index (int): The index of the key to ratchet.

Returns

bytes
The new ratcheted shared key.

Example

new_key = key_provider.ratchet_shared_key(key_index=1)
def set_key(self, participant_identity: str, key: bytes, key_index: int) ‑> None

Sets the encryption key for a specific participant.

Parameters

participant_identity (str): The identity of the participant. key (bytes): The encryption key to set. key_index (int): The index of the key.

Example

key_provider.set_key("participant123", b"participant_key", key_index=2)
def set_shared_key(self, key: bytes, key_index: int) ‑> None

Sets the shared encryption key.

Parameters

key (bytes): The new shared key. key_index (int): The index of the key.

Example

key_provider.set_shared_key(b"my_shared_key", key_index=1)
class KeyProviderOptions (shared_key: Optional[bytes] = None, ratchet_salt: bytes = b'LKFrameEncryptionKey', ratchet_window_size: int = 16, failure_tolerance: int = -1)

KeyProviderOptions(shared_key: Optional[bytes] = None, ratchet_salt: bytes = b'LKFrameEncryptionKey', ratchet_window_size: int = 16, failure_tolerance: int = -1)

Expand source code
@dataclass
class KeyProviderOptions:
    shared_key: Optional[bytes] = None
    ratchet_salt: bytes = DEFAULT_RATCHET_SALT
    ratchet_window_size: int = DEFAULT_RATCHET_WINDOW_SIZE
    failure_tolerance: int = DEFAULT_FAILURE_TOLERANCE

Class variables

var failure_tolerance : int
var ratchet_salt : bytes
var ratchet_window_size : int
var shared_key : Optional[bytes]
class LocalAudioTrack (info: track_pb2.OwnedTrack)
Expand source code
class LocalAudioTrack(Track):
    def __init__(self, info: proto_track.OwnedTrack):
        super().__init__(info)

    @staticmethod
    def create_audio_track(name: str, source: "AudioSource") -> "LocalAudioTrack":
        req = proto_ffi.FfiRequest()
        req.create_audio_track.name = name
        req.create_audio_track.source_handle = source._ffi_handle.handle

        resp = FfiClient.instance.request(req)
        return LocalAudioTrack(resp.create_audio_track.track)

    def __repr__(self) -> str:
        return f"rtc.LocalAudioTrack(sid={self.sid}, name={self.name})"

Ancestors

Static methods

def create_audio_track(name: str, source: AudioSource)
class LocalParticipant (room_queue: BroadcastQueue[proto_ffi.FfiEvent], owned_info: proto_participant.OwnedParticipant)

Represents the local participant in a room.

Expand source code
class LocalParticipant(Participant):
    """Represents the local participant in a room."""

    def __init__(
        self,
        room_queue: BroadcastQueue[proto_ffi.FfiEvent],
        owned_info: proto_participant.OwnedParticipant,
    ) -> None:
        super().__init__(owned_info)
        self._room_queue = room_queue
        self._track_publications: dict[str, LocalTrackPublication] = {}  # type: ignore
        self._rpc_handlers: Dict[
            str, Callable[[RpcInvocationData], Union[Awaitable[str], str]]
        ] = {}

    @property
    def track_publications(self) -> Mapping[str, LocalTrackPublication]:
        """
        A dictionary of track publications associated with the participant.
        """
        return self._track_publications

    async def publish_data(
        self,
        payload: Union[bytes, str],
        *,
        reliable: bool = True,
        destination_identities: List[str] = [],
        topic: str = "",
    ) -> None:
        """
        Publish arbitrary data to the room.

        Args:
            payload (Union[bytes, str]): The data to publish.
            reliable (bool, optional): Whether to send reliably or not. Defaults to True.
            destination_identities (List[str], optional): List of participant identities to send to. Defaults to [].
            topic (str, optional): The topic under which to publish the data. Defaults to "".

        Raises:
            PublishDataError: If there is an error in publishing data.
        """
        if isinstance(payload, str):
            payload = payload.encode("utf-8")

        data_len = len(payload)
        cdata = (ctypes.c_byte * data_len)(*payload)

        req = proto_ffi.FfiRequest()
        req.publish_data.local_participant_handle = self._ffi_handle.handle
        req.publish_data.data_ptr = ctypes.addressof(cdata)
        req.publish_data.data_len = data_len
        req.publish_data.reliable = reliable
        req.publish_data.topic = topic
        req.publish_data.destination_identities.extend(destination_identities)

        queue = FfiClient.instance.queue.subscribe()
        try:
            resp = FfiClient.instance.request(req)
            cb: proto_ffi.FfiEvent = await queue.wait_for(
                lambda e: e.publish_data.async_id == resp.publish_data.async_id
            )
        finally:
            FfiClient.instance.queue.unsubscribe(queue)

        if cb.publish_data.error:
            raise PublishDataError(cb.publish_data.error)

    async def publish_dtmf(self, *, code: int, digit: str) -> None:
        """
        Publish SIP DTMF message.

        Args:
            code (int): DTMF code.
            digit (str): DTMF digit.

        Raises:
            PublishDTMFError: If there is an error in publishing SIP DTMF message.
        """
        req = proto_ffi.FfiRequest()
        req.publish_sip_dtmf.local_participant_handle = self._ffi_handle.handle
        req.publish_sip_dtmf.code = code
        req.publish_sip_dtmf.digit = digit

        queue = FfiClient.instance.queue.subscribe()
        try:
            resp = FfiClient.instance.request(req)
            cb: proto_ffi.FfiEvent = await queue.wait_for(
                lambda e: e.publish_sip_dtmf.async_id == resp.publish_sip_dtmf.async_id
            )
        finally:
            FfiClient.instance.queue.unsubscribe(queue)

        if cb.publish_sip_dtmf.error:
            raise PublishDTMFError(cb.publish_sip_dtmf.error)

    async def publish_transcription(self, transcription: Transcription) -> None:
        """
        Publish transcription data to the room.

        Args:
            transcription (Transcription): The transcription data to publish.

        Raises:
            PublishTranscriptionError: If there is an error in publishing transcription.
        """
        req = proto_ffi.FfiRequest()
        proto_segments = [
            ProtoTranscriptionSegment(
                id=s.id,
                text=s.text,
                start_time=s.start_time,
                end_time=s.end_time,
                final=s.final,
                language=s.language,
            )
            for s in transcription.segments
        ]
        # fmt: off
        req.publish_transcription.local_participant_handle = self._ffi_handle.handle
        req.publish_transcription.participant_identity = transcription.participant_identity
        req.publish_transcription.segments.extend(proto_segments)
        req.publish_transcription.track_id = transcription.track_sid
        # fmt: on
        queue = FfiClient.instance.queue.subscribe()
        try:
            resp = FfiClient.instance.request(req)
            cb: proto_ffi.FfiEvent = await queue.wait_for(
                lambda e: e.publish_transcription.async_id
                == resp.publish_transcription.async_id
            )
        finally:
            FfiClient.instance.queue.unsubscribe(queue)

        if cb.publish_transcription.error:
            raise PublishTranscriptionError(cb.publish_transcription.error)

    async def perform_rpc(
        self,
        *,
        destination_identity: str,
        method: str,
        payload: str,
        response_timeout: Optional[float] = None,
    ) -> str:
        """
        Initiate an RPC call to a remote participant.

        Args:
            destination_identity (str): The `identity` of the destination participant
            method (str): The method name to call
            payload (str): The method payload
            response_timeout (Optional[float]): Timeout for receiving a response after initial connection

        Returns:
            str: The response payload

        Raises:
            RpcError: On failure. Details in `message`.
        """
        req = proto_ffi.FfiRequest()
        req.perform_rpc.local_participant_handle = self._ffi_handle.handle
        req.perform_rpc.destination_identity = destination_identity
        req.perform_rpc.method = method
        req.perform_rpc.payload = payload
        if response_timeout is not None:
            req.perform_rpc.response_timeout_ms = int(response_timeout * 1000)

        queue = FfiClient.instance.queue.subscribe()
        try:
            resp = FfiClient.instance.request(req)
            cb = await queue.wait_for(
                lambda e: (e.perform_rpc.async_id == resp.perform_rpc.async_id)
            )
        finally:
            FfiClient.instance.queue.unsubscribe(queue)

        if cb.perform_rpc.HasField("error"):
            raise RpcError._from_proto(cb.perform_rpc.error)

        return cb.perform_rpc.payload

    def register_rpc_method(
        self,
        method_name: str,
        handler: Optional[
            Callable[[RpcInvocationData], Union[Awaitable[str], str]]
        ] = None,
    ) -> Union[None, Callable]:
        """
        Establishes the participant as a receiver for calls of the specified RPC method.
        Can be used either as a decorator or a regular method.

        The handler will receive one argument of type `RpcInvocationData` and should return a string response which will be forwarded back to the caller.

        The handler may be synchronous or asynchronous.

        If unable to respond within `response_timeout`, the caller will hang up and receive an error on their side.

        You may raise errors of type `RpcError` in the handler, and they will be forwarded to the caller.

        Other errors raised in your handler will be caught and forwarded to the caller as "1500 Application Error".

        Args:
            method_name (str): The name of the indicated RPC method.
            handler (Optional[Callable]): Handler to be invoked whenever an RPC request for this method is received.  Omit this argument to use the decorator syntax.

        Returns:
            None (when used as a decorator it returns the decorator function)

        Example:
            # As a decorator:
            @room.local_participant.register_rpc_method("greet")
            async def greet_handler(data: RpcInvocationData) -> str:
                print(f"Received greeting from {data.caller_identity}: {data.payload}")
                return f"Hello, {data.caller_identity}!"

            # As a regular method:
            async def greet_handler(data: RpcInvocationData) -> str:
                print(f"Received greeting from {data.caller_identity}: {data.payload}")
                return f"Hello, {data.caller_identity}!"

            room.local_participant.register_rpc_method('greet', greet_handler)
        """

        def register(handler_func):
            self._rpc_handlers[method_name] = handler_func
            req = proto_ffi.FfiRequest()
            req.register_rpc_method.local_participant_handle = self._ffi_handle.handle
            req.register_rpc_method.method = method_name
            FfiClient.instance.request(req)

        if handler is not None:
            register(handler)
            return None
        else:
            # Called as a decorator
            return register

    def unregister_rpc_method(self, method: str) -> None:
        """
        Unregisters a previously registered RPC method.

        Args:
            method (str): The name of the RPC method to unregister
        """
        self._rpc_handlers.pop(method, None)

        req = proto_ffi.FfiRequest()
        req.unregister_rpc_method.local_participant_handle = self._ffi_handle.handle
        req.unregister_rpc_method.method = method

        FfiClient.instance.request(req)

    async def _handle_rpc_method_invocation(
        self,
        invocation_id: int,
        method: str,
        request_id: str,
        caller_identity: str,
        payload: str,
        response_timeout: float,
    ) -> None:
        response_error: Optional[RpcError] = None
        response_payload: Optional[str] = None

        params = RpcInvocationData(
            request_id, caller_identity, payload, response_timeout
        )

        handler = self._rpc_handlers.get(method)

        if not handler:
            response_error = RpcError._built_in(RpcError.ErrorCode.UNSUPPORTED_METHOD)
        else:
            try:
                if asyncio.iscoroutinefunction(handler):
                    async_handler = cast(
                        Callable[[RpcInvocationData], Awaitable[str]], handler
                    )

                    async def run_handler():
                        try:
                            return await async_handler(params)
                        except asyncio.CancelledError:
                            # This will be caught by the outer try-except if it's due to timeout
                            raise

                    try:
                        response_payload = await asyncio.wait_for(
                            run_handler(), timeout=response_timeout
                        )
                    except asyncio.TimeoutError:
                        raise RpcError._built_in(RpcError.ErrorCode.RESPONSE_TIMEOUT)
                    except asyncio.CancelledError:
                        raise RpcError._built_in(
                            RpcError.ErrorCode.RECIPIENT_DISCONNECTED
                        )
                else:
                    sync_handler = cast(Callable[[RpcInvocationData], str], handler)
                    response_payload = sync_handler(params)
            except RpcError as error:
                response_error = error
            except Exception as error:
                logger.exception(
                    f"Uncaught error returned by RPC handler for {method}. Returning APPLICATION_ERROR instead.  Original error: {error}",
                )
                response_error = RpcError._built_in(
                    RpcError.ErrorCode.APPLICATION_ERROR
                )

        req = proto_ffi.FfiRequest(
            rpc_method_invocation_response=RpcMethodInvocationResponseRequest(
                local_participant_handle=self._ffi_handle.handle,
                invocation_id=invocation_id,
                error=response_error._to_proto() if response_error else None,
                payload=response_payload,
            )
        )

        res = FfiClient.instance.request(req)

        if res.rpc_method_invocation_response.error:
            logger.exception(
                f"error sending rpc method invocation response: {res.rpc_method_invocation_response.error}"
            )

    async def set_metadata(self, metadata: str) -> None:
        """
        Set the metadata for the local participant.

        Note: this requires `canUpdateOwnMetadata` permission.

        Args:
            metadata (str): The new metadata.
        """
        req = proto_ffi.FfiRequest()
        req.set_local_metadata.local_participant_handle = self._ffi_handle.handle
        req.set_local_metadata.metadata = metadata

        queue = FfiClient.instance.queue.subscribe()
        try:
            resp = FfiClient.instance.request(req)
            await queue.wait_for(
                lambda e: e.set_local_metadata.async_id
                == resp.set_local_metadata.async_id
            )
        finally:
            FfiClient.instance.queue.unsubscribe(queue)

    async def set_name(self, name: str) -> None:
        """
        Set the name for the local participant.

        Note: this requires `canUpdateOwnMetadata` permission.

        Args:
            name (str): The new name.
        """
        req = proto_ffi.FfiRequest()
        req.set_local_name.local_participant_handle = self._ffi_handle.handle
        req.set_local_name.name = name

        queue = FfiClient.instance.queue.subscribe()
        try:
            resp = FfiClient.instance.request(req)
            await queue.wait_for(
                lambda e: e.set_local_name.async_id == resp.set_local_name.async_id
            )
        finally:
            FfiClient.instance.queue.unsubscribe(queue)

    async def set_attributes(self, attributes: dict[str, str]) -> None:
        """
        Set custom attributes for the local participant.

        Note: this requires `canUpdateOwnMetadata` permission.

        Args:
            attributes (dict[str, str]): A dictionary of attributes to set.
        """
        req = proto_ffi.FfiRequest()
        req.set_local_attributes.local_participant_handle = self._ffi_handle.handle
        existing_attributes = {
            entry.key: entry.value for entry in req.set_local_attributes.attributes
        }
        existing_attributes.update(attributes)

        for key, value in existing_attributes.items():
            entry = req.set_local_attributes.attributes.add()
            entry.key = key
            entry.value = value

        queue = FfiClient.instance.queue.subscribe()
        try:
            resp = FfiClient.instance.request(req)
            await queue.wait_for(
                lambda e: e.set_local_attributes.async_id
                == resp.set_local_attributes.async_id
            )
        finally:
            FfiClient.instance.queue.unsubscribe(queue)

    async def publish_track(
        self, track: LocalTrack, options: TrackPublishOptions = TrackPublishOptions()
    ) -> LocalTrackPublication:
        """
        Publish a local track to the room.

        Args:
            track (LocalTrack): The track to publish.
            options (TrackPublishOptions, optional): Options for publishing the track.

        Returns:
            LocalTrackPublication: The publication of the published track.

        Raises:
            PublishTrackError: If there is an error in publishing the track.
        """
        req = proto_ffi.FfiRequest()
        req.publish_track.track_handle = track._ffi_handle.handle
        req.publish_track.local_participant_handle = self._ffi_handle.handle
        req.publish_track.options.CopyFrom(options)

        queue = self._room_queue.subscribe()
        try:
            resp = FfiClient.instance.request(req)
            cb: proto_ffi.FfiEvent = await queue.wait_for(
                lambda e: e.publish_track.async_id == resp.publish_track.async_id
            )

            if cb.publish_track.error:
                raise PublishTrackError(cb.publish_track.error)

            track_publication = LocalTrackPublication(cb.publish_track.publication)
            track_publication.track = track
            track._info.sid = track_publication.sid
            self._track_publications[track_publication.sid] = track_publication

            queue.task_done()
            return track_publication
        finally:
            self._room_queue.unsubscribe(queue)

    async def unpublish_track(self, track_sid: str) -> None:
        """
        Unpublish a track from the room.

        Args:
            track_sid (str): The SID of the track to unpublish.

        Raises:
            UnpublishTrackError: If there is an error in unpublishing the track.
        """
        req = proto_ffi.FfiRequest()
        req.unpublish_track.local_participant_handle = self._ffi_handle.handle
        req.unpublish_track.track_sid = track_sid

        queue = self._room_queue.subscribe()
        try:
            resp = FfiClient.instance.request(req)
            cb: proto_ffi.FfiEvent = await queue.wait_for(
                lambda e: e.unpublish_track.async_id == resp.unpublish_track.async_id
            )

            if cb.unpublish_track.error:
                raise UnpublishTrackError(cb.unpublish_track.error)

            publication = self._track_publications.pop(track_sid)
            publication.track = None
            queue.task_done()
        finally:
            self._room_queue.unsubscribe(queue)

    def __repr__(self) -> str:
        return f"rtc.LocalParticipant(sid={self.sid}, identity={self.identity}, name={self.name})"

Ancestors

Methods

async def perform_rpc(self, *, destination_identity: str, method: str, payload: str, response_timeout: Optional[float] = None) ‑> str

Initiate an RPC call to a remote participant.

Args

destination_identity : str
The identity of the destination participant
method : str
The method name to call
payload : str
The method payload
response_timeout : Optional[float]
Timeout for receiving a response after initial connection

Returns

str
The response payload

Raises

RpcError
On failure. Details in message.
async def publish_data(self, payload: Union[bytes, str], *, reliable: bool = True, destination_identities: List[str] = [], topic: str = '') ‑> None

Publish arbitrary data to the room.

Args

payload : Union[bytes, str]
The data to publish.
reliable : bool, optional
Whether to send reliably or not. Defaults to True.
destination_identities : List[str], optional
List of participant identities to send to. Defaults to [].
topic : str, optional
The topic under which to publish the data. Defaults to "".

Raises

PublishDataError
If there is an error in publishing data.
async def publish_dtmf(self, *, code: int, digit: str) ‑> None

Publish SIP DTMF message.

Args

code : int
DTMF code.
digit : str
DTMF digit.

Raises

PublishDTMFError
If there is an error in publishing SIP DTMF message.
async def publish_track(self, track: LocalTrack, options: TrackPublishOptions = ) ‑> LocalTrackPublication

Publish a local track to the room.

Args

track : LocalTrack
The track to publish.
options : TrackPublishOptions, optional
Options for publishing the track.

Returns

LocalTrackPublication
The publication of the published track.

Raises

PublishTrackError
If there is an error in publishing the track.
async def publish_transcription(self, transcription: Transcription) ‑> None

Publish transcription data to the room.

Args

transcription : Transcription
The transcription data to publish.

Raises

PublishTranscriptionError
If there is an error in publishing transcription.
def register_rpc_method(self, method_name: str, handler: Optional[Callable[[RpcInvocationData], Union[Awaitable[str], str]]] = None) ‑> Optional[Callable]

Establishes the participant as a receiver for calls of the specified RPC method. Can be used either as a decorator or a regular method.

The handler will receive one argument of type RpcInvocationData and should return a string response which will be forwarded back to the caller.

The handler may be synchronous or asynchronous.

If unable to respond within response_timeout, the caller will hang up and receive an error on their side.

You may raise errors of type RpcError in the handler, and they will be forwarded to the caller.

Other errors raised in your handler will be caught and forwarded to the caller as "1500 Application Error".

Args

method_name : str
The name of the indicated RPC method.
handler : Optional[Callable]
Handler to be invoked whenever an RPC request for this method is received. Omit this argument to use the decorator syntax.

Returns

None (when used as a decorator it returns the decorator function)

Example

As a decorator:

@room.local_participant.register_rpc_method("greet") async def greet_handler(data: RpcInvocationData) -> str: print(f"Received greeting from {data.caller_identity}: {data.payload}") return f"Hello, {data.caller_identity}!"

As a regular method:

async def greet_handler(data: RpcInvocationData) -> str: print(f"Received greeting from {data.caller_identity}: {data.payload}") return f"Hello, {data.caller_identity}!"

room.local_participant.register_rpc_method('greet', greet_handler)

async def set_attributes(self, attributes: dict[str, str]) ‑> None

Set custom attributes for the local participant.

Note: this requires canUpdateOwnMetadata permission.

Args

attributes : dict[str, str]
A dictionary of attributes to set.
async def set_metadata(self, metadata: str) ‑> None

Set the metadata for the local participant.

Note: this requires canUpdateOwnMetadata permission.

Args

metadata : str
The new metadata.
async def set_name(self, name: str) ‑> None

Set the name for the local participant.

Note: this requires canUpdateOwnMetadata permission.

Args

name : str
The new name.
async def unpublish_track(self, track_sid: str) ‑> None

Unpublish a track from the room.

Args

track_sid : str
The SID of the track to unpublish.

Raises

UnpublishTrackError
If there is an error in unpublishing the track.
def unregister_rpc_method(self, method: str) ‑> None

Unregisters a previously registered RPC method.

Args

method : str
The name of the RPC method to unregister

Inherited members

class LocalTrackPublication (owned_info: track_pb2.OwnedTrackPublication)
Expand source code
class LocalTrackPublication(TrackPublication):
    def __init__(self, owned_info: proto_track.OwnedTrackPublication):
        super().__init__(owned_info)
        self._first_subscription: asyncio.Future[None] = asyncio.Future()

    async def wait_for_subscription(self) -> None:
        await asyncio.shield(self._first_subscription)

    def __repr__(self) -> str:
        return f"rtc.LocalTrackPublication(sid={self.sid}, name={self.name}, kind={self.kind}, source={self.source})"

Ancestors

Methods

async def wait_for_subscription(self) ‑> None
class LocalVideoTrack (info: track_pb2.OwnedTrack)
Expand source code
class LocalVideoTrack(Track):
    def __init__(self, info: proto_track.OwnedTrack):
        super().__init__(info)

    @staticmethod
    def create_video_track(name: str, source: "VideoSource") -> "LocalVideoTrack":
        req = proto_ffi.FfiRequest()
        req.create_video_track.name = name
        req.create_video_track.source_handle = source._ffi_handle.handle

        resp = FfiClient.instance.request(req)
        return LocalVideoTrack(resp.create_video_track.track)

    def __repr__(self) -> str:
        return f"rtc.LocalVideoTrack(sid={self.sid}, name={self.name})"

Ancestors

Static methods

def create_video_track(name: str, source: VideoSource)
class Participant (owned_info: proto_participant.OwnedParticipant)

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code
class Participant(ABC):
    def __init__(self, owned_info: proto_participant.OwnedParticipant) -> None:
        self._info = owned_info.info
        self._ffi_handle = FfiHandle(owned_info.handle.id)

    @property
    @abstractmethod
    def track_publications(self) -> Mapping[str, TrackPublication]:
        """
        A dictionary of track publications associated with the participant.
        """
        ...

    @property
    def sid(self) -> str:
        return self._info.sid

    @property
    def name(self) -> str:
        return self._info.name

    @property
    def identity(self) -> str:
        return self._info.identity

    @property
    def metadata(self) -> str:
        return self._info.metadata

    @property
    def attributes(self) -> dict[str, str]:
        """Custom attributes associated with the participant."""
        return dict(self._info.attributes)

    @property
    def kind(self) -> proto_participant.ParticipantKind.ValueType:
        """Participant's kind (e.g., regular participant, ingress, egress, sip, agent)."""
        return self._info.kind

Ancestors

  • abc.ABC

Subclasses

Instance variables

prop attributes : dict[str, str]

Custom attributes associated with the participant.

Expand source code
@property
def attributes(self) -> dict[str, str]:
    """Custom attributes associated with the participant."""
    return dict(self._info.attributes)
prop identity : str
Expand source code
@property
def identity(self) -> str:
    return self._info.identity
prop kind : proto_participant.ParticipantKind.ValueType

Participant's kind (e.g., regular participant, ingress, egress, sip, agent).

Expand source code
@property
def kind(self) -> proto_participant.ParticipantKind.ValueType:
    """Participant's kind (e.g., regular participant, ingress, egress, sip, agent)."""
    return self._info.kind
prop metadata : str
Expand source code
@property
def metadata(self) -> str:
    return self._info.metadata
prop name : str
Expand source code
@property
def name(self) -> str:
    return self._info.name
prop sid : str
Expand source code
@property
def sid(self) -> str:
    return self._info.sid
prop track_publications : Mapping[str, TrackPublication]

A dictionary of track publications associated with the participant.

Expand source code
@property
@abstractmethod
def track_publications(self) -> Mapping[str, TrackPublication]:
    """
    A dictionary of track publications associated with the participant.
    """
    ...
class RemoteAudioTrack (info: track_pb2.OwnedTrack)
Expand source code
class RemoteAudioTrack(Track):
    def __init__(self, info: proto_track.OwnedTrack):
        super().__init__(info)

    def __repr__(self) -> str:
        return f"rtc.RemoteAudioTrack(sid={self.sid}, name={self.name})"

Ancestors

class RemoteParticipant (owned_info: proto_participant.OwnedParticipant)

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code
class RemoteParticipant(Participant):
    def __init__(self, owned_info: proto_participant.OwnedParticipant) -> None:
        super().__init__(owned_info)
        self._track_publications: dict[str, RemoteTrackPublication] = {}  # type: ignore

    @property
    def track_publications(self) -> Mapping[str, RemoteTrackPublication]:
        """
        A dictionary of track publications associated with the participant.
        """
        return self._track_publications

    def __repr__(self) -> str:
        return f"rtc.RemoteParticipant(sid={self.sid}, identity={self.identity}, name={self.name})"

Ancestors

Inherited members

class RemoteTrackPublication (owned_info: track_pb2.OwnedTrackPublication)
Expand source code
class RemoteTrackPublication(TrackPublication):
    def __init__(self, owned_info: proto_track.OwnedTrackPublication):
        super().__init__(owned_info)
        self.subscribed = False

    def set_subscribed(self, subscribed: bool):
        req = proto_ffi.FfiRequest()
        req.set_subscribed.subscribe = subscribed
        req.set_subscribed.publication_handle = self._ffi_handle.handle
        FfiClient.instance.request(req)

    def __repr__(self) -> str:
        return f"rtc.RemoteTrackPublication(sid={self.sid}, name={self.name}, kind={self.kind}, source={self.source})"

Ancestors

Methods

def set_subscribed(self, subscribed: bool)
class RemoteVideoTrack (info: track_pb2.OwnedTrack)
Expand source code
class RemoteVideoTrack(Track):
    def __init__(self, info: proto_track.OwnedTrack):
        super().__init__(info)

    def __repr__(self) -> str:
        return f"rtc.RemoteVideoTrack(sid={self.sid}, name={self.name})"

Ancestors

class Room (loop: Optional[asyncio.AbstractEventLoop] = None)

Abstract base class for generic types.

On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::

class Mapping[KT, VT]:
    def __getitem__(self, key: KT) -> VT:
        ...
    # Etc.

On older versions of Python, however, generic classes have to explicitly inherit from Generic.

After a class has been declared to be generic, it can then be used as follows::

def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT:
    try:
        return mapping[key]
    except KeyError:
        return default

Initializes a new Room instance.

Parameters

loop (Optional[asyncio.AbstractEventLoop]): The event loop to use. If not provided, the default event loop is used.

Expand source code
class Room(EventEmitter[EventTypes]):
    def __init__(self, loop: Optional[asyncio.AbstractEventLoop] = None) -> None:
        """Initializes a new Room instance.

        Parameters:
            loop (Optional[asyncio.AbstractEventLoop]): The event loop to use. If not provided, the default event loop is used.
        """
        super().__init__()

        self._ffi_handle: Optional[FfiHandle] = None
        self._loop = loop or asyncio.get_event_loop()
        self._room_queue = BroadcastQueue[proto_ffi.FfiEvent]()
        self._info = proto_room.RoomInfo()
        self._rpc_invocation_tasks: set[asyncio.Task] = set()

        self._remote_participants: Dict[str, RemoteParticipant] = {}
        self._connection_state = ConnectionState.CONN_DISCONNECTED
        self._first_sid_future = asyncio.Future[str]()
        self._local_participant: LocalParticipant | None = None

    def __del__(self) -> None:
        if self._ffi_handle is not None:
            FfiClient.instance.queue.unsubscribe(self._ffi_queue)

    @property
    async def sid(self) -> str:
        """Asynchronously retrieves the session ID (SID) of the room.

        Returns:
            str: The session ID of the room.
        """
        if self._info.sid:
            return self._info.sid

        return await self._first_sid_future

    @property
    def local_participant(self) -> LocalParticipant:
        """Gets the local participant in the room.

        Returns:
            LocalParticipant: The local participant in the room.
        """
        if self._local_participant is None:
            raise Exception("cannot access local participant before connecting")

        return self._local_participant

    @property
    def connection_state(self) -> ConnectionState.ValueType:
        """Gets the connection state of the room.

        Returns:
            ConnectionState: The connection state of the room.
        """
        return self._connection_state

    @property
    def remote_participants(self) -> Mapping[str, RemoteParticipant]:
        """Gets the remote participants in the room.

        Returns:
            dict[str, RemoteParticipant]: A dictionary of remote participants indexed by their
            identity.
        """
        return self._remote_participants

    @property
    def name(self) -> str:
        """Gets the name of the room.

        Returns:
            str: The name of the room.
        """
        return self._info.name

    @property
    def metadata(self) -> str:
        """Gets the metadata associated with the room.

        Returns:
            str: The metadata of the room.
        """
        return self._info.metadata

    @property
    def e2ee_manager(self) -> E2EEManager:
        """Gets the end-to-end encryption (E2EE) manager for the room.

        Returns:
            E2EEManager: The E2EE manager instance.
        """
        return self._e2ee_manager

    def isconnected(self) -> bool:
        """Checks if the room is currently connected.

        Returns:
            bool: True if connected, False otherwise.
        """
        return (
            self._ffi_handle is not None
            and self._connection_state != ConnectionState.CONN_DISCONNECTED
        )

    def on(self, event: EventTypes, callback: Optional[Callable] = None) -> Callable:
        """Registers an event handler for a specific event type.

        Parameters:
            event (EventTypes): The name of the event to listen for.
            callback (Callable): The function to call when the event occurs.

        Returns:
            Callable: The registered callback function.

        Available events:
            - **"participant_connected"**: Called when a new participant joins the room.
                - Arguments: `participant` (RemoteParticipant)
            - **"participant_disconnected"**: Called when a participant leaves the room.
                - Arguments: `participant` (RemoteParticipant)
            - **"local_track_published"**: Called when a local track is published.
                - Arguments: `publication` (LocalTrackPublication), `track` (Track)
            - **"local_track_unpublished"**: Called when a local track is unpublished.
                - Arguments: `publication` (LocalTrackPublication)
            - **"local_track_subscribed"**: Called when a local track is subscribed.
                - Arguments: `track` (Track)
            - **"track_published"**: Called when a remote participant publishes a track.
                - Arguments: `publication` (RemoteTrackPublication), `participant` (RemoteParticipant)
            - **"track_unpublished"**: Called when a remote participant unpublishes a track.
                - Arguments: `publication` (RemoteTrackPublication), `participant` (RemoteParticipant)
            - **"track_subscribed"**: Called when a track is subscribed.
                - Arguments: `track` (Track), `publication` (RemoteTrackPublication), `participant` (RemoteParticipant)
            - **"track_unsubscribed"**: Called when a track is unsubscribed.
                - Arguments: `track` (Track), `publication` (RemoteTrackPublication), `participant` (RemoteParticipant)
            - **"track_subscription_failed"**: Called when a track subscription fails.
                - Arguments: `participant` (RemoteParticipant), `track_sid` (str), `error` (str)
            - **"track_muted"**: Called when a track is muted.
                - Arguments: `participant` (Participant), `publication` (TrackPublication)
            - **"track_unmuted"**: Called when a track is unmuted.
                - Arguments: `participant` (Participant), `publication` (TrackPublication)
            - **"active_speakers_changed"**: Called when the list of active speakers changes.
                - Arguments: `speakers` (list[Participant])
            - **"room_metadata_changed"**: Called when the room's metadata is updated.
                - Arguments: `old_metadata` (str), `new_metadata` (str)
            - **"participant_metadata_changed"**: Called when a participant's metadata is updated.
                - Arguments: `participant` (Participant), `old_metadata` (str), `new_metadata` (str)
            - **"participant_name_changed"**: Called when a participant's name is changed.
                - Arguments: `participant` (Participant), `old_name` (str), `new_name` (str)
            - **"participant_attributes_changed"**: Called when a participant's attributes change.
                - Arguments: `changed_attributes` (dict), `participant` (Participant)
            - **"connection_quality_changed"**: Called when a participant's connection quality changes.
                - Arguments: `participant` (Participant), `quality` (ConnectionQuality)
            - **"transcription_received"**: Called when a transcription is received.
                - Arguments: `segments` (list[TranscriptionSegment]), `participant` (Participant), `publication` (TrackPublication)
            - **"data_received"**: Called when data is received.
                - Arguments: `data_packet` (DataPacket)
            - **"sip_dtmf_received"**: Called when a SIP DTMF signal is received.
                - Arguments: `sip_dtmf` (SipDTMF)
            - **"e2ee_state_changed"**: Called when a participant's E2EE state changes.
                - Arguments: `participant` (Participant), `state` (EncryptionState)
            - **"connection_state_changed"**: Called when the room's connection state changes.
                - Arguments: `connection_state` (ConnectionState)
            - **"connected"**: Called when the room is successfully connected.
                - Arguments: None
            - **"disconnected"**: Called when the room is disconnected.
                - Arguments: `reason` (DisconnectReason)
            - **"reconnecting"**: Called when the room is attempting to reconnect.
                - Arguments: None
            - **"reconnected"**: Called when the room has successfully reconnected.
                - Arguments: None

        Example:
            ```python
            def on_participant_connected(participant):
                print(f"Participant connected: {participant.identity}")

            room.on("participant_connected", on_participant_connected)
            ```
        """
        return super().on(event, callback)

    async def connect(
        self, url: str, token: str, options: RoomOptions = RoomOptions()
    ) -> None:
        """Connects to a LiveKit room using the specified URL and token.

        Parameters:
            url (str): The WebSocket URL of the LiveKit server to connect to.
            token (str): The access token for authentication and authorization.
            options (RoomOptions, optional): Additional options for the room connection.

        Raises:
            ConnectError: If the connection fails.

        Example:
            ```python
            room = Room()

            # Listen for events before connecting to the room
            @room.on("participant_connected")
            def on_participant_connected(participant):
                print(f"Participant connected: {participant.identity}")

            await room.connect("ws://localhost:7880", "your_token")
            ```
        """
        req = proto_ffi.FfiRequest()
        req.connect.url = url
        req.connect.token = token

        # options
        req.connect.options.auto_subscribe = options.auto_subscribe
        req.connect.options.dynacast = options.dynacast

        if options.e2ee:
            req.connect.options.e2ee.encryption_type = options.e2ee.encryption_type
            req.connect.options.e2ee.key_provider_options.shared_key = (
                options.e2ee.key_provider_options.shared_key  # type: ignore
            )
            req.connect.options.e2ee.key_provider_options.ratchet_salt = (
                options.e2ee.key_provider_options.ratchet_salt
            )
            req.connect.options.e2ee.key_provider_options.failure_tolerance = (
                options.e2ee.key_provider_options.failure_tolerance
            )
            req.connect.options.e2ee.key_provider_options.ratchet_window_size = (
                options.e2ee.key_provider_options.ratchet_window_size
            )

        if options.rtc_config:
            req.connect.options.rtc_config.ice_transport_type = (
                options.rtc_config.ice_transport_type
            )  # type: ignore
            req.connect.options.rtc_config.continual_gathering_policy = (
                options.rtc_config.continual_gathering_policy
            )  # type: ignore
            req.connect.options.rtc_config.ice_servers.extend(
                options.rtc_config.ice_servers
            )

        # subscribe before connecting so we don't miss any events
        self._ffi_queue = FfiClient.instance.queue.subscribe(self._loop)

        queue = FfiClient.instance.queue.subscribe()
        try:
            resp = FfiClient.instance.request(req)
            cb: proto_ffi.FfiEvent = await queue.wait_for(
                lambda e: e.connect.async_id == resp.connect.async_id
            )
        finally:
            FfiClient.instance.queue.unsubscribe(queue)

        if cb.connect.error:
            FfiClient.instance.queue.unsubscribe(self._ffi_queue)
            raise ConnectError(cb.connect.error)

        self._ffi_handle = FfiHandle(cb.connect.result.room.handle.id)

        self._e2ee_manager = E2EEManager(self._ffi_handle.handle, options.e2ee)

        self._info = cb.connect.result.room.info
        self._connection_state = ConnectionState.CONN_CONNECTED

        self._local_participant = LocalParticipant(
            self._room_queue, cb.connect.result.local_participant
        )

        for pt in cb.connect.result.participants:
            rp = self._create_remote_participant(pt.participant)

            # add the initial remote participant tracks
            for owned_publication_info in pt.publications:
                publication = RemoteTrackPublication(owned_publication_info)
                rp._track_publications[publication.sid] = publication

        # start listening to room events
        self._task = self._loop.create_task(self._listen_task())

    async def disconnect(self) -> None:
        """Disconnects from the room."""
        if not self.isconnected():
            return

        await self._drain_rpc_invocation_tasks()

        req = proto_ffi.FfiRequest()
        req.disconnect.room_handle = self._ffi_handle.handle  # type: ignore
        queue = FfiClient.instance.queue.subscribe()
        try:
            resp = FfiClient.instance.request(req)
            await queue.wait_for(
                lambda e: e.disconnect.async_id == resp.disconnect.async_id
            )
        finally:
            FfiClient.instance.queue.unsubscribe(queue)
        await self._task
        FfiClient.instance.queue.unsubscribe(self._ffi_queue)

    async def _listen_task(self) -> None:
        # listen to incoming room events
        while True:
            event = await self._ffi_queue.get()
            if event.WhichOneof("message") == "rpc_method_invocation":
                self._on_rpc_method_invocation(event.rpc_method_invocation)
            elif event.room_event.room_handle == self._ffi_handle.handle:  # type: ignore
                if event.room_event.HasField("eos"):
                    break

                try:
                    self._on_room_event(event.room_event)
                except Exception:
                    logging.exception(
                        "error running user callback for %s: %s",
                        event.room_event.WhichOneof("message"),
                        event.room_event,
                    )

            # wait for the subscribers to process the event
            # before processing the next one
            self._room_queue.put_nowait(event)
            await self._room_queue.join()

        # Clean up any pending RPC invocation tasks
        await self._drain_rpc_invocation_tasks()

    def _on_rpc_method_invocation(self, rpc_invocation: RpcMethodInvocationEvent):
        if self._local_participant is None:
            return

        if (
            rpc_invocation.local_participant_handle
            == self._local_participant._ffi_handle.handle
        ):
            task = self._loop.create_task(
                self._local_participant._handle_rpc_method_invocation(
                    rpc_invocation.invocation_id,
                    rpc_invocation.method,
                    rpc_invocation.request_id,
                    rpc_invocation.caller_identity,
                    rpc_invocation.payload,
                    rpc_invocation.response_timeout_ms / 1000.0,
                )
            )
            self._rpc_invocation_tasks.add(task)
            task.add_done_callback(self._rpc_invocation_tasks.discard)

    def _on_room_event(self, event: proto_room.RoomEvent):
        which = event.WhichOneof("message")
        if which == "participant_connected":
            rparticipant = self._create_remote_participant(
                event.participant_connected.info
            )
            self.emit("participant_connected", rparticipant)
        elif which == "participant_disconnected":
            identity = event.participant_disconnected.participant_identity
            rparticipant = self._remote_participants.pop(identity)
            self.emit("participant_disconnected", rparticipant)
        elif which == "local_track_published":
            sid = event.local_track_published.track_sid
            lpublication = self.local_participant.track_publications[sid]
            track = lpublication.track
            self.emit("local_track_published", lpublication, track)
        elif which == "local_track_unpublished":
            sid = event.local_track_unpublished.publication_sid
            lpublication = self.local_participant.track_publications[sid]
            self.emit("local_track_unpublished", lpublication)
        elif which == "local_track_subscribed":
            sid = event.local_track_subscribed.track_sid
            lpublication = self.local_participant.track_publications[sid]
            lpublication._first_subscription.set_result(None)
            self.emit("local_track_subscribed", lpublication.track)
        elif which == "track_published":
            rparticipant = self._remote_participants[
                event.track_published.participant_identity
            ]
            rpublication = RemoteTrackPublication(event.track_published.publication)
            rparticipant._track_publications[rpublication.sid] = rpublication
            self.emit("track_published", rpublication, rparticipant)
        elif which == "track_unpublished":
            rparticipant = self._remote_participants[
                event.track_unpublished.participant_identity
            ]
            rpublication = rparticipant._track_publications.pop(
                event.track_unpublished.publication_sid
            )
            self.emit("track_unpublished", rpublication, rparticipant)
        elif which == "track_subscribed":
            owned_track_info = event.track_subscribed.track
            track_info = owned_track_info.info
            rparticipant = self._remote_participants[
                event.track_subscribed.participant_identity
            ]
            rpublication = rparticipant.track_publications[track_info.sid]
            rpublication.subscribed = True
            if track_info.kind == TrackKind.KIND_VIDEO:
                remote_video_track = RemoteVideoTrack(owned_track_info)
                rpublication.track = remote_video_track
                self.emit(
                    "track_subscribed", remote_video_track, rpublication, rparticipant
                )
            elif track_info.kind == TrackKind.KIND_AUDIO:
                remote_audio_track = RemoteAudioTrack(owned_track_info)
                rpublication.track = remote_audio_track
                self.emit(
                    "track_subscribed", remote_audio_track, rpublication, rparticipant
                )
        elif which == "track_unsubscribed":
            identity = event.track_unsubscribed.participant_identity
            rparticipant = self._remote_participants[identity]
            rpublication = rparticipant.track_publications[
                event.track_unsubscribed.track_sid
            ]
            track = rpublication.track
            rpublication.track = None
            rpublication.subscribed = False
            self.emit("track_unsubscribed", track, rpublication, rparticipant)
        elif which == "track_subscription_failed":
            identity = event.track_subscription_failed.participant_identity
            rparticipant = self._remote_participants[identity]
            error = event.track_subscription_failed.error
            self.emit(
                "track_subscription_failed",
                rparticipant,
                event.track_subscription_failed.track_sid,
                error,
            )
        elif which == "track_muted":
            identity = event.track_muted.participant_identity
            # TODO: pass participant identity
            participant = self._retrieve_participant(identity)
            assert isinstance(participant, Participant)
            publication = participant.track_publications[event.track_muted.track_sid]
            publication._info.muted = True
            if publication.track:
                publication.track._info.muted = True

            self.emit("track_muted", participant, publication)
        elif which == "track_unmuted":
            identity = event.track_unmuted.participant_identity
            # TODO: pass participant identity
            participant = self._retrieve_participant(identity)
            assert isinstance(participant, Participant)
            publication = participant.track_publications[event.track_unmuted.track_sid]
            publication._info.muted = False
            if publication.track:
                publication.track._info.muted = False

            self.emit("track_unmuted", participant, publication)
        elif which == "active_speakers_changed":
            speakers: list[Participant] = []
            # TODO: pass participant identity
            for identity in event.active_speakers_changed.participant_identities:
                participant = self._retrieve_participant(identity)
                assert isinstance(participant, Participant)
                speakers.append(participant)

            self.emit("active_speakers_changed", speakers)
        elif which == "room_metadata_changed":
            old_metadata = self.metadata
            self._info.metadata = event.room_metadata_changed.metadata
            self.emit("room_metadata_changed", old_metadata, self.metadata)
        elif which == "room_sid_changed":
            if not self._info.sid:
                self._first_sid_future.set_result(event.room_sid_changed.sid)
            self._info.sid = event.room_sid_changed.sid
            # This is an internal event, not exposed to users
        elif which == "participant_metadata_changed":
            identity = event.participant_metadata_changed.participant_identity
            # TODO: pass participant identity
            participant = self._retrieve_participant(identity)
            assert isinstance(participant, Participant)
            old_metadata = participant.metadata
            participant._info.metadata = event.participant_metadata_changed.metadata
            self.emit(
                "participant_metadata_changed",
                participant,
                old_metadata,
                participant.metadata,
            )
        elif which == "participant_name_changed":
            identity = event.participant_name_changed.participant_identity
            participant = self._retrieve_participant(identity)
            assert isinstance(participant, Participant)
            old_name = participant.name
            participant._info.name = event.participant_name_changed.name
            self.emit(
                "participant_name_changed", participant, old_name, participant.name
            )
        elif which == "participant_attributes_changed":
            identity = event.participant_attributes_changed.participant_identity
            attributes = event.participant_attributes_changed.attributes
            changed_attributes = dict(
                (entry.key, entry.value)
                for entry in event.participant_attributes_changed.changed_attributes
            )
            participant = self._retrieve_participant(identity)
            assert isinstance(participant, Participant)
            participant._info.attributes.clear()
            participant._info.attributes.update(
                (entry.key, entry.value) for entry in attributes
            )
            self.emit(
                "participant_attributes_changed",
                changed_attributes,
                participant,
            )
        elif which == "connection_quality_changed":
            identity = event.connection_quality_changed.participant_identity
            # TODO: pass participant identity
            participant = self._retrieve_participant(identity)
            self.emit(
                "connection_quality_changed",
                participant,
                event.connection_quality_changed.quality,
            )
        elif which == "transcription_received":
            transcription = event.transcription_received
            segments = [
                TranscriptionSegment(
                    id=s.id,
                    text=s.text,
                    final=s.final,
                    start_time=s.start_time,
                    end_time=s.end_time,
                    language=s.language,
                )
                for s in transcription.segments
            ]
            part = self._retrieve_participant(transcription.participant_identity)
            pub: TrackPublication | None = None
            if part:
                pub = part.track_publications.get(transcription.track_sid)
            self.emit("transcription_received", segments, part, pub)
        elif which == "data_packet_received":
            packet = event.data_packet_received
            which_val = packet.WhichOneof("value")
            if which_val == "user":
                owned_buffer_info = packet.user.data
                buffer_info = owned_buffer_info.data
                native_data = ctypes.cast(
                    buffer_info.data_ptr,
                    ctypes.POINTER(ctypes.c_byte * buffer_info.data_len),
                ).contents

                data = bytes(native_data)
                FfiHandle(owned_buffer_info.handle.id)
                rparticipant = cast(
                    RemoteParticipant,
                    self._retrieve_remote_participant(packet.participant_identity),
                )
                self.emit(
                    "data_received",
                    DataPacket(
                        data=data,
                        kind=packet.kind,
                        participant=rparticipant,
                        topic=packet.user.topic,
                    ),
                )
            elif which_val == "sip_dtmf":
                rparticipant = cast(
                    RemoteParticipant,
                    self._retrieve_remote_participant(packet.participant_identity),
                )
                self.emit(
                    "sip_dtmf_received",
                    SipDTMF(
                        code=packet.sip_dtmf.code,
                        digit=packet.sip_dtmf.digit,
                        participant=rparticipant,
                    ),
                )
        elif which == "e2ee_state_changed":
            identity = event.e2ee_state_changed.participant_identity
            e2ee_state = event.e2ee_state_changed.state
            # TODO: pass participant identity
            self.emit(
                "e2ee_state_changed", self._retrieve_participant(identity), e2ee_state
            )
        elif which == "connection_state_changed":
            connection_state = event.connection_state_changed.state
            self._connection_state = connection_state
            self.emit("connection_state_changed", connection_state)
        elif which == "connected":
            self.emit("connected")
        elif which == "disconnected":
            self.emit("disconnected", event.disconnected.reason)
        elif which == "reconnecting":
            self.emit("reconnecting")
        elif which == "reconnected":
            self.emit("reconnected")

    async def _drain_rpc_invocation_tasks(self) -> None:
        if self._rpc_invocation_tasks:
            for task in self._rpc_invocation_tasks:
                task.cancel()
            await asyncio.gather(*self._rpc_invocation_tasks, return_exceptions=True)

    def _retrieve_remote_participant(
        self, identity: str
    ) -> Optional[RemoteParticipant]:
        """Retrieve a remote participant by identity"""
        return self._remote_participants.get(identity, None)

    def _retrieve_participant(self, identity: str) -> Optional[Participant]:
        """Retrieve a local or remote participant by identity"""
        if identity and identity == self.local_participant.identity:
            return self.local_participant

        return self._retrieve_remote_participant(identity)

    def _create_remote_participant(
        self, owned_info: proto_participant.OwnedParticipant
    ) -> RemoteParticipant:
        if owned_info.info.identity in self._remote_participants:
            raise Exception("participant already exists")

        participant = RemoteParticipant(owned_info)
        self._remote_participants[participant.identity] = participant
        return participant

    def __repr__(self) -> str:
        sid = "unknown"
        if self._first_sid_future.done():
            sid = self._first_sid_future.result()

        return f"rtc.Room(sid={sid}, name={self.name}, metadata={self.metadata}, connection_state={self._connection_state})"

Ancestors

Instance variables

prop connection_state : ConnectionState.ValueType

Gets the connection state of the room.

Returns

ConnectionState
The connection state of the room.
Expand source code
@property
def connection_state(self) -> ConnectionState.ValueType:
    """Gets the connection state of the room.

    Returns:
        ConnectionState: The connection state of the room.
    """
    return self._connection_state
prop e2ee_managerE2EEManager

Gets the end-to-end encryption (E2EE) manager for the room.

Returns

E2EEManager
The E2EE manager instance.
Expand source code
@property
def e2ee_manager(self) -> E2EEManager:
    """Gets the end-to-end encryption (E2EE) manager for the room.

    Returns:
        E2EEManager: The E2EE manager instance.
    """
    return self._e2ee_manager
prop local_participantLocalParticipant

Gets the local participant in the room.

Returns

LocalParticipant
The local participant in the room.
Expand source code
@property
def local_participant(self) -> LocalParticipant:
    """Gets the local participant in the room.

    Returns:
        LocalParticipant: The local participant in the room.
    """
    if self._local_participant is None:
        raise Exception("cannot access local participant before connecting")

    return self._local_participant
prop metadata : str

Gets the metadata associated with the room.

Returns

str
The metadata of the room.
Expand source code
@property
def metadata(self) -> str:
    """Gets the metadata associated with the room.

    Returns:
        str: The metadata of the room.
    """
    return self._info.metadata
prop name : str

Gets the name of the room.

Returns

str
The name of the room.
Expand source code
@property
def name(self) -> str:
    """Gets the name of the room.

    Returns:
        str: The name of the room.
    """
    return self._info.name
prop remote_participants : Mapping[str, RemoteParticipant]

Gets the remote participants in the room.

Returns

dict[str, RemoteParticipant]
A dictionary of remote participants indexed by their

identity.

Expand source code
@property
def remote_participants(self) -> Mapping[str, RemoteParticipant]:
    """Gets the remote participants in the room.

    Returns:
        dict[str, RemoteParticipant]: A dictionary of remote participants indexed by their
        identity.
    """
    return self._remote_participants
prop sid : str

Asynchronously retrieves the session ID (SID) of the room.

Returns

str
The session ID of the room.
Expand source code
@property
async def sid(self) -> str:
    """Asynchronously retrieves the session ID (SID) of the room.

    Returns:
        str: The session ID of the room.
    """
    if self._info.sid:
        return self._info.sid

    return await self._first_sid_future

Methods

async def connect(self, url: str, token: str, options: RoomOptions = RoomOptions(auto_subscribe=True, dynacast=False, e2ee=None, rtc_config=None)) ‑> None

Connects to a LiveKit room using the specified URL and token.

Parameters

url (str): The WebSocket URL of the LiveKit server to connect to. token (str): The access token for authentication and authorization. options (RoomOptions, optional): Additional options for the room connection.

Raises

ConnectError
If the connection fails.

Example

room = Room()

# Listen for events before connecting to the room
@room.on("participant_connected")
def on_participant_connected(participant):
    print(f"Participant connected: {participant.identity}")

await room.connect("ws://localhost:7880", "your_token")
async def disconnect(self) ‑> None

Disconnects from the room.

def isconnected(self) ‑> bool

Checks if the room is currently connected.

Returns

bool
True if connected, False otherwise.
def on(self, event: EventTypes, callback: Optional[Callable] = None) ‑> Callable

Registers an event handler for a specific event type.

Parameters

event (EventTypes): The name of the event to listen for. callback (Callable): The function to call when the event occurs.

Returns

Callable
The registered callback function.

Available events: - "participant_connected": Called when a new participant joins the room. - Arguments: livekit.rtc.participant (RemoteParticipant) - "participant_disconnected": Called when a participant leaves the room. - Arguments: livekit.rtc.participant (RemoteParticipant) - "local_track_published": Called when a local track is published. - Arguments: publication (LocalTrackPublication), livekit.rtc.track (Track) - "local_track_unpublished": Called when a local track is unpublished. - Arguments: publication (LocalTrackPublication) - "local_track_subscribed": Called when a local track is subscribed. - Arguments: livekit.rtc.track (Track) - "track_published": Called when a remote participant publishes a track. - Arguments: publication (RemoteTrackPublication), livekit.rtc.participant (RemoteParticipant) - "track_unpublished": Called when a remote participant unpublishes a track. - Arguments: publication (RemoteTrackPublication), livekit.rtc.participant (RemoteParticipant) - "track_subscribed": Called when a track is subscribed. - Arguments: livekit.rtc.track (Track), publication (RemoteTrackPublication), livekit.rtc.participant (RemoteParticipant) - "track_unsubscribed": Called when a track is unsubscribed. - Arguments: livekit.rtc.track (Track), publication (RemoteTrackPublication), livekit.rtc.participant (RemoteParticipant) - "track_subscription_failed": Called when a track subscription fails. - Arguments: livekit.rtc.participant (RemoteParticipant), track_sid (str), error (str) - "track_muted": Called when a track is muted. - Arguments: livekit.rtc.participant (Participant), publication (TrackPublication) - "track_unmuted": Called when a track is unmuted. - Arguments: livekit.rtc.participant (Participant), publication (TrackPublication) - "active_speakers_changed": Called when the list of active speakers changes. - Arguments: speakers (list[Participant]) - "room_metadata_changed": Called when the room's metadata is updated. - Arguments: old_metadata (str), new_metadata (str) - "participant_metadata_changed": Called when a participant's metadata is updated. - Arguments: livekit.rtc.participant (Participant), old_metadata (str), new_metadata (str) - "participant_name_changed": Called when a participant's name is changed. - Arguments: livekit.rtc.participant (Participant), old_name (str), new_name (str) - "participant_attributes_changed": Called when a participant's attributes change. - Arguments: changed_attributes (dict), livekit.rtc.participant (Participant) - "connection_quality_changed": Called when a participant's connection quality changes. - Arguments: livekit.rtc.participant (Participant), quality (ConnectionQuality) - "transcription_received": Called when a transcription is received. - Arguments: segments (list[TranscriptionSegment]), livekit.rtc.participant (Participant), publication (TrackPublication) - "data_received": Called when data is received. - Arguments: data_packet (DataPacket) - "sip_dtmf_received": Called when a SIP DTMF signal is received. - Arguments: sip_dtmf (SipDTMF) - "e2ee_state_changed": Called when a participant's E2EE state changes. - Arguments: livekit.rtc.participant (Participant), state (EncryptionState) - "connection_state_changed": Called when the room's connection state changes. - Arguments: connection_state (ConnectionState) - "connected": Called when the room is successfully connected. - Arguments: None - "disconnected": Called when the room is disconnected. - Arguments: reason (DisconnectReason) - "reconnecting": Called when the room is attempting to reconnect. - Arguments: None - "reconnected": Called when the room has successfully reconnected. - Arguments: None

Example

def on_participant_connected(participant):
    print(f"Participant connected: {participant.identity}")

room.on("participant_connected", on_participant_connected)

Inherited members

class RoomOptions (auto_subscribe: bool = True, dynacast: bool = False, e2ee: E2EEOptions | None = None, rtc_config: RtcConfiguration | None = None)

RoomOptions(auto_subscribe: 'bool' = True, dynacast: 'bool' = False, e2ee: 'E2EEOptions | None' = None, rtc_config: 'RtcConfiguration | None' = None)

Expand source code
@dataclass
class RoomOptions:
    auto_subscribe: bool = True
    """Automatically subscribe to tracks when participants join."""
    dynacast: bool = False
    e2ee: E2EEOptions | None = None
    """Options for end-to-end encryption."""
    rtc_config: RtcConfiguration | None = None
    """WebRTC-related configuration."""

Class variables

var auto_subscribe : bool

Automatically subscribe to tracks when participants join.

var dynacast : bool
var e2ee : Optional[E2EEOptions]

Options for end-to-end encryption.

var rtc_configRtcConfiguration | None

WebRTC-related configuration.

class RpcError (code: Union[int, ForwardRef('RpcError.ErrorCode')], message: str, data: str | None = None)

Specialized error handling for RPC methods.

Instances of this type, when thrown in a method handler, will have their message serialized and sent across the wire. The caller will receive an equivalent error on the other side.

Built-in errors are included (codes 1001-1999) but developers may use the code, message, and data fields to create their own errors.

Creates an error object with the given code and message, plus an optional data payload.

If thrown in an RPC method handler, the error will be sent back to the caller.

Args

code : int
Your error code (Error codes 1001-1999 are reserved for built-in errors)
message : str
A readable error message.
data : Optional[str]
Optional additional data associated with the error (JSON recommended)
Expand source code
class RpcError(Exception):
    """
    Specialized error handling for RPC methods.

    Instances of this type, when thrown in a method handler, will have their `message`
    serialized and sent across the wire. The caller will receive an equivalent error on the other side.

    Built-in errors are included (codes 1001-1999) but developers may use the code, message, and data fields to create their own errors.
    """

    class ErrorCode(IntEnum):
        APPLICATION_ERROR = 1500
        CONNECTION_TIMEOUT = 1501
        RESPONSE_TIMEOUT = 1502
        RECIPIENT_DISCONNECTED = 1503
        RESPONSE_PAYLOAD_TOO_LARGE = 1504
        SEND_FAILED = 1505

        UNSUPPORTED_METHOD = 1400
        RECIPIENT_NOT_FOUND = 1401
        REQUEST_PAYLOAD_TOO_LARGE = 1402
        UNSUPPORTED_SERVER = 1403
        UNSUPPORTED_VERSION = 1404

    ErrorMessage: ClassVar[Dict[ErrorCode, str]] = {
        ErrorCode.APPLICATION_ERROR: "Application error in method handler",
        ErrorCode.CONNECTION_TIMEOUT: "Connection timeout",
        ErrorCode.RESPONSE_TIMEOUT: "Response timeout",
        ErrorCode.RECIPIENT_DISCONNECTED: "Recipient disconnected",
        ErrorCode.RESPONSE_PAYLOAD_TOO_LARGE: "Response payload too large",
        ErrorCode.SEND_FAILED: "Failed to send",
        ErrorCode.UNSUPPORTED_METHOD: "Method not supported at destination",
        ErrorCode.RECIPIENT_NOT_FOUND: "Recipient not found",
        ErrorCode.REQUEST_PAYLOAD_TOO_LARGE: "Request payload too large",
        ErrorCode.UNSUPPORTED_SERVER: "RPC not supported by server",
        ErrorCode.UNSUPPORTED_VERSION: "Unsupported RPC version",
    }

    def __init__(
        self,
        code: Union[int, "RpcError.ErrorCode"],
        message: str,
        data: Optional[str] = None,
    ):
        """
        Creates an error object with the given code and message, plus an optional data payload.

        If thrown in an RPC method handler, the error will be sent back to the caller.

        Args:
            code (int): Your error code (Error codes 1001-1999 are reserved for built-in errors)
            message (str): A readable error message.
            data (Optional[str]): Optional additional data associated with the error (JSON recommended)
        """
        super().__init__(message)
        self._code = code
        self._message = message
        self._data = data

    @property
    def code(self) -> int:
        """Error code value. Codes 1001-1999 are reserved for built-in errors (see RpcError.ErrorCode for their meanings)."""
        return self._code

    @property
    def message(self) -> str:
        """A readable error message."""
        return self._message

    @property
    def data(self) -> Optional[str]:
        """Optional additional data associated with the error (JSON recommended)."""
        return self._data

    @classmethod
    def _from_proto(cls, proto: proto_rpc.RpcError) -> "RpcError":
        return cls(proto.code, proto.message, proto.data)

    def _to_proto(self) -> proto_rpc.RpcError:
        return proto_rpc.RpcError(code=self.code, message=self.message, data=self.data)

    @classmethod
    def _built_in(
        cls, code: "RpcError.ErrorCode", data: Optional[str] = None
    ) -> "RpcError":
        message = cls.ErrorMessage[code]
        return cls(code, message, data)

Ancestors

  • builtins.Exception
  • builtins.BaseException

Class variables

var ErrorCode

Enum where members are also (and must be) ints

var ErrorMessage : ClassVar[Dict[RpcError.ErrorCode, str]]

Instance variables

prop code : int

Error code value. Codes 1001-1999 are reserved for built-in errors (see RpcError.ErrorCode for their meanings).

Expand source code
@property
def code(self) -> int:
    """Error code value. Codes 1001-1999 are reserved for built-in errors (see RpcError.ErrorCode for their meanings)."""
    return self._code
prop data : str | None

Optional additional data associated with the error (JSON recommended).

Expand source code
@property
def data(self) -> Optional[str]:
    """Optional additional data associated with the error (JSON recommended)."""
    return self._data
prop message : str

A readable error message.

Expand source code
@property
def message(self) -> str:
    """A readable error message."""
    return self._message
class RpcInvocationData (request_id: str, caller_identity: str, payload: str, response_timeout: float)

Data passed to method handler for incoming RPC invocations

Attributes

request_id : str
The unique request ID. Will match at both sides of the call, useful for debugging or logging.
caller_identity : str
The unique participant identity of the caller.
payload : str
The payload of the request. User-definable format, typically JSON.
response_timeout : float
The maximum time the caller will wait for a response.
Expand source code
@dataclass
class RpcInvocationData:
    """Data passed to method handler for incoming RPC invocations

    Attributes:
        request_id (str): The unique request ID. Will match at both sides of the call, useful for debugging or logging.
        caller_identity (str): The unique participant identity of the caller.
        payload (str): The payload of the request. User-definable format, typically JSON.
        response_timeout (float): The maximum time the caller will wait for a response.
    """

    request_id: str
    caller_identity: str
    payload: str
    response_timeout: float

Class variables

var caller_identity : str
var payload : str
var request_id : str
var response_timeout : float
class RtcConfiguration (ice_transport_type: proto_room.IceTransportType.ValueType = 2, continual_gathering_policy: proto_room.ContinualGatheringPolicy.ValueType = 1, ice_servers: list[proto_room.IceServer] = <factory>)

RtcConfiguration(ice_transport_type: 'proto_room.IceTransportType.ValueType' = 2, continual_gathering_policy: 'proto_room.ContinualGatheringPolicy.ValueType' = 1, ice_servers: 'list[proto_room.IceServer]' = )

Expand source code
@dataclass
class RtcConfiguration:
    ice_transport_type: proto_room.IceTransportType.ValueType = (
        proto_room.IceTransportType.TRANSPORT_ALL
    )
    """Specifies the type of ICE transport to be used (e.g., all, relay, etc.)."""
    continual_gathering_policy: proto_room.ContinualGatheringPolicy.ValueType = (
        proto_room.ContinualGatheringPolicy.GATHER_CONTINUALLY
    )
    """Policy for continual gathering of ICE candidates."""
    ice_servers: list[proto_room.IceServer] = field(default_factory=list)
    """List of ICE servers for STUN/TURN. When empty, it uses the default ICE servers provided by
    the SFU."""

Class variables

var continual_gathering_policy : int

Policy for continual gathering of ICE candidates.

var ice_servers : list[room_pb2.IceServer]

List of ICE servers for STUN/TURN. When empty, it uses the default ICE servers provided by the SFU.

var ice_transport_type : int

Specifies the type of ICE transport to be used (e.g., all, relay, etc.).

class SipDTMF (code: int, digit: str, participant: RemoteParticipant | None = None)

SipDTMF(code: 'int', digit: 'str', participant: 'RemoteParticipant | None' = None)

Expand source code
@dataclass
class SipDTMF:
    code: int
    """DTMF code corresponding to the digit."""
    digit: str
    """DTMF digit sent."""
    participant: RemoteParticipant | None = None
    """Participant who sent the DTMF digit. None when sent by a server SDK."""

Class variables

var code : int

DTMF code corresponding to the digit.

var digit : str

DTMF digit sent.

var participantRemoteParticipant | None

Participant who sent the DTMF digit. None when sent by a server SDK.

class Track (owned_info: track_pb2.OwnedTrack)
Expand source code
class Track:
    def __init__(self, owned_info: proto_track.OwnedTrack):
        self._info = owned_info.info
        self._ffi_handle = FfiHandle(owned_info.handle.id)

    @property
    def sid(self) -> str:
        return self._info.sid

    @property
    def name(self) -> str:
        return self._info.name

    @property
    def kind(self) -> proto_track.TrackKind.ValueType:
        return self._info.kind

    @property
    def stream_state(self) -> proto_track.StreamState.ValueType:
        return self._info.stream_state

    @property
    def muted(self) -> bool:
        return self._info.muted

    async def get_stats(self) -> List[proto_stats.RtcStats]:
        req = proto_ffi.FfiRequest()
        req.get_stats.track_handle = self._ffi_handle.handle

        queue = FfiClient.instance.queue.subscribe()
        try:
            resp = FfiClient.instance.request(req)
            cb: proto_ffi.FfiEvent = await queue.wait_for(
                lambda e: e.get_stats.async_id == resp.get_stats.async_id
            )
        finally:
            FfiClient.instance.queue.unsubscribe(queue)

        if cb.get_stats.error:
            raise Exception(cb.get_stats.error)

        return list(cb.get_stats.stats)

Subclasses

Instance variables

prop kind : int
Expand source code
@property
def kind(self) -> proto_track.TrackKind.ValueType:
    return self._info.kind
prop muted : bool
Expand source code
@property
def muted(self) -> bool:
    return self._info.muted
prop name : str
Expand source code
@property
def name(self) -> str:
    return self._info.name
prop sid : str
Expand source code
@property
def sid(self) -> str:
    return self._info.sid
prop stream_state : int
Expand source code
@property
def stream_state(self) -> proto_track.StreamState.ValueType:
    return self._info.stream_state

Methods

async def get_stats(self) ‑> List[stats_pb2.RtcStats]
class TrackPublication (owned_info: track_pb2.OwnedTrackPublication)
Expand source code
class TrackPublication:
    def __init__(self, owned_info: proto_track.OwnedTrackPublication):
        self._info = owned_info.info
        self.track: Optional[Track] = None
        self._ffi_handle = FfiHandle(owned_info.handle.id)

    @property
    def sid(self) -> str:
        return self._info.sid

    @property
    def name(self) -> str:
        return self._info.name

    @property
    def kind(self) -> proto_track.TrackKind.ValueType:
        return self._info.kind

    @property
    def source(self) -> proto_track.TrackSource.ValueType:
        return self._info.source

    @property
    def simulcasted(self) -> bool:
        return self._info.simulcasted

    @property
    def width(self) -> int:
        return self._info.width

    @property
    def height(self) -> int:
        return self._info.height

    @property
    def mime_type(self) -> str:
        return self._info.mime_type

    @property
    def muted(self) -> bool:
        return self._info.muted

    @property
    def encryption_type(self) -> proto_e2ee.EncryptionType.ValueType:
        return self._info.encryption_type

Subclasses

Instance variables

prop encryption_type : int
Expand source code
@property
def encryption_type(self) -> proto_e2ee.EncryptionType.ValueType:
    return self._info.encryption_type
prop height : int
Expand source code
@property
def height(self) -> int:
    return self._info.height
prop kind : int
Expand source code
@property
def kind(self) -> proto_track.TrackKind.ValueType:
    return self._info.kind
prop mime_type : str
Expand source code
@property
def mime_type(self) -> str:
    return self._info.mime_type
prop muted : bool
Expand source code
@property
def muted(self) -> bool:
    return self._info.muted
prop name : str
Expand source code
@property
def name(self) -> str:
    return self._info.name
prop sid : str
Expand source code
@property
def sid(self) -> str:
    return self._info.sid
prop simulcasted : bool
Expand source code
@property
def simulcasted(self) -> bool:
    return self._info.simulcasted
prop source : int
Expand source code
@property
def source(self) -> proto_track.TrackSource.ValueType:
    return self._info.source
prop width : int
Expand source code
@property
def width(self) -> int:
    return self._info.width
class TrackPublishOptions (*args, **kwargs)

A ProtocolMessage

Ancestors

  • google._upb._message.Message
  • google.protobuf.message.Message

Class variables

var DESCRIPTOR
class Transcription (participant_identity: str, track_sid: str, segments: List[ForwardRef('TranscriptionSegment')])

Transcription(participant_identity: str, track_sid: str, segments: List[ForwardRef('TranscriptionSegment')])

Expand source code
@dataclass
class Transcription:
    participant_identity: str
    track_sid: str
    segments: List["TranscriptionSegment"]

Class variables

var participant_identity : str
var segments : List[TranscriptionSegment]
var track_sid : str
class TranscriptionSegment (id: str, text: str, start_time: int, end_time: int, language: str, final: bool)

TranscriptionSegment(id: str, text: str, start_time: int, end_time: int, language: str, final: bool)

Expand source code
@dataclass
class TranscriptionSegment:
    id: str
    text: str
    start_time: int
    end_time: int
    language: str
    final: bool

Class variables

var end_time : int
var final : bool
var id : str
var language : str
var start_time : int
var text : str
class VideoEncoding (*args, **kwargs)

A ProtocolMessage

Ancestors

  • google._upb._message.Message
  • google.protobuf.message.Message

Class variables

var DESCRIPTOR
class VideoFrame (width: int, height: int, type: int, data: Union[bytes, bytearray, memoryview])

Represents a video frame with associated metadata and pixel data.

This class provides methods to access video frame properties such as width, height, and pixel format, as well as methods for manipulating and converting video frames.

Initializes a new VideoFrame instance.

Args

width : int
The width of the video frame in pixels.
height : int
The height of the video frame in pixels.
type : proto_video.VideoBufferType.ValueType
The format type of the video frame data (e.g., RGBA, BGRA, RGB24, etc.).
data : Union[bytes, bytearray, memoryview]
The raw pixel data for the video frame.
Expand source code
class VideoFrame:
    """
    Represents a video frame with associated metadata and pixel data.

    This class provides methods to access video frame properties such as width, height,
    and pixel format, as well as methods for manipulating and converting video frames.
    """

    def __init__(
        self,
        width: int,
        height: int,
        type: proto_video.VideoBufferType.ValueType,
        data: Union[bytes, bytearray, memoryview],
    ) -> None:
        """
        Initializes a new VideoFrame instance.

        Args:
            width (int): The width of the video frame in pixels.
            height (int): The height of the video frame in pixels.
            type (proto_video.VideoBufferType.ValueType): The format type of the video frame data
                (e.g., RGBA, BGRA, RGB24, etc.).
            data (Union[bytes, bytearray, memoryview]): The raw pixel data for the video frame.
        """
        self._width = width
        self._height = height
        self._type = type
        self._data = bytearray(data)

    @property
    def width(self) -> int:
        """
        Returns the width of the video frame in pixels.

        Returns:
            int: The width of the video frame.
        """
        return self._width

    @property
    def height(self) -> int:
        """
        Returns the height of the video frame in pixels.

        Returns:
            int: The height of the video frame.
        """
        return self._height

    @property
    def type(self) -> proto_video.VideoBufferType.ValueType:
        """
        Returns the height of the video frame in pixels.

        Returns:
            int: The height of the video frame.
        """
        return self._type

    @property
    def data(self) -> memoryview:
        """
        Returns a memoryview of the raw pixel data for the video frame.

        Returns:
            memoryview: The raw pixel data of the video frame as a memoryview object.
        """
        return memoryview(self._data)

    @staticmethod
    def _from_owned_info(owned_info: proto_video.OwnedVideoBuffer) -> "VideoFrame":
        info = owned_info.info
        data_len = _get_plane_length(info.type, info.width, info.height)
        cdata = (ctypes.c_uint8 * data_len).from_address(info.data_ptr)
        data = bytearray(cdata)
        frame = VideoFrame(
            width=info.width,
            height=info.height,
            type=info.type,
            data=data,
        )
        FfiHandle(owned_info.handle.id)
        return frame

    def _proto_info(self) -> proto_video.VideoBufferInfo:
        info = proto_video.VideoBufferInfo()
        addr = get_address(self.data)
        info.components.extend(
            _get_plane_infos(addr, self.type, self.width, self.height)
        )
        info.width = self.width
        info.height = self.height
        info.type = self.type
        info.data_ptr = addr
        info.stride = 0

        if self.type in [
            proto_video.VideoBufferType.ARGB,
            proto_video.VideoBufferType.ABGR,
            proto_video.VideoBufferType.RGBA,
            proto_video.VideoBufferType.BGRA,
        ]:
            info.stride = self.width * 4
        elif self.type == proto_video.VideoBufferType.RGB24:
            info.stride = self.width * 3

        return info

    def get_plane(self, plane_nth: int) -> Optional[memoryview]:
        """
        Returns the memoryview of a specific plane in the video frame, based on its index.

        Some video formats (e.g., I420, NV12) contain multiple planes (Y, U, V channels).
        This method allows access to individual planes by index.

        Args:
            plane_nth (int): The index of the plane to retrieve (starting from 0).

        Returns:
            Optional[memoryview]: A memoryview of the specified plane's data, or None if
            the index is out of bounds for the format.
        """
        plane_infos = _get_plane_infos(
            get_address(self.data), self.type, self.width, self.height
        )
        if plane_nth >= len(plane_infos):
            return None

        plane_info = plane_infos[plane_nth]
        cdata = (ctypes.c_uint8 * plane_info.size).from_address(plane_info.data_ptr)
        return memoryview(cdata)

    def convert(
        self, type: proto_video.VideoBufferType.ValueType, *, flip_y: bool = False
    ) -> "VideoFrame":
        """
        Converts the current video frame to a different format type, optionally flipping
        the frame vertically.

        Args:
            type (proto_video.VideoBufferType.ValueType): The target format type to convert to
                (e.g., RGBA, I420).
            flip_y (bool, optional): If True, the frame will be flipped vertically. Defaults to False.

        Returns:
            VideoFrame: A new VideoFrame object in the specified format.

        Raises:
            Exception: If the conversion isn't supported.

        Example:
            Convert a frame from RGBA to I420 format:

            >>> frame = VideoFrame(width=1920, height=1080, type=proto_video.VideoBufferType.RGBA, data=raw_data)
            >>> converted_frame = frame.convert(proto_video.VideoBufferType.I420)
            >>> print(converted_frame.type)
            VideoBufferType.I420

        Example:
            Convert a frame from BGRA to RGB24 format and flip it vertically:

            >>> frame = VideoFrame(width=1280, height=720, type=proto_video.VideoBufferType.BGRA, data=raw_data)
            >>> converted_frame = frame.convert(proto_video.VideoBufferType.RGB24, flip_y=True)
            >>> print(converted_frame.type)
            VideoBufferType.RGB24
            >>> print(converted_frame.width, converted_frame.height)
            1280 720
        """
        req = proto.FfiRequest()
        req.video_convert.flip_y = flip_y
        req.video_convert.dst_type = type
        req.video_convert.buffer.CopyFrom(self._proto_info())
        resp = FfiClient.instance.request(req)
        if resp.video_convert.error:
            raise Exception(resp.video_convert.error)

        return VideoFrame._from_owned_info(resp.video_convert.buffer)

    def __repr__(self) -> str:
        return f"rtc.VideoFrame(width={self.width}, height={self.height}, type={self.type})"

Instance variables

prop data : memoryview

Returns a memoryview of the raw pixel data for the video frame.

Returns

memoryview
The raw pixel data of the video frame as a memoryview object.
Expand source code
@property
def data(self) -> memoryview:
    """
    Returns a memoryview of the raw pixel data for the video frame.

    Returns:
        memoryview: The raw pixel data of the video frame as a memoryview object.
    """
    return memoryview(self._data)
prop height : int

Returns the height of the video frame in pixels.

Returns

int
The height of the video frame.
Expand source code
@property
def height(self) -> int:
    """
    Returns the height of the video frame in pixels.

    Returns:
        int: The height of the video frame.
    """
    return self._height
prop type : int

Returns the height of the video frame in pixels.

Returns

int
The height of the video frame.
Expand source code
@property
def type(self) -> proto_video.VideoBufferType.ValueType:
    """
    Returns the height of the video frame in pixels.

    Returns:
        int: The height of the video frame.
    """
    return self._type
prop width : int

Returns the width of the video frame in pixels.

Returns

int
The width of the video frame.
Expand source code
@property
def width(self) -> int:
    """
    Returns the width of the video frame in pixels.

    Returns:
        int: The width of the video frame.
    """
    return self._width

Methods

def convert(self, type: int, *, flip_y: bool = False) ‑> VideoFrame

Converts the current video frame to a different format type, optionally flipping the frame vertically.

Args

type : proto_video.VideoBufferType.ValueType
The target format type to convert to (e.g., RGBA, I420).
flip_y : bool, optional
If True, the frame will be flipped vertically. Defaults to False.

Returns

VideoFrame
A new VideoFrame object in the specified format.

Raises

Exception
If the conversion isn't supported.

Example

Convert a frame from RGBA to I420 format:

>>> frame = VideoFrame(width=1920, height=1080, type=proto_video.VideoBufferType.RGBA, data=raw_data)
>>> converted_frame = frame.convert(proto_video.VideoBufferType.I420)
>>> print(converted_frame.type)
VideoBufferType.I420

Example

Convert a frame from BGRA to RGB24 format and flip it vertically:

>>> frame = VideoFrame(width=1280, height=720, type=proto_video.VideoBufferType.BGRA, data=raw_data)
>>> converted_frame = frame.convert(proto_video.VideoBufferType.RGB24, flip_y=True)
>>> print(converted_frame.type)
VideoBufferType.RGB24
>>> print(converted_frame.width, converted_frame.height)
1280 720
def get_plane(self, plane_nth: int) ‑> Optional[memoryview]

Returns the memoryview of a specific plane in the video frame, based on its index.

Some video formats (e.g., I420, NV12) contain multiple planes (Y, U, V channels). This method allows access to individual planes by index.

Args

plane_nth : int
The index of the plane to retrieve (starting from 0).

Returns

Optional[memoryview]
A memoryview of the specified plane's data, or None if

the index is out of bounds for the format.

class VideoFrameEvent (frame: VideoFrame, timestamp_us: int, rotation: proto_video_frame.VideoRotation)

VideoFrameEvent(frame: 'VideoFrame', timestamp_us: 'int', rotation: 'proto_video_frame.VideoRotation')

Expand source code
@dataclass
class VideoFrameEvent:
    frame: VideoFrame
    timestamp_us: int
    rotation: proto_video_frame.VideoRotation

Class variables

var frameVideoFrame
var rotation
var timestamp_us : int
class VideoSource (width: int, height: int)
Expand source code
class VideoSource:
    def __init__(self, width: int, height: int) -> None:
        req = proto_ffi.FfiRequest()
        req.new_video_source.type = proto_video.VideoSourceType.VIDEO_SOURCE_NATIVE
        req.new_video_source.resolution.width = width
        req.new_video_source.resolution.height = height

        resp = FfiClient.instance.request(req)
        self._info = resp.new_video_source.source
        self._ffi_handle = FfiHandle(self._info.handle.id)

    def capture_frame(
        self,
        frame: VideoFrame,
        *,
        timestamp_us: int = 0,
        rotation: proto_video.VideoRotation.ValueType = proto_video.VideoRotation.VIDEO_ROTATION_0,
    ) -> None:
        req = proto_ffi.FfiRequest()
        req.capture_video_frame.source_handle = self._ffi_handle.handle
        req.capture_video_frame.buffer.CopyFrom(frame._proto_info())
        req.capture_video_frame.rotation = rotation
        req.capture_video_frame.timestamp_us = timestamp_us
        FfiClient.instance.request(req)

Methods

def capture_frame(self, frame: VideoFrame, *, timestamp_us: int = 0, rotation: int = 0) ‑> None
class VideoStream (track: Track, loop: Optional[asyncio.AbstractEventLoop] = None, capacity: int = 0, format: Optional[proto_video_frame.VideoBufferType.ValueType] = None, **kwargs)

VideoStream is a stream of video frames received from a RemoteTrack.

Expand source code
class VideoStream:
    """VideoStream is a stream of video frames received from a RemoteTrack."""

    def __init__(
        self,
        track: Track,
        loop: Optional[asyncio.AbstractEventLoop] = None,
        capacity: int = 0,
        format: Optional[proto_video_frame.VideoBufferType.ValueType] = None,
        **kwargs,
    ) -> None:
        self._loop = loop or asyncio.get_event_loop()
        self._ffi_queue = FfiClient.instance.queue.subscribe(self._loop)
        self._queue: RingQueue[VideoFrameEvent | None] = RingQueue(capacity)
        self._track: Track | None = track
        self._format = format
        self._capacity = capacity
        self._format = format
        stream: Any = None
        if "participant" in kwargs:
            stream = self._create_owned_stream_from_participant(
                participant=kwargs["participant"], track_source=kwargs["track_source"]
            )
        else:
            stream = self._create_owned_stream()

        self._ffi_handle = FfiHandle(stream.handle.id)
        self._info = stream.info

        self._task = self._loop.create_task(self._run())
        self._task.add_done_callback(task_done_logger)

    @classmethod
    def from_participant(
        cls,
        *,
        participant: Participant,
        track_source: TrackSource.ValueType,
        loop: Optional[asyncio.AbstractEventLoop] = None,
        format: Optional[proto_video_frame.VideoBufferType.ValueType] = None,
        capacity: int = 0,
    ) -> VideoStream:
        return VideoStream(
            participant=participant,
            track_source=track_source,
            loop=loop,
            capacity=capacity,
            format=format,
            track=None,  # type: ignore
        )

    @classmethod
    def from_track(
        cls,
        *,
        track: Track,
        loop: Optional[asyncio.AbstractEventLoop] = None,
        format: Optional[proto_video_frame.VideoBufferType.ValueType] = None,
        capacity: int = 0,
    ) -> VideoStream:
        return VideoStream(
            track=track,
            loop=loop,
            capacity=capacity,
            format=format,
        )

    def __del__(self) -> None:
        FfiClient.instance.queue.unsubscribe(self._ffi_queue)

    def _create_owned_stream(self) -> Any:
        assert self._track is not None
        req = proto_ffi.FfiRequest()
        new_video_stream = req.new_video_stream
        new_video_stream.track_handle = self._track._ffi_handle.handle
        new_video_stream.type = proto_video_frame.VideoStreamType.VIDEO_STREAM_NATIVE
        if self._format is not None:
            new_video_stream.format = self._format
        new_video_stream.normalize_stride = True
        resp = FfiClient.instance.request(req)
        return resp.new_video_stream.stream

    def _create_owned_stream_from_participant(
        self, participant: Participant, track_source: TrackSource.ValueType
    ) -> Any:
        req = proto_ffi.FfiRequest()
        video_stream_from_participant = req.video_stream_from_participant
        video_stream_from_participant.participant_handle = (
            participant._ffi_handle.handle
        )
        video_stream_from_participant.type = (
            proto_video_frame.VideoStreamType.VIDEO_STREAM_NATIVE
        )
        video_stream_from_participant.track_source = track_source
        video_stream_from_participant.normalize_stride = True
        if self._format is not None:
            video_stream_from_participant.format = self._format
        resp = FfiClient.instance.request(req)
        return resp.video_stream_from_participant.stream

    async def _run(self) -> None:
        while True:
            event = await self._ffi_queue.wait_for(self._is_event)
            video_event = event.video_stream_event

            if video_event.HasField("frame_received"):
                owned_buffer_info = video_event.frame_received.buffer
                frame = VideoFrame._from_owned_info(owned_buffer_info)

                event = VideoFrameEvent(
                    frame=frame,
                    timestamp_us=video_event.frame_received.timestamp_us,
                    rotation=video_event.frame_received.rotation,
                )

                self._queue.put(event)
            elif video_event.HasField("eos"):
                break

        FfiClient.instance.queue.unsubscribe(self._ffi_queue)

    async def aclose(self) -> None:
        self._ffi_handle.dispose()
        await self._task

    def _is_event(self, e: proto_ffi.FfiEvent) -> bool:
        return e.video_stream_event.stream_handle == self._ffi_handle.handle

    def __aiter__(self) -> AsyncIterator[VideoFrameEvent]:
        return self

    async def __anext__(self) -> VideoFrameEvent:
        if self._task.done():
            raise StopAsyncIteration

        item = await self._queue.get()
        if item is None:
            raise StopAsyncIteration

        return item

Static methods

def from_participant(*, participant: Participant, track_source: TrackSource.ValueType, loop: Optional[asyncio.AbstractEventLoop] = None, format: Optional[proto_video_frame.VideoBufferType.ValueType] = None, capacity: int = 0) ‑> VideoStream
def from_track(*, track: Track, loop: Optional[asyncio.AbstractEventLoop] = None, format: Optional[proto_video_frame.VideoBufferType.ValueType] = None, capacity: int = 0) ‑> VideoStream

Methods

async def aclose(self) ‑> None