Module livekit.rtc.participant

Classes

class LocalParticipant (room_queue: BroadcastQueue[proto_ffi.FfiEvent], owned_info: proto_participant.OwnedParticipant)

Represents the local participant in a room.

Expand source code
class LocalParticipant(Participant):
    """Represents the local participant in a room."""

    def __init__(
        self,
        room_queue: BroadcastQueue[proto_ffi.FfiEvent],
        owned_info: proto_participant.OwnedParticipant,
    ) -> None:
        super().__init__(owned_info)
        self._room_queue = room_queue
        self._track_publications: dict[str, LocalTrackPublication] = {}  # type: ignore
        self._rpc_handlers: Dict[
            str, Callable[[RpcInvocationData], Union[Awaitable[str], str]]
        ] = {}

    @property
    def track_publications(self) -> Mapping[str, LocalTrackPublication]:
        """
        A dictionary of track publications associated with the participant.
        """
        return self._track_publications

    async def publish_data(
        self,
        payload: Union[bytes, str],
        *,
        reliable: bool = True,
        destination_identities: List[str] = [],
        topic: str = "",
    ) -> None:
        """
        Publish arbitrary data to the room.

        Args:
            payload (Union[bytes, str]): The data to publish.
            reliable (bool, optional): Whether to send reliably or not. Defaults to True.
            destination_identities (List[str], optional): List of participant identities to send to. Defaults to [].
            topic (str, optional): The topic under which to publish the data. Defaults to "".

        Raises:
            PublishDataError: If there is an error in publishing data.
        """
        if isinstance(payload, str):
            payload = payload.encode("utf-8")

        data_len = len(payload)
        cdata = (ctypes.c_byte * data_len)(*payload)

        req = proto_ffi.FfiRequest()
        req.publish_data.local_participant_handle = self._ffi_handle.handle
        req.publish_data.data_ptr = ctypes.addressof(cdata)
        req.publish_data.data_len = data_len
        req.publish_data.reliable = reliable
        req.publish_data.topic = topic
        req.publish_data.destination_identities.extend(destination_identities)

        queue = FfiClient.instance.queue.subscribe()
        try:
            resp = FfiClient.instance.request(req)
            cb: proto_ffi.FfiEvent = await queue.wait_for(
                lambda e: e.publish_data.async_id == resp.publish_data.async_id
            )
        finally:
            FfiClient.instance.queue.unsubscribe(queue)

        if cb.publish_data.error:
            raise PublishDataError(cb.publish_data.error)

    async def publish_dtmf(self, *, code: int, digit: str) -> None:
        """
        Publish SIP DTMF message.

        Args:
            code (int): DTMF code.
            digit (str): DTMF digit.

        Raises:
            PublishDTMFError: If there is an error in publishing SIP DTMF message.
        """
        req = proto_ffi.FfiRequest()
        req.publish_sip_dtmf.local_participant_handle = self._ffi_handle.handle
        req.publish_sip_dtmf.code = code
        req.publish_sip_dtmf.digit = digit

        queue = FfiClient.instance.queue.subscribe()
        try:
            resp = FfiClient.instance.request(req)
            cb: proto_ffi.FfiEvent = await queue.wait_for(
                lambda e: e.publish_sip_dtmf.async_id == resp.publish_sip_dtmf.async_id
            )
        finally:
            FfiClient.instance.queue.unsubscribe(queue)

        if cb.publish_sip_dtmf.error:
            raise PublishDTMFError(cb.publish_sip_dtmf.error)

    async def publish_transcription(self, transcription: Transcription) -> None:
        """
        Publish transcription data to the room.

        Args:
            transcription (Transcription): The transcription data to publish.

        Raises:
            PublishTranscriptionError: If there is an error in publishing transcription.
        """
        req = proto_ffi.FfiRequest()
        proto_segments = [
            ProtoTranscriptionSegment(
                id=s.id,
                text=s.text,
                start_time=s.start_time,
                end_time=s.end_time,
                final=s.final,
                language=s.language,
            )
            for s in transcription.segments
        ]
        # fmt: off
        req.publish_transcription.local_participant_handle = self._ffi_handle.handle
        req.publish_transcription.participant_identity = transcription.participant_identity
        req.publish_transcription.segments.extend(proto_segments)
        req.publish_transcription.track_id = transcription.track_sid
        # fmt: on
        queue = FfiClient.instance.queue.subscribe()
        try:
            resp = FfiClient.instance.request(req)
            cb: proto_ffi.FfiEvent = await queue.wait_for(
                lambda e: e.publish_transcription.async_id
                == resp.publish_transcription.async_id
            )
        finally:
            FfiClient.instance.queue.unsubscribe(queue)

        if cb.publish_transcription.error:
            raise PublishTranscriptionError(cb.publish_transcription.error)

    async def perform_rpc(
        self,
        *,
        destination_identity: str,
        method: str,
        payload: str,
        response_timeout: Optional[float] = None,
    ) -> str:
        """
        Initiate an RPC call to a remote participant.

        Args:
            destination_identity (str): The `identity` of the destination participant
            method (str): The method name to call
            payload (str): The method payload
            response_timeout (Optional[float]): Timeout for receiving a response after initial connection

        Returns:
            str: The response payload

        Raises:
            RpcError: On failure. Details in `message`.
        """
        req = proto_ffi.FfiRequest()
        req.perform_rpc.local_participant_handle = self._ffi_handle.handle
        req.perform_rpc.destination_identity = destination_identity
        req.perform_rpc.method = method
        req.perform_rpc.payload = payload
        if response_timeout is not None:
            req.perform_rpc.response_timeout_ms = int(response_timeout * 1000)

        queue = FfiClient.instance.queue.subscribe()
        try:
            resp = FfiClient.instance.request(req)
            cb = await queue.wait_for(
                lambda e: (e.perform_rpc.async_id == resp.perform_rpc.async_id)
            )
        finally:
            FfiClient.instance.queue.unsubscribe(queue)

        if cb.perform_rpc.HasField("error"):
            raise RpcError._from_proto(cb.perform_rpc.error)

        return cb.perform_rpc.payload

    def register_rpc_method(
        self,
        method_name: str,
        handler: Optional[
            Callable[[RpcInvocationData], Union[Awaitable[str], str]]
        ] = None,
    ) -> Union[None, Callable]:
        """
        Establishes the participant as a receiver for calls of the specified RPC method.
        Can be used either as a decorator or a regular method.

        The handler will receive one argument of type `RpcInvocationData` and should return a string response which will be forwarded back to the caller.

        The handler may be synchronous or asynchronous.

        If unable to respond within `response_timeout`, the caller will hang up and receive an error on their side.

        You may raise errors of type `RpcError` in the handler, and they will be forwarded to the caller.

        Other errors raised in your handler will be caught and forwarded to the caller as "1500 Application Error".

        Args:
            method_name (str): The name of the indicated RPC method.
            handler (Optional[Callable]): Handler to be invoked whenever an RPC request for this method is received.  Omit this argument to use the decorator syntax.

        Returns:
            None (when used as a decorator it returns the decorator function)

        Example:
            # As a decorator:
            @room.local_participant.register_rpc_method("greet")
            async def greet_handler(data: RpcInvocationData) -> str:
                print(f"Received greeting from {data.caller_identity}: {data.payload}")
                return f"Hello, {data.caller_identity}!"

            # As a regular method:
            async def greet_handler(data: RpcInvocationData) -> str:
                print(f"Received greeting from {data.caller_identity}: {data.payload}")
                return f"Hello, {data.caller_identity}!"

            room.local_participant.register_rpc_method('greet', greet_handler)
        """

        def register(handler_func):
            self._rpc_handlers[method_name] = handler_func
            req = proto_ffi.FfiRequest()
            req.register_rpc_method.local_participant_handle = self._ffi_handle.handle
            req.register_rpc_method.method = method_name
            FfiClient.instance.request(req)

        if handler is not None:
            register(handler)
            return None
        else:
            # Called as a decorator
            return register

    def unregister_rpc_method(self, method: str) -> None:
        """
        Unregisters a previously registered RPC method.

        Args:
            method (str): The name of the RPC method to unregister
        """
        self._rpc_handlers.pop(method, None)

        req = proto_ffi.FfiRequest()
        req.unregister_rpc_method.local_participant_handle = self._ffi_handle.handle
        req.unregister_rpc_method.method = method

        FfiClient.instance.request(req)

    async def _handle_rpc_method_invocation(
        self,
        invocation_id: int,
        method: str,
        request_id: str,
        caller_identity: str,
        payload: str,
        response_timeout: float,
    ) -> None:
        response_error: Optional[RpcError] = None
        response_payload: Optional[str] = None

        params = RpcInvocationData(
            request_id, caller_identity, payload, response_timeout
        )

        handler = self._rpc_handlers.get(method)

        if not handler:
            response_error = RpcError._built_in(RpcError.ErrorCode.UNSUPPORTED_METHOD)
        else:
            try:
                if asyncio.iscoroutinefunction(handler):
                    async_handler = cast(
                        Callable[[RpcInvocationData], Awaitable[str]], handler
                    )

                    async def run_handler():
                        try:
                            return await async_handler(params)
                        except asyncio.CancelledError:
                            # This will be caught by the outer try-except if it's due to timeout
                            raise

                    try:
                        response_payload = await asyncio.wait_for(
                            run_handler(), timeout=response_timeout
                        )
                    except asyncio.TimeoutError:
                        raise RpcError._built_in(RpcError.ErrorCode.RESPONSE_TIMEOUT)
                    except asyncio.CancelledError:
                        raise RpcError._built_in(
                            RpcError.ErrorCode.RECIPIENT_DISCONNECTED
                        )
                else:
                    sync_handler = cast(Callable[[RpcInvocationData], str], handler)
                    response_payload = sync_handler(params)
            except RpcError as error:
                response_error = error
            except Exception as error:
                logger.exception(
                    f"Uncaught error returned by RPC handler for {method}. Returning APPLICATION_ERROR instead.  Original error: {error}",
                )
                response_error = RpcError._built_in(
                    RpcError.ErrorCode.APPLICATION_ERROR
                )

        req = proto_ffi.FfiRequest(
            rpc_method_invocation_response=RpcMethodInvocationResponseRequest(
                local_participant_handle=self._ffi_handle.handle,
                invocation_id=invocation_id,
                error=response_error._to_proto() if response_error else None,
                payload=response_payload,
            )
        )

        res = FfiClient.instance.request(req)

        if res.rpc_method_invocation_response.error:
            logger.exception(
                f"error sending rpc method invocation response: {res.rpc_method_invocation_response.error}"
            )

    async def set_metadata(self, metadata: str) -> None:
        """
        Set the metadata for the local participant.

        Note: this requires `canUpdateOwnMetadata` permission.

        Args:
            metadata (str): The new metadata.
        """
        req = proto_ffi.FfiRequest()
        req.set_local_metadata.local_participant_handle = self._ffi_handle.handle
        req.set_local_metadata.metadata = metadata

        queue = FfiClient.instance.queue.subscribe()
        try:
            resp = FfiClient.instance.request(req)
            await queue.wait_for(
                lambda e: e.set_local_metadata.async_id
                == resp.set_local_metadata.async_id
            )
        finally:
            FfiClient.instance.queue.unsubscribe(queue)

    async def set_name(self, name: str) -> None:
        """
        Set the name for the local participant.

        Note: this requires `canUpdateOwnMetadata` permission.

        Args:
            name (str): The new name.
        """
        req = proto_ffi.FfiRequest()
        req.set_local_name.local_participant_handle = self._ffi_handle.handle
        req.set_local_name.name = name

        queue = FfiClient.instance.queue.subscribe()
        try:
            resp = FfiClient.instance.request(req)
            await queue.wait_for(
                lambda e: e.set_local_name.async_id == resp.set_local_name.async_id
            )
        finally:
            FfiClient.instance.queue.unsubscribe(queue)

    async def set_attributes(self, attributes: dict[str, str]) -> None:
        """
        Set custom attributes for the local participant.

        Note: this requires `canUpdateOwnMetadata` permission.

        Args:
            attributes (dict[str, str]): A dictionary of attributes to set.
        """
        req = proto_ffi.FfiRequest()
        req.set_local_attributes.local_participant_handle = self._ffi_handle.handle
        existing_attributes = {
            entry.key: entry.value for entry in req.set_local_attributes.attributes
        }
        existing_attributes.update(attributes)

        for key, value in existing_attributes.items():
            entry = req.set_local_attributes.attributes.add()
            entry.key = key
            entry.value = value

        queue = FfiClient.instance.queue.subscribe()
        try:
            resp = FfiClient.instance.request(req)
            await queue.wait_for(
                lambda e: e.set_local_attributes.async_id
                == resp.set_local_attributes.async_id
            )
        finally:
            FfiClient.instance.queue.unsubscribe(queue)

    async def publish_track(
        self, track: LocalTrack, options: TrackPublishOptions = TrackPublishOptions()
    ) -> LocalTrackPublication:
        """
        Publish a local track to the room.

        Args:
            track (LocalTrack): The track to publish.
            options (TrackPublishOptions, optional): Options for publishing the track.

        Returns:
            LocalTrackPublication: The publication of the published track.

        Raises:
            PublishTrackError: If there is an error in publishing the track.
        """
        req = proto_ffi.FfiRequest()
        req.publish_track.track_handle = track._ffi_handle.handle
        req.publish_track.local_participant_handle = self._ffi_handle.handle
        req.publish_track.options.CopyFrom(options)

        queue = self._room_queue.subscribe()
        try:
            resp = FfiClient.instance.request(req)
            cb: proto_ffi.FfiEvent = await queue.wait_for(
                lambda e: e.publish_track.async_id == resp.publish_track.async_id
            )

            if cb.publish_track.error:
                raise PublishTrackError(cb.publish_track.error)

            track_publication = LocalTrackPublication(cb.publish_track.publication)
            track_publication.track = track
            track._info.sid = track_publication.sid
            self._track_publications[track_publication.sid] = track_publication

            queue.task_done()
            return track_publication
        finally:
            self._room_queue.unsubscribe(queue)

    async def unpublish_track(self, track_sid: str) -> None:
        """
        Unpublish a track from the room.

        Args:
            track_sid (str): The SID of the track to unpublish.

        Raises:
            UnpublishTrackError: If there is an error in unpublishing the track.
        """
        req = proto_ffi.FfiRequest()
        req.unpublish_track.local_participant_handle = self._ffi_handle.handle
        req.unpublish_track.track_sid = track_sid

        queue = self._room_queue.subscribe()
        try:
            resp = FfiClient.instance.request(req)
            cb: proto_ffi.FfiEvent = await queue.wait_for(
                lambda e: e.unpublish_track.async_id == resp.unpublish_track.async_id
            )

            if cb.unpublish_track.error:
                raise UnpublishTrackError(cb.unpublish_track.error)

            publication = self._track_publications.pop(track_sid)
            publication.track = None
            queue.task_done()
        finally:
            self._room_queue.unsubscribe(queue)

    def __repr__(self) -> str:
        return f"rtc.LocalParticipant(sid={self.sid}, identity={self.identity}, name={self.name})"

