Module livekit.rtc.chat
Classes
class ChatManager (room: Room)
-
Expand source code
class ChatManager(EventEmitter[EventTypes]): """A utility class that sends and receives chat messages in the active session. It implements LiveKit Chat Protocol, and serializes data to/from JSON data packets. """ def __init__(self, room: Room): super().__init__() self._lp = room.local_participant self._room = room room.on("data_received", self._on_data_received) def close(self): self._room.off("data_received", self._on_data_received) async def send_message(self, message: str) -> "ChatMessage": """Send a chat message to the end user using LiveKit Chat Protocol. Args: message (str): the message to send Returns: ChatMessage: the message that was sent """ msg = ChatMessage( message=message, is_local=True, participant=self._lp, ) await self._lp.publish_data( payload=json.dumps(msg.asjsondict()), topic=_CHAT_TOPIC, ) return msg async def update_message(self, message: "ChatMessage"): """Update a chat message that was previously sent. If message.deleted is set to True, we'll signal to remote participants that the message should be deleted. """ await self._lp.publish_data( payload=json.dumps(message.asjsondict()), topic=_CHAT_UPDATE_TOPIC, ) def _on_data_received(self, dp: DataPacket): # handle both new and updates the same way, as long as the ID is in there # the user can decide how to replace the previous message if dp.topic == _CHAT_TOPIC or dp.topic == _CHAT_UPDATE_TOPIC: try: parsed = json.loads(dp.data) msg = ChatMessage.from_jsondict(parsed) if dp.participant: msg.participant = dp.participant self.emit("message_received", msg) except Exception as e: logging.warning("failed to parse chat message: %s", e, exc_info=e)
A utility class that sends and receives chat messages in the active session.
It implements LiveKit Chat Protocol, and serializes data to/from JSON data packets.
Initialize a new instance of EventEmitter.
Ancestors
- EventEmitter
- typing.Generic
Methods
def close(self)
-
Expand source code
def close(self): self._room.off("data_received", self._on_data_received)
async def send_message(self, message: str) ‑> ChatMessage
-
Expand source code
async def send_message(self, message: str) -> "ChatMessage": """Send a chat message to the end user using LiveKit Chat Protocol. Args: message (str): the message to send Returns: ChatMessage: the message that was sent """ msg = ChatMessage( message=message, is_local=True, participant=self._lp, ) await self._lp.publish_data( payload=json.dumps(msg.asjsondict()), topic=_CHAT_TOPIC, ) return msg
Send a chat message to the end user using LiveKit Chat Protocol.
Args
message
:str
- the message to send
Returns
ChatMessage
- the message that was sent
async def update_message(self,
message: ChatMessage)-
Expand source code
async def update_message(self, message: "ChatMessage"): """Update a chat message that was previously sent. If message.deleted is set to True, we'll signal to remote participants that the message should be deleted. """ await self._lp.publish_data( payload=json.dumps(message.asjsondict()), topic=_CHAT_UPDATE_TOPIC, )
Update a chat message that was previously sent.
If message.deleted is set to True, we'll signal to remote participants that the message should be deleted.
Inherited members
class ChatMessage (message: str | None = None,
id: str = <factory>,
timestamp: datetime.datetime = <factory>,
deleted: bool = False,
participant: Participant | None = None,
is_local: bool = False)-
Expand source code
@dataclass class ChatMessage: message: Optional[str] = None id: str = field(default_factory=generate_random_base62) timestamp: datetime = field(default_factory=datetime.now) deleted: bool = field(default=False) # These fields are not part of the wire protocol. They are here to provide # context for the application. participant: Optional[Participant] = None is_local: bool = field(default=False) @classmethod def from_jsondict(cls, d: Dict[str, Any]) -> "ChatMessage": # older version of the protocol didn't contain a message ID, so we'll create one id = d.get("id") or generate_random_base62() timestamp = datetime.now() if d.get("timestamp"): timestamp = datetime.fromtimestamp(d.get("timestamp", 0) / 1000.0) msg = cls( id=id, timestamp=timestamp, ) msg.update_from_jsondict(d) return msg def update_from_jsondict(self, d: Dict[str, Any]) -> None: self.message = d.get("message") self.deleted = d.get("deleted", False) def asjsondict(self): """Returns a JSON serializable dictionary representation of the message.""" d = { "id": self.id, "message": self.message, "timestamp": int(self.timestamp.timestamp() * 1000), } if self.deleted: d["deleted"] = True return d
ChatMessage(message: Optional[str] = None, id: str =
, timestamp: datetime.datetime = , deleted: bool = False, participant: Optional[livekit.rtc.participant.Participant] = None, is_local: bool = False) Class variables
var deleted : bool
var id : str
var is_local : bool
var message : str | None
var participant : Participant | None
var timestamp : datetime.datetime
Static methods
def from_jsondict(d: Dict[str, Any]) ‑> ChatMessage
Methods
def asjsondict(self)
-
Expand source code
def asjsondict(self): """Returns a JSON serializable dictionary representation of the message.""" d = { "id": self.id, "message": self.message, "timestamp": int(self.timestamp.timestamp() * 1000), } if self.deleted: d["deleted"] = True return d
Returns a JSON serializable dictionary representation of the message.
def update_from_jsondict(self, d: Dict[str, Any]) ‑> None
-
Expand source code
def update_from_jsondict(self, d: Dict[str, Any]) -> None: self.message = d.get("message") self.deleted = d.get("deleted", False)