Module livekit.agents.ipc.proto
Classes
class Exiting (reason: str = '')
-
Expand source code
@dataclass class Exiting: """sent by the subprocess to the main process to indicate that it is exiting""" MSG_ID: ClassVar[int] = 6 reason: str = "" def write(self, b: io.BytesIO) -> None: channel.write_string(b, self.reason) def read(self, b: io.BytesIO) -> None: self.reason = channel.read_string(b)
sent by the subprocess to the main process to indicate that it is exiting
Instance variables
var MSG_ID : ClassVar[int]
var reason : str
Methods
def read(self, b: io.BytesIO) ‑> None
-
Expand source code
def read(self, b: io.BytesIO) -> None: self.reason = channel.read_string(b)
def write(self, b: io.BytesIO) ‑> None
-
Expand source code
def write(self, b: io.BytesIO) -> None: channel.write_string(b, self.reason)
class InferenceRequest (method: str = '', request_id: str = '', data: bytes = b'')
-
Expand source code
@dataclass class InferenceRequest: """sent by a subprocess to the main process to request inference""" MSG_ID: ClassVar[int] = 7 method: str = "" request_id: str = "" data: bytes = b"" def write(self, b: io.BytesIO) -> None: channel.write_string(b, self.method) channel.write_string(b, self.request_id) channel.write_bytes(b, self.data) def read(self, b: io.BytesIO) -> None: self.method = channel.read_string(b) self.request_id = channel.read_string(b) self.data = channel.read_bytes(b)
sent by a subprocess to the main process to request inference
Instance variables
var MSG_ID : ClassVar[int]
var data : bytes
var method : str
var request_id : str
Methods
def read(self, b: io.BytesIO) ‑> None
-
Expand source code
def read(self, b: io.BytesIO) -> None: self.method = channel.read_string(b) self.request_id = channel.read_string(b) self.data = channel.read_bytes(b)
def write(self, b: io.BytesIO) ‑> None
-
Expand source code
def write(self, b: io.BytesIO) -> None: channel.write_string(b, self.method) channel.write_string(b, self.request_id) channel.write_bytes(b, self.data)
class InferenceResponse (request_id: str = '', data: bytes | None = None, error: str = '')
-
Expand source code
@dataclass class InferenceResponse: """response to an InferenceRequest""" MSG_ID: ClassVar[int] = 8 request_id: str = "" data: bytes | None = None error: str = "" def write(self, b: io.BytesIO) -> None: channel.write_string(b, self.request_id) channel.write_bool(b, self.data is not None) if self.data is not None: channel.write_bytes(b, self.data) channel.write_string(b, self.error) def read(self, b: io.BytesIO) -> None: self.request_id = channel.read_string(b) has_data = channel.read_bool(b) if has_data: self.data = channel.read_bytes(b) self.error = channel.read_string(b)
response to an InferenceRequest
Instance variables
var MSG_ID : ClassVar[int]
var data : bytes | None
var error : str
var request_id : str
Methods
def read(self, b: io.BytesIO) ‑> None
-
Expand source code
def read(self, b: io.BytesIO) -> None: self.request_id = channel.read_string(b) has_data = channel.read_bool(b) if has_data: self.data = channel.read_bytes(b) self.error = channel.read_string(b)
def write(self, b: io.BytesIO) ‑> None
-
Expand source code
def write(self, b: io.BytesIO) -> None: channel.write_string(b, self.request_id) channel.write_bool(b, self.data is not None) if self.data is not None: channel.write_bytes(b, self.data) channel.write_string(b, self.error)
class InitializeRequest (asyncio_debug: bool = False,
ping_interval: float = 0,
ping_timeout: float = 0,
high_ping_threshold: float = 0)-
Expand source code
@dataclass class InitializeRequest: """sent by the main process to the subprocess to initialize it. this is going to call initialize_process_fnc""" MSG_ID: ClassVar[int] = 0 asyncio_debug: bool = False ping_interval: float = 0 ping_timeout: float = 0 # if no response, process is considered dead high_ping_threshold: float = ( 0 # if ping is higher than this, process is considered unresponsive ) def write(self, b: io.BytesIO) -> None: channel.write_bool(b, self.asyncio_debug) channel.write_float(b, self.ping_interval) channel.write_float(b, self.ping_timeout) channel.write_float(b, self.high_ping_threshold) def read(self, b: io.BytesIO) -> None: self.asyncio_debug = channel.read_bool(b) self.ping_interval = channel.read_float(b) self.ping_timeout = channel.read_float(b) self.high_ping_threshold = channel.read_float(b)
sent by the main process to the subprocess to initialize it. this is going to call initialize_process_fnc
Instance variables
var MSG_ID : ClassVar[int]
var asyncio_debug : bool
var high_ping_threshold : float
var ping_interval : float
var ping_timeout : float
Methods
def read(self, b: io.BytesIO) ‑> None
-
Expand source code
def read(self, b: io.BytesIO) -> None: self.asyncio_debug = channel.read_bool(b) self.ping_interval = channel.read_float(b) self.ping_timeout = channel.read_float(b) self.high_ping_threshold = channel.read_float(b)
def write(self, b: io.BytesIO) ‑> None
-
Expand source code
def write(self, b: io.BytesIO) -> None: channel.write_bool(b, self.asyncio_debug) channel.write_float(b, self.ping_interval) channel.write_float(b, self.ping_timeout) channel.write_float(b, self.high_ping_threshold)
class InitializeResponse (error: str = '')
-
Expand source code
@dataclass class InitializeResponse: """mark the process as initialized""" MSG_ID: ClassVar[int] = 1 error: str = "" def write(self, b: io.BytesIO) -> None: channel.write_string(b, self.error) def read(self, b: io.BytesIO) -> None: self.error = channel.read_string(b)
mark the process as initialized
Instance variables
var MSG_ID : ClassVar[int]
var error : str
Methods
def read(self, b: io.BytesIO) ‑> None
-
Expand source code
def read(self, b: io.BytesIO) -> None: self.error = channel.read_string(b)
def write(self, b: io.BytesIO) ‑> None
-
Expand source code
def write(self, b: io.BytesIO) -> None: channel.write_string(b, self.error)
class PingRequest (timestamp: int = 0)
-
Expand source code
@dataclass class PingRequest: """sent by the main process to the subprocess to check if it is still alive""" MSG_ID: ClassVar[int] = 2 timestamp: int = 0 def write(self, b: io.BytesIO) -> None: channel.write_long(b, self.timestamp) def read(self, b: io.BytesIO) -> None: self.timestamp = channel.read_long(b)
sent by the main process to the subprocess to check if it is still alive
Instance variables
var MSG_ID : ClassVar[int]
var timestamp : int
Methods
def read(self, b: io.BytesIO) ‑> None
-
Expand source code
def read(self, b: io.BytesIO) -> None: self.timestamp = channel.read_long(b)
def write(self, b: io.BytesIO) ‑> None
-
Expand source code
def write(self, b: io.BytesIO) -> None: channel.write_long(b, self.timestamp)
class PongResponse (last_timestamp: int = 0, timestamp: int = 0)
-
Expand source code
@dataclass class PongResponse: """response to a PingRequest""" MSG_ID: ClassVar[int] = 3 last_timestamp: int = 0 timestamp: int = 0 def write(self, b: io.BytesIO) -> None: channel.write_long(b, self.last_timestamp) channel.write_long(b, self.timestamp) def read(self, b: io.BytesIO) -> None: self.last_timestamp = channel.read_long(b) self.timestamp = channel.read_long(b)
response to a PingRequest
Instance variables
var MSG_ID : ClassVar[int]
var last_timestamp : int
var timestamp : int
Methods
def read(self, b: io.BytesIO) ‑> None
-
Expand source code
def read(self, b: io.BytesIO) -> None: self.last_timestamp = channel.read_long(b) self.timestamp = channel.read_long(b)
def write(self, b: io.BytesIO) ‑> None
-
Expand source code
def write(self, b: io.BytesIO) -> None: channel.write_long(b, self.last_timestamp) channel.write_long(b, self.timestamp)
class ShutdownRequest (reason: str = '')
-
Expand source code
@dataclass class ShutdownRequest: """sent by the main process to the subprocess to indicate that it should shut down gracefully. the subprocess will follow with a ExitInfo message""" MSG_ID: ClassVar[int] = 5 reason: str = "" def write(self, b: io.BytesIO) -> None: channel.write_string(b, self.reason) def read(self, b: io.BytesIO) -> None: self.reason = channel.read_string(b)
sent by the main process to the subprocess to indicate that it should shut down gracefully. the subprocess will follow with a ExitInfo message
Instance variables
var MSG_ID : ClassVar[int]
var reason : str
Methods
def read(self, b: io.BytesIO) ‑> None
-
Expand source code
def read(self, b: io.BytesIO) -> None: self.reason = channel.read_string(b)
def write(self, b: io.BytesIO) ‑> None
-
Expand source code
def write(self, b: io.BytesIO) -> None: channel.write_string(b, self.reason)
class StartJobRequest
-
Expand source code
@dataclass class StartJobRequest: """sent by the main process to the subprocess to start a job, the subprocess will only receive this message if the process is fully initialized (after sending a InitializeResponse).""" MSG_ID: ClassVar[int] = 4 running_job: RunningJobInfo = field(init=False) def write(self, b: io.BytesIO) -> None: accept_args = self.running_job.accept_arguments channel.write_bytes(b, self.running_job.job.SerializeToString()) channel.write_string(b, accept_args.name) channel.write_string(b, accept_args.identity) channel.write_string(b, accept_args.metadata) channel.write_string(b, self.running_job.url) channel.write_string(b, self.running_job.token) channel.write_string(b, self.running_job.worker_id) def read(self, b: io.BytesIO) -> None: job = agent.Job() job.ParseFromString(channel.read_bytes(b)) self.running_job = RunningJobInfo( accept_arguments=JobAcceptArguments( name=channel.read_string(b), identity=channel.read_string(b), metadata=channel.read_string(b), ), job=job, url=channel.read_string(b), token=channel.read_string(b), worker_id=channel.read_string(b), )
sent by the main process to the subprocess to start a job, the subprocess will only receive this message if the process is fully initialized (after sending a InitializeResponse).
Instance variables
var MSG_ID : ClassVar[int]
var running_job : livekit.agents.job.RunningJobInfo
Methods
def read(self, b: io.BytesIO) ‑> None
-
Expand source code
def read(self, b: io.BytesIO) -> None: job = agent.Job() job.ParseFromString(channel.read_bytes(b)) self.running_job = RunningJobInfo( accept_arguments=JobAcceptArguments( name=channel.read_string(b), identity=channel.read_string(b), metadata=channel.read_string(b), ), job=job, url=channel.read_string(b), token=channel.read_string(b), worker_id=channel.read_string(b), )
def write(self, b: io.BytesIO) ‑> None
-
Expand source code
def write(self, b: io.BytesIO) -> None: accept_args = self.running_job.accept_arguments channel.write_bytes(b, self.running_job.job.SerializeToString()) channel.write_string(b, accept_args.name) channel.write_string(b, accept_args.identity) channel.write_string(b, accept_args.metadata) channel.write_string(b, self.running_job.url) channel.write_string(b, self.running_job.token) channel.write_string(b, self.running_job.worker_id)