Ancestors

Methods

async def perform_rpc(self, *, destination_identity: str, method: str, payload: str, response_timeout: Optional[float] = None) ‑> str

Initiate an RPC call to a remote participant.

Args

destination_identity : str
The identity of the destination participant
method : str
The method name to call
payload : str
The method payload
response_timeout : Optional[float]
Timeout for receiving a response after initial connection

Returns

str
The response payload

Raises

RpcError
On failure. Details in message.
async def publish_data(self, payload: Union[bytes, str], *, reliable: bool = True, destination_identities: List[str] = [], topic: str = '') ‑> None

Publish arbitrary data to the room.

Args

payload : Union[bytes, str]
The data to publish.
reliable : bool, optional
Whether to send reliably or not. Defaults to True.
destination_identities : List[str], optional
List of participant identities to send to. Defaults to [].
topic : str, optional
The topic under which to publish the data. Defaults to "".

Raises

PublishDataError
If there is an error in publishing data.
async def publish_dtmf(self, *, code: int, digit: str) ‑> None

Publish SIP DTMF message.

Args

code : int
DTMF code.
digit : str
DTMF digit.

Raises

PublishDTMFError
If there is an error in publishing SIP DTMF message.
async def publish_track(self, track: LocalTrack, options: TrackPublishOptions = ) ‑> LocalTrackPublication

