Module livekit.agents.ipc.proto
Classes
class Exiting (reason: str = '')
-
sent by the subprocess to the main process to indicate that it is exiting
Expand source code
@dataclass class Exiting: """sent by the subprocess to the main process to indicate that it is exiting""" MSG_ID: ClassVar[int] = 6 reason: str = "" def write(self, b: io.BytesIO) -> None: channel.write_string(b, self.reason) def read(self, b: io.BytesIO) -> None: self.reason = channel.read_string(b)
Class variables
var MSG_ID : ClassVar[int]
var reason : str
Methods
def read(self, b: io.BytesIO) ‑> None
def write(self, b: io.BytesIO) ‑> None
class InitializeRequest
-
sent by the main process to the subprocess to initialize it. this is going to call initialize_process_fnc
Expand source code
@dataclass class InitializeRequest: """sent by the main process to the subprocess to initialize it. this is going to call initialize_process_fnc""" MSG_ID: ClassVar[int] = 0
Class variables
var MSG_ID : ClassVar[int]
class InitializeResponse
-
mark the process as initialized
Expand source code
@dataclass class InitializeResponse: """mark the process as initialized""" MSG_ID: ClassVar[int] = 1
Class variables
var MSG_ID : ClassVar[int]
class PingRequest (timestamp: int = 0)
-
sent by the main process to the subprocess to check if it is still alive
Expand source code
@dataclass class PingRequest: """sent by the main process to the subprocess to check if it is still alive""" MSG_ID: ClassVar[int] = 2 timestamp: int = 0 def write(self, b: io.BytesIO) -> None: channel.write_long(b, self.timestamp) def read(self, b: io.BytesIO) -> None: self.timestamp = channel.read_long(b)
Class variables
var MSG_ID : ClassVar[int]
var timestamp : int
Methods
def read(self, b: io.BytesIO) ‑> None
def write(self, b: io.BytesIO) ‑> None
class PongResponse (last_timestamp: int = 0, timestamp: int = 0)
-
response to a PingRequest
Expand source code
@dataclass class PongResponse: """response to a PingRequest""" MSG_ID: ClassVar[int] = 3 last_timestamp: int = 0 timestamp: int = 0 def write(self, b: io.BytesIO) -> None: channel.write_long(b, self.last_timestamp) channel.write_long(b, self.timestamp) def read(self, b: io.BytesIO) -> None: self.last_timestamp = channel.read_long(b) self.timestamp = channel.read_long(b)
Class variables
var MSG_ID : ClassVar[int]
var last_timestamp : int
var timestamp : int
Methods
def read(self, b: io.BytesIO) ‑> None
def write(self, b: io.BytesIO) ‑> None
class ShutdownRequest (reason: str = '')
-
sent by the main process to the subprocess to indicate that it should shut down gracefully. the subprocess will follow with a ExitInfo message
Expand source code
@dataclass class ShutdownRequest: """sent by the main process to the subprocess to indicate that it should shut down gracefully. the subprocess will follow with a ExitInfo message""" MSG_ID: ClassVar[int] = 5 reason: str = "" def write(self, b: io.BytesIO) -> None: channel.write_string(b, self.reason) def read(self, b: io.BytesIO) -> None: self.reason = channel.read_string(b)
Class variables
var MSG_ID : ClassVar[int]
var reason : str
Methods
def read(self, b: io.BytesIO) ‑> None
def write(self, b: io.BytesIO) ‑> None
class StartJobRequest
-
sent by the main process to the subprocess to start a job, the subprocess will only receive this message if the process is fully initialized (after sending a InitializeResponse).
Expand source code
@dataclass class StartJobRequest: """sent by the main process to the subprocess to start a job, the subprocess will only receive this message if the process is fully initialized (after sending a InitializeResponse).""" MSG_ID: ClassVar[int] = 4 running_job: RunningJobInfo = field(init=False) def write(self, b: io.BytesIO) -> None: accept_args = self.running_job.accept_arguments channel.write_bytes(b, self.running_job.job.SerializeToString()) channel.write_string(b, accept_args.name) channel.write_string(b, accept_args.identity) channel.write_string(b, accept_args.metadata) channel.write_string(b, self.running_job.url) channel.write_string(b, self.running_job.token) def read(self, b: io.BytesIO) -> None: job = agent.Job() job.ParseFromString(channel.read_bytes(b)) self.running_job = RunningJobInfo( accept_arguments=JobAcceptArguments( name=channel.read_string(b), identity=channel.read_string(b), metadata=channel.read_string(b), ), job=job, url=channel.read_string(b), token=channel.read_string(b), )
Class variables
var MSG_ID : ClassVar[int]
var running_job : livekit.agents.job.RunningJobInfo
Methods
def read(self, b: io.BytesIO) ‑> None
def write(self, b: io.BytesIO) ‑> None