Module livekit.rtc.data_track
Classes
class DataTrackFrame (payload: bytes, user_timestamp: Optional[int] = None)-
Expand source code
@dataclass class DataTrackFrame: """A frame published on a data track, consisting of a payload and optional metadata.""" payload: bytes """The frame's payload.""" user_timestamp: Optional[int] = None """The frame's user timestamp, if one is associated."""A frame published on a data track, consisting of a payload and optional metadata.
Instance variables
var payload : bytes-
The frame's payload.
var user_timestamp : int | None-
The frame's user timestamp, if one is associated.
class DataTrackInfo (sid: str, name: str, uses_e2ee: bool)-
Expand source code
@dataclass class DataTrackInfo: """Information about a published data track.""" sid: str """Unique track identifier assigned by the SFU. This identifier may change if a reconnect occurs. Use ``name`` if a stable identifier is needed. """ name: str """Name of the track assigned by the publisher.""" uses_e2ee: bool """Whether or not frames sent on the track use end-to-end encryption."""Information about a published data track.
Instance variables
var name : str-
Name of the track assigned by the publisher.
var sid : str-
Unique track identifier assigned by the SFU.
This identifier may change if a reconnect occurs. Use
nameif a stable identifier is needed. var uses_e2ee : bool-
Whether or not frames sent on the track use end-to-end encryption.
class DataTrackStream (owned_info: proto_data_track.OwnedDataTrackStream)-
Expand source code
class DataTrackStream: """An active subscription to a remote data track. Use as an async iterator to receive frames:: stream = remote_track.subscribe() async for frame in stream: process(frame.payload) Dropping or closing the stream unsubscribes from the track. If subscribing to the track fails, :class:`SubscribeDataTrackError` is raised when iteration ends instead of a normal ``StopAsyncIteration``. """ def __init__(self, owned_info: proto_data_track.OwnedDataTrackStream) -> None: self._ffi_handle = FfiHandle(owned_info.handle.id) handle_id = owned_info.handle.id self._queue = FfiClient.instance.queue.subscribe( filter_fn=lambda e: ( e.WhichOneof("message") == "data_track_stream_event" and e.data_track_stream_event.stream_handle == handle_id ), ) self._closed = False async def read(self) -> Optional[DataTrackFrame]: """Read a single frame, or ``None`` if the stream has ended.""" try: return await self.__anext__() except StopAsyncIteration: return None def __aiter__(self) -> AsyncIterator[DataTrackFrame]: return self async def __anext__(self) -> DataTrackFrame: if self._closed: raise StopAsyncIteration self._send_read_request() event: proto_ffi.FfiEvent = await self._queue.get() stream_event = event.data_track_stream_event detail = stream_event.WhichOneof("detail") if detail == "frame_received": proto_frame = stream_event.frame_received.frame user_ts: Optional[int] = None if proto_frame.HasField("user_timestamp"): user_ts = proto_frame.user_timestamp return DataTrackFrame( payload=proto_frame.payload, user_timestamp=user_ts, ) elif detail == "eos": self._close() if stream_event.eos.HasField("error"): raise SubscribeDataTrackError(stream_event.eos.error) raise StopAsyncIteration else: self._close() raise StopAsyncIteration def _send_read_request(self) -> None: req = proto_ffi.FfiRequest() req.data_track_stream_read.stream_handle = self._ffi_handle.handle FfiClient.instance.request(req) def _close(self) -> None: if not self._closed: self._closed = True FfiClient.instance.queue.unsubscribe(self._queue) def close(self) -> None: """Explicitly close the subscription and unsubscribe.""" self._close() self._ffi_handle.dispose() async def aclose(self) -> None: self.close()An active subscription to a remote data track.
Use as an async iterator to receive frames::
stream = remote_track.subscribe() async for frame in stream: process(frame.payload)Dropping or closing the stream unsubscribes from the track.
If subscribing to the track fails, :class:
SubscribeDataTrackErroris raised when iteration ends instead of a normalStopAsyncIteration.Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: self.close() def close(self) ‑> None-
Expand source code
def close(self) -> None: """Explicitly close the subscription and unsubscribe.""" self._close() self._ffi_handle.dispose()Explicitly close the subscription and unsubscribe.
async def read(self) ‑> DataTrackFrame | None-
Expand source code
async def read(self) -> Optional[DataTrackFrame]: """Read a single frame, or ``None`` if the stream has ended.""" try: return await self.__anext__() except StopAsyncIteration: return NoneRead a single frame, or
Noneif the stream has ended.
class LocalDataTrack (owned_info: proto_data_track.OwnedLocalDataTrack)-
Expand source code
class LocalDataTrack: """Data track published by the local participant.""" def __init__(self, owned_info: proto_data_track.OwnedLocalDataTrack) -> None: self._info = DataTrackInfo( sid=owned_info.info.sid, name=owned_info.info.name, uses_e2ee=owned_info.info.uses_e2ee, ) self._ffi_handle = FfiHandle(owned_info.handle.id) @property def info(self) -> DataTrackInfo: """Information about the data track.""" return self._info def try_push(self, frame: DataTrackFrame) -> None: """Try pushing a frame to subscribers of the track. See :class:`DataTrackFrame` for how to construct a frame and attach metadata. Args: frame: The data track frame to send. Raises: PushFrameError: If the push fails. """ proto_frame = proto_data_track.DataTrackFrame(payload=bytes(frame.payload)) if frame.user_timestamp is not None: proto_frame.user_timestamp = frame.user_timestamp req = proto_ffi.FfiRequest() req.local_data_track_try_push.track_handle = self._ffi_handle.handle req.local_data_track_try_push.frame.CopyFrom(proto_frame) resp = FfiClient.instance.request(req) if resp.local_data_track_try_push.HasField("error"): raise PushFrameError(resp.local_data_track_try_push.error.message) def is_published(self) -> bool: """Whether or not the track is still published.""" req = proto_ffi.FfiRequest() req.local_data_track_is_published.track_handle = self._ffi_handle.handle resp = FfiClient.instance.request(req) return resp.local_data_track_is_published.is_published async def unpublish(self) -> None: """Unpublishes the track.""" req = proto_ffi.FfiRequest() req.local_data_track_unpublish.track_handle = self._ffi_handle.handle FfiClient.instance.request(req) def __repr__(self) -> str: return f"rtc.LocalDataTrack(sid={self._info.sid}, name={self._info.name})"Data track published by the local participant.
Instance variables
prop info : DataTrackInfo-
Expand source code
@property def info(self) -> DataTrackInfo: """Information about the data track.""" return self._infoInformation about the data track.
Methods
def is_published(self) ‑> bool-
Expand source code
def is_published(self) -> bool: """Whether or not the track is still published.""" req = proto_ffi.FfiRequest() req.local_data_track_is_published.track_handle = self._ffi_handle.handle resp = FfiClient.instance.request(req) return resp.local_data_track_is_published.is_publishedWhether or not the track is still published.
def try_push(self,
frame: DataTrackFrame) ‑> None-
Expand source code
def try_push(self, frame: DataTrackFrame) -> None: """Try pushing a frame to subscribers of the track. See :class:`DataTrackFrame` for how to construct a frame and attach metadata. Args: frame: The data track frame to send. Raises: PushFrameError: If the push fails. """ proto_frame = proto_data_track.DataTrackFrame(payload=bytes(frame.payload)) if frame.user_timestamp is not None: proto_frame.user_timestamp = frame.user_timestamp req = proto_ffi.FfiRequest() req.local_data_track_try_push.track_handle = self._ffi_handle.handle req.local_data_track_try_push.frame.CopyFrom(proto_frame) resp = FfiClient.instance.request(req) if resp.local_data_track_try_push.HasField("error"): raise PushFrameError(resp.local_data_track_try_push.error.message)Try pushing a frame to subscribers of the track.
See :class:
DataTrackFramefor how to construct a frame and attach metadata.Args
frame- The data track frame to send.
Raises
PushFrameError- If the push fails.
async def unpublish(self) ‑> None-
Expand source code
async def unpublish(self) -> None: """Unpublishes the track.""" req = proto_ffi.FfiRequest() req.local_data_track_unpublish.track_handle = self._ffi_handle.handle FfiClient.instance.request(req)Unpublishes the track.
class PushFrameError (message: str)-
Expand source code
class PushFrameError(Exception): """Frame could not be pushed to a data track. Pushing a frame can fail for several reasons: - The track has been unpublished by the local participant or SFU - The room is no longer connected - Frames are being pushed too fast """ def __init__(self, message: str) -> None: self.message = messageFrame could not be pushed to a data track.
Pushing a frame can fail for several reasons:
- The track has been unpublished by the local participant or SFU
- The room is no longer connected
- Frames are being pushed too fast
Ancestors
- builtins.Exception
- builtins.BaseException
class RemoteDataTrack (owned_info: proto_data_track.OwnedRemoteDataTrack)-
Expand source code
class RemoteDataTrack: """Data track published by a remote participant.""" def __init__(self, owned_info: proto_data_track.OwnedRemoteDataTrack) -> None: self._info = DataTrackInfo( sid=owned_info.info.sid, name=owned_info.info.name, uses_e2ee=owned_info.info.uses_e2ee, ) self._ffi_handle = FfiHandle(owned_info.handle.id) self._publisher_identity = owned_info.publisher_identity @property def info(self) -> DataTrackInfo: """Information about the data track.""" return self._info @property def publisher_identity(self) -> str: """Identity of the participant who published the track.""" return self._publisher_identity def subscribe(self, *, buffer_size: Optional[int] = None) -> DataTrackStream: """Subscribes to the data track to receive frames. Args: buffer_size: Maximum number of received frames to buffer internally. When ``None``, the default buffer size is used. Zero is not a valid buffer size; if a value of zero is provided, it will be clamped to one. Returns a :class:`DataTrackStream` that yields :class:`DataTrackFrame` instances as they arrive. If the subscription encounters an error, it is raised as :class:`SubscribeDataTrackError` when iteration ends. """ opts = proto_data_track.DataTrackSubscribeOptions() if buffer_size is not None: opts.buffer_size = buffer_size req = proto_ffi.FfiRequest() req.subscribe_data_track.track_handle = self._ffi_handle.handle req.subscribe_data_track.options.CopyFrom(opts) resp = FfiClient.instance.request(req) return DataTrackStream(resp.subscribe_data_track.stream) def is_published(self) -> bool: """Whether or not the track is still published.""" req = proto_ffi.FfiRequest() req.remote_data_track_is_published.track_handle = self._ffi_handle.handle resp = FfiClient.instance.request(req) return resp.remote_data_track_is_published.is_published def __repr__(self) -> str: return ( f"rtc.RemoteDataTrack(sid={self._info.sid}, name={self._info.name}, " f"publisher_identity={self._publisher_identity})" )Data track published by a remote participant.
Instance variables
prop info : DataTrackInfo-
Expand source code
@property def info(self) -> DataTrackInfo: """Information about the data track.""" return self._infoInformation about the data track.
prop publisher_identity : str-
Expand source code
@property def publisher_identity(self) -> str: """Identity of the participant who published the track.""" return self._publisher_identityIdentity of the participant who published the track.
Methods
def is_published(self) ‑> bool-
Expand source code
def is_published(self) -> bool: """Whether or not the track is still published.""" req = proto_ffi.FfiRequest() req.remote_data_track_is_published.track_handle = self._ffi_handle.handle resp = FfiClient.instance.request(req) return resp.remote_data_track_is_published.is_publishedWhether or not the track is still published.
def subscribe(self, *, buffer_size: Optional[int] = None) ‑> DataTrackStream-
Expand source code
def subscribe(self, *, buffer_size: Optional[int] = None) -> DataTrackStream: """Subscribes to the data track to receive frames. Args: buffer_size: Maximum number of received frames to buffer internally. When ``None``, the default buffer size is used. Zero is not a valid buffer size; if a value of zero is provided, it will be clamped to one. Returns a :class:`DataTrackStream` that yields :class:`DataTrackFrame` instances as they arrive. If the subscription encounters an error, it is raised as :class:`SubscribeDataTrackError` when iteration ends. """ opts = proto_data_track.DataTrackSubscribeOptions() if buffer_size is not None: opts.buffer_size = buffer_size req = proto_ffi.FfiRequest() req.subscribe_data_track.track_handle = self._ffi_handle.handle req.subscribe_data_track.options.CopyFrom(opts) resp = FfiClient.instance.request(req) return DataTrackStream(resp.subscribe_data_track.stream)Subscribes to the data track to receive frames.
Args
buffer_size- Maximum number of received frames to buffer internally.
When
None, the default buffer size is used. Zero is not a valid buffer size; if a value of zero is provided, it will be clamped to one.
Returns a :class:
DataTrackStreamthat yields :class:DataTrackFrameinstances as they arrive. If the subscription encounters an error, it is raised as :class:SubscribeDataTrackErrorwhen iteration ends.
class SubscribeDataTrackError (message: str)-
Expand source code
class SubscribeDataTrackError(Exception): """An error that can occur when subscribing to a data track.""" def __init__(self, message: str) -> None: self.message = messageAn error that can occur when subscribing to a data track.
Ancestors
- builtins.Exception
- builtins.BaseException