Publish a local track to the room.

Args

track : LocalTrack
The track to publish.
options : TrackPublishOptions, optional
Options for publishing the track.

Returns

LocalTrackPublication
The publication of the published track.

Raises

PublishTrackError
If there is an error in publishing the track.
async def publish_transcription(self, transcription: Transcription) ‑> None

Publish transcription data to the room.

Args

transcription : Transcription
The transcription data to publish.

Raises

PublishTranscriptionError
If there is an error in publishing transcription.
def register_rpc_method(self, method_name: str, handler: Optional[Callable[[RpcInvocationData], Union[Awaitable[str], str]]] = None) ‑> Optional[Callable]

Establishes the participant as a receiver for calls of the specified RPC method. Can be used either as a decorator or a regular method.

The handler will receive one argument of type RpcInvocationData and should return a string response which will be forwarded back to the caller.

The handler may be synchronous or asynchronous.

If unable to respond within response_timeout, the caller will hang up and receive an error on their side.

You may raise errors of type RpcError in the handler, and they will be forwarded to the caller.

Other errors raised in your handler will be caught and forwarded to the caller as "1500 Application Error".

Args

method_name : str
The name of the indicated RPC method.
handler : Optional[Callable]
Handler to be invoked whenever an RPC request for this method is received. Omit this argument to use the decorator syntax.

