Module livekit.rtc
LiveKit RTC SDK
Sub-modules
livekit.rtc.audio_frame
livekit.rtc.audio_resampler
livekit.rtc.audio_source
livekit.rtc.audio_stream
livekit.rtc.chat
livekit.rtc.e2ee
livekit.rtc.event_emitter
livekit.rtc.log
livekit.rtc.participant
livekit.rtc.resources
-
Used by importlib.resources and setuptools
livekit.rtc.room
livekit.rtc.rpc
livekit.rtc.synchronizer
livekit.rtc.track
livekit.rtc.track_publication
livekit.rtc.transcription
livekit.rtc.utils
livekit.rtc.version
livekit.rtc.video_frame
livekit.rtc.video_source
livekit.rtc.video_stream
Functions
def combine_audio_frames(buffer: AudioFrame | list[AudioFrame]) ‑> AudioFrame
-
Expand source code
def combine_audio_frames(buffer: AudioFrame | list[AudioFrame]) -> AudioFrame: """ Combines one or more `rtc.AudioFrame` objects into a single `rtc.AudioFrame`. This function concatenates the audio data from multiple frames, ensuring that all frames have the same sample rate and number of channels. It efficiently merges the data by preallocating the necessary memory and copying the frame data without unnecessary reallocations. Args: buffer: A single `rtc.AudioFrame` or a list of `rtc.AudioFrame` objects to be combined. Returns: rtc.AudioFrame: A new `rtc.AudioFrame` containing the combined audio data. Raises: ValueError: If the buffer is empty. ValueError: If frames have differing sample rates. ValueError: If frames have differing numbers of channels. Example: >>> frame1 = rtc.AudioFrame( ... data=b"\x01\x02", sample_rate=48000, num_channels=2, samples_per_channel=1 ... ) >>> frame2 = rtc.AudioFrame( ... data=b"\x03\x04", sample_rate=48000, num_channels=2, samples_per_channel=1 ... ) >>> combined_frame = combine_audio_frames([frame1, frame2]) >>> combined_frame.data b'\x01\x02\x03\x04' >>> combined_frame.sample_rate 48000 >>> combined_frame.num_channels 2 >>> combined_frame.samples_per_channel 2 """ if not isinstance(buffer, list): return buffer if not buffer: raise ValueError("buffer is empty") sample_rate = buffer[0].sample_rate num_channels = buffer[0].num_channels total_data_length = 0 total_samples_per_channel = 0 for frame in buffer: if frame.sample_rate != sample_rate: raise ValueError( f"Sample rate mismatch: expected {sample_rate}, got {frame.sample_rate}" ) if frame.num_channels != num_channels: raise ValueError( f"Channel count mismatch: expected {num_channels}, got {frame.num_channels}" ) total_data_length += len(frame.data) total_samples_per_channel += frame.samples_per_channel data = bytearray(total_data_length) offset = 0 for frame in buffer: frame_data = frame.data.cast("b") data[offset : offset + len(frame_data)] = frame_data offset += len(frame_data) return AudioFrame( data=data, sample_rate=sample_rate, num_channels=num_channels, samples_per_channel=total_samples_per_channel, )
Combines one or more
rtc.AudioFrame
objects into a singlertc.AudioFrame
.This function concatenates the audio data from multiple frames, ensuring that all frames have the same sample rate and number of channels. It efficiently merges the data by preallocating the necessary memory and copying the frame data without unnecessary reallocations.
Args
buffer
- A single
rtc.AudioFrame
or a list ofrtc.AudioFrame
objects to be combined.
Returns
rtc.AudioFrame
- A new
rtc.AudioFrame
containing the combined audio data.
Raises
ValueError
- If the buffer is empty.
ValueError
- If frames have differing sample rates.
ValueError
- If frames have differing numbers of channels.
Example
>>> frame1 = rtc.AudioFrame( ... data=b"", sample_rate=48000, num_channels=2, samples_per_channel=1 ... ) >>> frame2 = rtc.AudioFrame( ... data=b"", sample_rate=48000, num_channels=2, samples_per_channel=1 ... ) >>> combined_frame = combine_audio_frames([frame1, frame2]) >>> combined_frame.data b'' >>> combined_frame.sample_rate 48000 >>> combined_frame.num_channels 2 >>> combined_frame.samples_per_channel 2
Classes
class AVSynchronizer (*,
audio_source: AudioSource,
video_source: VideoSource,
video_fps: float,
video_queue_size_ms: float = 100)-
Expand source code
class AVSynchronizer: """Synchronize audio and video capture. Usage: av_sync = AVSynchronizer( audio_source=audio_source, video_source=video_source, video_fps=video_fps, ) async for video_frame, audio_frame in video_generator: await av_sync.push(video_frame) await av_sync.push(audio_frame) """ def __init__( self, *, audio_source: AudioSource, video_source: VideoSource, video_fps: float, video_queue_size_ms: float = 100, _max_delay_tolerance_ms: float = 300, ): self._audio_source = audio_source self._video_source = video_source self._video_fps = video_fps self._video_queue_size_ms = video_queue_size_ms self._max_delay_tolerance_ms = _max_delay_tolerance_ms self._stopped = False self._video_queue_max_size = int( self._video_fps * self._video_queue_size_ms / 1000 ) if self._video_queue_size_ms > 0: # ensure queue is bounded if queue size is specified self._video_queue_max_size = max(1, self._video_queue_max_size) self._video_queue = asyncio.Queue[VideoFrame]( maxsize=self._video_queue_max_size ) self._fps_controller = _FPSController( expected_fps=self._video_fps, max_delay_tolerance_ms=self._max_delay_tolerance_ms, ) self._capture_video_task = asyncio.create_task(self._capture_video()) async def push(self, frame: Union[VideoFrame, AudioFrame]) -> None: if isinstance(frame, AudioFrame): await self._audio_source.capture_frame(frame) return await self._video_queue.put(frame) async def clear_queue(self) -> None: self._audio_source.clear_queue() while not self._video_queue.empty(): await self._video_queue.get() async def wait_for_playout(self) -> None: """Wait until all video and audio frames are played out.""" await self._audio_source.wait_for_playout() await self._video_queue.join() async def _capture_video(self) -> None: while not self._stopped: frame = await self._video_queue.get() async with self._fps_controller: self._video_source.capture_frame(frame) self._video_queue.task_done() async def aclose(self) -> None: self._stopped = True if self._capture_video_task: self._capture_video_task.cancel() @property def actual_fps(self) -> float: return self._fps_controller.actual_fps
Synchronize audio and video capture.
Usage
av_sync = AVSynchronizer( audio_source=audio_source, video_source=video_source, video_fps=video_fps, )
async for video_frame, audio_frame in video_generator: await av_sync.push(video_frame) await av_sync.push(audio_frame)
Instance variables
prop actual_fps : float
-
Expand source code
@property def actual_fps(self) -> float: return self._fps_controller.actual_fps
Methods
async def aclose(self) ‑> None
-
Expand source code
async def aclose(self) -> None: self._stopped = True if self._capture_video_task: self._capture_video_task.cancel()
async def clear_queue(self) ‑> None
-
Expand source code
async def clear_queue(self) -> None: self._audio_source.clear_queue() while not self._video_queue.empty(): await self._video_queue.get()
async def push(self,
frame: VideoFrame | AudioFrame) ‑> None-
Expand source code
async def push(self, frame: Union[VideoFrame, AudioFrame]) -> None: if isinstance(frame, AudioFrame): await self._audio_source.capture_frame(frame) return await self._video_queue.put(frame)
async def wait_for_playout(self) ‑> None
-
Expand source code
async def wait_for_playout(self) -> None: """Wait until all video and audio frames are played out.""" await self._audio_source.wait_for_playout() await self._video_queue.join()
Wait until all video and audio frames are played out.
class AudioFrame (data: bytes | bytearray | memoryview,
sample_rate: int,
num_channels: int,
samples_per_channel: int)-
Expand source code
class AudioFrame: """ A class that represents a frame of audio data with specific properties such as sample rate, number of channels, and samples per channel. The format of the audio data is 16-bit signed integers (int16) interleaved by channel. """ def __init__( self, data: Union[bytes, bytearray, memoryview], sample_rate: int, num_channels: int, samples_per_channel: int, ) -> None: """ Initialize an AudioFrame instance. Args: data (Union[bytes, bytearray, memoryview]): The raw audio data, which must be at least `num_channels * samples_per_channel * sizeof(int16)` bytes long. sample_rate (int): The sample rate of the audio in Hz. num_channels (int): The number of audio channels (e.g., 1 for mono, 2 for stereo). samples_per_channel (int): The number of samples per channel. Raises: ValueError: If the length of `data` is smaller than the required size. """ if len(data) < num_channels * samples_per_channel * ctypes.sizeof( ctypes.c_int16 ): raise ValueError( "data length must be >= num_channels * samples_per_channel * sizeof(int16)" ) self._data = bytearray(data) self._sample_rate = sample_rate self._num_channels = num_channels self._samples_per_channel = samples_per_channel @staticmethod def create( sample_rate: int, num_channels: int, samples_per_channel: int ) -> "AudioFrame": """ Create a new empty AudioFrame instance with specified sample rate, number of channels, and samples per channel. Args: sample_rate (int): The sample rate of the audio in Hz. num_channels (int): The number of audio channels (e.g., 1 for mono, 2 for stereo). samples_per_channel (int): The number of samples per channel. Returns: AudioFrame: A new AudioFrame instance with uninitialized (zeroed) data. """ size = num_channels * samples_per_channel * ctypes.sizeof(ctypes.c_int16) data = bytearray(size) return AudioFrame(data, sample_rate, num_channels, samples_per_channel) @staticmethod def _from_owned_info(owned_info: proto_audio.OwnedAudioFrameBuffer) -> "AudioFrame": info = owned_info.info size = info.num_channels * info.samples_per_channel cdata = (ctypes.c_int16 * size).from_address(info.data_ptr) data = bytearray(cdata) FfiHandle(owned_info.handle.id) return AudioFrame( data, info.sample_rate, info.num_channels, info.samples_per_channel ) def remix_and_resample(self, sample_rate: int, num_channels: int) -> "AudioFrame": """Resample the audio frame to the given sample rate and number of channels. .. warning:: This method is deprecated and will be removed in a future release. Please use the `rtc.AudioResampler` class instead. """ req = proto_ffi.FfiRequest() req.new_audio_resampler.CopyFrom(proto_audio.NewAudioResamplerRequest()) resp = FfiClient.instance.request(req) resampler_handle = FfiHandle(resp.new_audio_resampler.resampler.handle.id) resample_req = proto_ffi.FfiRequest() resample_req.remix_and_resample.resampler_handle = resampler_handle.handle resample_req.remix_and_resample.buffer.CopyFrom(self._proto_info()) resample_req.remix_and_resample.sample_rate = sample_rate resample_req.remix_and_resample.num_channels = num_channels resp = FfiClient.instance.request(resample_req) return AudioFrame._from_owned_info(resp.remix_and_resample.buffer) def _proto_info(self) -> proto_audio.AudioFrameBufferInfo: audio_info = proto_audio.AudioFrameBufferInfo() audio_info.data_ptr = get_address(memoryview(self._data)) audio_info.sample_rate = self.sample_rate audio_info.num_channels = self.num_channels audio_info.samples_per_channel = self.samples_per_channel return audio_info @property def data(self) -> memoryview: """ Returns a memory view of the audio data as 16-bit signed integers. Returns: memoryview: A memory view of the audio data. """ return memoryview(self._data).cast("h") @property def sample_rate(self) -> int: """ Returns the sample rate of the audio frame. Returns: int: The sample rate in Hz. """ return self._sample_rate @property def num_channels(self) -> int: """ Returns the number of channels in the audio frame. Returns: int: The number of audio channels (e.g., 1 for mono, 2 for stereo). """ return self._num_channels @property def samples_per_channel(self) -> int: """ Returns the number of samples per channel. Returns: int: The number of samples per channel. """ return self._samples_per_channel @property def duration(self) -> float: """ Returns the duration of the audio frame in seconds. Returns: float: The duration in seconds. """ return self.samples_per_channel / self.sample_rate def to_wav_bytes(self) -> bytes: """ Convert the audio frame data to a WAV-formatted byte stream. Returns: bytes: The audio data encoded in WAV format. """ import wave import io with io.BytesIO() as wav_file: with wave.open(wav_file, "wb") as wav: wav.setnchannels(self.num_channels) wav.setsampwidth(2) wav.setframerate(self.sample_rate) wav.writeframes(self._data) return wav_file.getvalue() def __repr__(self) -> str: return ( f"rtc.AudioFrame(sample_rate={self.sample_rate}, " f"num_channels={self.num_channels}, " f"samples_per_channel={self.samples_per_channel}, " f"duration={self.duration:.3f})" )
A class that represents a frame of audio data with specific properties such as sample rate, number of channels, and samples per channel.
The format of the audio data is 16-bit signed integers (int16) interleaved by channel.
Initialize an AudioFrame instance.
Args
data
:Union[bytes, bytearray, memoryview]
- The raw audio data, which must be at least
num_channels * samples_per_channel * sizeof(int16)
bytes long. sample_rate
:int
- The sample rate of the audio in Hz.
num_channels
:int
- The number of audio channels (e.g., 1 for mono, 2 for stereo).
samples_per_channel
:int
- The number of samples per channel.
Raises
ValueError
- If the length of
data
is smaller than the required size.
Static methods
def create(sample_rate: int, num_channels: int, samples_per_channel: int) ‑> AudioFrame
-
Expand source code
@staticmethod def create( sample_rate: int, num_channels: int, samples_per_channel: int ) -> "AudioFrame": """ Create a new empty AudioFrame instance with specified sample rate, number of channels, and samples per channel. Args: sample_rate (int): The sample rate of the audio in Hz. num_channels (int): The number of audio channels (e.g., 1 for mono, 2 for stereo). samples_per_channel (int): The number of samples per channel. Returns: AudioFrame: A new AudioFrame instance with uninitialized (zeroed) data. """ size = num_channels * samples_per_channel * ctypes.sizeof(ctypes.c_int16) data = bytearray(size) return AudioFrame(data, sample_rate, num_channels, samples_per_channel)
Create a new empty AudioFrame instance with specified sample rate, number of channels, and samples per channel.
Args
sample_rate
:int
- The sample rate of the audio in Hz.
num_channels
:int
- The number of audio channels (e.g., 1 for mono, 2 for stereo).
samples_per_channel
:int
- The number of samples per channel.
Returns
AudioFrame
- A new AudioFrame instance with uninitialized (zeroed) data.
Instance variables
prop data : memoryview
-
Expand source code
@property def data(self) -> memoryview: """ Returns a memory view of the audio data as 16-bit signed integers. Returns: memoryview: A memory view of the audio data. """ return memoryview(self._data).cast("h")
Returns a memory view of the audio data as 16-bit signed integers.
Returns
memoryview
- A memory view of the audio data.
prop duration : float
-
Expand source code
@property def duration(self) -> float: """ Returns the duration of the audio frame in seconds. Returns: float: The duration in seconds. """ return self.samples_per_channel / self.sample_rate
Returns the duration of the audio frame in seconds.
Returns
float
- The duration in seconds.
prop num_channels : int
-
Expand source code
@property def num_channels(self) -> int: """ Returns the number of channels in the audio frame. Returns: int: The number of audio channels (e.g., 1 for mono, 2 for stereo). """ return self._num_channels
Returns the number of channels in the audio frame.
Returns
int
- The number of audio channels (e.g., 1 for mono, 2 for stereo).
prop sample_rate : int
-
Expand source code
@property def sample_rate(self) -> int: """ Returns the sample rate of the audio frame. Returns: int: The sample rate in Hz. """ return self._sample_rate
Returns the sample rate of the audio frame.
Returns
int
- The sample rate in Hz.
prop samples_per_channel : int
-
Expand source code
@property def samples_per_channel(self) -> int: """ Returns the number of samples per channel. Returns: int: The number of samples per channel. """ return self._samples_per_channel
Returns the number of samples per channel.
Returns
int
- The number of samples per channel.
Methods
def remix_and_resample(self, sample_rate: int, num_channels: int) ‑> AudioFrame
-
Expand source code
def remix_and_resample(self, sample_rate: int, num_channels: int) -> "AudioFrame": """Resample the audio frame to the given sample rate and number of channels. .. warning:: This method is deprecated and will be removed in a future release. Please use the `rtc.AudioResampler` class instead. """ req = proto_ffi.FfiRequest() req.new_audio_resampler.CopyFrom(proto_audio.NewAudioResamplerRequest()) resp = FfiClient.instance.request(req) resampler_handle = FfiHandle(resp.new_audio_resampler.resampler.handle.id) resample_req = proto_ffi.FfiRequest() resample_req.remix_and_resample.resampler_handle = resampler_handle.handle resample_req.remix_and_resample.buffer.CopyFrom(self._proto_info()) resample_req.remix_and_resample.sample_rate = sample_rate resample_req.remix_and_resample.num_channels = num_channels resp = FfiClient.instance.request(resample_req) return AudioFrame._from_owned_info(resp.remix_and_resample.buffer)
Resample the audio frame to the given sample rate and number of channels.
Warning
This method is deprecated and will be removed in a future release. Please use the
rtc.AudioResampler
class instead. def to_wav_bytes(self) ‑> bytes
-
Expand source code
def to_wav_bytes(self) -> bytes: """ Convert the audio frame data to a WAV-formatted byte stream. Returns: bytes: The audio data encoded in WAV format. """ import wave import io with io.BytesIO() as wav_file: with wave.open(wav_file, "wb") as wav: wav.setnchannels(self.num_channels) wav.setsampwidth(2) wav.setframerate(self.sample_rate) wav.writeframes(self._data) return wav_file.getvalue()
Convert the audio frame data to a WAV-formatted byte stream.
Returns
bytes
- The audio data encoded in WAV format.
class AudioFrameEvent (frame: AudioFrame)
-
Expand source code
@dataclass class AudioFrameEvent: """An event representing a received audio frame. Attributes: frame (AudioFrame): The received audio frame. """ frame: AudioFrame
An event representing a received audio frame.
Attributes
frame
:AudioFrame
- The received audio frame.
Class variables
var frame : AudioFrame
class AudioResampler (input_rate: int,
output_rate: int,
*,
num_channels: int = 1,
quality: AudioResamplerQuality = AudioResamplerQuality.MEDIUM)-
Expand source code
class AudioResampler: """ A class for resampling audio data from one sample rate to another. `AudioResampler` provides functionality to resample audio data from an input sample rate to an output sample rate using the Sox resampling library. It supports multiple channels and configurable resampling quality. """ def __init__( self, input_rate: int, output_rate: int, *, num_channels: int = 1, quality: AudioResamplerQuality = AudioResamplerQuality.MEDIUM, ) -> None: """ Initialize an `AudioResampler` instance for resampling audio data. Args: input_rate (int): The sample rate of the input audio data (in Hz). output_rate (int): The desired sample rate of the output audio data (in Hz). num_channels (int, optional): The number of audio channels (e.g., 1 for mono, 2 for stereo). Defaults to 1. quality (AudioResamplerQuality, optional): The quality setting for the resampler. Can be one of the `AudioResamplerQuality` enum values: `QUICK`, `LOW`, `MEDIUM`, `HIGH`, `VERY_HIGH`. Higher quality settings result in better audio quality but require more processing power. Defaults to `AudioResamplerQuality.MEDIUM`. Raises: Exception: If there is an error creating the resampler. """ self._input_rate = input_rate self._output_rate = output_rate self._num_channels = num_channels req = proto_ffi.FfiRequest() req.new_sox_resampler.input_rate = input_rate req.new_sox_resampler.output_rate = output_rate req.new_sox_resampler.num_channels = num_channels req.new_sox_resampler.quality_recipe = _to_proto_quality(quality) # not exposed for now req.new_sox_resampler.input_data_type = ( proto_audio_frame.SoxResamplerDataType.SOXR_DATATYPE_INT16I ) req.new_sox_resampler.output_data_type = ( proto_audio_frame.SoxResamplerDataType.SOXR_DATATYPE_INT16I ) req.new_sox_resampler.flags = 0 # default resp = FfiClient.instance.request(req) if resp.new_sox_resampler.error: raise Exception(resp.new_sox_resampler.error) self._ffi_handle = FfiHandle(resp.new_sox_resampler.resampler.handle.id) def push(self, data: bytearray | AudioFrame) -> list[AudioFrame]: """ Push audio data into the resampler and retrieve any available resampled data. This method accepts audio data, resamples it according to the configured input and output rates, and returns any resampled data that is available after processing the input. Args: data (bytearray | AudioFrame): The audio data to resample. This can be a `bytearray` containing raw audio bytes in int16le format or an `AudioFrame` object. Returns: list[AudioFrame]: A list of `AudioFrame` objects containing the resampled audio data. The list may be empty if no output data is available yet. Raises: Exception: If there is an error during resampling. """ bdata = data if isinstance(data, bytearray) else data.data.cast("b") req = proto_ffi.FfiRequest() req.push_sox_resampler.resampler_handle = self._ffi_handle.handle req.push_sox_resampler.data_ptr = get_address(memoryview(bdata)) req.push_sox_resampler.size = len(bdata) resp = FfiClient.instance.request(req) if resp.push_sox_resampler.error: raise Exception(resp.push_sox_resampler.error) if not resp.push_sox_resampler.output_ptr: return [] cdata = (ctypes.c_int8 * resp.push_sox_resampler.size).from_address( resp.push_sox_resampler.output_ptr ) output_data = bytearray(cdata) return [ AudioFrame( output_data, self._output_rate, self._num_channels, len(output_data) // (self._num_channels * ctypes.sizeof(ctypes.c_int16)), ) ] def flush(self) -> list[AudioFrame]: """ Flush any remaining audio data through the resampler and retrieve the resampled data. This method should be called when no more input data will be provided to ensure that all internal buffers are processed and all resampled data is output. Returns: list[AudioFrame]: A list of `AudioFrame` objects containing the remaining resampled audio data after flushing. The list may be empty if no output data remains. Raises: Exception: If there is an error during flushing. """ req = proto_ffi.FfiRequest() req.flush_sox_resampler.resampler_handle = self._ffi_handle.handle resp = FfiClient.instance.request(req) if not resp.flush_sox_resampler.output_ptr: return [] cdata = (ctypes.c_int8 * resp.flush_sox_resampler.size).from_address( resp.flush_sox_resampler.output_ptr ) output_data = bytearray(cdata) return [ AudioFrame( output_data, self._output_rate, self._num_channels, len(output_data) // (self._num_channels * ctypes.sizeof(ctypes.c_int16)), ) ]
A class for resampling audio data from one sample rate to another.
AudioResampler
provides functionality to resample audio data from an input sample rate to an output sample rate using the Sox resampling library. It supports multiple channels and configurable resampling quality.Initialize an
AudioResampler
instance for resampling audio data.Args
input_rate
:int
- The sample rate of the input audio data (in Hz).
output_rate
:int
- The desired sample rate of the output audio data (in Hz).
num_channels
:int
, optional- The number of audio channels (e.g., 1 for mono, 2 for stereo). Defaults to 1.
quality
:AudioResamplerQuality
, optional- The quality setting for the resampler. Can be one of the
AudioResamplerQuality
enum values:QUICK
,LOW
,MEDIUM
,HIGH
,VERY_HIGH
. Higher quality settings result in better audio quality but require more processing power. Defaults toAudioResamplerQuality.MEDIUM
.
Raises
Exception
- If there is an error creating the resampler.
Methods
def flush(self) ‑> list[AudioFrame]
-
Expand source code
def flush(self) -> list[AudioFrame]: """ Flush any remaining audio data through the resampler and retrieve the resampled data. This method should be called when no more input data will be provided to ensure that all internal buffers are processed and all resampled data is output. Returns: list[AudioFrame]: A list of `AudioFrame` objects containing the remaining resampled audio data after flushing. The list may be empty if no output data remains. Raises: Exception: If there is an error during flushing. """ req = proto_ffi.FfiRequest() req.flush_sox_resampler.resampler_handle = self._ffi_handle.handle resp = FfiClient.instance.request(req) if not resp.flush_sox_resampler.output_ptr: return [] cdata = (ctypes.c_int8 * resp.flush_sox_resampler.size).from_address( resp.flush_sox_resampler.output_ptr ) output_data = bytearray(cdata) return [ AudioFrame( output_data, self._output_rate, self._num_channels, len(output_data) // (self._num_channels * ctypes.sizeof(ctypes.c_int16)), ) ]
Flush any remaining audio data through the resampler and retrieve the resampled data.
This method should be called when no more input data will be provided to ensure that all internal buffers are processed and all resampled data is output.
Returns
list[AudioFrame]
- A list of
AudioFrame
objects containing the remaining resampled audio data after flushing. The list may be empty if no output data remains.
Raises
Exception
- If there is an error during flushing.
def push(self,
data: bytearray | AudioFrame) ‑> list[AudioFrame]-
Expand source code
def push(self, data: bytearray | AudioFrame) -> list[AudioFrame]: """ Push audio data into the resampler and retrieve any available resampled data. This method accepts audio data, resamples it according to the configured input and output rates, and returns any resampled data that is available after processing the input. Args: data (bytearray | AudioFrame): The audio data to resample. This can be a `bytearray` containing raw audio bytes in int16le format or an `AudioFrame` object. Returns: list[AudioFrame]: A list of `AudioFrame` objects containing the resampled audio data. The list may be empty if no output data is available yet. Raises: Exception: If there is an error during resampling. """ bdata = data if isinstance(data, bytearray) else data.data.cast("b") req = proto_ffi.FfiRequest() req.push_sox_resampler.resampler_handle = self._ffi_handle.handle req.push_sox_resampler.data_ptr = get_address(memoryview(bdata)) req.push_sox_resampler.size = len(bdata) resp = FfiClient.instance.request(req) if resp.push_sox_resampler.error: raise Exception(resp.push_sox_resampler.error) if not resp.push_sox_resampler.output_ptr: return [] cdata = (ctypes.c_int8 * resp.push_sox_resampler.size).from_address( resp.push_sox_resampler.output_ptr ) output_data = bytearray(cdata) return [ AudioFrame( output_data, self._output_rate, self._num_channels, len(output_data) // (self._num_channels * ctypes.sizeof(ctypes.c_int16)), ) ]
Push audio data into the resampler and retrieve any available resampled data.
This method accepts audio data, resamples it according to the configured input and output rates, and returns any resampled data that is available after processing the input.
Args
data
:bytearray | AudioFrame
- The audio data to resample. This can be a
bytearray
containing raw audio bytes in int16le format or anAudioFrame
object.
Returns
list[AudioFrame]
- A list of
AudioFrame
objects containing the resampled audio data. The list may be empty if no output data is available yet.
Raises
Exception
- If there is an error during resampling.
class AudioResamplerQuality (*args, **kwds)
-
Expand source code
@unique class AudioResamplerQuality(str, Enum): QUICK = "quick" LOW = "low" MEDIUM = "medium" HIGH = "high" VERY_HIGH = "very_high"
str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.
Ancestors
- builtins.str
- enum.Enum
Class variables
var HIGH
var LOW
var MEDIUM
var QUICK
var VERY_HIGH
class AudioSource (sample_rate: int,
num_channels: int,
queue_size_ms: int = 1000,
loop: asyncio.AbstractEventLoop | None = None)-
Expand source code
class AudioSource: """ Represents a real-time audio source with an internal audio queue. The `AudioSource` class allows you to push audio frames into a real-time audio source, managing an internal queue of audio data up to a maximum duration defined by `queue_size_ms`. It supports asynchronous operations to capture audio frames and to wait for the playback of all queued audio data. """ def __init__( self, sample_rate: int, num_channels: int, queue_size_ms: int = 1000, loop: asyncio.AbstractEventLoop | None = None, ) -> None: """ Initializes a new instance of the audio source. Args: sample_rate (int): The sample rate of the audio source in Hz. num_channels (int): The number of audio channels. queue_size_ms (int, optional): The buffer size of the audio queue in milliseconds. Defaults to 1000 ms. loop (asyncio.AbstractEventLoop, optional): The event loop to use. Defaults to `asyncio.get_event_loop()`. """ self._sample_rate = sample_rate self._num_channels = num_channels self._loop = loop or asyncio.get_event_loop() req = proto_ffi.FfiRequest() req.new_audio_source.type = ( proto_audio_frame.AudioSourceType.AUDIO_SOURCE_NATIVE ) req.new_audio_source.sample_rate = sample_rate req.new_audio_source.num_channels = num_channels req.new_audio_source.queue_size_ms = queue_size_ms resp = FfiClient.instance.request(req) self._info = resp.new_audio_source.source self._ffi_handle = FfiHandle(self._info.handle.id) self._last_capture = 0.0 self._q_size = 0.0 self._join_handle: asyncio.TimerHandle | None = None self._join_fut: asyncio.Future[None] | None = None @property def sample_rate(self) -> int: """The sample rate of the audio source in Hz.""" return self._sample_rate @property def num_channels(self) -> int: """The number of audio channels.""" return self._num_channels @property def queued_duration(self) -> float: """The current duration (in seconds) of audio data queued for playback.""" return max(self._q_size - time.monotonic() + self._last_capture, 0.0) def clear_queue(self) -> None: """ Clears the internal audio queue, discarding all buffered audio data. This method immediately removes all audio data currently queued for playback, effectively resetting the audio source's buffer. Any audio frames that have been captured but not yet played will be discarded. This is useful in scenarios where you need to stop playback abruptly or prevent outdated audio data from being played. """ req = proto_ffi.FfiRequest() req.clear_audio_buffer.source_handle = self._ffi_handle.handle _ = FfiClient.instance.request(req) self._release_waiter() async def capture_frame(self, frame: AudioFrame) -> None: """ Captures an `AudioFrame` and queues it for playback. This method is used to push new audio data into the audio source. The audio data will be processed and queued. If the size of the audio frame exceeds the internal queue size, the method will wait until there is enough space in the queue to accommodate the frame. The method returns only when all of the data in the buffer has been pushed. Args: frame (AudioFrame): The audio frame to capture and queue. Raises: Exception: If there is an error during frame capture. """ if frame.samples_per_channel == 0: return now = time.monotonic() elapsed = 0.0 if self._last_capture == 0.0 else now - self._last_capture self._q_size += frame.samples_per_channel / self.sample_rate - elapsed self._last_capture = now if self._join_handle: self._join_handle.cancel() if self._join_fut is None: self._join_fut = self._loop.create_future() self._join_handle = self._loop.call_later(self._q_size, self._release_waiter) req = proto_ffi.FfiRequest() req.capture_audio_frame.source_handle = self._ffi_handle.handle req.capture_audio_frame.buffer.CopyFrom(frame._proto_info()) queue = FfiClient.instance.queue.subscribe(loop=self._loop) try: resp = FfiClient.instance.request(req) cb: proto_ffi.FfiEvent = await queue.wait_for( lambda e: e.capture_audio_frame.async_id == resp.capture_audio_frame.async_id ) finally: FfiClient.instance.queue.unsubscribe(queue) if cb.capture_audio_frame.error: raise Exception(cb.capture_audio_frame.error) async def wait_for_playout(self) -> None: """ Waits for the audio source to finish playing out all audio data. This method ensures that all queued audio data has been played out before returning. It can be used to synchronize events after audio playback or to ensure that the audio queue is empty. """ if self._join_fut is None: return await asyncio.shield(self._join_fut) def _release_waiter(self) -> None: if self._join_fut is None: return # could be None when clear_queue is called if not self._join_fut.done(): self._join_fut.set_result(None) self._last_capture = 0.0 self._q_size = 0.0 self._join_fut = None
Represents a real-time audio source with an internal audio queue.
The
AudioSource
class allows you to push audio frames into a real-time audio source, managing an internal queue of audio data up to a maximum duration defined byqueue_size_ms
. It supports asynchronous operations to capture audio frames and to wait for the playback of all queued audio data.Initializes a new instance of the audio source.
Args
sample_rate
:int
- The sample rate of the audio source in Hz.
num_channels
:int
- The number of audio channels.
queue_size_ms
:int
, optional- The buffer size of the audio queue in milliseconds. Defaults to 1000 ms.
loop
:asyncio.AbstractEventLoop
, optional- The event loop to use. Defaults to
asyncio.get_event_loop()
.
Instance variables
prop num_channels : int
-
Expand source code
@property def num_channels(self) -> int: """The number of audio channels.""" return self._num_channels
The number of audio channels.
prop queued_duration : float
-
Expand source code
@property def queued_duration(self) -> float: """The current duration (in seconds) of audio data queued for playback.""" return max(self._q_size - time.monotonic() + self._last_capture, 0.0)
The current duration (in seconds) of audio data queued for playback.
prop sample_rate : int
-
Expand source code
@property def sample_rate(self) -> int: """The sample rate of the audio source in Hz.""" return self._sample_rate
The sample rate of the audio source in Hz.
Methods
async def capture_frame(self,
frame: AudioFrame) ‑> None-
Expand source code
async def capture_frame(self, frame: AudioFrame) -> None: """ Captures an `AudioFrame` and queues it for playback. This method is used to push new audio data into the audio source. The audio data will be processed and queued. If the size of the audio frame exceeds the internal queue size, the method will wait until there is enough space in the queue to accommodate the frame. The method returns only when all of the data in the buffer has been pushed. Args: frame (AudioFrame): The audio frame to capture and queue. Raises: Exception: If there is an error during frame capture. """ if frame.samples_per_channel == 0: return now = time.monotonic() elapsed = 0.0 if self._last_capture == 0.0 else now - self._last_capture self._q_size += frame.samples_per_channel / self.sample_rate - elapsed self._last_capture = now if self._join_handle: self._join_handle.cancel() if self._join_fut is None: self._join_fut = self._loop.create_future() self._join_handle = self._loop.call_later(self._q_size, self._release_waiter) req = proto_ffi.FfiRequest() req.capture_audio_frame.source_handle = self._ffi_handle.handle req.capture_audio_frame.buffer.CopyFrom(frame._proto_info()) queue = FfiClient.instance.queue.subscribe(loop=self._loop) try: resp = FfiClient.instance.request(req) cb: proto_ffi.FfiEvent = await queue.wait_for( lambda e: e.capture_audio_frame.async_id == resp.capture_audio_frame.async_id ) finally: FfiClient.instance.queue.unsubscribe(queue) if cb.capture_audio_frame.error: raise Exception(cb.capture_audio_frame.error)
Captures an
AudioFrame
and queues it for playback.This method is used to push new audio data into the audio source. The audio data will be processed and queued. If the size of the audio frame exceeds the internal queue size, the method will wait until there is enough space in the queue to accommodate the frame. The method returns only when all of the data in the buffer has been pushed.
Args
frame
:AudioFrame
- The audio frame to capture and queue.
Raises
Exception
- If there is an error during frame capture.
def clear_queue(self) ‑> None
-
Expand source code
def clear_queue(self) -> None: """ Clears the internal audio queue, discarding all buffered audio data. This method immediately removes all audio data currently queued for playback, effectively resetting the audio source's buffer. Any audio frames that have been captured but not yet played will be discarded. This is useful in scenarios where you need to stop playback abruptly or prevent outdated audio data from being played. """ req = proto_ffi.FfiRequest() req.clear_audio_buffer.source_handle = self._ffi_handle.handle _ = FfiClient.instance.request(req) self._release_waiter()
Clears the internal audio queue, discarding all buffered audio data.
This method immediately removes all audio data currently queued for playback, effectively resetting the audio source's buffer. Any audio frames that have been captured but not yet played will be discarded. This is useful in scenarios where you need to stop playback abruptly or prevent outdated audio data from being played.
async def wait_for_playout(self) ‑> None
-
Expand source code
async def wait_for_playout(self) -> None: """ Waits for the audio source to finish playing out all audio data. This method ensures that all queued audio data has been played out before returning. It can be used to synchronize events after audio playback or to ensure that the audio queue is empty. """ if self._join_fut is None: return await asyncio.shield(self._join_fut)
Waits for the audio source to finish playing out all audio data.
This method ensures that all queued audio data has been played out before returning. It can be used to synchronize events after audio playback or to ensure that the audio queue is empty.
class AudioStream (track: Track,
loop: Optional[asyncio.AbstractEventLoop] = None,
capacity: int = 0,
sample_rate: int = 48000,
num_channels: int = 1,
**kwargs)-
Expand source code
class AudioStream: """An asynchronous audio stream for receiving audio frames from a participant or track. The `AudioStream` class provides an asynchronous iterator over audio frames received from a specific track or participant. It allows you to receive audio frames in real-time with customizable sample rates and channel configurations. """ def __init__( self, track: Track, loop: Optional[asyncio.AbstractEventLoop] = None, capacity: int = 0, sample_rate: int = 48000, num_channels: int = 1, **kwargs, ) -> None: """Initialize an `AudioStream` instance. Args: track (Optional[Track]): The audio track from which to receive audio. If not provided, you must specify `participant` and `track_source` in `kwargs`. loop (Optional[asyncio.AbstractEventLoop], optional): The event loop to use. Defaults to the current event loop. capacity (int, optional): The capacity of the internal frame queue. Defaults to 0 (unbounded). sample_rate (int, optional): The sample rate for the audio stream in Hz. Defaults to 48000. num_channels (int, optional): The number of audio channels. Defaults to 1. Example: ```python audio_stream = AudioStream( track=audio_track, sample_rate=44100, num_channels=2, ) audio_stream = AudioStream.from_track( track=audio_track, sample_rate=44100, num_channels=2, ) ``` """ self._track: Track | None = track self._sample_rate = sample_rate self._num_channels = num_channels self._loop = loop or asyncio.get_event_loop() self._ffi_queue = FfiClient.instance.queue.subscribe(self._loop) self._queue: RingQueue[AudioFrameEvent | None] = RingQueue(capacity) self._task = self._loop.create_task(self._run()) self._task.add_done_callback(task_done_logger) stream: Any = None if "participant" in kwargs: stream = self._create_owned_stream_from_participant( participant=kwargs["participant"], track_source=kwargs["track_source"] ) else: stream = self._create_owned_stream() self._ffi_handle = FfiHandle(stream.handle.id) self._info = stream.info @classmethod def from_participant( cls, *, participant: Participant, track_source: TrackSource.ValueType, loop: Optional[asyncio.AbstractEventLoop] = None, capacity: int = 0, sample_rate: int = 48000, num_channels: int = 1, ) -> AudioStream: """Create an `AudioStream` from a participant's audio track. Args: participant (Participant): The participant from whom to receive audio. track_source (TrackSource.ValueType): The source of the audio track (e.g., microphone, screen share). loop (Optional[asyncio.AbstractEventLoop], optional): The event loop to use. Defaults to the current event loop. capacity (int, optional): The capacity of the internal frame queue. Defaults to 0 (unbounded). sample_rate (int, optional): The sample rate for the audio stream in Hz. Defaults to 48000. num_channels (int, optional): The number of audio channels. Defaults to 1. Returns: AudioStream: An instance of `AudioStream` that can be used to receive audio frames. Example: ```python audio_stream = AudioStream.from_participant( participant=participant, track_source=TrackSource.MICROPHONE, sample_rate=24000, num_channels=1, ) ``` """ return AudioStream( participant=participant, track_source=track_source, loop=loop, capacity=capacity, track=None, # type: ignore sample_rate=sample_rate, num_channels=num_channels, ) @classmethod def from_track( cls, *, track: Track, loop: Optional[asyncio.AbstractEventLoop] = None, capacity: int = 0, sample_rate: int = 48000, num_channels: int = 1, ) -> AudioStream: """Create an `AudioStream` from an existing audio track. Args: track (Track): The audio track from which to receive audio. loop (Optional[asyncio.AbstractEventLoop], optional): The event loop to use. Defaults to the current event loop. capacity (int, optional): The capacity of the internal frame queue. Defaults to 0 (unbounded). sample_rate (int, optional): The sample rate for the audio stream in Hz. Defaults to 48000. num_channels (int, optional): The number of audio channels. Defaults to 1. Returns: AudioStream: An instance of `AudioStream` that can be used to receive audio frames. Example: ```python audio_stream = AudioStream.from_track( track=audio_track, sample_rate=44100, num_channels=2, ) ``` """ return AudioStream( track=track, loop=loop, capacity=capacity, sample_rate=sample_rate, num_channels=num_channels, ) def __del__(self) -> None: FfiClient.instance.queue.unsubscribe(self._ffi_queue) def _create_owned_stream(self) -> Any: assert self._track is not None req = proto_ffi.FfiRequest() new_audio_stream = req.new_audio_stream new_audio_stream.track_handle = self._track._ffi_handle.handle new_audio_stream.sample_rate = self._sample_rate new_audio_stream.num_channels = self._num_channels new_audio_stream.type = proto_audio_frame.AudioStreamType.AUDIO_STREAM_NATIVE resp = FfiClient.instance.request(req) return resp.new_audio_stream.stream def _create_owned_stream_from_participant( self, participant: Participant, track_source: TrackSource.ValueType ) -> Any: req = proto_ffi.FfiRequest() audio_stream_from_participant = req.audio_stream_from_participant audio_stream_from_participant.participant_handle = ( participant._ffi_handle.handle ) audio_stream_from_participant.sample_rate = self._sample_rate audio_stream_from_participant.num_channels = self._num_channels audio_stream_from_participant.type = ( proto_audio_frame.AudioStreamType.AUDIO_STREAM_NATIVE ) audio_stream_from_participant.track_source = track_source resp = FfiClient.instance.request(req) return resp.audio_stream_from_participant.stream async def _run(self): while True: event = await self._ffi_queue.wait_for(self._is_event) audio_event: proto_audio_frame.AudioStreamEvent = event.audio_stream_event if audio_event.HasField("frame_received"): owned_buffer_info = audio_event.frame_received.frame frame = AudioFrame._from_owned_info(owned_buffer_info) event = AudioFrameEvent(frame) self._queue.put(event) elif audio_event.HasField("eos"): self._queue.put(None) break FfiClient.instance.queue.unsubscribe(self._ffi_queue) async def aclose(self) -> None: """Asynchronously close the audio stream. This method cleans up resources associated with the audio stream and waits for any pending operations to complete. """ self._ffi_handle.dispose() await self._task def _is_event(self, e: proto_ffi.FfiEvent) -> bool: return e.audio_stream_event.stream_handle == self._ffi_handle.handle def __aiter__(self) -> AsyncIterator[AudioFrameEvent]: return self async def __anext__(self) -> AudioFrameEvent: if self._task.done(): raise StopAsyncIteration item = await self._queue.get() if item is None: raise StopAsyncIteration return item
An asynchronous audio stream for receiving audio frames from a participant or track.
The
AudioStream
class provides an asynchronous iterator over audio frames received from a specific track or participant. It allows you to receive audio frames in real-time with customizable sample rates and channel configurations.Initialize an
AudioStream
instance.Args
track
:Optional[Track]
- The audio track from which to receive audio. If not provided,
you must specify
livekit.rtc.participant
andtrack_source
inkwargs
. loop
:Optional[asyncio.AbstractEventLoop]
, optional- The event loop to use. Defaults to the current event loop.
capacity
:int
, optional- The capacity of the internal frame queue. Defaults to 0 (unbounded).
sample_rate
:int
, optional- The sample rate for the audio stream in Hz. Defaults to 48000.
num_channels
:int
, optional- The number of audio channels. Defaults to 1.
Example
audio_stream = AudioStream( track=audio_track, sample_rate=44100, num_channels=2, ) audio_stream = AudioStream.from_track( track=audio_track, sample_rate=44100, num_channels=2, )
Static methods
def from_participant(*,
participant: Participant,
track_source: TrackSource.ValueType,
loop: Optional[asyncio.AbstractEventLoop] = None,
capacity: int = 0,
sample_rate: int = 48000,
num_channels: int = 1) ‑> AudioStream-
Create an
AudioStream
from a participant's audio track.Args
participant
:Participant
- The participant from whom to receive audio.
track_source
:TrackSource.ValueType
- The source of the audio track (e.g., microphone, screen share).
loop
:Optional[asyncio.AbstractEventLoop]
, optional- The event loop to use. Defaults to the current event loop.
capacity
:int
, optional- The capacity of the internal frame queue. Defaults to 0 (unbounded).
sample_rate
:int
, optional- The sample rate for the audio stream in Hz. Defaults to 48000.
num_channels
:int
, optional- The number of audio channels. Defaults to 1.
Returns
AudioStream
- An instance of
AudioStream
that can be used to receive audio frames.
Example
audio_stream = AudioStream.from_participant( participant=participant, track_source=TrackSource.MICROPHONE, sample_rate=24000, num_channels=1, )
def from_track(*,
track: Track,
loop: Optional[asyncio.AbstractEventLoop] = None,
capacity: int = 0,
sample_rate: int = 48000,
num_channels: int = 1) ‑> AudioStream-
Create an
AudioStream
from an existing audio track.Args
track
:Track
- The audio track from which to receive audio.
loop
:Optional[asyncio.AbstractEventLoop]
, optional- The event loop to use. Defaults to the current event loop.
capacity
:int
, optional- The capacity of the internal frame queue. Defaults to 0 (unbounded).
sample_rate
:int
, optional- The sample rate for the audio stream in Hz. Defaults to 48000.
num_channels
:int
, optional- The number of audio channels. Defaults to 1.
Returns
AudioStream
- An instance of
AudioStream
that can be used to receive audio frames.
Example
audio_stream = AudioStream.from_track( track=audio_track, sample_rate=44100, num_channels=2, )
Methods
async def aclose(self) ‑> None
-
Expand source code
async def aclose(self) -> None: """Asynchronously close the audio stream. This method cleans up resources associated with the audio stream and waits for any pending operations to complete. """ self._ffi_handle.dispose() await self._task
Asynchronously close the audio stream.
This method cleans up resources associated with the audio stream and waits for any pending operations to complete.
class ChatManager (room: Room)
-
Expand source code
class ChatManager(EventEmitter[EventTypes]): """A utility class that sends and receives chat messages in the active session. It implements LiveKit Chat Protocol, and serializes data to/from JSON data packets. """ def __init__(self, room: Room): super().__init__() self._lp = room.local_participant self._room = room room.on("data_received", self._on_data_received) def close(self): self._room.off("data_received", self._on_data_received) async def send_message(self, message: str) -> "ChatMessage": """Send a chat message to the end user using LiveKit Chat Protocol. Args: message (str): the message to send Returns: ChatMessage: the message that was sent """ msg = ChatMessage( message=message, is_local=True, participant=self._lp, ) await self._lp.publish_data( payload=json.dumps(msg.asjsondict()), topic=_CHAT_TOPIC, ) return msg async def update_message(self, message: "ChatMessage"): """Update a chat message that was previously sent. If message.deleted is set to True, we'll signal to remote participants that the message should be deleted. """ await self._lp.publish_data( payload=json.dumps(message.asjsondict()), topic=_CHAT_UPDATE_TOPIC, ) def _on_data_received(self, dp: DataPacket): # handle both new and updates the same way, as long as the ID is in there # the user can decide how to replace the previous message if dp.topic == _CHAT_TOPIC or dp.topic == _CHAT_UPDATE_TOPIC: try: parsed = json.loads(dp.data) msg = ChatMessage.from_jsondict(parsed) if dp.participant: msg.participant = dp.participant self.emit("message_received", msg) except Exception as e: logging.warning("failed to parse chat message: %s", e, exc_info=e)
A utility class that sends and receives chat messages in the active session.
It implements LiveKit Chat Protocol, and serializes data to/from JSON data packets.
Initialize a new instance of EventEmitter.
Ancestors
- EventEmitter
- typing.Generic
Methods
def close(self)
-
Expand source code
def close(self): self._room.off("data_received", self._on_data_received)
async def send_message(self, message: str) ‑> ChatMessage
-
Expand source code
async def send_message(self, message: str) -> "ChatMessage": """Send a chat message to the end user using LiveKit Chat Protocol. Args: message (str): the message to send Returns: ChatMessage: the message that was sent """ msg = ChatMessage( message=message, is_local=True, participant=self._lp, ) await self._lp.publish_data( payload=json.dumps(msg.asjsondict()), topic=_CHAT_TOPIC, ) return msg
Send a chat message to the end user using LiveKit Chat Protocol.
Args
message
:str
- the message to send
Returns
ChatMessage
- the message that was sent
async def update_message(self,
message: ChatMessage)-
Expand source code
async def update_message(self, message: "ChatMessage"): """Update a chat message that was previously sent. If message.deleted is set to True, we'll signal to remote participants that the message should be deleted. """ await self._lp.publish_data( payload=json.dumps(message.asjsondict()), topic=_CHAT_UPDATE_TOPIC, )
Update a chat message that was previously sent.
If message.deleted is set to True, we'll signal to remote participants that the message should be deleted.
Inherited members
class ChatMessage (message: str | None = None,
id: str = <factory>,
timestamp: datetime.datetime = <factory>,
deleted: bool = False,
participant: Participant | None = None,
is_local: bool = False)-
Expand source code
@dataclass class ChatMessage: message: Optional[str] = None id: str = field(default_factory=generate_random_base62) timestamp: datetime = field(default_factory=datetime.now) deleted: bool = field(default=False) # These fields are not part of the wire protocol. They are here to provide # context for the application. participant: Optional[Participant] = None is_local: bool = field(default=False) @classmethod def from_jsondict(cls, d: Dict[str, Any]) -> "ChatMessage": # older version of the protocol didn't contain a message ID, so we'll create one id = d.get("id") or generate_random_base62() timestamp = datetime.now() if d.get("timestamp"): timestamp = datetime.fromtimestamp(d.get("timestamp", 0) / 1000.0) msg = cls( id=id, timestamp=timestamp, ) msg.update_from_jsondict(d) return msg def update_from_jsondict(self, d: Dict[str, Any]) -> None: self.message = d.get("message") self.deleted = d.get("deleted", False) def asjsondict(self): """Returns a JSON serializable dictionary representation of the message.""" d = { "id": self.id, "message": self.message, "timestamp": int(self.timestamp.timestamp() * 1000), } if self.deleted: d["deleted"] = True return d
ChatMessage(message: Optional[str] = None, id: str =
, timestamp: datetime.datetime = , deleted: bool = False, participant: Optional[livekit.rtc.participant.Participant] = None, is_local: bool = False) Class variables
var deleted : bool
var id : str
var is_local : bool
var message : str | None
var participant : Participant | None
var timestamp : datetime.datetime
Static methods
def from_jsondict(d: Dict[str, Any]) ‑> ChatMessage
Methods
def asjsondict(self)
-
Expand source code
def asjsondict(self): """Returns a JSON serializable dictionary representation of the message.""" d = { "id": self.id, "message": self.message, "timestamp": int(self.timestamp.timestamp() * 1000), } if self.deleted: d["deleted"] = True return d
Returns a JSON serializable dictionary representation of the message.
def update_from_jsondict(self, d: Dict[str, Any]) ‑> None
-
Expand source code
def update_from_jsondict(self, d: Dict[str, Any]) -> None: self.message = d.get("message") self.deleted = d.get("deleted", False)
class ConnectError (message: str)
-
Expand source code
class ConnectError(Exception): def __init__(self, message: str): self.message = message
Common base class for all non-exit exceptions.
Ancestors
- builtins.Exception
- builtins.BaseException
class DataPacket (data: bytes,
kind: proto_room.DataPacketKind.ValueType,
participant: RemoteParticipant | None,
topic: str | None = None)-
Expand source code
@dataclass class DataPacket: data: bytes """The payload of the data packet.""" kind: proto_room.DataPacketKind.ValueType """Type of the data packet (e.g., RELIABLE, LOSSY).""" participant: RemoteParticipant | None """Participant who sent the data. None when sent by a server SDK.""" topic: str | None = None """Topic associated with the data packet."""
DataPacket(data: 'bytes', kind: 'proto_room.DataPacketKind.ValueType', participant: 'RemoteParticipant | None', topic: 'str | None' = None)
Class variables
var data : bytes
-
The payload of the data packet.
var kind : int
-
Type of the data packet (e.g., RELIABLE, LOSSY).
var participant : RemoteParticipant | None
-
Participant who sent the data. None when sent by a server SDK.
var topic : str | None
-
Topic associated with the data packet.
class E2EEManager (room_handle: int,
options: E2EEOptions | None)-
Expand source code
class E2EEManager: def __init__(self, room_handle: int, options: Optional[E2EEOptions]): self.options = options self._room_handle = room_handle self._enabled = options is not None if options is not None: self._key_provider = KeyProvider( self._room_handle, options.key_provider_options ) @property def key_provider(self) -> Optional[KeyProvider]: return self._key_provider @property def enabled(self) -> bool: return self._enabled def set_enabled(self, enabled: bool) -> None: """Enables or disables end-to-end encryption. Parameters: enabled (bool): True to enable, False to disable. Example: ```python e2ee_manager.set_enabled(True) ``` """ self._enabled = enabled req = proto_ffi.FfiRequest() req.e2ee.room_handle = self._room_handle req.e2ee.manager_set_enabled.enabled = enabled FfiClient.instance.request(req) def frame_cryptors(self) -> List[FrameCryptor]: """Retrieves the list of frame cryptors for participants. Returns: List[FrameCryptor]: A list of FrameCryptor instances. Example: ```python cryptors = e2ee_manager.frame_cryptors() for cryptor in cryptors: print(cryptor.participant_identity) ``` """ req = proto_ffi.FfiRequest() req.e2ee.room_handle = self._room_handle resp = FfiClient.instance.request(req) frame_cryptors = [] for frame_cryptor in resp.e2ee.manager_get_frame_cryptors.frame_cryptors: frame_cryptors.append( FrameCryptor( self._room_handle, frame_cryptor.participant_identity, frame_cryptor.key_index, frame_cryptor.enabled, ) ) return frame_cryptors
Instance variables
prop enabled : bool
-
Expand source code
@property def enabled(self) -> bool: return self._enabled
prop key_provider : KeyProvider | None
-
Expand source code
@property def key_provider(self) -> Optional[KeyProvider]: return self._key_provider
Methods
def frame_cryptors(self) ‑> List[FrameCryptor]
-
Expand source code
def frame_cryptors(self) -> List[FrameCryptor]: """Retrieves the list of frame cryptors for participants. Returns: List[FrameCryptor]: A list of FrameCryptor instances. Example: ```python cryptors = e2ee_manager.frame_cryptors() for cryptor in cryptors: print(cryptor.participant_identity) ``` """ req = proto_ffi.FfiRequest() req.e2ee.room_handle = self._room_handle resp = FfiClient.instance.request(req) frame_cryptors = [] for frame_cryptor in resp.e2ee.manager_get_frame_cryptors.frame_cryptors: frame_cryptors.append( FrameCryptor( self._room_handle, frame_cryptor.participant_identity, frame_cryptor.key_index, frame_cryptor.enabled, ) ) return frame_cryptors
Retrieves the list of frame cryptors for participants.
Returns
List[FrameCryptor]
- A list of FrameCryptor instances.
Example
cryptors = e2ee_manager.frame_cryptors() for cryptor in cryptors: print(cryptor.participant_identity)
def set_enabled(self, enabled: bool) ‑> None
-
Expand source code
def set_enabled(self, enabled: bool) -> None: """Enables or disables end-to-end encryption. Parameters: enabled (bool): True to enable, False to disable. Example: ```python e2ee_manager.set_enabled(True) ``` """ self._enabled = enabled req = proto_ffi.FfiRequest() req.e2ee.room_handle = self._room_handle req.e2ee.manager_set_enabled.enabled = enabled FfiClient.instance.request(req)
Enables or disables end-to-end encryption.
Parameters
enabled (bool): True to enable, False to disable.
Example
e2ee_manager.set_enabled(True)
class E2EEOptions (key_provider_options: KeyProviderOptions = <factory>,
encryption_type: int = 1)-
Expand source code
@dataclass class E2EEOptions: key_provider_options: KeyProviderOptions = field(default_factory=KeyProviderOptions) encryption_type: proto_e2ee.EncryptionType.ValueType = proto_e2ee.EncryptionType.GCM
E2EEOptions(key_provider_options: livekit.rtc.e2ee.KeyProviderOptions =
, encryption_type: int = 1) Class variables
var encryption_type : int
var key_provider_options : KeyProviderOptions
class EventEmitter
-
Expand source code
class EventEmitter(Generic[T_contra]): def __init__(self) -> None: """ Initialize a new instance of EventEmitter. """ self._events: Dict[T_contra, Set[Callable]] = dict() def emit(self, event: T_contra, *args) -> None: """ Trigger all callbacks associated with the given event. Args: event (T): The event to emit. *args: Positional arguments to pass to the callbacks. Example: Basic usage of emit: ```python emitter = EventEmitter[str]() def greet(name): print(f"Hello, {name}!") emitter.on('greet', greet) emitter.emit('greet', 'Alice') # Output: Hello, Alice! ``` """ if event in self._events: callables = self._events[event].copy() for callback in callables: try: sig = inspect.signature(callback) params = sig.parameters.values() has_varargs = any(p.kind == p.VAR_POSITIONAL for p in params) if has_varargs: callback(*args) else: positional_params = [ p for p in params if p.kind in (p.POSITIONAL_ONLY, p.POSITIONAL_OR_KEYWORD) ] num_params = len(positional_params) num_args = min(len(args), num_params) callback_args = args[:num_args] callback(*callback_args) except TypeError: raise except Exception: logger.exception(f"failed to emit event {event}") def once(self, event: T_contra, callback: Optional[Callable] = None) -> Callable: """ Register a callback to be called only once when the event is emitted. If a callback is provided, it registers the callback directly. If no callback is provided, it returns a decorator for use with function definitions. Args: event (T): The event to listen for. callback (Callable, optional): The callback to register. Defaults to None. Returns: Callable: The registered callback or a decorator if callback is None. Example: Using once with a direct callback: ```python emitter = EventEmitter[str]() def greet_once(name): print(f"Hello once, {name}!") emitter.once('greet', greet_once) emitter.emit('greet', 'Bob') # Output: Hello once, Bob! emitter.emit('greet', 'Bob') # No output, callback was removed after first call ``` Using once as a decorator: ```python emitter = EventEmitter[str]() @emitter.once('greet') def greet_once(name): print(f"Hello once, {name}!") emitter.emit('greet', 'Bob') # Output: Hello once, Bob! emitter.emit('greet', 'Bob') # No output ``` """ if callback is not None: def once_callback(*args, **kwargs): self.off(event, once_callback) callback(*args, **kwargs) return self.on(event, once_callback) else: def decorator(callback: Callable) -> Callable: self.once(event, callback) return callback return decorator def on(self, event: T_contra, callback: Optional[Callable] = None) -> Callable: """ Register a callback to be called whenever the event is emitted. If a callback is provided, it registers the callback directly. If no callback is provided, it returns a decorator for use with function definitions. Args: event (T): The event to listen for. callback (Callable, optional): The callback to register. Defaults to None. Returns: Callable: The registered callback or a decorator if callback is None. Example: Using on with a direct callback: ```python emitter = EventEmitter[str]() def greet(name): print(f"Hello, {name}!") emitter.on('greet', greet) emitter.emit('greet', 'Charlie') # Output: Hello, Charlie! ``` Using on as a decorator: ```python emitter = EventEmitter[str]() @emitter.on('greet') def greet(name): print(f"Hello, {name}!") emitter.emit('greet', 'Charlie') # Output: Hello, Charlie! ``` """ if callback is not None: if asyncio.iscoroutinefunction(callback): raise ValueError( "Cannot register an async callback with `.on()`. Use `asyncio.create_task` within your synchronous callback instead." ) if event not in self._events: self._events[event] = set() self._events[event].add(callback) return callback else: def decorator(callback: Callable) -> Callable: self.on(event, callback) return callback return decorator def off(self, event: T_contra, callback: Callable) -> None: """ Unregister a callback from an event. Args: event (T): The event to stop listening to. callback (Callable): The callback to remove. Example: Removing a callback: ```python emitter = EventEmitter[str]() def greet(name): print(f"Hello, {name}!") emitter.on('greet', greet) emitter.off('greet', greet) emitter.emit('greet', 'Dave') # No output, callback was removed ``` """ if event in self._events: self._events[event].remove(callback)
Abstract base class for generic types.
On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::
class Mapping[KT, VT]: def __getitem__(self, key: KT) -> VT: ... # Etc.
On older versions of Python, however, generic classes have to explicitly inherit from Generic.
After a class has been declared to be generic, it can then be used as follows::
def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
Initialize a new instance of EventEmitter.
Ancestors
- typing.Generic
Subclasses
- ProcPool
- LLM
- AgentPlayout
- MultimodalAgent
- AgentPlayout
- HumanInput
- VoicePipelineAgent
- STT
- TTS
- VAD
- livekit.agents.worker.Worker
- RealtimeSession
- ChatManager
- Room
Methods
def emit(self, event: -T_contra, *args) ‑> None
-
Expand source code
def emit(self, event: T_contra, *args) -> None: """ Trigger all callbacks associated with the given event. Args: event (T): The event to emit. *args: Positional arguments to pass to the callbacks. Example: Basic usage of emit: ```python emitter = EventEmitter[str]() def greet(name): print(f"Hello, {name}!") emitter.on('greet', greet) emitter.emit('greet', 'Alice') # Output: Hello, Alice! ``` """ if event in self._events: callables = self._events[event].copy() for callback in callables: try: sig = inspect.signature(callback) params = sig.parameters.values() has_varargs = any(p.kind == p.VAR_POSITIONAL for p in params) if has_varargs: callback(*args) else: positional_params = [ p for p in params if p.kind in (p.POSITIONAL_ONLY, p.POSITIONAL_OR_KEYWORD) ] num_params = len(positional_params) num_args = min(len(args), num_params) callback_args = args[:num_args] callback(*callback_args) except TypeError: raise except Exception: logger.exception(f"failed to emit event {event}")
Trigger all callbacks associated with the given event.
Args
event
:T
- The event to emit.
*args
- Positional arguments to pass to the callbacks.
Example
Basic usage of emit:
emitter = EventEmitter[str]() def greet(name): print(f"Hello, {name}!") emitter.on('greet', greet) emitter.emit('greet', 'Alice') # Output: Hello, Alice!
def off(self, event: -T_contra, callback: Callable) ‑> None
-
Expand source code
def off(self, event: T_contra, callback: Callable) -> None: """ Unregister a callback from an event. Args: event (T): The event to stop listening to. callback (Callable): The callback to remove. Example: Removing a callback: ```python emitter = EventEmitter[str]() def greet(name): print(f"Hello, {name}!") emitter.on('greet', greet) emitter.off('greet', greet) emitter.emit('greet', 'Dave') # No output, callback was removed ``` """ if event in self._events: self._events[event].remove(callback)
Unregister a callback from an event.
Args
event
:T
- The event to stop listening to.
callback
:Callable
- The callback to remove.
Example
Removing a callback:
emitter = EventEmitter[str]() def greet(name): print(f"Hello, {name}!") emitter.on('greet', greet) emitter.off('greet', greet) emitter.emit('greet', 'Dave') # No output, callback was removed
def on(self, event: -T_contra, callback: Callable | None = None) ‑> Callable
-
Expand source code
def on(self, event: T_contra, callback: Optional[Callable] = None) -> Callable: """ Register a callback to be called whenever the event is emitted. If a callback is provided, it registers the callback directly. If no callback is provided, it returns a decorator for use with function definitions. Args: event (T): The event to listen for. callback (Callable, optional): The callback to register. Defaults to None. Returns: Callable: The registered callback or a decorator if callback is None. Example: Using on with a direct callback: ```python emitter = EventEmitter[str]() def greet(name): print(f"Hello, {name}!") emitter.on('greet', greet) emitter.emit('greet', 'Charlie') # Output: Hello, Charlie! ``` Using on as a decorator: ```python emitter = EventEmitter[str]() @emitter.on('greet') def greet(name): print(f"Hello, {name}!") emitter.emit('greet', 'Charlie') # Output: Hello, Charlie! ``` """ if callback is not None: if asyncio.iscoroutinefunction(callback): raise ValueError( "Cannot register an async callback with `.on()`. Use `asyncio.create_task` within your synchronous callback instead." ) if event not in self._events: self._events[event] = set() self._events[event].add(callback) return callback else: def decorator(callback: Callable) -> Callable: self.on(event, callback) return callback return decorator
Register a callback to be called whenever the event is emitted.
If a callback is provided, it registers the callback directly. If no callback is provided, it returns a decorator for use with function definitions.
Args
event
:T
- The event to listen for.
callback
:Callable
, optional- The callback to register. Defaults to None.
Returns
Callable
- The registered callback or a decorator if callback is None.
Example
Using on with a direct callback:
emitter = EventEmitter[str]() def greet(name): print(f"Hello, {name}!") emitter.on('greet', greet) emitter.emit('greet', 'Charlie') # Output: Hello, Charlie!
Using on as a decorator:
emitter = EventEmitter[str]() @emitter.on('greet') def greet(name): print(f"Hello, {name}!") emitter.emit('greet', 'Charlie') # Output: Hello, Charlie!
def once(self, event: -T_contra, callback: Callable | None = None) ‑> Callable
-
Expand source code
def once(self, event: T_contra, callback: Optional[Callable] = None) -> Callable: """ Register a callback to be called only once when the event is emitted. If a callback is provided, it registers the callback directly. If no callback is provided, it returns a decorator for use with function definitions. Args: event (T): The event to listen for. callback (Callable, optional): The callback to register. Defaults to None. Returns: Callable: The registered callback or a decorator if callback is None. Example: Using once with a direct callback: ```python emitter = EventEmitter[str]() def greet_once(name): print(f"Hello once, {name}!") emitter.once('greet', greet_once) emitter.emit('greet', 'Bob') # Output: Hello once, Bob! emitter.emit('greet', 'Bob') # No output, callback was removed after first call ``` Using once as a decorator: ```python emitter = EventEmitter[str]() @emitter.once('greet') def greet_once(name): print(f"Hello once, {name}!") emitter.emit('greet', 'Bob') # Output: Hello once, Bob! emitter.emit('greet', 'Bob') # No output ``` """ if callback is not None: def once_callback(*args, **kwargs): self.off(event, once_callback) callback(*args, **kwargs) return self.on(event, once_callback) else: def decorator(callback: Callable) -> Callable: self.once(event, callback) return callback return decorator
Register a callback to be called only once when the event is emitted.
If a callback is provided, it registers the callback directly. If no callback is provided, it returns a decorator for use with function definitions.
Args
event
:T
- The event to listen for.
callback
:Callable
, optional- The callback to register. Defaults to None.
Returns
Callable
- The registered callback or a decorator if callback is None.
Example
Using once with a direct callback:
emitter = EventEmitter[str]() def greet_once(name): print(f"Hello once, {name}!") emitter.once('greet', greet_once) emitter.emit('greet', 'Bob') # Output: Hello once, Bob! emitter.emit('greet', 'Bob') # No output, callback was removed after first call
Using once as a decorator:
emitter = EventEmitter[str]() @emitter.once('greet') def greet_once(name): print(f"Hello once, {name}!") emitter.emit('greet', 'Bob') # Output: Hello once, Bob! emitter.emit('greet', 'Bob') # No output
class FrameCryptor (room_handle: int, participant_identity: str, key_index: int, enabled: bool)
-
Expand source code
class FrameCryptor: def __init__( self, room_handle: int, participant_identity: str, key_index: int, enabled: bool ): self._room_handle = room_handle self._enabled = enabled self._participant_identity = participant_identity self._key_index = key_index @property def participant_identity(self) -> str: return self._participant_identity @property def key_index(self) -> int: return self._key_index @property def enabled(self) -> bool: return self._enabled def set_enabled(self, enabled: bool) -> None: """Enables or disables frame encryption. Parameters: enabled (bool): True to enable, False to disable. Example: ```python frame_cryptor.set_enabled(True) ``` """ self._enabled = enabled req = proto_ffi.FfiRequest() req.e2ee.room_handle = self._room_handle req.e2ee.cryptor_set_enabled.participant_identity = self._participant_identity req.e2ee.cryptor_set_enabled.enabled = enabled FfiClient.instance.request(req) def set_key_index(self, key_index: int) -> None: """Sets the key index for encryption/decryption. Parameters: key_index (int): The new key index. Example: ```python frame_cryptor.set_key_index(3) ``` """ self._key_index = key_index req = proto_ffi.FfiRequest() req.e2ee.room_handle = self._room_handle req.e2ee.cryptor_set_key_index.participant_identity = self._participant_identity req.e2ee.cryptor_set_key_index.key_index = key_index FfiClient.instance.request(req)
Instance variables
prop enabled : bool
-
Expand source code
@property def enabled(self) -> bool: return self._enabled
prop key_index : int
-
Expand source code
@property def key_index(self) -> int: return self._key_index
prop participant_identity : str
-
Expand source code
@property def participant_identity(self) -> str: return self._participant_identity
Methods
def set_enabled(self, enabled: bool) ‑> None
-
Expand source code
def set_enabled(self, enabled: bool) -> None: """Enables or disables frame encryption. Parameters: enabled (bool): True to enable, False to disable. Example: ```python frame_cryptor.set_enabled(True) ``` """ self._enabled = enabled req = proto_ffi.FfiRequest() req.e2ee.room_handle = self._room_handle req.e2ee.cryptor_set_enabled.participant_identity = self._participant_identity req.e2ee.cryptor_set_enabled.enabled = enabled FfiClient.instance.request(req)
Enables or disables frame encryption.
Parameters
enabled (bool): True to enable, False to disable.
Example
frame_cryptor.set_enabled(True)
def set_key_index(self, key_index: int) ‑> None
-
Expand source code
def set_key_index(self, key_index: int) -> None: """Sets the key index for encryption/decryption. Parameters: key_index (int): The new key index. Example: ```python frame_cryptor.set_key_index(3) ``` """ self._key_index = key_index req = proto_ffi.FfiRequest() req.e2ee.room_handle = self._room_handle req.e2ee.cryptor_set_key_index.participant_identity = self._participant_identity req.e2ee.cryptor_set_key_index.key_index = key_index FfiClient.instance.request(req)
Sets the key index for encryption/decryption.
Parameters
key_index (int): The new key index.
Example
frame_cryptor.set_key_index(3)
class IceServer (*args, **kwargs)
-
A ProtocolMessage
Ancestors
- google._upb._message.Message
- google.protobuf.message.Message
Class variables
var DESCRIPTOR
class KeyProvider (room_handle: int,
options: KeyProviderOptions)-
Expand source code
class KeyProvider: def __init__(self, room_handle: int, options: KeyProviderOptions): self._options = options self._room_handle = room_handle @property def options(self) -> KeyProviderOptions: return self._options def set_shared_key(self, key: bytes, key_index: int) -> None: """Sets the shared encryption key. Parameters: key (bytes): The new shared key. key_index (int): The index of the key. Example: ```python key_provider.set_shared_key(b"my_shared_key", key_index=1) ``` """ req = proto_ffi.FfiRequest() req.e2ee.room_handle = self._room_handle req.e2ee.set_shared_key.key_index = key_index req.e2ee.set_shared_key.shared_key = key FfiClient.instance.request(req) def export_shared_key(self, key_index: int) -> bytes: """Exports the shared encryption key. Parameters: key_index (int): The index of the key to export. Returns: bytes: The exported shared key. Example: ```python key = key_provider.export_shared_key(key_index=1) ``` """ req = proto_ffi.FfiRequest() req.e2ee.room_handle = self._room_handle req.e2ee.get_shared_key.key_index = key_index resp = FfiClient.instance.request(req) key = resp.e2ee.get_shared_key.key return key def ratchet_shared_key(self, key_index: int) -> bytes: """Ratchets the shared encryption key to a new key. Parameters: key_index (int): The index of the key to ratchet. Returns: bytes: The new ratcheted shared key. Example: ```python new_key = key_provider.ratchet_shared_key(key_index=1) ``` """ req = proto_ffi.FfiRequest() req.e2ee.room_handle = self._room_handle req.e2ee.ratchet_shared_key.key_index = key_index resp = FfiClient.instance.request(req) new_key = resp.e2ee.ratchet_shared_key.new_key return new_key def set_key(self, participant_identity: str, key: bytes, key_index: int) -> None: """Sets the encryption key for a specific participant. Parameters: participant_identity (str): The identity of the participant. key (bytes): The encryption key to set. key_index (int): The index of the key. Example: ```python key_provider.set_key("participant123", b"participant_key", key_index=2) ``` """ req = proto_ffi.FfiRequest() req.e2ee.room_handle = self._room_handle req.e2ee.set_key.participant_identity = participant_identity req.e2ee.set_key.key_index = key_index req.e2ee.set_key.key = key self.key_index = key_index FfiClient.instance.request(req) def export_key(self, participant_identity: str, key_index: int) -> bytes: """Exports the encryption key for a specific participant. Parameters: participant_identity (str): The identity of the participant. key_index (int): The index of the key to export. Returns: bytes: The exported key. Example: ```python key = key_provider.export_key("participant123", key_index=2) ``` """ req = proto_ffi.FfiRequest() req.e2ee.room_handle = self._room_handle req.e2ee.get_key.participant_identity = participant_identity req.e2ee.get_key.key_index = key_index resp = FfiClient.instance.request(req) key = resp.e2ee.get_key.key return key def ratchet_key(self, participant_identity: str, key_index: int) -> bytes: """Ratchets the encryption key for a specific participant to a new key. Parameters: participant_identity (str): The identity of the participant. key_index (int): The index of the key to ratchet. Returns: bytes: The new ratcheted key. Example: ```python new_key = key_provider.ratchet_key("participant123", key_index=2) ``` """ req = proto_ffi.FfiRequest() req.e2ee.room_handle = self._room_handle req.e2ee.ratchet_key.participant_identity = participant_identity req.e2ee.ratchet_key.key_index = key_index resp = FfiClient.instance.request(req) new_key = resp.e2ee.ratchet_key.new_key return new_key
Instance variables
prop options : KeyProviderOptions
-
Expand source code
@property def options(self) -> KeyProviderOptions: return self._options
Methods
def export_key(self, participant_identity: str, key_index: int) ‑> bytes
-
Expand source code
def export_key(self, participant_identity: str, key_index: int) -> bytes: """Exports the encryption key for a specific participant. Parameters: participant_identity (str): The identity of the participant. key_index (int): The index of the key to export. Returns: bytes: The exported key. Example: ```python key = key_provider.export_key("participant123", key_index=2) ``` """ req = proto_ffi.FfiRequest() req.e2ee.room_handle = self._room_handle req.e2ee.get_key.participant_identity = participant_identity req.e2ee.get_key.key_index = key_index resp = FfiClient.instance.request(req) key = resp.e2ee.get_key.key return key
Exports the encryption key for a specific participant.
Parameters
participant_identity (str): The identity of the participant. key_index (int): The index of the key to export.
Returns
bytes
- The exported key.
Example
key = key_provider.export_key("participant123", key_index=2)
-
Expand source code
def export_shared_key(self, key_index: int) -> bytes: """Exports the shared encryption key. Parameters: key_index (int): The index of the key to export. Returns: bytes: The exported shared key. Example: ```python key = key_provider.export_shared_key(key_index=1) ``` """ req = proto_ffi.FfiRequest() req.e2ee.room_handle = self._room_handle req.e2ee.get_shared_key.key_index = key_index resp = FfiClient.instance.request(req) key = resp.e2ee.get_shared_key.key return key
Exports the shared encryption key.
Parameters
key_index (int): The index of the key to export.
Returns
bytes
- The exported shared key.
Example
key = key_provider.export_shared_key(key_index=1)
def ratchet_key(self, participant_identity: str, key_index: int) ‑> bytes
-
Expand source code
def ratchet_key(self, participant_identity: str, key_index: int) -> bytes: """Ratchets the encryption key for a specific participant to a new key. Parameters: participant_identity (str): The identity of the participant. key_index (int): The index of the key to ratchet. Returns: bytes: The new ratcheted key. Example: ```python new_key = key_provider.ratchet_key("participant123", key_index=2) ``` """ req = proto_ffi.FfiRequest() req.e2ee.room_handle = self._room_handle req.e2ee.ratchet_key.participant_identity = participant_identity req.e2ee.ratchet_key.key_index = key_index resp = FfiClient.instance.request(req) new_key = resp.e2ee.ratchet_key.new_key return new_key
Ratchets the encryption key for a specific participant to a new key.
Parameters
participant_identity (str): The identity of the participant. key_index (int): The index of the key to ratchet.
Returns
bytes
- The new ratcheted key.
Example
new_key = key_provider.ratchet_key("participant123", key_index=2)
-
Expand source code
def ratchet_shared_key(self, key_index: int) -> bytes: """Ratchets the shared encryption key to a new key. Parameters: key_index (int): The index of the key to ratchet. Returns: bytes: The new ratcheted shared key. Example: ```python new_key = key_provider.ratchet_shared_key(key_index=1) ``` """ req = proto_ffi.FfiRequest() req.e2ee.room_handle = self._room_handle req.e2ee.ratchet_shared_key.key_index = key_index resp = FfiClient.instance.request(req) new_key = resp.e2ee.ratchet_shared_key.new_key return new_key
Ratchets the shared encryption key to a new key.
Parameters
key_index (int): The index of the key to ratchet.
Returns
bytes
- The new ratcheted shared key.
Example
new_key = key_provider.ratchet_shared_key(key_index=1)
def set_key(self, participant_identity: str, key: bytes, key_index: int) ‑> None
-
Expand source code
def set_key(self, participant_identity: str, key: bytes, key_index: int) -> None: """Sets the encryption key for a specific participant. Parameters: participant_identity (str): The identity of the participant. key (bytes): The encryption key to set. key_index (int): The index of the key. Example: ```python key_provider.set_key("participant123", b"participant_key", key_index=2) ``` """ req = proto_ffi.FfiRequest() req.e2ee.room_handle = self._room_handle req.e2ee.set_key.participant_identity = participant_identity req.e2ee.set_key.key_index = key_index req.e2ee.set_key.key = key self.key_index = key_index FfiClient.instance.request(req)
Sets the encryption key for a specific participant.
Parameters
participant_identity (str): The identity of the participant. key (bytes): The encryption key to set. key_index (int): The index of the key.
Example
key_provider.set_key("participant123", b"participant_key", key_index=2)
-
Expand source code
def set_shared_key(self, key: bytes, key_index: int) -> None: """Sets the shared encryption key. Parameters: key (bytes): The new shared key. key_index (int): The index of the key. Example: ```python key_provider.set_shared_key(b"my_shared_key", key_index=1) ``` """ req = proto_ffi.FfiRequest() req.e2ee.room_handle = self._room_handle req.e2ee.set_shared_key.key_index = key_index req.e2ee.set_shared_key.shared_key = key FfiClient.instance.request(req)
Sets the shared encryption key.
Parameters
key (bytes): The new shared key. key_index (int): The index of the key.
Example
key_provider.set_shared_key(b"my_shared_key", key_index=1)
class KeyProviderOptions (shared_key: bytes | None = None,
ratchet_salt: bytes = b'LKFrameEncryptionKey',
ratchet_window_size: int = 16,
failure_tolerance: int = -1)-
Expand source code
@dataclass class KeyProviderOptions: shared_key: Optional[bytes] = None ratchet_salt: bytes = DEFAULT_RATCHET_SALT ratchet_window_size: int = DEFAULT_RATCHET_WINDOW_SIZE failure_tolerance: int = DEFAULT_FAILURE_TOLERANCE
KeyProviderOptions(shared_key: Optional[bytes] = None, ratchet_salt: bytes = b'LKFrameEncryptionKey', ratchet_window_size: int = 16, failure_tolerance: int = -1)
Class variables
var failure_tolerance : int
var ratchet_salt : bytes
var ratchet_window_size : int
class LocalAudioTrack (info: track_pb2.OwnedTrack)
-
Expand source code
class LocalAudioTrack(Track): def __init__(self, info: proto_track.OwnedTrack): super().__init__(info) @staticmethod def create_audio_track(name: str, source: "AudioSource") -> "LocalAudioTrack": req = proto_ffi.FfiRequest() req.create_audio_track.name = name req.create_audio_track.source_handle = source._ffi_handle.handle resp = FfiClient.instance.request(req) return LocalAudioTrack(resp.create_audio_track.track) def mute(self): req = proto_ffi.FfiRequest() req.local_track_mute.track_handle = self._ffi_handle.handle req.local_track_mute.mute = True FfiClient.instance.request(req) self._info.muted = True def unmute(self): req = proto_ffi.FfiRequest() req.local_track_mute.track_handle = self._ffi_handle.handle req.local_track_mute.mute = False FfiClient.instance.request(req) self._info.muted = False def __repr__(self) -> str: return f"rtc.LocalAudioTrack(sid={self.sid}, name={self.name})"
Ancestors
Static methods
def create_audio_track(name: str,
source: AudioSource) ‑> LocalAudioTrack-
Expand source code
@staticmethod def create_audio_track(name: str, source: "AudioSource") -> "LocalAudioTrack": req = proto_ffi.FfiRequest() req.create_audio_track.name = name req.create_audio_track.source_handle = source._ffi_handle.handle resp = FfiClient.instance.request(req) return LocalAudioTrack(resp.create_audio_track.track)
Methods
def mute(self)
-
Expand source code
def mute(self): req = proto_ffi.FfiRequest() req.local_track_mute.track_handle = self._ffi_handle.handle req.local_track_mute.mute = True FfiClient.instance.request(req) self._info.muted = True
def unmute(self)
-
Expand source code
def unmute(self): req = proto_ffi.FfiRequest() req.local_track_mute.track_handle = self._ffi_handle.handle req.local_track_mute.mute = False FfiClient.instance.request(req) self._info.muted = False
class LocalParticipant (room_queue: BroadcastQueue[proto_ffi.FfiEvent],
owned_info: proto_participant.OwnedParticipant)-
Expand source code
class LocalParticipant(Participant): """Represents the local participant in a room.""" def __init__( self, room_queue: BroadcastQueue[proto_ffi.FfiEvent], owned_info: proto_participant.OwnedParticipant, ) -> None: super().__init__(owned_info) self._room_queue = room_queue self._track_publications: dict[str, LocalTrackPublication] = {} # type: ignore self._rpc_handlers: Dict[ str, Callable[[RpcInvocationData], Union[Awaitable[str], str]] ] = {} @property def track_publications(self) -> Mapping[str, LocalTrackPublication]: """ A dictionary of track publications associated with the participant. """ return self._track_publications async def publish_data( self, payload: Union[bytes, str], *, reliable: bool = True, destination_identities: List[str] = [], topic: str = "", ) -> None: """ Publish arbitrary data to the room. Args: payload (Union[bytes, str]): The data to publish. reliable (bool, optional): Whether to send reliably or not. Defaults to True. destination_identities (List[str], optional): List of participant identities to send to. Defaults to []. topic (str, optional): The topic under which to publish the data. Defaults to "". Raises: PublishDataError: If there is an error in publishing data. """ if isinstance(payload, str): payload = payload.encode("utf-8") data_len = len(payload) cdata = (ctypes.c_byte * data_len)(*payload) req = proto_ffi.FfiRequest() req.publish_data.local_participant_handle = self._ffi_handle.handle req.publish_data.data_ptr = ctypes.addressof(cdata) req.publish_data.data_len = data_len req.publish_data.reliable = reliable req.publish_data.topic = topic req.publish_data.destination_identities.extend(destination_identities) queue = FfiClient.instance.queue.subscribe() try: resp = FfiClient.instance.request(req) cb: proto_ffi.FfiEvent = await queue.wait_for( lambda e: e.publish_data.async_id == resp.publish_data.async_id ) finally: FfiClient.instance.queue.unsubscribe(queue) if cb.publish_data.error: raise PublishDataError(cb.publish_data.error) async def publish_dtmf(self, *, code: int, digit: str) -> None: """ Publish SIP DTMF message. Args: code (int): DTMF code. digit (str): DTMF digit. Raises: PublishDTMFError: If there is an error in publishing SIP DTMF message. """ req = proto_ffi.FfiRequest() req.publish_sip_dtmf.local_participant_handle = self._ffi_handle.handle req.publish_sip_dtmf.code = code req.publish_sip_dtmf.digit = digit queue = FfiClient.instance.queue.subscribe() try: resp = FfiClient.instance.request(req) cb: proto_ffi.FfiEvent = await queue.wait_for( lambda e: e.publish_sip_dtmf.async_id == resp.publish_sip_dtmf.async_id ) finally: FfiClient.instance.queue.unsubscribe(queue) if cb.publish_sip_dtmf.error: raise PublishDTMFError(cb.publish_sip_dtmf.error) async def publish_transcription(self, transcription: Transcription) -> None: """ Publish transcription data to the room. Args: transcription (Transcription): The transcription data to publish. Raises: PublishTranscriptionError: If there is an error in publishing transcription. """ req = proto_ffi.FfiRequest() proto_segments = [ ProtoTranscriptionSegment( id=s.id, text=s.text, start_time=s.start_time, end_time=s.end_time, final=s.final, language=s.language, ) for s in transcription.segments ] # fmt: off req.publish_transcription.local_participant_handle = self._ffi_handle.handle req.publish_transcription.participant_identity = transcription.participant_identity req.publish_transcription.segments.extend(proto_segments) req.publish_transcription.track_id = transcription.track_sid # fmt: on queue = FfiClient.instance.queue.subscribe() try: resp = FfiClient.instance.request(req) cb: proto_ffi.FfiEvent = await queue.wait_for( lambda e: e.publish_transcription.async_id == resp.publish_transcription.async_id ) finally: FfiClient.instance.queue.unsubscribe(queue) if cb.publish_transcription.error: raise PublishTranscriptionError(cb.publish_transcription.error) async def perform_rpc( self, *, destination_identity: str, method: str, payload: str, response_timeout: Optional[float] = None, ) -> str: """ Initiate an RPC call to a remote participant. Args: destination_identity (str): The `identity` of the destination participant method (str): The method name to call payload (str): The method payload response_timeout (Optional[float]): Timeout for receiving a response after initial connection Returns: str: The response payload Raises: RpcError: On failure. Details in `message`. """ req = proto_ffi.FfiRequest() req.perform_rpc.local_participant_handle = self._ffi_handle.handle req.perform_rpc.destination_identity = destination_identity req.perform_rpc.method = method req.perform_rpc.payload = payload if response_timeout is not None: req.perform_rpc.response_timeout_ms = int(response_timeout * 1000) queue = FfiClient.instance.queue.subscribe() try: resp = FfiClient.instance.request(req) cb = await queue.wait_for( lambda e: (e.perform_rpc.async_id == resp.perform_rpc.async_id) ) finally: FfiClient.instance.queue.unsubscribe(queue) if cb.perform_rpc.HasField("error"): raise RpcError._from_proto(cb.perform_rpc.error) return cb.perform_rpc.payload def register_rpc_method( self, method_name: str, handler: Optional[ Callable[[RpcInvocationData], Union[Awaitable[str], str]] ] = None, ) -> Union[None, Callable]: """ Establishes the participant as a receiver for calls of the specified RPC method. Can be used either as a decorator or a regular method. The handler will receive one argument of type `RpcInvocationData` and should return a string response which will be forwarded back to the caller. The handler may be synchronous or asynchronous. If unable to respond within `response_timeout`, the caller will hang up and receive an error on their side. You may raise errors of type `RpcError` in the handler, and they will be forwarded to the caller. Other errors raised in your handler will be caught and forwarded to the caller as "1500 Application Error". Args: method_name (str): The name of the indicated RPC method. handler (Optional[Callable]): Handler to be invoked whenever an RPC request for this method is received. Omit this argument to use the decorator syntax. Returns: None (when used as a decorator it returns the decorator function) Example: # As a decorator: @room.local_participant.register_rpc_method("greet") async def greet_handler(data: RpcInvocationData) -> str: print(f"Received greeting from {data.caller_identity}: {data.payload}") return f"Hello, {data.caller_identity}!" # As a regular method: async def greet_handler(data: RpcInvocationData) -> str: print(f"Received greeting from {data.caller_identity}: {data.payload}") return f"Hello, {data.caller_identity}!" room.local_participant.register_rpc_method('greet', greet_handler) """ def register(handler_func): self._rpc_handlers[method_name] = handler_func req = proto_ffi.FfiRequest() req.register_rpc_method.local_participant_handle = self._ffi_handle.handle req.register_rpc_method.method = method_name FfiClient.instance.request(req) if handler is not None: register(handler) return None else: # Called as a decorator return register def unregister_rpc_method(self, method: str) -> None: """ Unregisters a previously registered RPC method. Args: method (str): The name of the RPC method to unregister """ self._rpc_handlers.pop(method, None) req = proto_ffi.FfiRequest() req.unregister_rpc_method.local_participant_handle = self._ffi_handle.handle req.unregister_rpc_method.method = method FfiClient.instance.request(req) async def _handle_rpc_method_invocation( self, invocation_id: int, method: str, request_id: str, caller_identity: str, payload: str, response_timeout: float, ) -> None: response_error: Optional[RpcError] = None response_payload: Optional[str] = None params = RpcInvocationData( request_id, caller_identity, payload, response_timeout ) handler = self._rpc_handlers.get(method) if not handler: response_error = RpcError._built_in(RpcError.ErrorCode.UNSUPPORTED_METHOD) else: try: if asyncio.iscoroutinefunction(handler): async_handler = cast( Callable[[RpcInvocationData], Awaitable[str]], handler ) async def run_handler(): try: return await async_handler(params) except asyncio.CancelledError: # This will be caught by the outer try-except if it's due to timeout raise try: response_payload = await asyncio.wait_for( run_handler(), timeout=response_timeout ) except asyncio.TimeoutError: raise RpcError._built_in(RpcError.ErrorCode.RESPONSE_TIMEOUT) except asyncio.CancelledError: raise RpcError._built_in( RpcError.ErrorCode.RECIPIENT_DISCONNECTED ) else: sync_handler = cast(Callable[[RpcInvocationData], str], handler) response_payload = sync_handler(params) except RpcError as error: response_error = error except Exception as error: logger.exception( f"Uncaught error returned by RPC handler for {method}. Returning APPLICATION_ERROR instead. Original error: {error}", ) response_error = RpcError._built_in( RpcError.ErrorCode.APPLICATION_ERROR ) req = proto_ffi.FfiRequest( rpc_method_invocation_response=RpcMethodInvocationResponseRequest( local_participant_handle=self._ffi_handle.handle, invocation_id=invocation_id, error=response_error._to_proto() if response_error else None, payload=response_payload, ) ) res = FfiClient.instance.request(req) if res.rpc_method_invocation_response.error: logger.exception( f"error sending rpc method invocation response: {res.rpc_method_invocation_response.error}" ) async def set_metadata(self, metadata: str) -> None: """ Set the metadata for the local participant. Note: this requires `canUpdateOwnMetadata` permission. Args: metadata (str): The new metadata. """ req = proto_ffi.FfiRequest() req.set_local_metadata.local_participant_handle = self._ffi_handle.handle req.set_local_metadata.metadata = metadata queue = FfiClient.instance.queue.subscribe() try: resp = FfiClient.instance.request(req) await queue.wait_for( lambda e: e.set_local_metadata.async_id == resp.set_local_metadata.async_id ) finally: FfiClient.instance.queue.unsubscribe(queue) async def set_name(self, name: str) -> None: """ Set the name for the local participant. Note: this requires `canUpdateOwnMetadata` permission. Args: name (str): The new name. """ req = proto_ffi.FfiRequest() req.set_local_name.local_participant_handle = self._ffi_handle.handle req.set_local_name.name = name queue = FfiClient.instance.queue.subscribe() try: resp = FfiClient.instance.request(req) await queue.wait_for( lambda e: e.set_local_name.async_id == resp.set_local_name.async_id ) finally: FfiClient.instance.queue.unsubscribe(queue) async def set_attributes(self, attributes: dict[str, str]) -> None: """ Set custom attributes for the local participant. Note: this requires `canUpdateOwnMetadata` permission. Args: attributes (dict[str, str]): A dictionary of attributes to set. """ req = proto_ffi.FfiRequest() req.set_local_attributes.local_participant_handle = self._ffi_handle.handle existing_attributes = { entry.key: entry.value for entry in req.set_local_attributes.attributes } existing_attributes.update(attributes) for key, value in existing_attributes.items(): entry = req.set_local_attributes.attributes.add() entry.key = key entry.value = value queue = FfiClient.instance.queue.subscribe() try: resp = FfiClient.instance.request(req) await queue.wait_for( lambda e: e.set_local_attributes.async_id == resp.set_local_attributes.async_id ) finally: FfiClient.instance.queue.unsubscribe(queue) async def publish_track( self, track: LocalTrack, options: TrackPublishOptions = TrackPublishOptions() ) -> LocalTrackPublication: """ Publish a local track to the room. Args: track (LocalTrack): The track to publish. options (TrackPublishOptions, optional): Options for publishing the track. Returns: LocalTrackPublication: The publication of the published track. Raises: PublishTrackError: If there is an error in publishing the track. """ req = proto_ffi.FfiRequest() req.publish_track.track_handle = track._ffi_handle.handle req.publish_track.local_participant_handle = self._ffi_handle.handle req.publish_track.options.CopyFrom(options) queue = self._room_queue.subscribe() try: resp = FfiClient.instance.request(req) cb: proto_ffi.FfiEvent = await queue.wait_for( lambda e: e.publish_track.async_id == resp.publish_track.async_id ) if cb.publish_track.error: raise PublishTrackError(cb.publish_track.error) track_publication = LocalTrackPublication(cb.publish_track.publication) track_publication.track = track track._info.sid = track_publication.sid self._track_publications[track_publication.sid] = track_publication queue.task_done() return track_publication finally: self._room_queue.unsubscribe(queue) async def unpublish_track(self, track_sid: str) -> None: """ Unpublish a track from the room. Args: track_sid (str): The SID of the track to unpublish. Raises: UnpublishTrackError: If there is an error in unpublishing the track. """ req = proto_ffi.FfiRequest() req.unpublish_track.local_participant_handle = self._ffi_handle.handle req.unpublish_track.track_sid = track_sid req.unpublish_track.stop_on_unpublish = True queue = self._room_queue.subscribe() try: resp = FfiClient.instance.request(req) cb: proto_ffi.FfiEvent = await queue.wait_for( lambda e: e.unpublish_track.async_id == resp.unpublish_track.async_id ) if cb.unpublish_track.error: raise UnpublishTrackError(cb.unpublish_track.error) publication = self._track_publications.pop(track_sid) publication.track = None queue.task_done() finally: self._room_queue.unsubscribe(queue) def __repr__(self) -> str: return f"rtc.LocalParticipant(sid={self.sid}, identity={self.identity}, name={self.name})"
Represents the local participant in a room.
Ancestors
- Participant
- abc.ABC
Methods
async def perform_rpc(self,
*,
destination_identity: str,
method: str,
payload: str,
response_timeout: Optional[float] = None) ‑> str-
Expand source code
async def perform_rpc( self, *, destination_identity: str, method: str, payload: str, response_timeout: Optional[float] = None, ) -> str: """ Initiate an RPC call to a remote participant. Args: destination_identity (str): The `identity` of the destination participant method (str): The method name to call payload (str): The method payload response_timeout (Optional[float]): Timeout for receiving a response after initial connection Returns: str: The response payload Raises: RpcError: On failure. Details in `message`. """ req = proto_ffi.FfiRequest() req.perform_rpc.local_participant_handle = self._ffi_handle.handle req.perform_rpc.destination_identity = destination_identity req.perform_rpc.method = method req.perform_rpc.payload = payload if response_timeout is not None: req.perform_rpc.response_timeout_ms = int(response_timeout * 1000) queue = FfiClient.instance.queue.subscribe() try: resp = FfiClient.instance.request(req) cb = await queue.wait_for( lambda e: (e.perform_rpc.async_id == resp.perform_rpc.async_id) ) finally: FfiClient.instance.queue.unsubscribe(queue) if cb.perform_rpc.HasField("error"): raise RpcError._from_proto(cb.perform_rpc.error) return cb.perform_rpc.payload
Initiate an RPC call to a remote participant.
Args
destination_identity
:str
- The
identity
of the destination participant method
:str
- The method name to call
payload
:str
- The method payload
response_timeout
:Optional[float]
- Timeout for receiving a response after initial connection
Returns
str
- The response payload
Raises
RpcError
- On failure. Details in
message
.
async def publish_data(self,
payload: Union[bytes, str],
*,
reliable: bool = True,
destination_identities: List[str] = [],
topic: str = '') ‑> None-
Expand source code
async def publish_data( self, payload: Union[bytes, str], *, reliable: bool = True, destination_identities: List[str] = [], topic: str = "", ) -> None: """ Publish arbitrary data to the room. Args: payload (Union[bytes, str]): The data to publish. reliable (bool, optional): Whether to send reliably or not. Defaults to True. destination_identities (List[str], optional): List of participant identities to send to. Defaults to []. topic (str, optional): The topic under which to publish the data. Defaults to "". Raises: PublishDataError: If there is an error in publishing data. """ if isinstance(payload, str): payload = payload.encode("utf-8") data_len = len(payload) cdata = (ctypes.c_byte * data_len)(*payload) req = proto_ffi.FfiRequest() req.publish_data.local_participant_handle = self._ffi_handle.handle req.publish_data.data_ptr = ctypes.addressof(cdata) req.publish_data.data_len = data_len req.publish_data.reliable = reliable req.publish_data.topic = topic req.publish_data.destination_identities.extend(destination_identities) queue = FfiClient.instance.queue.subscribe() try: resp = FfiClient.instance.request(req) cb: proto_ffi.FfiEvent = await queue.wait_for( lambda e: e.publish_data.async_id == resp.publish_data.async_id ) finally: FfiClient.instance.queue.unsubscribe(queue) if cb.publish_data.error: raise PublishDataError(cb.publish_data.error)
Publish arbitrary data to the room.
Args
payload
:Union[bytes, str]
- The data to publish.
reliable
:bool
, optional- Whether to send reliably or not. Defaults to True.
destination_identities
:List[str]
, optional- List of participant identities to send to. Defaults to [].
topic
:str
, optional- The topic under which to publish the data. Defaults to "".
Raises
PublishDataError
- If there is an error in publishing data.
async def publish_dtmf(self, *, code: int, digit: str) ‑> None
-
Expand source code
async def publish_dtmf(self, *, code: int, digit: str) -> None: """ Publish SIP DTMF message. Args: code (int): DTMF code. digit (str): DTMF digit. Raises: PublishDTMFError: If there is an error in publishing SIP DTMF message. """ req = proto_ffi.FfiRequest() req.publish_sip_dtmf.local_participant_handle = self._ffi_handle.handle req.publish_sip_dtmf.code = code req.publish_sip_dtmf.digit = digit queue = FfiClient.instance.queue.subscribe() try: resp = FfiClient.instance.request(req) cb: proto_ffi.FfiEvent = await queue.wait_for( lambda e: e.publish_sip_dtmf.async_id == resp.publish_sip_dtmf.async_id ) finally: FfiClient.instance.queue.unsubscribe(queue) if cb.publish_sip_dtmf.error: raise PublishDTMFError(cb.publish_sip_dtmf.error)
Publish SIP DTMF message.
Args
code
:int
- DTMF code.
digit
:str
- DTMF digit.
Raises
PublishDTMFError
- If there is an error in publishing SIP DTMF message.
async def publish_track(self,
track: LocalTrack,
options: TrackPublishOptions = ) ‑> LocalTrackPublication-
Expand source code
async def publish_track( self, track: LocalTrack, options: TrackPublishOptions = TrackPublishOptions() ) -> LocalTrackPublication: """ Publish a local track to the room. Args: track (LocalTrack): The track to publish. options (TrackPublishOptions, optional): Options for publishing the track. Returns: LocalTrackPublication: The publication of the published track. Raises: PublishTrackError: If there is an error in publishing the track. """ req = proto_ffi.FfiRequest() req.publish_track.track_handle = track._ffi_handle.handle req.publish_track.local_participant_handle = self._ffi_handle.handle req.publish_track.options.CopyFrom(options) queue = self._room_queue.subscribe() try: resp = FfiClient.instance.request(req) cb: proto_ffi.FfiEvent = await queue.wait_for( lambda e: e.publish_track.async_id == resp.publish_track.async_id ) if cb.publish_track.error: raise PublishTrackError(cb.publish_track.error) track_publication = LocalTrackPublication(cb.publish_track.publication) track_publication.track = track track._info.sid = track_publication.sid self._track_publications[track_publication.sid] = track_publication queue.task_done() return track_publication finally: self._room_queue.unsubscribe(queue)
Publish a local track to the room.
Args
track
:LocalTrack
- The track to publish.
options
:TrackPublishOptions
, optional- Options for publishing the track.
Returns
LocalTrackPublication
- The publication of the published track.
Raises
PublishTrackError
- If there is an error in publishing the track.
async def publish_transcription(self,
transcription: Transcription) ‑> None-
Expand source code
async def publish_transcription(self, transcription: Transcription) -> None: """ Publish transcription data to the room. Args: transcription (Transcription): The transcription data to publish. Raises: PublishTranscriptionError: If there is an error in publishing transcription. """ req = proto_ffi.FfiRequest() proto_segments = [ ProtoTranscriptionSegment( id=s.id, text=s.text, start_time=s.start_time, end_time=s.end_time, final=s.final, language=s.language, ) for s in transcription.segments ] # fmt: off req.publish_transcription.local_participant_handle = self._ffi_handle.handle req.publish_transcription.participant_identity = transcription.participant_identity req.publish_transcription.segments.extend(proto_segments) req.publish_transcription.track_id = transcription.track_sid # fmt: on queue = FfiClient.instance.queue.subscribe() try: resp = FfiClient.instance.request(req) cb: proto_ffi.FfiEvent = await queue.wait_for( lambda e: e.publish_transcription.async_id == resp.publish_transcription.async_id ) finally: FfiClient.instance.queue.unsubscribe(queue) if cb.publish_transcription.error: raise PublishTranscriptionError(cb.publish_transcription.error)
Publish transcription data to the room.
Args
transcription
:Transcription
- The transcription data to publish.
Raises
PublishTranscriptionError
- If there is an error in publishing transcription.
def register_rpc_method(self,
method_name: str,
handler: Optional[Callable[[RpcInvocationData], Union[Awaitable[str], str]]] = None) ‑> Callable | None-
Expand source code
def register_rpc_method( self, method_name: str, handler: Optional[ Callable[[RpcInvocationData], Union[Awaitable[str], str]] ] = None, ) -> Union[None, Callable]: """ Establishes the participant as a receiver for calls of the specified RPC method. Can be used either as a decorator or a regular method. The handler will receive one argument of type `RpcInvocationData` and should return a string response which will be forwarded back to the caller. The handler may be synchronous or asynchronous. If unable to respond within `response_timeout`, the caller will hang up and receive an error on their side. You may raise errors of type `RpcError` in the handler, and they will be forwarded to the caller. Other errors raised in your handler will be caught and forwarded to the caller as "1500 Application Error". Args: method_name (str): The name of the indicated RPC method. handler (Optional[Callable]): Handler to be invoked whenever an RPC request for this method is received. Omit this argument to use the decorator syntax. Returns: None (when used as a decorator it returns the decorator function) Example: # As a decorator: @room.local_participant.register_rpc_method("greet") async def greet_handler(data: RpcInvocationData) -> str: print(f"Received greeting from {data.caller_identity}: {data.payload}") return f"Hello, {data.caller_identity}!" # As a regular method: async def greet_handler(data: RpcInvocationData) -> str: print(f"Received greeting from {data.caller_identity}: {data.payload}") return f"Hello, {data.caller_identity}!" room.local_participant.register_rpc_method('greet', greet_handler) """ def register(handler_func): self._rpc_handlers[method_name] = handler_func req = proto_ffi.FfiRequest() req.register_rpc_method.local_participant_handle = self._ffi_handle.handle req.register_rpc_method.method = method_name FfiClient.instance.request(req) if handler is not None: register(handler) return None else: # Called as a decorator return register
Establishes the participant as a receiver for calls of the specified RPC method. Can be used either as a decorator or a regular method.
The handler will receive one argument of type
RpcInvocationData
and should return a string response which will be forwarded back to the caller.The handler may be synchronous or asynchronous.
If unable to respond within
response_timeout
, the caller will hang up and receive an error on their side.You may raise errors of type
RpcError
in the handler, and they will be forwarded to the caller.Other errors raised in your handler will be caught and forwarded to the caller as "1500 Application Error".
Args
method_name
:str
- The name of the indicated RPC method.
handler
:Optional[Callable]
- Handler to be invoked whenever an RPC request for this method is received. Omit this argument to use the decorator syntax.
Returns
None (when used as a decorator it returns the decorator function)
Example
As a decorator:
@room.local_participant.register_rpc_method("greet") async def greet_handler(data: RpcInvocationData) -> str: print(f"Received greeting from {data.caller_identity}: {data.payload}") return f"Hello, {data.caller_identity}!"
As a regular method:
async def greet_handler(data: RpcInvocationData) -> str: print(f"Received greeting from {data.caller_identity}: {data.payload}") return f"Hello, {data.caller_identity}!"
room.local_participant.register_rpc_method('greet', greet_handler)
async def set_attributes(self, attributes: dict[str, str]) ‑> None
-
Expand source code
async def set_attributes(self, attributes: dict[str, str]) -> None: """ Set custom attributes for the local participant. Note: this requires `canUpdateOwnMetadata` permission. Args: attributes (dict[str, str]): A dictionary of attributes to set. """ req = proto_ffi.FfiRequest() req.set_local_attributes.local_participant_handle = self._ffi_handle.handle existing_attributes = { entry.key: entry.value for entry in req.set_local_attributes.attributes } existing_attributes.update(attributes) for key, value in existing_attributes.items(): entry = req.set_local_attributes.attributes.add() entry.key = key entry.value = value queue = FfiClient.instance.queue.subscribe() try: resp = FfiClient.instance.request(req) await queue.wait_for( lambda e: e.set_local_attributes.async_id == resp.set_local_attributes.async_id ) finally: FfiClient.instance.queue.unsubscribe(queue)
Set custom attributes for the local participant.
Note: this requires
canUpdateOwnMetadata
permission.Args
attributes
:dict[str, str]
- A dictionary of attributes to set.
async def set_metadata(self, metadata: str) ‑> None
-
Expand source code
async def set_metadata(self, metadata: str) -> None: """ Set the metadata for the local participant. Note: this requires `canUpdateOwnMetadata` permission. Args: metadata (str): The new metadata. """ req = proto_ffi.FfiRequest() req.set_local_metadata.local_participant_handle = self._ffi_handle.handle req.set_local_metadata.metadata = metadata queue = FfiClient.instance.queue.subscribe() try: resp = FfiClient.instance.request(req) await queue.wait_for( lambda e: e.set_local_metadata.async_id == resp.set_local_metadata.async_id ) finally: FfiClient.instance.queue.unsubscribe(queue)
Set the metadata for the local participant.
Note: this requires
canUpdateOwnMetadata
permission.Args
metadata
:str
- The new metadata.
async def set_name(self, name: str) ‑> None
-
Expand source code
async def set_name(self, name: str) -> None: """ Set the name for the local participant. Note: this requires `canUpdateOwnMetadata` permission. Args: name (str): The new name. """ req = proto_ffi.FfiRequest() req.set_local_name.local_participant_handle = self._ffi_handle.handle req.set_local_name.name = name queue = FfiClient.instance.queue.subscribe() try: resp = FfiClient.instance.request(req) await queue.wait_for( lambda e: e.set_local_name.async_id == resp.set_local_name.async_id ) finally: FfiClient.instance.queue.unsubscribe(queue)
Set the name for the local participant.
Note: this requires
canUpdateOwnMetadata
permission.Args
name
:str
- The new name.
async def unpublish_track(self, track_sid: str) ‑> None
-
Expand source code
async def unpublish_track(self, track_sid: str) -> None: """ Unpublish a track from the room. Args: track_sid (str): The SID of the track to unpublish. Raises: UnpublishTrackError: If there is an error in unpublishing the track. """ req = proto_ffi.FfiRequest() req.unpublish_track.local_participant_handle = self._ffi_handle.handle req.unpublish_track.track_sid = track_sid req.unpublish_track.stop_on_unpublish = True queue = self._room_queue.subscribe() try: resp = FfiClient.instance.request(req) cb: proto_ffi.FfiEvent = await queue.wait_for( lambda e: e.unpublish_track.async_id == resp.unpublish_track.async_id ) if cb.unpublish_track.error: raise UnpublishTrackError(cb.unpublish_track.error) publication = self._track_publications.pop(track_sid) publication.track = None queue.task_done() finally: self._room_queue.unsubscribe(queue)
Unpublish a track from the room.
Args
track_sid
:str
- The SID of the track to unpublish.
Raises
UnpublishTrackError
- If there is an error in unpublishing the track.
def unregister_rpc_method(self, method: str) ‑> None
-
Expand source code
def unregister_rpc_method(self, method: str) -> None: """ Unregisters a previously registered RPC method. Args: method (str): The name of the RPC method to unregister """ self._rpc_handlers.pop(method, None) req = proto_ffi.FfiRequest() req.unregister_rpc_method.local_participant_handle = self._ffi_handle.handle req.unregister_rpc_method.method = method FfiClient.instance.request(req)
Unregisters a previously registered RPC method.
Args
method
:str
- The name of the RPC method to unregister
Inherited members
class LocalTrackPublication (owned_info: track_pb2.OwnedTrackPublication)
-
Expand source code
class LocalTrackPublication(TrackPublication): def __init__(self, owned_info: proto_track.OwnedTrackPublication): super().__init__(owned_info) self._first_subscription: asyncio.Future[None] = asyncio.Future() async def wait_for_subscription(self) -> None: await asyncio.shield(self._first_subscription) def __repr__(self) -> str: return f"rtc.LocalTrackPublication(sid={self.sid}, name={self.name}, kind={self.kind}, source={self.source})"
Ancestors
Methods
async def wait_for_subscription(self) ‑> None
-
Expand source code
async def wait_for_subscription(self) -> None: await asyncio.shield(self._first_subscription)
class LocalVideoTrack (info: track_pb2.OwnedTrack)
-
Expand source code
class LocalVideoTrack(Track): def __init__(self, info: proto_track.OwnedTrack): super().__init__(info) @staticmethod def create_video_track(name: str, source: "VideoSource") -> "LocalVideoTrack": req = proto_ffi.FfiRequest() req.create_video_track.name = name req.create_video_track.source_handle = source._ffi_handle.handle resp = FfiClient.instance.request(req) return LocalVideoTrack(resp.create_video_track.track) def mute(self): req = proto_ffi.FfiRequest() req.local_track_mute.track_handle = self._ffi_handle.handle req.local_track_mute.mute = True FfiClient.instance.request(req) self._info.muted = True def unmute(self): req = proto_ffi.FfiRequest() req.local_track_mute.track_handle = self._ffi_handle.handle req.local_track_mute.mute = False FfiClient.instance.request(req) self._info.muted = False def __repr__(self) -> str: return f"rtc.LocalVideoTrack(sid={self.sid}, name={self.name})"
Ancestors
Static methods
def create_video_track(name: str,
source: VideoSource) ‑> LocalVideoTrack-
Expand source code
@staticmethod def create_video_track(name: str, source: "VideoSource") -> "LocalVideoTrack": req = proto_ffi.FfiRequest() req.create_video_track.name = name req.create_video_track.source_handle = source._ffi_handle.handle resp = FfiClient.instance.request(req) return LocalVideoTrack(resp.create_video_track.track)
Methods
def mute(self)
-
Expand source code
def mute(self): req = proto_ffi.FfiRequest() req.local_track_mute.track_handle = self._ffi_handle.handle req.local_track_mute.mute = True FfiClient.instance.request(req) self._info.muted = True
def unmute(self)
-
Expand source code
def unmute(self): req = proto_ffi.FfiRequest() req.local_track_mute.track_handle = self._ffi_handle.handle req.local_track_mute.mute = False FfiClient.instance.request(req) self._info.muted = False
class Participant (owned_info: proto_participant.OwnedParticipant)
-
Expand source code
class Participant(ABC): def __init__(self, owned_info: proto_participant.OwnedParticipant) -> None: self._info = owned_info.info self._ffi_handle = FfiHandle(owned_info.handle.id) @property @abstractmethod def track_publications(self) -> Mapping[str, TrackPublication]: """ A dictionary of track publications associated with the participant. """ ... @property def sid(self) -> str: return self._info.sid @property def name(self) -> str: return self._info.name @property def identity(self) -> str: return self._info.identity @property def metadata(self) -> str: return self._info.metadata @property def attributes(self) -> dict[str, str]: """Custom attributes associated with the participant.""" return dict(self._info.attributes) @property def kind(self) -> proto_participant.ParticipantKind.ValueType: """Participant's kind (e.g., regular participant, ingress, egress, sip, agent).""" return self._info.kind
Helper class that provides a standard way to create an ABC using inheritance.
Ancestors
- abc.ABC
Subclasses
Instance variables
prop attributes : dict[str, str]
-
Expand source code
@property def attributes(self) -> dict[str, str]: """Custom attributes associated with the participant.""" return dict(self._info.attributes)
Custom attributes associated with the participant.
prop identity : str
-
Expand source code
@property def identity(self) -> str: return self._info.identity
prop kind : proto_participant.ParticipantKind.ValueType
-
Expand source code
@property def kind(self) -> proto_participant.ParticipantKind.ValueType: """Participant's kind (e.g., regular participant, ingress, egress, sip, agent).""" return self._info.kind
Participant's kind (e.g., regular participant, ingress, egress, sip, agent).
prop metadata : str
-
Expand source code
@property def metadata(self) -> str: return self._info.metadata
prop name : str
-
Expand source code
@property def name(self) -> str: return self._info.name
prop sid : str
-
Expand source code
@property def sid(self) -> str: return self._info.sid
prop track_publications : Mapping[str, TrackPublication]
-
Expand source code
@property @abstractmethod def track_publications(self) -> Mapping[str, TrackPublication]: """ A dictionary of track publications associated with the participant. """ ...
A dictionary of track publications associated with the participant.
class RemoteAudioTrack (info: track_pb2.OwnedTrack)
-
Expand source code
class RemoteAudioTrack(Track): def __init__(self, info: proto_track.OwnedTrack): super().__init__(info) def __repr__(self) -> str: return f"rtc.RemoteAudioTrack(sid={self.sid}, name={self.name})"
Ancestors
class RemoteParticipant (owned_info: proto_participant.OwnedParticipant)
-
Expand source code
class RemoteParticipant(Participant): def __init__(self, owned_info: proto_participant.OwnedParticipant) -> None: super().__init__(owned_info) self._track_publications: dict[str, RemoteTrackPublication] = {} # type: ignore @property def track_publications(self) -> Mapping[str, RemoteTrackPublication]: """ A dictionary of track publications associated with the participant. """ return self._track_publications def __repr__(self) -> str: return f"rtc.RemoteParticipant(sid={self.sid}, identity={self.identity}, name={self.name})"
Helper class that provides a standard way to create an ABC using inheritance.
Ancestors
- Participant
- abc.ABC
Inherited members
class RemoteTrackPublication (owned_info: track_pb2.OwnedTrackPublication)
-
Expand source code
class RemoteTrackPublication(TrackPublication): def __init__(self, owned_info: proto_track.OwnedTrackPublication): super().__init__(owned_info) self.subscribed = False def set_subscribed(self, subscribed: bool): req = proto_ffi.FfiRequest() req.set_subscribed.subscribe = subscribed req.set_subscribed.publication_handle = self._ffi_handle.handle FfiClient.instance.request(req) def __repr__(self) -> str: return f"rtc.RemoteTrackPublication(sid={self.sid}, name={self.name}, kind={self.kind}, source={self.source})"
Ancestors
Methods
def set_subscribed(self, subscribed: bool)
-
Expand source code
def set_subscribed(self, subscribed: bool): req = proto_ffi.FfiRequest() req.set_subscribed.subscribe = subscribed req.set_subscribed.publication_handle = self._ffi_handle.handle FfiClient.instance.request(req)
class RemoteVideoTrack (info: track_pb2.OwnedTrack)
-
Expand source code
class RemoteVideoTrack(Track): def __init__(self, info: proto_track.OwnedTrack): super().__init__(info) def __repr__(self) -> str: return f"rtc.RemoteVideoTrack(sid={self.sid}, name={self.name})"
Ancestors
class Room (loop: Optional[asyncio.AbstractEventLoop] = None)
-
Expand source code
class Room(EventEmitter[EventTypes]): def __init__(self, loop: Optional[asyncio.AbstractEventLoop] = None) -> None: """Initializes a new Room instance. Parameters: loop (Optional[asyncio.AbstractEventLoop]): The event loop to use. If not provided, the default event loop is used. """ super().__init__() self._ffi_handle: Optional[FfiHandle] = None self._loop = loop or asyncio.get_event_loop() self._room_queue = BroadcastQueue[proto_ffi.FfiEvent]() self._info = proto_room.RoomInfo() self._rpc_invocation_tasks: set[asyncio.Task] = set() self._remote_participants: Dict[str, RemoteParticipant] = {} self._connection_state = ConnectionState.CONN_DISCONNECTED self._first_sid_future = asyncio.Future[str]() self._local_participant: LocalParticipant | None = None def __del__(self) -> None: if self._ffi_handle is not None: FfiClient.instance.queue.unsubscribe(self._ffi_queue) @property async def sid(self) -> str: """Asynchronously retrieves the session ID (SID) of the room. Returns: str: The session ID of the room. """ if self._info.sid: return self._info.sid return await self._first_sid_future @property def local_participant(self) -> LocalParticipant: """Gets the local participant in the room. Returns: LocalParticipant: The local participant in the room. """ if self._local_participant is None: raise Exception("cannot access local participant before connecting") return self._local_participant @property def connection_state(self) -> ConnectionState.ValueType: """Gets the connection state of the room. Returns: ConnectionState: The connection state of the room. """ return self._connection_state @property def remote_participants(self) -> Mapping[str, RemoteParticipant]: """Gets the remote participants in the room. Returns: dict[str, RemoteParticipant]: A dictionary of remote participants indexed by their identity. """ return self._remote_participants @property def name(self) -> str: """Gets the name of the room. Returns: str: The name of the room. """ return self._info.name @property def metadata(self) -> str: """Gets the metadata associated with the room. Returns: str: The metadata of the room. """ return self._info.metadata @property def e2ee_manager(self) -> E2EEManager: """Gets the end-to-end encryption (E2EE) manager for the room. Returns: E2EEManager: The E2EE manager instance. """ return self._e2ee_manager def isconnected(self) -> bool: """Checks if the room is currently connected. Returns: bool: True if connected, False otherwise. """ return ( self._ffi_handle is not None and self._connection_state != ConnectionState.CONN_DISCONNECTED ) def on(self, event: EventTypes, callback: Optional[Callable] = None) -> Callable: """Registers an event handler for a specific event type. Parameters: event (EventTypes): The name of the event to listen for. callback (Callable): The function to call when the event occurs. Returns: Callable: The registered callback function. Available events: - **"participant_connected"**: Called when a new participant joins the room. - Arguments: `participant` (RemoteParticipant) - **"participant_disconnected"**: Called when a participant leaves the room. - Arguments: `participant` (RemoteParticipant) - **"local_track_published"**: Called when a local track is published. - Arguments: `publication` (LocalTrackPublication), `track` (Track) - **"local_track_unpublished"**: Called when a local track is unpublished. - Arguments: `publication` (LocalTrackPublication) - **"local_track_subscribed"**: Called when a local track is subscribed. - Arguments: `track` (Track) - **"track_published"**: Called when a remote participant publishes a track. - Arguments: `publication` (RemoteTrackPublication), `participant` (RemoteParticipant) - **"track_unpublished"**: Called when a remote participant unpublishes a track. - Arguments: `publication` (RemoteTrackPublication), `participant` (RemoteParticipant) - **"track_subscribed"**: Called when a track is subscribed. - Arguments: `track` (Track), `publication` (RemoteTrackPublication), `participant` (RemoteParticipant) - **"track_unsubscribed"**: Called when a track is unsubscribed. - Arguments: `track` (Track), `publication` (RemoteTrackPublication), `participant` (RemoteParticipant) - **"track_subscription_failed"**: Called when a track subscription fails. - Arguments: `participant` (RemoteParticipant), `track_sid` (str), `error` (str) - **"track_muted"**: Called when a track is muted. - Arguments: `participant` (Participant), `publication` (TrackPublication) - **"track_unmuted"**: Called when a track is unmuted. - Arguments: `participant` (Participant), `publication` (TrackPublication) - **"active_speakers_changed"**: Called when the list of active speakers changes. - Arguments: `speakers` (list[Participant]) - **"room_metadata_changed"**: Called when the room's metadata is updated. - Arguments: `old_metadata` (str), `new_metadata` (str) - **"participant_metadata_changed"**: Called when a participant's metadata is updated. - Arguments: `participant` (Participant), `old_metadata` (str), `new_metadata` (str) - **"participant_name_changed"**: Called when a participant's name is changed. - Arguments: `participant` (Participant), `old_name` (str), `new_name` (str) - **"participant_attributes_changed"**: Called when a participant's attributes change. - Arguments: `changed_attributes` (dict), `participant` (Participant) - **"connection_quality_changed"**: Called when a participant's connection quality changes. - Arguments: `participant` (Participant), `quality` (ConnectionQuality) - **"transcription_received"**: Called when a transcription is received. - Arguments: `segments` (list[TranscriptionSegment]), `participant` (Participant), `publication` (TrackPublication) - **"data_received"**: Called when data is received. - Arguments: `data_packet` (DataPacket) - **"sip_dtmf_received"**: Called when a SIP DTMF signal is received. - Arguments: `sip_dtmf` (SipDTMF) - **"e2ee_state_changed"**: Called when a participant's E2EE state changes. - Arguments: `participant` (Participant), `state` (EncryptionState) - **"connection_state_changed"**: Called when the room's connection state changes. - Arguments: `connection_state` (ConnectionState) - **"connected"**: Called when the room is successfully connected. - Arguments: None - **"disconnected"**: Called when the room is disconnected. - Arguments: `reason` (DisconnectReason) - **"reconnecting"**: Called when the room is attempting to reconnect. - Arguments: None - **"reconnected"**: Called when the room has successfully reconnected. - Arguments: None Example: ```python def on_participant_connected(participant): print(f"Participant connected: {participant.identity}") room.on("participant_connected", on_participant_connected) ``` """ return super().on(event, callback) async def connect( self, url: str, token: str, options: RoomOptions = RoomOptions() ) -> None: """Connects to a LiveKit room using the specified URL and token. Parameters: url (str): The WebSocket URL of the LiveKit server to connect to. token (str): The access token for authentication and authorization. options (RoomOptions, optional): Additional options for the room connection. Raises: ConnectError: If the connection fails. Example: ```python room = Room() # Listen for events before connecting to the room @room.on("participant_connected") def on_participant_connected(participant): print(f"Participant connected: {participant.identity}") await room.connect("ws://localhost:7880", "your_token") ``` """ req = proto_ffi.FfiRequest() req.connect.url = url req.connect.token = token # options req.connect.options.auto_subscribe = options.auto_subscribe req.connect.options.dynacast = options.dynacast if options.e2ee: req.connect.options.e2ee.encryption_type = options.e2ee.encryption_type req.connect.options.e2ee.key_provider_options.shared_key = ( options.e2ee.key_provider_options.shared_key # type: ignore ) req.connect.options.e2ee.key_provider_options.ratchet_salt = ( options.e2ee.key_provider_options.ratchet_salt ) req.connect.options.e2ee.key_provider_options.failure_tolerance = ( options.e2ee.key_provider_options.failure_tolerance ) req.connect.options.e2ee.key_provider_options.ratchet_window_size = ( options.e2ee.key_provider_options.ratchet_window_size ) if options.rtc_config: req.connect.options.rtc_config.ice_transport_type = ( options.rtc_config.ice_transport_type ) # type: ignore req.connect.options.rtc_config.continual_gathering_policy = ( options.rtc_config.continual_gathering_policy ) # type: ignore req.connect.options.rtc_config.ice_servers.extend( options.rtc_config.ice_servers ) # subscribe before connecting so we don't miss any events self._ffi_queue = FfiClient.instance.queue.subscribe(self._loop) queue = FfiClient.instance.queue.subscribe() try: resp = FfiClient.instance.request(req) cb: proto_ffi.FfiEvent = await queue.wait_for( lambda e: e.connect.async_id == resp.connect.async_id ) finally: FfiClient.instance.queue.unsubscribe(queue) if cb.connect.error: FfiClient.instance.queue.unsubscribe(self._ffi_queue) raise ConnectError(cb.connect.error) self._ffi_handle = FfiHandle(cb.connect.result.room.handle.id) self._e2ee_manager = E2EEManager(self._ffi_handle.handle, options.e2ee) self._info = cb.connect.result.room.info self._connection_state = ConnectionState.CONN_CONNECTED self._local_participant = LocalParticipant( self._room_queue, cb.connect.result.local_participant ) for pt in cb.connect.result.participants: rp = self._create_remote_participant(pt.participant) # add the initial remote participant tracks for owned_publication_info in pt.publications: publication = RemoteTrackPublication(owned_publication_info) rp._track_publications[publication.sid] = publication # start listening to room events self._task = self._loop.create_task(self._listen_task()) async def disconnect(self) -> None: """Disconnects from the room.""" if not self.isconnected(): return await self._drain_rpc_invocation_tasks() req = proto_ffi.FfiRequest() req.disconnect.room_handle = self._ffi_handle.handle # type: ignore queue = FfiClient.instance.queue.subscribe() try: resp = FfiClient.instance.request(req) await queue.wait_for( lambda e: e.disconnect.async_id == resp.disconnect.async_id ) finally: FfiClient.instance.queue.unsubscribe(queue) await self._task FfiClient.instance.queue.unsubscribe(self._ffi_queue) async def _listen_task(self) -> None: # listen to incoming room events while True: event = await self._ffi_queue.get() if event.WhichOneof("message") == "rpc_method_invocation": self._on_rpc_method_invocation(event.rpc_method_invocation) elif event.room_event.room_handle == self._ffi_handle.handle: # type: ignore if event.room_event.HasField("eos"): break try: self._on_room_event(event.room_event) except Exception: logging.exception( "error running user callback for %s: %s", event.room_event.WhichOneof("message"), event.room_event, ) # wait for the subscribers to process the event # before processing the next one self._room_queue.put_nowait(event) await self._room_queue.join() # Clean up any pending RPC invocation tasks await self._drain_rpc_invocation_tasks() def _on_rpc_method_invocation(self, rpc_invocation: RpcMethodInvocationEvent): if self._local_participant is None: return if ( rpc_invocation.local_participant_handle == self._local_participant._ffi_handle.handle ): task = self._loop.create_task( self._local_participant._handle_rpc_method_invocation( rpc_invocation.invocation_id, rpc_invocation.method, rpc_invocation.request_id, rpc_invocation.caller_identity, rpc_invocation.payload, rpc_invocation.response_timeout_ms / 1000.0, ) ) self._rpc_invocation_tasks.add(task) task.add_done_callback(self._rpc_invocation_tasks.discard) def _on_room_event(self, event: proto_room.RoomEvent): which = event.WhichOneof("message") if which == "participant_connected": rparticipant = self._create_remote_participant( event.participant_connected.info ) self.emit("participant_connected", rparticipant) elif which == "participant_disconnected": identity = event.participant_disconnected.participant_identity rparticipant = self._remote_participants.pop(identity) self.emit("participant_disconnected", rparticipant) elif which == "local_track_published": sid = event.local_track_published.track_sid lpublication = self.local_participant.track_publications[sid] track = lpublication.track self.emit("local_track_published", lpublication, track) elif which == "local_track_unpublished": sid = event.local_track_unpublished.publication_sid lpublication = self.local_participant.track_publications[sid] self.emit("local_track_unpublished", lpublication) elif which == "local_track_subscribed": sid = event.local_track_subscribed.track_sid lpublication = self.local_participant.track_publications[sid] lpublication._first_subscription.set_result(None) self.emit("local_track_subscribed", lpublication.track) elif which == "track_published": rparticipant = self._remote_participants[ event.track_published.participant_identity ] rpublication = RemoteTrackPublication(event.track_published.publication) rparticipant._track_publications[rpublication.sid] = rpublication self.emit("track_published", rpublication, rparticipant) elif which == "track_unpublished": rparticipant = self._remote_participants[ event.track_unpublished.participant_identity ] rpublication = rparticipant._track_publications.pop( event.track_unpublished.publication_sid ) self.emit("track_unpublished", rpublication, rparticipant) elif which == "track_subscribed": owned_track_info = event.track_subscribed.track track_info = owned_track_info.info rparticipant = self._remote_participants[ event.track_subscribed.participant_identity ] rpublication = rparticipant.track_publications[track_info.sid] rpublication.subscribed = True if track_info.kind == TrackKind.KIND_VIDEO: remote_video_track = RemoteVideoTrack(owned_track_info) rpublication.track = remote_video_track self.emit( "track_subscribed", remote_video_track, rpublication, rparticipant ) elif track_info.kind == TrackKind.KIND_AUDIO: remote_audio_track = RemoteAudioTrack(owned_track_info) rpublication.track = remote_audio_track self.emit( "track_subscribed", remote_audio_track, rpublication, rparticipant ) elif which == "track_unsubscribed": identity = event.track_unsubscribed.participant_identity rparticipant = self._remote_participants[identity] rpublication = rparticipant.track_publications[ event.track_unsubscribed.track_sid ] track = rpublication.track rpublication.track = None rpublication.subscribed = False self.emit("track_unsubscribed", track, rpublication, rparticipant) elif which == "track_subscription_failed": identity = event.track_subscription_failed.participant_identity rparticipant = self._remote_participants[identity] error = event.track_subscription_failed.error self.emit( "track_subscription_failed", rparticipant, event.track_subscription_failed.track_sid, error, ) elif which == "track_muted": identity = event.track_muted.participant_identity # TODO: pass participant identity participant = self._retrieve_participant(identity) assert isinstance(participant, Participant) publication = participant.track_publications[event.track_muted.track_sid] publication._info.muted = True if publication.track: publication.track._info.muted = True self.emit("track_muted", participant, publication) elif which == "track_unmuted": identity = event.track_unmuted.participant_identity # TODO: pass participant identity participant = self._retrieve_participant(identity) assert isinstance(participant, Participant) publication = participant.track_publications[event.track_unmuted.track_sid] publication._info.muted = False if publication.track: publication.track._info.muted = False self.emit("track_unmuted", participant, publication) elif which == "active_speakers_changed": speakers: list[Participant] = [] # TODO: pass participant identity for identity in event.active_speakers_changed.participant_identities: participant = self._retrieve_participant(identity) assert isinstance(participant, Participant) speakers.append(participant) self.emit("active_speakers_changed", speakers) elif which == "room_metadata_changed": old_metadata = self.metadata self._info.metadata = event.room_metadata_changed.metadata self.emit("room_metadata_changed", old_metadata, self.metadata) elif which == "room_sid_changed": if not self._info.sid: self._first_sid_future.set_result(event.room_sid_changed.sid) self._info.sid = event.room_sid_changed.sid # This is an internal event, not exposed to users elif which == "participant_metadata_changed": identity = event.participant_metadata_changed.participant_identity # TODO: pass participant identity participant = self._retrieve_participant(identity) assert isinstance(participant, Participant) old_metadata = participant.metadata participant._info.metadata = event.participant_metadata_changed.metadata self.emit( "participant_metadata_changed", participant, old_metadata, participant.metadata, ) elif which == "participant_name_changed": identity = event.participant_name_changed.participant_identity participant = self._retrieve_participant(identity) assert isinstance(participant, Participant) old_name = participant.name participant._info.name = event.participant_name_changed.name self.emit( "participant_name_changed", participant, old_name, participant.name ) elif which == "participant_attributes_changed": identity = event.participant_attributes_changed.participant_identity attributes = event.participant_attributes_changed.attributes changed_attributes = dict( (entry.key, entry.value) for entry in event.participant_attributes_changed.changed_attributes ) participant = self._retrieve_participant(identity) assert isinstance(participant, Participant) participant._info.attributes.clear() participant._info.attributes.update( (entry.key, entry.value) for entry in attributes ) self.emit( "participant_attributes_changed", changed_attributes, participant, ) elif which == "connection_quality_changed": identity = event.connection_quality_changed.participant_identity # TODO: pass participant identity participant = self._retrieve_participant(identity) self.emit( "connection_quality_changed", participant, event.connection_quality_changed.quality, ) elif which == "transcription_received": transcription = event.transcription_received segments = [ TranscriptionSegment( id=s.id, text=s.text, final=s.final, start_time=s.start_time, end_time=s.end_time, language=s.language, ) for s in transcription.segments ] part = self._retrieve_participant(transcription.participant_identity) pub: TrackPublication | None = None if part: pub = part.track_publications.get(transcription.track_sid) self.emit("transcription_received", segments, part, pub) elif which == "data_packet_received": packet = event.data_packet_received which_val = packet.WhichOneof("value") if which_val == "user": owned_buffer_info = packet.user.data buffer_info = owned_buffer_info.data native_data = ctypes.cast( buffer_info.data_ptr, ctypes.POINTER(ctypes.c_byte * buffer_info.data_len), ).contents data = bytes(native_data) FfiHandle(owned_buffer_info.handle.id) rparticipant = cast( RemoteParticipant, self._retrieve_remote_participant(packet.participant_identity), ) self.emit( "data_received", DataPacket( data=data, kind=packet.kind, participant=rparticipant, topic=packet.user.topic, ), ) elif which_val == "sip_dtmf": rparticipant = cast( RemoteParticipant, self._retrieve_remote_participant(packet.participant_identity), ) self.emit( "sip_dtmf_received", SipDTMF( code=packet.sip_dtmf.code, digit=packet.sip_dtmf.digit, participant=rparticipant, ), ) elif which == "e2ee_state_changed": identity = event.e2ee_state_changed.participant_identity e2ee_state = event.e2ee_state_changed.state # TODO: pass participant identity self.emit( "e2ee_state_changed", self._retrieve_participant(identity), e2ee_state ) elif which == "connection_state_changed": connection_state = event.connection_state_changed.state self._connection_state = connection_state self.emit("connection_state_changed", connection_state) elif which == "connected": self.emit("connected") elif which == "disconnected": self.emit("disconnected", event.disconnected.reason) elif which == "reconnecting": self.emit("reconnecting") elif which == "reconnected": self.emit("reconnected") async def _drain_rpc_invocation_tasks(self) -> None: if self._rpc_invocation_tasks: for task in self._rpc_invocation_tasks: task.cancel() await asyncio.gather(*self._rpc_invocation_tasks, return_exceptions=True) def _retrieve_remote_participant( self, identity: str ) -> Optional[RemoteParticipant]: """Retrieve a remote participant by identity""" return self._remote_participants.get(identity, None) def _retrieve_participant(self, identity: str) -> Optional[Participant]: """Retrieve a local or remote participant by identity""" if identity and identity == self.local_participant.identity: return self.local_participant return self._retrieve_remote_participant(identity) def _create_remote_participant( self, owned_info: proto_participant.OwnedParticipant ) -> RemoteParticipant: if owned_info.info.identity in self._remote_participants: raise Exception("participant already exists") participant = RemoteParticipant(owned_info) self._remote_participants[participant.identity] = participant return participant def __repr__(self) -> str: sid = "unknown" if self._first_sid_future.done(): sid = self._first_sid_future.result() return f"rtc.Room(sid={sid}, name={self.name}, metadata={self.metadata}, connection_state={self._connection_state})"
Abstract base class for generic types.
On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::
class Mapping[KT, VT]: def __getitem__(self, key: KT) -> VT: ... # Etc.
On older versions of Python, however, generic classes have to explicitly inherit from Generic.
After a class has been declared to be generic, it can then be used as follows::
def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
Initializes a new Room instance.
Parameters
loop (Optional[asyncio.AbstractEventLoop]): The event loop to use. If not provided, the default event loop is used.
Ancestors
- EventEmitter
- typing.Generic
Instance variables
prop connection_state : ConnectionState.ValueType
-
Expand source code
@property def connection_state(self) -> ConnectionState.ValueType: """Gets the connection state of the room. Returns: ConnectionState: The connection state of the room. """ return self._connection_state
Gets the connection state of the room.
Returns
ConnectionState
- The connection state of the room.
prop e2ee_manager : E2EEManager
-
Expand source code
@property def e2ee_manager(self) -> E2EEManager: """Gets the end-to-end encryption (E2EE) manager for the room. Returns: E2EEManager: The E2EE manager instance. """ return self._e2ee_manager
Gets the end-to-end encryption (E2EE) manager for the room.
Returns
E2EEManager
- The E2EE manager instance.
prop local_participant : LocalParticipant
-
Expand source code
@property def local_participant(self) -> LocalParticipant: """Gets the local participant in the room. Returns: LocalParticipant: The local participant in the room. """ if self._local_participant is None: raise Exception("cannot access local participant before connecting") return self._local_participant
prop metadata : str
-
Expand source code
@property def metadata(self) -> str: """Gets the metadata associated with the room. Returns: str: The metadata of the room. """ return self._info.metadata
Gets the metadata associated with the room.
Returns
str
- The metadata of the room.
prop name : str
-
Expand source code
@property def name(self) -> str: """Gets the name of the room. Returns: str: The name of the room. """ return self._info.name
Gets the name of the room.
Returns
str
- The name of the room.
prop remote_participants : Mapping[str, RemoteParticipant]
-
Expand source code
@property def remote_participants(self) -> Mapping[str, RemoteParticipant]: """Gets the remote participants in the room. Returns: dict[str, RemoteParticipant]: A dictionary of remote participants indexed by their identity. """ return self._remote_participants
Gets the remote participants in the room.
Returns
dict[str, RemoteParticipant]
- A dictionary of remote participants indexed by their
identity.
prop sid : str
-
Expand source code
@property async def sid(self) -> str: """Asynchronously retrieves the session ID (SID) of the room. Returns: str: The session ID of the room. """ if self._info.sid: return self._info.sid return await self._first_sid_future
Asynchronously retrieves the session ID (SID) of the room.
Returns
str
- The session ID of the room.
Methods
async def connect(self,
url: str,
token: str,
options: RoomOptions = RoomOptions(auto_subscribe=True, dynacast=False, e2ee=None, rtc_config=None)) ‑> None-
Expand source code
async def connect( self, url: str, token: str, options: RoomOptions = RoomOptions() ) -> None: """Connects to a LiveKit room using the specified URL and token. Parameters: url (str): The WebSocket URL of the LiveKit server to connect to. token (str): The access token for authentication and authorization. options (RoomOptions, optional): Additional options for the room connection. Raises: ConnectError: If the connection fails. Example: ```python room = Room() # Listen for events before connecting to the room @room.on("participant_connected") def on_participant_connected(participant): print(f"Participant connected: {participant.identity}") await room.connect("ws://localhost:7880", "your_token") ``` """ req = proto_ffi.FfiRequest() req.connect.url = url req.connect.token = token # options req.connect.options.auto_subscribe = options.auto_subscribe req.connect.options.dynacast = options.dynacast if options.e2ee: req.connect.options.e2ee.encryption_type = options.e2ee.encryption_type req.connect.options.e2ee.key_provider_options.shared_key = ( options.e2ee.key_provider_options.shared_key # type: ignore ) req.connect.options.e2ee.key_provider_options.ratchet_salt = ( options.e2ee.key_provider_options.ratchet_salt ) req.connect.options.e2ee.key_provider_options.failure_tolerance = ( options.e2ee.key_provider_options.failure_tolerance ) req.connect.options.e2ee.key_provider_options.ratchet_window_size = ( options.e2ee.key_provider_options.ratchet_window_size ) if options.rtc_config: req.connect.options.rtc_config.ice_transport_type = ( options.rtc_config.ice_transport_type ) # type: ignore req.connect.options.rtc_config.continual_gathering_policy = ( options.rtc_config.continual_gathering_policy ) # type: ignore req.connect.options.rtc_config.ice_servers.extend( options.rtc_config.ice_servers ) # subscribe before connecting so we don't miss any events self._ffi_queue = FfiClient.instance.queue.subscribe(self._loop) queue = FfiClient.instance.queue.subscribe() try: resp = FfiClient.instance.request(req) cb: proto_ffi.FfiEvent = await queue.wait_for( lambda e: e.connect.async_id == resp.connect.async_id ) finally: FfiClient.instance.queue.unsubscribe(queue) if cb.connect.error: FfiClient.instance.queue.unsubscribe(self._ffi_queue) raise ConnectError(cb.connect.error) self._ffi_handle = FfiHandle(cb.connect.result.room.handle.id) self._e2ee_manager = E2EEManager(self._ffi_handle.handle, options.e2ee) self._info = cb.connect.result.room.info self._connection_state = ConnectionState.CONN_CONNECTED self._local_participant = LocalParticipant( self._room_queue, cb.connect.result.local_participant ) for pt in cb.connect.result.participants: rp = self._create_remote_participant(pt.participant) # add the initial remote participant tracks for owned_publication_info in pt.publications: publication = RemoteTrackPublication(owned_publication_info) rp._track_publications[publication.sid] = publication # start listening to room events self._task = self._loop.create_task(self._listen_task())
Connects to a LiveKit room using the specified URL and token.
Parameters
url (str): The WebSocket URL of the LiveKit server to connect to. token (str): The access token for authentication and authorization. options (RoomOptions, optional): Additional options for the room connection.
Raises
ConnectError
- If the connection fails.
Example
room = Room() # Listen for events before connecting to the room @room.on("participant_connected") def on_participant_connected(participant): print(f"Participant connected: {participant.identity}") await room.connect("ws://localhost:7880", "your_token")
async def disconnect(self) ‑> None
-
Expand source code
async def disconnect(self) -> None: """Disconnects from the room.""" if not self.isconnected(): return await self._drain_rpc_invocation_tasks() req = proto_ffi.FfiRequest() req.disconnect.room_handle = self._ffi_handle.handle # type: ignore queue = FfiClient.instance.queue.subscribe() try: resp = FfiClient.instance.request(req) await queue.wait_for( lambda e: e.disconnect.async_id == resp.disconnect.async_id ) finally: FfiClient.instance.queue.unsubscribe(queue) await self._task FfiClient.instance.queue.unsubscribe(self._ffi_queue)
Disconnects from the room.
def isconnected(self) ‑> bool
-
Expand source code
def isconnected(self) -> bool: """Checks if the room is currently connected. Returns: bool: True if connected, False otherwise. """ return ( self._ffi_handle is not None and self._connection_state != ConnectionState.CONN_DISCONNECTED )
Checks if the room is currently connected.
Returns
bool
- True if connected, False otherwise.
def on(self, event: EventTypes, callback: Optional[Callable] = None) ‑> Callable
-
Expand source code
def on(self, event: EventTypes, callback: Optional[Callable] = None) -> Callable: """Registers an event handler for a specific event type. Parameters: event (EventTypes): The name of the event to listen for. callback (Callable): The function to call when the event occurs. Returns: Callable: The registered callback function. Available events: - **"participant_connected"**: Called when a new participant joins the room. - Arguments: `participant` (RemoteParticipant) - **"participant_disconnected"**: Called when a participant leaves the room. - Arguments: `participant` (RemoteParticipant) - **"local_track_published"**: Called when a local track is published. - Arguments: `publication` (LocalTrackPublication), `track` (Track) - **"local_track_unpublished"**: Called when a local track is unpublished. - Arguments: `publication` (LocalTrackPublication) - **"local_track_subscribed"**: Called when a local track is subscribed. - Arguments: `track` (Track) - **"track_published"**: Called when a remote participant publishes a track. - Arguments: `publication` (RemoteTrackPublication), `participant` (RemoteParticipant) - **"track_unpublished"**: Called when a remote participant unpublishes a track. - Arguments: `publication` (RemoteTrackPublication), `participant` (RemoteParticipant) - **"track_subscribed"**: Called when a track is subscribed. - Arguments: `track` (Track), `publication` (RemoteTrackPublication), `participant` (RemoteParticipant) - **"track_unsubscribed"**: Called when a track is unsubscribed. - Arguments: `track` (Track), `publication` (RemoteTrackPublication), `participant` (RemoteParticipant) - **"track_subscription_failed"**: Called when a track subscription fails. - Arguments: `participant` (RemoteParticipant), `track_sid` (str), `error` (str) - **"track_muted"**: Called when a track is muted. - Arguments: `participant` (Participant), `publication` (TrackPublication) - **"track_unmuted"**: Called when a track is unmuted. - Arguments: `participant` (Participant), `publication` (TrackPublication) - **"active_speakers_changed"**: Called when the list of active speakers changes. - Arguments: `speakers` (list[Participant]) - **"room_metadata_changed"**: Called when the room's metadata is updated. - Arguments: `old_metadata` (str), `new_metadata` (str) - **"participant_metadata_changed"**: Called when a participant's metadata is updated. - Arguments: `participant` (Participant), `old_metadata` (str), `new_metadata` (str) - **"participant_name_changed"**: Called when a participant's name is changed. - Arguments: `participant` (Participant), `old_name` (str), `new_name` (str) - **"participant_attributes_changed"**: Called when a participant's attributes change. - Arguments: `changed_attributes` (dict), `participant` (Participant) - **"connection_quality_changed"**: Called when a participant's connection quality changes. - Arguments: `participant` (Participant), `quality` (ConnectionQuality) - **"transcription_received"**: Called when a transcription is received. - Arguments: `segments` (list[TranscriptionSegment]), `participant` (Participant), `publication` (TrackPublication) - **"data_received"**: Called when data is received. - Arguments: `data_packet` (DataPacket) - **"sip_dtmf_received"**: Called when a SIP DTMF signal is received. - Arguments: `sip_dtmf` (SipDTMF) - **"e2ee_state_changed"**: Called when a participant's E2EE state changes. - Arguments: `participant` (Participant), `state` (EncryptionState) - **"connection_state_changed"**: Called when the room's connection state changes. - Arguments: `connection_state` (ConnectionState) - **"connected"**: Called when the room is successfully connected. - Arguments: None - **"disconnected"**: Called when the room is disconnected. - Arguments: `reason` (DisconnectReason) - **"reconnecting"**: Called when the room is attempting to reconnect. - Arguments: None - **"reconnected"**: Called when the room has successfully reconnected. - Arguments: None Example: ```python def on_participant_connected(participant): print(f"Participant connected: {participant.identity}") room.on("participant_connected", on_participant_connected) ``` """ return super().on(event, callback)
Registers an event handler for a specific event type.
Parameters
event (EventTypes): The name of the event to listen for. callback (Callable): The function to call when the event occurs.
Returns
Callable
- The registered callback function.
Available events: - "participant_connected": Called when a new participant joins the room. - Arguments:
livekit.rtc.participant
(RemoteParticipant) - "participant_disconnected": Called when a participant leaves the room. - Arguments:livekit.rtc.participant
(RemoteParticipant) - "local_track_published": Called when a local track is published. - Arguments:publication
(LocalTrackPublication),livekit.rtc.track
(Track) - "local_track_unpublished": Called when a local track is unpublished. - Arguments:publication
(LocalTrackPublication) - "local_track_subscribed": Called when a local track is subscribed. - Arguments:livekit.rtc.track
(Track) - "track_published": Called when a remote participant publishes a track. - Arguments:publication
(RemoteTrackPublication),livekit.rtc.participant
(RemoteParticipant) - "track_unpublished": Called when a remote participant unpublishes a track. - Arguments:publication
(RemoteTrackPublication),livekit.rtc.participant
(RemoteParticipant) - "track_subscribed": Called when a track is subscribed. - Arguments:livekit.rtc.track
(Track),publication
(RemoteTrackPublication),livekit.rtc.participant
(RemoteParticipant) - "track_unsubscribed": Called when a track is unsubscribed. - Arguments:livekit.rtc.track
(Track),publication
(RemoteTrackPublication),livekit.rtc.participant
(RemoteParticipant) - "track_subscription_failed": Called when a track subscription fails. - Arguments:livekit.rtc.participant
(RemoteParticipant),track_sid
(str),error
(str) - "track_muted": Called when a track is muted. - Arguments:livekit.rtc.participant
(Participant),publication
(TrackPublication) - "track_unmuted": Called when a track is unmuted. - Arguments:livekit.rtc.participant
(Participant),publication
(TrackPublication) - "active_speakers_changed": Called when the list of active speakers changes. - Arguments:speakers
(list[Participant]) - "room_metadata_changed": Called when the room's metadata is updated. - Arguments:old_metadata
(str),new_metadata
(str) - "participant_metadata_changed": Called when a participant's metadata is updated. - Arguments:livekit.rtc.participant
(Participant),old_metadata
(str),new_metadata
(str) - "participant_name_changed": Called when a participant's name is changed. - Arguments:livekit.rtc.participant
(Participant),old_name
(str),new_name
(str) - "participant_attributes_changed": Called when a participant's attributes change. - Arguments:changed_attributes
(dict),livekit.rtc.participant
(Participant) - "connection_quality_changed": Called when a participant's connection quality changes. - Arguments:livekit.rtc.participant
(Participant),quality
(ConnectionQuality) - "transcription_received": Called when a transcription is received. - Arguments:segments
(list[TranscriptionSegment]),livekit.rtc.participant
(Participant),publication
(TrackPublication) - "data_received": Called when data is received. - Arguments:data_packet
(DataPacket) - "sip_dtmf_received": Called when a SIP DTMF signal is received. - Arguments:sip_dtmf
(SipDTMF) - "e2ee_state_changed": Called when a participant's E2EE state changes. - Arguments:livekit.rtc.participant
(Participant),state
(EncryptionState) - "connection_state_changed": Called when the room's connection state changes. - Arguments:connection_state
(ConnectionState) - "connected": Called when the room is successfully connected. - Arguments: None - "disconnected": Called when the room is disconnected. - Arguments:reason
(DisconnectReason) - "reconnecting": Called when the room is attempting to reconnect. - Arguments: None - "reconnected": Called when the room has successfully reconnected. - Arguments: NoneExample
def on_participant_connected(participant): print(f"Participant connected: {participant.identity}") room.on("participant_connected", on_participant_connected)
Inherited members
class RoomOptions (auto_subscribe: bool = True,
dynacast: bool = False,
e2ee: E2EEOptions | None = None,
rtc_config: RtcConfiguration | None = None)-
Expand source code
@dataclass class RoomOptions: auto_subscribe: bool = True """Automatically subscribe to tracks when participants join.""" dynacast: bool = False e2ee: E2EEOptions | None = None """Options for end-to-end encryption.""" rtc_config: RtcConfiguration | None = None """WebRTC-related configuration."""
RoomOptions(auto_subscribe: 'bool' = True, dynacast: 'bool' = False, e2ee: 'E2EEOptions | None' = None, rtc_config: 'RtcConfiguration | None' = None)
Class variables
var auto_subscribe : bool
-
Automatically subscribe to tracks when participants join.
var dynacast : bool
var e2ee : E2EEOptions | None
-
Options for end-to-end encryption.
var rtc_config : RtcConfiguration | None
-
WebRTC-related configuration.
class RpcError (code: int | ForwardRef('RpcError.ErrorCode'),
message: str,
data: str | None = None)-
Expand source code
class RpcError(Exception): """ Specialized error handling for RPC methods. Instances of this type, when thrown in a method handler, will have their `message` serialized and sent across the wire. The caller will receive an equivalent error on the other side. Built-in errors are included (codes 1001-1999) but developers may use the code, message, and data fields to create their own errors. """ class ErrorCode(IntEnum): APPLICATION_ERROR = 1500 CONNECTION_TIMEOUT = 1501 RESPONSE_TIMEOUT = 1502 RECIPIENT_DISCONNECTED = 1503 RESPONSE_PAYLOAD_TOO_LARGE = 1504 SEND_FAILED = 1505 UNSUPPORTED_METHOD = 1400 RECIPIENT_NOT_FOUND = 1401 REQUEST_PAYLOAD_TOO_LARGE = 1402 UNSUPPORTED_SERVER = 1403 UNSUPPORTED_VERSION = 1404 ErrorMessage: ClassVar[Dict[ErrorCode, str]] = { ErrorCode.APPLICATION_ERROR: "Application error in method handler", ErrorCode.CONNECTION_TIMEOUT: "Connection timeout", ErrorCode.RESPONSE_TIMEOUT: "Response timeout", ErrorCode.RECIPIENT_DISCONNECTED: "Recipient disconnected", ErrorCode.RESPONSE_PAYLOAD_TOO_LARGE: "Response payload too large", ErrorCode.SEND_FAILED: "Failed to send", ErrorCode.UNSUPPORTED_METHOD: "Method not supported at destination", ErrorCode.RECIPIENT_NOT_FOUND: "Recipient not found", ErrorCode.REQUEST_PAYLOAD_TOO_LARGE: "Request payload too large", ErrorCode.UNSUPPORTED_SERVER: "RPC not supported by server", ErrorCode.UNSUPPORTED_VERSION: "Unsupported RPC version", } def __init__( self, code: Union[int, "RpcError.ErrorCode"], message: str, data: Optional[str] = None, ): """ Creates an error object with the given code and message, plus an optional data payload. If thrown in an RPC method handler, the error will be sent back to the caller. Args: code (int): Your error code (Error codes 1001-1999 are reserved for built-in errors) message (str): A readable error message. data (Optional[str]): Optional additional data associated with the error (JSON recommended) """ super().__init__(message) self._code = code self._message = message self._data = data @property def code(self) -> int: """Error code value. Codes 1001-1999 are reserved for built-in errors (see RpcError.ErrorCode for their meanings).""" return self._code @property def message(self) -> str: """A readable error message.""" return self._message @property def data(self) -> Optional[str]: """Optional additional data associated with the error (JSON recommended).""" return self._data @classmethod def _from_proto(cls, proto: proto_rpc.RpcError) -> "RpcError": return cls(proto.code, proto.message, proto.data) def _to_proto(self) -> proto_rpc.RpcError: return proto_rpc.RpcError(code=self.code, message=self.message, data=self.data) @classmethod def _built_in( cls, code: "RpcError.ErrorCode", data: Optional[str] = None ) -> "RpcError": message = cls.ErrorMessage[code] return cls(code, message, data)
Specialized error handling for RPC methods.
Instances of this type, when thrown in a method handler, will have their
message
serialized and sent across the wire. The caller will receive an equivalent error on the other side.Built-in errors are included (codes 1001-1999) but developers may use the code, message, and data fields to create their own errors.
Creates an error object with the given code and message, plus an optional data payload.
If thrown in an RPC method handler, the error will be sent back to the caller.
Args
code
:int
- Your error code (Error codes 1001-1999 are reserved for built-in errors)
message
:str
- A readable error message.
data
:Optional[str]
- Optional additional data associated with the error (JSON recommended)
Ancestors
- builtins.Exception
- builtins.BaseException
Class variables
var ErrorCode
-
Enum where members are also (and must be) ints
var ErrorMessage : ClassVar[Dict[RpcError.ErrorCode, str]]
Instance variables
prop code : int
-
Expand source code
@property def code(self) -> int: """Error code value. Codes 1001-1999 are reserved for built-in errors (see RpcError.ErrorCode for their meanings).""" return self._code
Error code value. Codes 1001-1999 are reserved for built-in errors (see RpcError.ErrorCode for their meanings).
prop data : str | None
-
Expand source code
@property def data(self) -> Optional[str]: """Optional additional data associated with the error (JSON recommended).""" return self._data
Optional additional data associated with the error (JSON recommended).
prop message : str
-
Expand source code
@property def message(self) -> str: """A readable error message.""" return self._message
A readable error message.
class RpcInvocationData (request_id: str, caller_identity: str, payload: str, response_timeout: float)
-
Expand source code
@dataclass class RpcInvocationData: """Data passed to method handler for incoming RPC invocations Attributes: request_id (str): The unique request ID. Will match at both sides of the call, useful for debugging or logging. caller_identity (str): The unique participant identity of the caller. payload (str): The payload of the request. User-definable format, typically JSON. response_timeout (float): The maximum time the caller will wait for a response. """ request_id: str caller_identity: str payload: str response_timeout: float
Data passed to method handler for incoming RPC invocations
Attributes
request_id
:str
- The unique request ID. Will match at both sides of the call, useful for debugging or logging.
caller_identity
:str
- The unique participant identity of the caller.
payload
:str
- The payload of the request. User-definable format, typically JSON.
response_timeout
:float
- The maximum time the caller will wait for a response.
Class variables
var caller_identity : str
var payload : str
var request_id : str
var response_timeout : float
class RtcConfiguration (ice_transport_type: proto_room.IceTransportType.ValueType = 2,
continual_gathering_policy: proto_room.ContinualGatheringPolicy.ValueType = 1,
ice_servers: list[proto_room.IceServer] = <factory>)-
Expand source code
@dataclass class RtcConfiguration: ice_transport_type: proto_room.IceTransportType.ValueType = ( proto_room.IceTransportType.TRANSPORT_ALL ) """Specifies the type of ICE transport to be used (e.g., all, relay, etc.).""" continual_gathering_policy: proto_room.ContinualGatheringPolicy.ValueType = ( proto_room.ContinualGatheringPolicy.GATHER_CONTINUALLY ) """Policy for continual gathering of ICE candidates.""" ice_servers: list[proto_room.IceServer] = field(default_factory=list) """List of ICE servers for STUN/TURN. When empty, it uses the default ICE servers provided by the SFU."""
RtcConfiguration(ice_transport_type: 'proto_room.IceTransportType.ValueType' = 2, continual_gathering_policy: 'proto_room.ContinualGatheringPolicy.ValueType' = 1, ice_servers: 'list[proto_room.IceServer]' =
) Class variables
var continual_gathering_policy : int
-
Policy for continual gathering of ICE candidates.
var ice_servers : list[room_pb2.IceServer]
-
List of ICE servers for STUN/TURN. When empty, it uses the default ICE servers provided by the SFU.
var ice_transport_type : int
-
Specifies the type of ICE transport to be used (e.g., all, relay, etc.).
class SipDTMF (code: int,
digit: str,
participant: RemoteParticipant | None = None)-
Expand source code
@dataclass class SipDTMF: code: int """DTMF code corresponding to the digit.""" digit: str """DTMF digit sent.""" participant: RemoteParticipant | None = None """Participant who sent the DTMF digit. None when sent by a server SDK."""
SipDTMF(code: 'int', digit: 'str', participant: 'RemoteParticipant | None' = None)
Class variables
var code : int
-
DTMF code corresponding to the digit.
var digit : str
-
DTMF digit sent.
var participant : RemoteParticipant | None
-
Participant who sent the DTMF digit. None when sent by a server SDK.
class Track (owned_info: track_pb2.OwnedTrack)
-
Expand source code
class Track: def __init__(self, owned_info: proto_track.OwnedTrack): self._info = owned_info.info self._ffi_handle = FfiHandle(owned_info.handle.id) @property def sid(self) -> str: return self._info.sid @property def name(self) -> str: return self._info.name @property def kind(self) -> proto_track.TrackKind.ValueType: return self._info.kind @property def stream_state(self) -> proto_track.StreamState.ValueType: return self._info.stream_state @property def muted(self) -> bool: return self._info.muted async def get_stats(self) -> List[proto_stats.RtcStats]: req = proto_ffi.FfiRequest() req.get_stats.track_handle = self._ffi_handle.handle queue = FfiClient.instance.queue.subscribe() try: resp = FfiClient.instance.request(req) cb: proto_ffi.FfiEvent = await queue.wait_for( lambda e: e.get_stats.async_id == resp.get_stats.async_id ) finally: FfiClient.instance.queue.unsubscribe(queue) if cb.get_stats.error: raise Exception(cb.get_stats.error) return list(cb.get_stats.stats)
Subclasses
Instance variables
prop kind : int
-
Expand source code
@property def kind(self) -> proto_track.TrackKind.ValueType: return self._info.kind
prop muted : bool
-
Expand source code
@property def muted(self) -> bool: return self._info.muted
prop name : str
-
Expand source code
@property def name(self) -> str: return self._info.name
prop sid : str
-
Expand source code
@property def sid(self) -> str: return self._info.sid
prop stream_state : int
-
Expand source code
@property def stream_state(self) -> proto_track.StreamState.ValueType: return self._info.stream_state
Methods
async def get_stats(self) ‑> List[stats_pb2.RtcStats]
-
Expand source code
async def get_stats(self) -> List[proto_stats.RtcStats]: req = proto_ffi.FfiRequest() req.get_stats.track_handle = self._ffi_handle.handle queue = FfiClient.instance.queue.subscribe() try: resp = FfiClient.instance.request(req) cb: proto_ffi.FfiEvent = await queue.wait_for( lambda e: e.get_stats.async_id == resp.get_stats.async_id ) finally: FfiClient.instance.queue.unsubscribe(queue) if cb.get_stats.error: raise Exception(cb.get_stats.error) return list(cb.get_stats.stats)
class TrackPublication (owned_info: track_pb2.OwnedTrackPublication)
-
Expand source code
class TrackPublication: def __init__(self, owned_info: proto_track.OwnedTrackPublication): self._info = owned_info.info self.track: Optional[Track] = None self._ffi_handle = FfiHandle(owned_info.handle.id) @property def sid(self) -> str: return self._info.sid @property def name(self) -> str: return self._info.name @property def kind(self) -> proto_track.TrackKind.ValueType: return self._info.kind @property def source(self) -> proto_track.TrackSource.ValueType: return self._info.source @property def simulcasted(self) -> bool: return self._info.simulcasted @property def width(self) -> int: return self._info.width @property def height(self) -> int: return self._info.height @property def mime_type(self) -> str: return self._info.mime_type @property def muted(self) -> bool: return self._info.muted @property def encryption_type(self) -> proto_e2ee.EncryptionType.ValueType: return self._info.encryption_type
Subclasses
Instance variables
prop encryption_type : int
-
Expand source code
@property def encryption_type(self) -> proto_e2ee.EncryptionType.ValueType: return self._info.encryption_type
prop height : int
-
Expand source code
@property def height(self) -> int: return self._info.height
prop kind : int
-
Expand source code
@property def kind(self) -> proto_track.TrackKind.ValueType: return self._info.kind
prop mime_type : str
-
Expand source code
@property def mime_type(self) -> str: return self._info.mime_type
prop muted : bool
-
Expand source code
@property def muted(self) -> bool: return self._info.muted
prop name : str
-
Expand source code
@property def name(self) -> str: return self._info.name
prop sid : str
-
Expand source code
@property def sid(self) -> str: return self._info.sid
prop simulcasted : bool
-
Expand source code
@property def simulcasted(self) -> bool: return self._info.simulcasted
prop source : int
-
Expand source code
@property def source(self) -> proto_track.TrackSource.ValueType: return self._info.source
prop width : int
-
Expand source code
@property def width(self) -> int: return self._info.width
class TrackPublishOptions (*args, **kwargs)
-
A ProtocolMessage
Ancestors
- google._upb._message.Message
- google.protobuf.message.Message
Class variables
var DESCRIPTOR
class Transcription (participant_identity: str,
track_sid: str,
segments: List[ForwardRef('TranscriptionSegment')])-
Expand source code
@dataclass class Transcription: participant_identity: str track_sid: str segments: List["TranscriptionSegment"]
Transcription(participant_identity: str, track_sid: str, segments: List[ForwardRef('TranscriptionSegment')])
Class variables
var participant_identity : str
var segments : List[TranscriptionSegment]
var track_sid : str
class TranscriptionSegment (id: str, text: str, start_time: int, end_time: int, language: str, final: bool)
-
Expand source code
@dataclass class TranscriptionSegment: id: str text: str start_time: int end_time: int language: str final: bool
TranscriptionSegment(id: str, text: str, start_time: int, end_time: int, language: str, final: bool)
Class variables
var end_time : int
var final : bool
var id : str
var language : str
var start_time : int
var text : str
class VideoEncoding (*args, **kwargs)
-
A ProtocolMessage
Ancestors
- google._upb._message.Message
- google.protobuf.message.Message
Class variables
var DESCRIPTOR
class VideoFrame (width: int, height: int, type: int, data: bytes | bytearray | memoryview)
-
Expand source code
class VideoFrame: """ Represents a video frame with associated metadata and pixel data. This class provides methods to access video frame properties such as width, height, and pixel format, as well as methods for manipulating and converting video frames. """ def __init__( self, width: int, height: int, type: proto_video.VideoBufferType.ValueType, data: Union[bytes, bytearray, memoryview], ) -> None: """ Initializes a new VideoFrame instance. Args: width (int): The width of the video frame in pixels. height (int): The height of the video frame in pixels. type (proto_video.VideoBufferType.ValueType): The format type of the video frame data (e.g., RGBA, BGRA, RGB24, etc.). data (Union[bytes, bytearray, memoryview]): The raw pixel data for the video frame. """ self._width = width self._height = height self._type = type self._data = bytearray(data) @property def width(self) -> int: """ Returns the width of the video frame in pixels. Returns: int: The width of the video frame. """ return self._width @property def height(self) -> int: """ Returns the height of the video frame in pixels. Returns: int: The height of the video frame. """ return self._height @property def type(self) -> proto_video.VideoBufferType.ValueType: """ Returns the height of the video frame in pixels. Returns: int: The height of the video frame. """ return self._type @property def data(self) -> memoryview: """ Returns a memoryview of the raw pixel data for the video frame. Returns: memoryview: The raw pixel data of the video frame as a memoryview object. """ return memoryview(self._data) @staticmethod def _from_owned_info(owned_info: proto_video.OwnedVideoBuffer) -> "VideoFrame": info = owned_info.info data_len = _get_plane_length(info.type, info.width, info.height) cdata = (ctypes.c_uint8 * data_len).from_address(info.data_ptr) data = bytearray(cdata) frame = VideoFrame( width=info.width, height=info.height, type=info.type, data=data, ) FfiHandle(owned_info.handle.id) return frame def _proto_info(self) -> proto_video.VideoBufferInfo: info = proto_video.VideoBufferInfo() addr = get_address(self.data) info.components.extend( _get_plane_infos(addr, self.type, self.width, self.height) ) info.width = self.width info.height = self.height info.type = self.type info.data_ptr = addr info.stride = 0 if self.type in [ proto_video.VideoBufferType.ARGB, proto_video.VideoBufferType.ABGR, proto_video.VideoBufferType.RGBA, proto_video.VideoBufferType.BGRA, ]: info.stride = self.width * 4 elif self.type == proto_video.VideoBufferType.RGB24: info.stride = self.width * 3 return info def get_plane(self, plane_nth: int) -> Optional[memoryview]: """ Returns the memoryview of a specific plane in the video frame, based on its index. Some video formats (e.g., I420, NV12) contain multiple planes (Y, U, V channels). This method allows access to individual planes by index. Args: plane_nth (int): The index of the plane to retrieve (starting from 0). Returns: Optional[memoryview]: A memoryview of the specified plane's data, or None if the index is out of bounds for the format. """ plane_infos = _get_plane_infos( get_address(self.data), self.type, self.width, self.height ) if plane_nth >= len(plane_infos): return None plane_info = plane_infos[plane_nth] cdata = (ctypes.c_uint8 * plane_info.size).from_address(plane_info.data_ptr) return memoryview(cdata) def convert( self, type: proto_video.VideoBufferType.ValueType, *, flip_y: bool = False ) -> "VideoFrame": """ Converts the current video frame to a different format type, optionally flipping the frame vertically. Args: type (proto_video.VideoBufferType.ValueType): The target format type to convert to (e.g., RGBA, I420). flip_y (bool, optional): If True, the frame will be flipped vertically. Defaults to False. Returns: VideoFrame: A new VideoFrame object in the specified format. Raises: Exception: If the conversion isn't supported. Example: Convert a frame from RGBA to I420 format: >>> frame = VideoFrame(width=1920, height=1080, type=proto_video.VideoBufferType.RGBA, data=raw_data) >>> converted_frame = frame.convert(proto_video.VideoBufferType.I420) >>> print(converted_frame.type) VideoBufferType.I420 Example: Convert a frame from BGRA to RGB24 format and flip it vertically: >>> frame = VideoFrame(width=1280, height=720, type=proto_video.VideoBufferType.BGRA, data=raw_data) >>> converted_frame = frame.convert(proto_video.VideoBufferType.RGB24, flip_y=True) >>> print(converted_frame.type) VideoBufferType.RGB24 >>> print(converted_frame.width, converted_frame.height) 1280 720 """ req = proto.FfiRequest() req.video_convert.flip_y = flip_y req.video_convert.dst_type = type req.video_convert.buffer.CopyFrom(self._proto_info()) resp = FfiClient.instance.request(req) if resp.video_convert.error: raise Exception(resp.video_convert.error) return VideoFrame._from_owned_info(resp.video_convert.buffer) def __repr__(self) -> str: return f"rtc.VideoFrame(width={self.width}, height={self.height}, type={self.type})"
Represents a video frame with associated metadata and pixel data.
This class provides methods to access video frame properties such as width, height, and pixel format, as well as methods for manipulating and converting video frames.
Initializes a new VideoFrame instance.
Args
width
:int
- The width of the video frame in pixels.
height
:int
- The height of the video frame in pixels.
type
:proto_video.VideoBufferType.ValueType
- The format type of the video frame data (e.g., RGBA, BGRA, RGB24, etc.).
data
:Union[bytes, bytearray, memoryview]
- The raw pixel data for the video frame.
Instance variables
prop data : memoryview
-
Expand source code
@property def data(self) -> memoryview: """ Returns a memoryview of the raw pixel data for the video frame. Returns: memoryview: The raw pixel data of the video frame as a memoryview object. """ return memoryview(self._data)
Returns a memoryview of the raw pixel data for the video frame.
Returns
memoryview
- The raw pixel data of the video frame as a memoryview object.
prop height : int
-
Expand source code
@property def height(self) -> int: """ Returns the height of the video frame in pixels. Returns: int: The height of the video frame. """ return self._height
Returns the height of the video frame in pixels.
Returns
int
- The height of the video frame.
prop type : int
-
Expand source code
@property def type(self) -> proto_video.VideoBufferType.ValueType: """ Returns the height of the video frame in pixels. Returns: int: The height of the video frame. """ return self._type
Returns the height of the video frame in pixels.
Returns
int
- The height of the video frame.
prop width : int
-
Expand source code
@property def width(self) -> int: """ Returns the width of the video frame in pixels. Returns: int: The width of the video frame. """ return self._width
Returns the width of the video frame in pixels.
Returns
int
- The width of the video frame.
Methods
def convert(self, type: int, *, flip_y: bool = False) ‑> VideoFrame
-
Expand source code
def convert( self, type: proto_video.VideoBufferType.ValueType, *, flip_y: bool = False ) -> "VideoFrame": """ Converts the current video frame to a different format type, optionally flipping the frame vertically. Args: type (proto_video.VideoBufferType.ValueType): The target format type to convert to (e.g., RGBA, I420). flip_y (bool, optional): If True, the frame will be flipped vertically. Defaults to False. Returns: VideoFrame: A new VideoFrame object in the specified format. Raises: Exception: If the conversion isn't supported. Example: Convert a frame from RGBA to I420 format: >>> frame = VideoFrame(width=1920, height=1080, type=proto_video.VideoBufferType.RGBA, data=raw_data) >>> converted_frame = frame.convert(proto_video.VideoBufferType.I420) >>> print(converted_frame.type) VideoBufferType.I420 Example: Convert a frame from BGRA to RGB24 format and flip it vertically: >>> frame = VideoFrame(width=1280, height=720, type=proto_video.VideoBufferType.BGRA, data=raw_data) >>> converted_frame = frame.convert(proto_video.VideoBufferType.RGB24, flip_y=True) >>> print(converted_frame.type) VideoBufferType.RGB24 >>> print(converted_frame.width, converted_frame.height) 1280 720 """ req = proto.FfiRequest() req.video_convert.flip_y = flip_y req.video_convert.dst_type = type req.video_convert.buffer.CopyFrom(self._proto_info()) resp = FfiClient.instance.request(req) if resp.video_convert.error: raise Exception(resp.video_convert.error) return VideoFrame._from_owned_info(resp.video_convert.buffer)
Converts the current video frame to a different format type, optionally flipping the frame vertically.
Args
type
:proto_video.VideoBufferType.ValueType
- The target format type to convert to (e.g., RGBA, I420).
flip_y
:bool
, optional- If True, the frame will be flipped vertically. Defaults to False.
Returns
VideoFrame
- A new VideoFrame object in the specified format.
Raises
Exception
- If the conversion isn't supported.
Example
Convert a frame from RGBA to I420 format:
>>> frame = VideoFrame(width=1920, height=1080, type=proto_video.VideoBufferType.RGBA, data=raw_data) >>> converted_frame = frame.convert(proto_video.VideoBufferType.I420) >>> print(converted_frame.type) VideoBufferType.I420
Example
Convert a frame from BGRA to RGB24 format and flip it vertically:
>>> frame = VideoFrame(width=1280, height=720, type=proto_video.VideoBufferType.BGRA, data=raw_data) >>> converted_frame = frame.convert(proto_video.VideoBufferType.RGB24, flip_y=True) >>> print(converted_frame.type) VideoBufferType.RGB24 >>> print(converted_frame.width, converted_frame.height) 1280 720
def get_plane(self, plane_nth: int) ‑> memoryview | None
-
Expand source code
def get_plane(self, plane_nth: int) -> Optional[memoryview]: """ Returns the memoryview of a specific plane in the video frame, based on its index. Some video formats (e.g., I420, NV12) contain multiple planes (Y, U, V channels). This method allows access to individual planes by index. Args: plane_nth (int): The index of the plane to retrieve (starting from 0). Returns: Optional[memoryview]: A memoryview of the specified plane's data, or None if the index is out of bounds for the format. """ plane_infos = _get_plane_infos( get_address(self.data), self.type, self.width, self.height ) if plane_nth >= len(plane_infos): return None plane_info = plane_infos[plane_nth] cdata = (ctypes.c_uint8 * plane_info.size).from_address(plane_info.data_ptr) return memoryview(cdata)
Returns the memoryview of a specific plane in the video frame, based on its index.
Some video formats (e.g., I420, NV12) contain multiple planes (Y, U, V channels). This method allows access to individual planes by index.
Args
plane_nth
:int
- The index of the plane to retrieve (starting from 0).
Returns
Optional[memoryview]
- A memoryview of the specified plane's data, or None if
the index is out of bounds for the format.
class VideoFrameEvent (frame: VideoFrame,
timestamp_us: int,
rotation: proto_video_frame.VideoRotation)-
Expand source code
@dataclass class VideoFrameEvent: frame: VideoFrame timestamp_us: int rotation: proto_video_frame.VideoRotation
VideoFrameEvent(frame: 'VideoFrame', timestamp_us: 'int', rotation: 'proto_video_frame.VideoRotation')
Class variables
var frame : VideoFrame
var rotation :
var timestamp_us : int
class VideoSource (width: int, height: int)
-
Expand source code
class VideoSource: def __init__(self, width: int, height: int) -> None: req = proto_ffi.FfiRequest() req.new_video_source.type = proto_video.VideoSourceType.VIDEO_SOURCE_NATIVE req.new_video_source.resolution.width = width req.new_video_source.resolution.height = height resp = FfiClient.instance.request(req) self._info = resp.new_video_source.source self._ffi_handle = FfiHandle(self._info.handle.id) def capture_frame( self, frame: VideoFrame, *, timestamp_us: int = 0, rotation: proto_video.VideoRotation.ValueType = proto_video.VideoRotation.VIDEO_ROTATION_0, ) -> None: req = proto_ffi.FfiRequest() req.capture_video_frame.source_handle = self._ffi_handle.handle req.capture_video_frame.buffer.CopyFrom(frame._proto_info()) req.capture_video_frame.rotation = rotation req.capture_video_frame.timestamp_us = timestamp_us FfiClient.instance.request(req)
Methods
def capture_frame(self,
frame: VideoFrame,
*,
timestamp_us: int = 0,
rotation: int = 0) ‑> None-
Expand source code
def capture_frame( self, frame: VideoFrame, *, timestamp_us: int = 0, rotation: proto_video.VideoRotation.ValueType = proto_video.VideoRotation.VIDEO_ROTATION_0, ) -> None: req = proto_ffi.FfiRequest() req.capture_video_frame.source_handle = self._ffi_handle.handle req.capture_video_frame.buffer.CopyFrom(frame._proto_info()) req.capture_video_frame.rotation = rotation req.capture_video_frame.timestamp_us = timestamp_us FfiClient.instance.request(req)
class VideoStream (track: Track,
loop: Optional[asyncio.AbstractEventLoop] = None,
capacity: int = 0,
format: Optional[proto_video_frame.VideoBufferType.ValueType] = None,
**kwargs)-
Expand source code
class VideoStream: """VideoStream is a stream of video frames received from a RemoteTrack.""" def __init__( self, track: Track, loop: Optional[asyncio.AbstractEventLoop] = None, capacity: int = 0, format: Optional[proto_video_frame.VideoBufferType.ValueType] = None, **kwargs, ) -> None: self._loop = loop or asyncio.get_event_loop() self._ffi_queue = FfiClient.instance.queue.subscribe(self._loop) self._queue: RingQueue[VideoFrameEvent | None] = RingQueue(capacity) self._track: Track | None = track self._format = format self._capacity = capacity self._format = format stream: Any = None if "participant" in kwargs: stream = self._create_owned_stream_from_participant( participant=kwargs["participant"], track_source=kwargs["track_source"] ) else: stream = self._create_owned_stream() self._ffi_handle = FfiHandle(stream.handle.id) self._info = stream.info self._task = self._loop.create_task(self._run()) self._task.add_done_callback(task_done_logger) @classmethod def from_participant( cls, *, participant: Participant, track_source: TrackSource.ValueType, loop: Optional[asyncio.AbstractEventLoop] = None, format: Optional[proto_video_frame.VideoBufferType.ValueType] = None, capacity: int = 0, ) -> VideoStream: return VideoStream( participant=participant, track_source=track_source, loop=loop, capacity=capacity, format=format, track=None, # type: ignore ) @classmethod def from_track( cls, *, track: Track, loop: Optional[asyncio.AbstractEventLoop] = None, format: Optional[proto_video_frame.VideoBufferType.ValueType] = None, capacity: int = 0, ) -> VideoStream: return VideoStream( track=track, loop=loop, capacity=capacity, format=format, ) def __del__(self) -> None: FfiClient.instance.queue.unsubscribe(self._ffi_queue) def _create_owned_stream(self) -> Any: assert self._track is not None req = proto_ffi.FfiRequest() new_video_stream = req.new_video_stream new_video_stream.track_handle = self._track._ffi_handle.handle new_video_stream.type = proto_video_frame.VideoStreamType.VIDEO_STREAM_NATIVE if self._format is not None: new_video_stream.format = self._format new_video_stream.normalize_stride = True resp = FfiClient.instance.request(req) return resp.new_video_stream.stream def _create_owned_stream_from_participant( self, participant: Participant, track_source: TrackSource.ValueType ) -> Any: req = proto_ffi.FfiRequest() video_stream_from_participant = req.video_stream_from_participant video_stream_from_participant.participant_handle = ( participant._ffi_handle.handle ) video_stream_from_participant.type = ( proto_video_frame.VideoStreamType.VIDEO_STREAM_NATIVE ) video_stream_from_participant.track_source = track_source video_stream_from_participant.normalize_stride = True if self._format is not None: video_stream_from_participant.format = self._format resp = FfiClient.instance.request(req) return resp.video_stream_from_participant.stream async def _run(self) -> None: while True: event = await self._ffi_queue.wait_for(self._is_event) video_event = event.video_stream_event if video_event.HasField("frame_received"): owned_buffer_info = video_event.frame_received.buffer frame = VideoFrame._from_owned_info(owned_buffer_info) event = VideoFrameEvent( frame=frame, timestamp_us=video_event.frame_received.timestamp_us, rotation=video_event.frame_received.rotation, ) self._queue.put(event) elif video_event.HasField("eos"): break FfiClient.instance.queue.unsubscribe(self._ffi_queue) async def aclose(self) -> None: self._ffi_handle.dispose() await self._task def _is_event(self, e: proto_ffi.FfiEvent) -> bool: return e.video_stream_event.stream_handle == self._ffi_handle.handle def __aiter__(self) -> AsyncIterator[VideoFrameEvent]: return self async def __anext__(self) -> VideoFrameEvent: if self._task.done(): raise StopAsyncIteration item = await self._queue.get() if item is None: raise StopAsyncIteration return item
VideoStream is a stream of video frames received from a RemoteTrack.
Static methods
def from_participant(*,
participant: Participant,
track_source: TrackSource.ValueType,
loop: Optional[asyncio.AbstractEventLoop] = None,
format: Optional[proto_video_frame.VideoBufferType.ValueType] = None,
capacity: int = 0) ‑> VideoStreamdef from_track(*,
track: Track,
loop: Optional[asyncio.AbstractEventLoop] = None,
format: Optional[proto_video_frame.VideoBufferType.ValueType] = None,
capacity: int = 0) ‑> VideoStream
Methods
async def aclose(self) ‑> None
-
Expand source code
async def aclose(self) -> None: self._ffi_handle.dispose() await self._task