Module livekit.rtc.chat

Classes

class ChatManager (room: Room)
Expand source code
class ChatManager(EventEmitter[EventTypes]):
    """A utility class that sends and receives chat messages in the active session.

    It implements LiveKit Chat Protocol, and serializes data to/from JSON data packets.
    """

    def __init__(self, room: Room):
        super().__init__()
        self._lp = room.local_participant
        self._room = room

        room.on("data_received", self._on_data_received)

    def close(self):
        self._room.off("data_received", self._on_data_received)

    async def send_message(self, message: str) -> "ChatMessage":
        """Send a chat message to the end user using LiveKit Chat Protocol.

        Args:
            message (str): the message to send

        Returns:
            ChatMessage: the message that was sent
        """
        msg = ChatMessage(
            message=message,
            is_local=True,
            participant=self._lp,
        )
        await self._lp.publish_data(
            payload=json.dumps(msg.asjsondict()),
            topic=_CHAT_TOPIC,
        )
        return msg

    async def update_message(self, message: "ChatMessage"):
        """Update a chat message that was previously sent.

        If message.deleted is set to True, we'll signal to remote participants that the message
        should be deleted.
        """
        await self._lp.publish_data(
            payload=json.dumps(message.asjsondict()),
            topic=_CHAT_UPDATE_TOPIC,
        )

    def _on_data_received(self, dp: DataPacket):
        # handle both new and updates the same way, as long as the ID is in there
        # the user can decide how to replace the previous message
        if dp.topic == _CHAT_TOPIC or dp.topic == _CHAT_UPDATE_TOPIC:
            try:
                parsed = json.loads(dp.data)
                msg = ChatMessage.from_jsondict(parsed)
                if dp.participant:
                    msg.participant = dp.participant
                self.emit("message_received", msg)
            except Exception as e:
                logging.warning("failed to parse chat message: %s", e, exc_info=e)

A utility class that sends and receives chat messages in the active session.

It implements LiveKit Chat Protocol, and serializes data to/from JSON data packets.

Initialize a new instance of EventEmitter.

Ancestors

Methods

def close(self)
Expand source code
def close(self):
    self._room.off("data_received", self._on_data_received)
async def send_message(self, message: str) ‑> ChatMessage
Expand source code
async def send_message(self, message: str) -> "ChatMessage":
    """Send a chat message to the end user using LiveKit Chat Protocol.

    Args:
        message (str): the message to send

    Returns:
        ChatMessage: the message that was sent
    """
    msg = ChatMessage(
        message=message,
        is_local=True,
        participant=self._lp,
    )
    await self._lp.publish_data(
        payload=json.dumps(msg.asjsondict()),
        topic=_CHAT_TOPIC,
    )
    return msg

Send a chat message to the end user using LiveKit Chat Protocol.

Args

message : str
the message to send

Returns

ChatMessage
the message that was sent
async def update_message(self,
message: ChatMessage)
Expand source code
async def update_message(self, message: "ChatMessage"):
    """Update a chat message that was previously sent.

    If message.deleted is set to True, we'll signal to remote participants that the message
    should be deleted.
    """
    await self._lp.publish_data(
        payload=json.dumps(message.asjsondict()),
        topic=_CHAT_UPDATE_TOPIC,
    )

Update a chat message that was previously sent.

If message.deleted is set to True, we'll signal to remote participants that the message should be deleted.

Inherited members

class ChatMessage (message: str | None = None,
id: str = <factory>,
timestamp: datetime.datetime = <factory>,
deleted: bool = False,
participant: Participant | None = None,
is_local: bool = False)
Expand source code
@dataclass
class ChatMessage:
    message: Optional[str] = None
    id: str = field(default_factory=generate_random_base62)
    timestamp: datetime = field(default_factory=datetime.now)
    deleted: bool = field(default=False)

    # These fields are not part of the wire protocol. They are here to provide
    # context for the application.
    participant: Optional[Participant] = None
    is_local: bool = field(default=False)

    @classmethod
    def from_jsondict(cls, d: Dict[str, Any]) -> "ChatMessage":
        # older version of the protocol didn't contain a message ID, so we'll create one
        id = d.get("id") or generate_random_base62()
        timestamp = datetime.now()
        if d.get("timestamp"):
            timestamp = datetime.fromtimestamp(d.get("timestamp", 0) / 1000.0)
        msg = cls(
            id=id,
            timestamp=timestamp,
        )
        msg.update_from_jsondict(d)
        return msg

    def update_from_jsondict(self, d: Dict[str, Any]) -> None:
        self.message = d.get("message")
        self.deleted = d.get("deleted", False)

    def asjsondict(self):
        """Returns a JSON serializable dictionary representation of the message."""
        d = {
            "id": self.id,
            "message": self.message,
            "timestamp": int(self.timestamp.timestamp() * 1000),
        }
        if self.deleted:
            d["deleted"] = True
        return d

ChatMessage(message: Optional[str] = None, id: str = , timestamp: datetime.datetime = , deleted: bool = False, participant: Optional[livekit.rtc.participant.Participant] = None, is_local: bool = False)

Class variables

var deleted : bool
var id : str
var is_local : bool
var message : str | None
var participantParticipant | None
var timestamp : datetime.datetime

Static methods

def from_jsondict(d: Dict[str, Any]) ‑> ChatMessage

Methods

def asjsondict(self)
Expand source code
def asjsondict(self):
    """Returns a JSON serializable dictionary representation of the message."""
    d = {
        "id": self.id,
        "message": self.message,
        "timestamp": int(self.timestamp.timestamp() * 1000),
    }
    if self.deleted:
        d["deleted"] = True
    return d

Returns a JSON serializable dictionary representation of the message.

def update_from_jsondict(self, d: Dict[str, Any]) ‑> None
Expand source code
def update_from_jsondict(self, d: Dict[str, Any]) -> None:
    self.message = d.get("message")
    self.deleted = d.get("deleted", False)