Returns

None (when used as a decorator it returns the decorator function)

Example

As a decorator:

@room.local_participant.register_rpc_method("greet") async def greet_handler(data: RpcInvocationData) -> str: print(f"Received greeting from {data.caller_identity}: {data.payload}") return f"Hello, {data.caller_identity}!"

As a regular method:

async def greet_handler(data: RpcInvocationData) -> str: print(f"Received greeting from {data.caller_identity}: {data.payload}") return f"Hello, {data.caller_identity}!"

room.local_participant.register_rpc_method('greet', greet_handler)

async def set_attributes(self, attributes: dict[str, str]) ‑> None

Set custom attributes for the local participant.

Note: this requires canUpdateOwnMetadata permission.

Args

attributes : dict[str, str]
A dictionary of attributes to set.
async def set_metadata(self, metadata: str) ‑> None

Set the metadata for the local participant.

Note: this requires canUpdateOwnMetadata permission.

Args

metadata : str
The new metadata.
async def set_name(self, name: str) ‑> None

Set the name for the local participant.

Note: this requires canUpdateOwnMetadata permission.

Args

name : str
The new name.
async def unpublish_track(self, track_sid: str) ‑> None

Unpublish a track from the room.

Args

track_sid : str
The SID of the track to unpublish.

Raises

