Module livekit.rtc.participant
Classes
class LocalParticipant (room_queue: BroadcastQueue[proto_ffi.FfiEvent], owned_info: proto_participant.OwnedParticipant)
-
Represents the local participant in a room.
Expand source code
class LocalParticipant(Participant): """Represents the local participant in a room.""" def __init__( self, room_queue: BroadcastQueue[proto_ffi.FfiEvent], owned_info: proto_participant.OwnedParticipant, ) -> None: super().__init__(owned_info) self._room_queue = room_queue self._track_publications: dict[str, LocalTrackPublication] = {} # type: ignore @property def track_publications(self) -> Mapping[str, LocalTrackPublication]: """ A dictionary of track publications associated with the participant. """ return self._track_publications async def publish_data( self, payload: Union[bytes, str], *, reliable: bool = True, destination_identities: List[str] = [], topic: str = "", ) -> None: """ Publish arbitrary data to the room. Args: payload (Union[bytes, str]): The data to publish. reliable (bool, optional): Whether to send reliably or not. Defaults to True. destination_identities (List[str], optional): List of participant identities to send to. Defaults to []. topic (str, optional): The topic under which to publish the data. Defaults to "". Raises: PublishDataError: If there is an error in publishing data. """ if isinstance(payload, str): payload = payload.encode("utf-8") data_len = len(payload) cdata = (ctypes.c_byte * data_len)(*payload) req = proto_ffi.FfiRequest() req.publish_data.local_participant_handle = self._ffi_handle.handle req.publish_data.data_ptr = ctypes.addressof(cdata) req.publish_data.data_len = data_len req.publish_data.reliable = reliable req.publish_data.topic = topic req.publish_data.destination_identities.extend(destination_identities) queue = FfiClient.instance.queue.subscribe() try: resp = FfiClient.instance.request(req) cb = await queue.wait_for( lambda e: e.publish_data.async_id == resp.publish_data.async_id ) finally: FfiClient.instance.queue.unsubscribe(queue) if cb.publish_data.error: raise PublishDataError(cb.publish_data.error) async def publish_dtmf(self, *, code: int, digit: str) -> None: """ Publish SIP DTMF message. Args: code (int): DTMF code. digit (str): DTMF digit. Raises: PublishDTMFError: If there is an error in publishing SIP DTMF message. """ req = proto_ffi.FfiRequest() req.publish_sip_dtmf.local_participant_handle = self._ffi_handle.handle req.publish_sip_dtmf.code = code req.publish_sip_dtmf.digit = digit queue = FfiClient.instance.queue.subscribe() try: resp = FfiClient.instance.request(req) cb = await queue.wait_for( lambda e: e.publish_sip_dtmf.async_id == resp.publish_sip_dtmf.async_id ) finally: FfiClient.instance.queue.unsubscribe(queue) if cb.publish_sip_dtmf.error: raise PublishDTMFError(cb.publish_sip_dtmf.error) async def publish_transcription(self, transcription: Transcription) -> None: """ Publish transcription data to the room. Args: transcription (Transcription): The transcription data to publish. Raises: PublishTranscriptionError: If there is an error in publishing transcription. """ req = proto_ffi.FfiRequest() proto_segments = [ ProtoTranscriptionSegment( id=s.id, text=s.text, start_time=s.start_time, end_time=s.end_time, final=s.final, language=s.language, ) for s in transcription.segments ] # fmt: off req.publish_transcription.local_participant_handle = self._ffi_handle.handle req.publish_transcription.participant_identity = transcription.participant_identity req.publish_transcription.segments.extend(proto_segments) req.publish_transcription.track_id = transcription.track_sid # fmt: on queue = FfiClient.instance.queue.subscribe() try: resp = FfiClient.instance.request(req) cb = await queue.wait_for( lambda e: e.publish_transcription.async_id == resp.publish_transcription.async_id ) finally: FfiClient.instance.queue.unsubscribe(queue) if cb.publish_transcription.error: raise PublishTranscriptionError(cb.publish_transcription.error) async def set_metadata(self, metadata: str) -> None: """ Set the metadata for the local participant. Note: this requires `canUpdateOwnMetadata` permission. Args: metadata (str): The new metadata. """ req = proto_ffi.FfiRequest() req.set_local_metadata.local_participant_handle = self._ffi_handle.handle req.set_local_metadata.metadata = metadata queue = FfiClient.instance.queue.subscribe() try: resp = FfiClient.instance.request(req) await queue.wait_for( lambda e: e.set_local_metadata.async_id == resp.set_local_metadata.async_id ) finally: FfiClient.instance.queue.unsubscribe(queue) async def set_name(self, name: str) -> None: """ Set the name for the local participant. Note: this requires `canUpdateOwnMetadata` permission. Args: name (str): The new name. """ req = proto_ffi.FfiRequest() req.set_local_name.local_participant_handle = self._ffi_handle.handle req.set_local_name.name = name queue = FfiClient.instance.queue.subscribe() try: resp = FfiClient.instance.request(req) await queue.wait_for( lambda e: e.set_local_name.async_id == resp.set_local_name.async_id ) finally: FfiClient.instance.queue.unsubscribe(queue) async def set_attributes(self, attributes: dict[str, str]) -> None: """ Set custom attributes for the local participant. Note: this requires `canUpdateOwnMetadata` permission. Args: attributes (dict[str, str]): A dictionary of attributes to set. """ req = proto_ffi.FfiRequest() req.set_local_attributes.local_participant_handle = self._ffi_handle.handle req.set_local_attributes.attributes.update(attributes) queue = FfiClient.instance.queue.subscribe() try: resp = FfiClient.instance.request(req) await queue.wait_for( lambda e: e.set_local_attributes.async_id == resp.set_local_attributes.async_id ) finally: FfiClient.instance.queue.unsubscribe(queue) async def publish_track( self, track: LocalTrack, options: TrackPublishOptions = TrackPublishOptions() ) -> LocalTrackPublication: """ Publish a local track to the room. Args: track (LocalTrack): The track to publish. options (TrackPublishOptions, optional): Options for publishing the track. Returns: LocalTrackPublication: The publication of the published track. Raises: PublishTrackError: If there is an error in publishing the track. """ req = proto_ffi.FfiRequest() req.publish_track.track_handle = track._ffi_handle.handle req.publish_track.local_participant_handle = self._ffi_handle.handle req.publish_track.options.CopyFrom(options) queue = self._room_queue.subscribe() try: resp = FfiClient.instance.request(req) cb = await queue.wait_for( lambda e: e.publish_track.async_id == resp.publish_track.async_id ) if cb.publish_track.error: raise PublishTrackError(cb.publish_track.error) track_publication = LocalTrackPublication(cb.publish_track.publication) track_publication.track = track track._info.sid = track_publication.sid self._track_publications[track_publication.sid] = track_publication queue.task_done() return track_publication finally: self._room_queue.unsubscribe(queue) async def unpublish_track(self, track_sid: str) -> None: """ Unpublish a track from the room. Args: track_sid (str): The SID of the track to unpublish. Raises: UnpublishTrackError: If there is an error in unpublishing the track. """ req = proto_ffi.FfiRequest() req.unpublish_track.local_participant_handle = self._ffi_handle.handle req.unpublish_track.track_sid = track_sid queue = self._room_queue.subscribe() try: resp = FfiClient.instance.request(req) cb = await queue.wait_for( lambda e: e.unpublish_track.async_id == resp.unpublish_track.async_id ) if cb.unpublish_track.error: raise UnpublishTrackError(cb.unpublish_track.error) publication = self._track_publications.pop(track_sid) publication.track = None queue.task_done() finally: self._room_queue.unsubscribe(queue) def __repr__(self) -> str: return f"rtc.LocalParticipant(sid={self.sid}, identity={self.identity}, name={self.name})"
Ancestors
- Participant
- abc.ABC
Methods
async def publish_data(self, payload: Union[bytes, str], *, reliable: bool = True, destination_identities: List[str] = [], topic: str = '') ‑> None
-
Publish arbitrary data to the room.
Args
payload
:Union[bytes, str]
- The data to publish.
reliable
:bool
, optional- Whether to send reliably or not. Defaults to True.
destination_identities
:List[str]
, optional- List of participant identities to send to. Defaults to [].
topic
:str
, optional- The topic under which to publish the data. Defaults to "".
Raises
PublishDataError
- If there is an error in publishing data.
async def publish_dtmf(self, *, code: int, digit: str) ‑> None
-
Publish SIP DTMF message.
Args
code
:int
- DTMF code.
digit
:str
- DTMF digit.
Raises
PublishDTMFError
- If there is an error in publishing SIP DTMF message.
async def publish_track(self, track: LocalTrack, options: TrackPublishOptions = ) ‑> LocalTrackPublication
-
Publish a local track to the room.
Args
track
:LocalTrack
- The track to publish.
options
:TrackPublishOptions
, optional- Options for publishing the track.
Returns
LocalTrackPublication
- The publication of the published track.
Raises
PublishTrackError
- If there is an error in publishing the track.
async def publish_transcription(self, transcription: Transcription) ‑> None
-
Publish transcription data to the room.
Args
transcription
:Transcription
- The transcription data to publish.
Raises
PublishTranscriptionError
- If there is an error in publishing transcription.
async def set_attributes(self, attributes: dict[str, str]) ‑> None
-
Set custom attributes for the local participant.
Note: this requires
canUpdateOwnMetadata
permission.Args
attributes
:dict[str, str]
- A dictionary of attributes to set.
async def set_metadata(self, metadata: str) ‑> None
-
Set the metadata for the local participant.
Note: this requires
canUpdateOwnMetadata
permission.Args
metadata
:str
- The new metadata.
async def set_name(self, name: str) ‑> None
-
Set the name for the local participant.
Note: this requires
canUpdateOwnMetadata
permission.Args
name
:str
- The new name.
async def unpublish_track(self, track_sid: str) ‑> None
-
Unpublish a track from the room.
Args
track_sid
:str
- The SID of the track to unpublish.
Raises
UnpublishTrackError
- If there is an error in unpublishing the track.
Inherited members
class Participant (owned_info: proto_participant.OwnedParticipant)
-
Helper class that provides a standard way to create an ABC using inheritance.
Expand source code
class Participant(ABC): def __init__(self, owned_info: proto_participant.OwnedParticipant) -> None: self._info = owned_info.info self._ffi_handle = FfiHandle(owned_info.handle.id) @property @abstractmethod def track_publications(self) -> Mapping[str, TrackPublication]: """ A dictionary of track publications associated with the participant. """ ... @property def sid(self) -> str: return self._info.sid @property def name(self) -> str: return self._info.name @property def identity(self) -> str: return self._info.identity @property def metadata(self) -> str: return self._info.metadata @property def attributes(self) -> dict[str, str]: """Custom attributes associated with the participant.""" return dict(self._info.attributes) @property def kind(self) -> proto_participant.ParticipantKind.ValueType: """Participant's kind (e.g., regular participant, ingress, egress, sip, agent).""" return self._info.kind
Ancestors
- abc.ABC
Subclasses
Instance variables
prop attributes : dict[str, str]
-
Custom attributes associated with the participant.
Expand source code
@property def attributes(self) -> dict[str, str]: """Custom attributes associated with the participant.""" return dict(self._info.attributes)
prop identity : str
-
Expand source code
@property def identity(self) -> str: return self._info.identity
prop kind : proto_participant.ParticipantKind.ValueType
-
Participant's kind (e.g., regular participant, ingress, egress, sip, agent).
Expand source code
@property def kind(self) -> proto_participant.ParticipantKind.ValueType: """Participant's kind (e.g., regular participant, ingress, egress, sip, agent).""" return self._info.kind
prop metadata : str
-
Expand source code
@property def metadata(self) -> str: return self._info.metadata
prop name : str
-
Expand source code
@property def name(self) -> str: return self._info.name
prop sid : str
-
Expand source code
@property def sid(self) -> str: return self._info.sid
prop track_publications : Mapping[str, TrackPublication]
-
A dictionary of track publications associated with the participant.
Expand source code
@property @abstractmethod def track_publications(self) -> Mapping[str, TrackPublication]: """ A dictionary of track publications associated with the participant. """ ...
class PublishDTMFError (message: str)
-
Common base class for all non-exit exceptions.
Expand source code
class PublishDTMFError(Exception): def __init__(self, message: str) -> None: self.message = message
Ancestors
- builtins.Exception
- builtins.BaseException
class PublishDataError (message: str)
-
Common base class for all non-exit exceptions.
Expand source code
class PublishDataError(Exception): def __init__(self, message: str) -> None: self.message = message
Ancestors
- builtins.Exception
- builtins.BaseException
class PublishTrackError (message: str)
-
Common base class for all non-exit exceptions.
Expand source code
class PublishTrackError(Exception): def __init__(self, message: str) -> None: self.message = message
Ancestors
- builtins.Exception
- builtins.BaseException
class PublishTranscriptionError (message: str)
-
Common base class for all non-exit exceptions.
Expand source code
class PublishTranscriptionError(Exception): def __init__(self, message: str) -> None: self.message = message
Ancestors
- builtins.Exception
- builtins.BaseException
class RemoteParticipant (owned_info: proto_participant.OwnedParticipant)
-
Helper class that provides a standard way to create an ABC using inheritance.
Expand source code
class RemoteParticipant(Participant): def __init__(self, owned_info: proto_participant.OwnedParticipant) -> None: super().__init__(owned_info) self._track_publications: dict[str, RemoteTrackPublication] = {} # type: ignore @property def track_publications(self) -> Mapping[str, RemoteTrackPublication]: """ A dictionary of track publications associated with the participant. """ return self._track_publications def __repr__(self) -> str: return f"rtc.RemoteParticipant(sid={self.sid}, identity={self.identity}, name={self.name})"
Ancestors
- Participant
- abc.ABC
Inherited members
class TrackPublishOptions (*args, **kwargs)
-
A ProtocolMessage
Ancestors
- google._upb._message.Message
- google.protobuf.message.Message
Class variables
var DESCRIPTOR
class ProtoTranscriptionSegment (*args, **kwargs)
-
A ProtocolMessage
Ancestors
- google._upb._message.Message
- google.protobuf.message.Message
Class variables
var DESCRIPTOR
class UnpublishTrackError (message: str)
-
Common base class for all non-exit exceptions.
Expand source code
class UnpublishTrackError(Exception): def __init__(self, message: str) -> None: self.message = message
Ancestors
- builtins.Exception
- builtins.BaseException