UnpublishTrackError
If there is an error in unpublishing the track.
def unregister_rpc_method(self, method: str) ‑> None

Unregisters a previously registered RPC method.

Args

method : str
The name of the RPC method to unregister

Inherited members

class Participant (owned_info: proto_participant.OwnedParticipant)

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code
class Participant(ABC):
    def __init__(self, owned_info: proto_participant.OwnedParticipant) -> None:
        self._info = owned_info.info
        self._ffi_handle = FfiHandle(owned_info.handle.id)

    @property
    @abstractmethod
    def track_publications(self) -> Mapping[str, TrackPublication]:
        """
        A dictionary of track publications associated with the participant.
        """
        ...

    @property
    def sid(self) -> str:
        return self._info.sid

    @property
    def name(self) -> str:
        return self._info.name

    @property
    def identity(self) -> str:
        return self._info.identity

    @property
    def metadata(self) -> str:
        return self._info.metadata

    @property
    def attributes(self) -> dict[str, str]:
        """Custom attributes associated with the participant."""
        return dict(self._info.attributes)

    @property
    def kind(self) -> proto_participant.ParticipantKind.ValueType:
        """Participant's kind (e.g., regular participant, ingress, egress, sip, agent)."""
        return self._info.kind

Ancestors

  • abc.ABC

Subclasses

Instance variables

prop attributes : dict[str, str]

Custom attributes associated with the participant.

Expand source code
@property
def attributes(self) -> dict[str, str]:
    """Custom attributes associated with the participant."""
    return dict(self._info.attributes)
prop identity : str
Expand source code
@property
def identity(self) -> str:
    return self._info.identity
prop kind : proto_participant.ParticipantKind.ValueType

Participant's kind (e.g., regular participant, ingress, egress, sip, agent).

Expand source code
@property
def kind(self) -> proto_participant.ParticipantKind.ValueType:
    """Participant's kind (e.g., regular participant, ingress, egress, sip, agent)."""
    return self._info.kind
prop metadata : str
Expand source code
@property
def metadata(self) -> str:
    return self._info.metadata
prop name : str
Expand source code
@property
def name(self) -> str:
    return self._info.name
prop sid : str
Expand source code
@property
def sid(self) -> str:
    return self._info.sid
prop track_publications : Mapping[str, TrackPublication]

A dictionary of track publications associated with the participant.

Expand source code
@property
@abstractmethod
def track_publications(self) -> Mapping[str, TrackPublication]:
    """
    A dictionary of track publications associated with the participant.
    """
    ...
class PublishDTMFError (message: str)

Common base class for all non-exit exceptions.

Expand source code
class PublishDTMFError(Exception):
    def __init__(self, message: str) -> None:
        self.message = message

Ancestors

  • builtins.Exception
  • builtins.BaseException
class PublishDataError (message: str)

Common base class for all non-exit exceptions.

Expand source code
class PublishDataError(Exception):
    def __init__(self, message: str) -> None:
        self.message = message

Ancestors

  • builtins.Exception
  • builtins.BaseException
class PublishTrackError (message: str)

Common base class for all non-exit exceptions.

Expand source code
class PublishTrackError(Exception):
    def __init__(self, message: str) -> None:
        self.message = message

Ancestors

  • builtins.Exception
  • builtins.BaseException
class PublishTranscriptionError (message: str)

Common base class for all non-exit exceptions.

Expand source code
class PublishTranscriptionError(Exception):
    def __init__(self, message: str) -> None:
        self.message = message

Ancestors

  • builtins.Exception
  • builtins.BaseException
class RemoteParticipant (owned_info: proto_participant.OwnedParticipant)

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code
class RemoteParticipant(Participant):
    def __init__(self, owned_info: proto_participant.OwnedParticipant) -> None:
        super().__init__(owned_info)
        self._track_publications: dict[str, RemoteTrackPublication] = {}  # type: ignore

    @property
    def track_publications(self) -> Mapping[str, RemoteTrackPublication]:
        """
        A dictionary of track publications associated with the participant.
        """
        return self._track_publications

    def __repr__(self) -> str:
        return f"rtc.RemoteParticipant(sid={self.sid}, identity={self.identity}, name={self.name})"

Ancestors

Inherited members

class RpcMethodInvocationResponseRequest (*args, **kwargs)

A ProtocolMessage

Ancestors

  • google._upb._message.Message
  • google.protobuf.message.Message

Class variables

var DESCRIPTOR
class TrackPublishOptions (*args, **kwargs)

A ProtocolMessage

Ancestors

  • google._upb._message.Message
  • google.protobuf.message.Message

Class variables

var DESCRIPTOR
class ProtoTranscriptionSegment (*args, **kwargs)

A ProtocolMessage

Ancestors

  • google._upb._message.Message
  • google.protobuf.message.Message

Class variables

var DESCRIPTOR
class UnpublishTrackError (message: str)

Common base class for all non-exit exceptions.

Expand source code
class UnpublishTrackError(Exception):
    def __init__(self, message: str) -> None:
        self.message = message

Ancestors

  • builtins.Exception
  • builtins.BaseException