Module livekit.agents.ipc.proto

Classes

class Exiting (reason: str = '')
Expand source code
@dataclass
class Exiting:
    """sent by the subprocess to the main process to indicate that it is exiting"""

    MSG_ID: ClassVar[int] = 6
    reason: str = ""

    def write(self, b: io.BytesIO) -> None:
        channel.write_string(b, self.reason)

    def read(self, b: io.BytesIO) -> None:
        self.reason = channel.read_string(b)

sent by the subprocess to the main process to indicate that it is exiting

Instance variables

var MSG_ID : ClassVar[int]
var reason : str

Methods

def read(self, b: io.BytesIO) ‑> None
Expand source code
def read(self, b: io.BytesIO) -> None:
    self.reason = channel.read_string(b)
def write(self, b: io.BytesIO) ‑> None
Expand source code
def write(self, b: io.BytesIO) -> None:
    channel.write_string(b, self.reason)
class InferenceRequest (method: str = '', request_id: str = '', data: bytes = b'')
Expand source code
@dataclass
class InferenceRequest:
    """sent by a subprocess to the main process to request inference"""

    MSG_ID: ClassVar[int] = 7
    method: str = ""
    request_id: str = ""
    data: bytes = b""

    def write(self, b: io.BytesIO) -> None:
        channel.write_string(b, self.method)
        channel.write_string(b, self.request_id)
        channel.write_bytes(b, self.data)

    def read(self, b: io.BytesIO) -> None:
        self.method = channel.read_string(b)
        self.request_id = channel.read_string(b)
        self.data = channel.read_bytes(b)

sent by a subprocess to the main process to request inference

Instance variables

var MSG_ID : ClassVar[int]
var data : bytes
var method : str
var request_id : str

Methods

def read(self, b: io.BytesIO) ‑> None
Expand source code
def read(self, b: io.BytesIO) -> None:
    self.method = channel.read_string(b)
    self.request_id = channel.read_string(b)
    self.data = channel.read_bytes(b)
def write(self, b: io.BytesIO) ‑> None
Expand source code
def write(self, b: io.BytesIO) -> None:
    channel.write_string(b, self.method)
    channel.write_string(b, self.request_id)
    channel.write_bytes(b, self.data)
class InferenceResponse (request_id: str = '', data: bytes | None = None, error: str = '')
Expand source code
@dataclass
class InferenceResponse:
    """response to an InferenceRequest"""

    MSG_ID: ClassVar[int] = 8
    request_id: str = ""
    data: bytes | None = None
    error: str = ""

    def write(self, b: io.BytesIO) -> None:
        channel.write_string(b, self.request_id)
        channel.write_bool(b, self.data is not None)
        if self.data is not None:
            channel.write_bytes(b, self.data)
        channel.write_string(b, self.error)

    def read(self, b: io.BytesIO) -> None:
        self.request_id = channel.read_string(b)
        has_data = channel.read_bool(b)
        if has_data:
            self.data = channel.read_bytes(b)
        self.error = channel.read_string(b)

response to an InferenceRequest

Instance variables

var MSG_ID : ClassVar[int]
var data : bytes | None
var error : str
var request_id : str

Methods

def read(self, b: io.BytesIO) ‑> None
Expand source code
def read(self, b: io.BytesIO) -> None:
    self.request_id = channel.read_string(b)
    has_data = channel.read_bool(b)
    if has_data:
        self.data = channel.read_bytes(b)
    self.error = channel.read_string(b)
def write(self, b: io.BytesIO) ‑> None
Expand source code
def write(self, b: io.BytesIO) -> None:
    channel.write_string(b, self.request_id)
    channel.write_bool(b, self.data is not None)
    if self.data is not None:
        channel.write_bytes(b, self.data)
    channel.write_string(b, self.error)
class InitializeRequest (asyncio_debug: bool = False,
ping_interval: float = 0,
ping_timeout: float = 0,
high_ping_threshold: float = 0)
Expand source code
@dataclass
class InitializeRequest:
    """sent by the main process to the subprocess to initialize it. this is going to call initialize_process_fnc"""

    MSG_ID: ClassVar[int] = 0

    asyncio_debug: bool = False
    ping_interval: float = 0
    ping_timeout: float = 0  # if no response, process is considered dead
    high_ping_threshold: float = (
        0  # if ping is higher than this, process is considered unresponsive
    )

    def write(self, b: io.BytesIO) -> None:
        channel.write_bool(b, self.asyncio_debug)
        channel.write_float(b, self.ping_interval)
        channel.write_float(b, self.ping_timeout)
        channel.write_float(b, self.high_ping_threshold)

    def read(self, b: io.BytesIO) -> None:
        self.asyncio_debug = channel.read_bool(b)
        self.ping_interval = channel.read_float(b)
        self.ping_timeout = channel.read_float(b)
        self.high_ping_threshold = channel.read_float(b)

sent by the main process to the subprocess to initialize it. this is going to call initialize_process_fnc

Instance variables

var MSG_ID : ClassVar[int]
var asyncio_debug : bool
var high_ping_threshold : float
var ping_interval : float
var ping_timeout : float

Methods

def read(self, b: io.BytesIO) ‑> None
Expand source code
def read(self, b: io.BytesIO) -> None:
    self.asyncio_debug = channel.read_bool(b)
    self.ping_interval = channel.read_float(b)
    self.ping_timeout = channel.read_float(b)
    self.high_ping_threshold = channel.read_float(b)
def write(self, b: io.BytesIO) ‑> None
Expand source code
def write(self, b: io.BytesIO) -> None:
    channel.write_bool(b, self.asyncio_debug)
    channel.write_float(b, self.ping_interval)
    channel.write_float(b, self.ping_timeout)
    channel.write_float(b, self.high_ping_threshold)
class InitializeResponse (error: str = '')
Expand source code
@dataclass
class InitializeResponse:
    """mark the process as initialized"""

    MSG_ID: ClassVar[int] = 1
    error: str = ""

    def write(self, b: io.BytesIO) -> None:
        channel.write_string(b, self.error)

    def read(self, b: io.BytesIO) -> None:
        self.error = channel.read_string(b)

mark the process as initialized

Instance variables

var MSG_ID : ClassVar[int]
var error : str

Methods

def read(self, b: io.BytesIO) ‑> None
Expand source code
def read(self, b: io.BytesIO) -> None:
    self.error = channel.read_string(b)
def write(self, b: io.BytesIO) ‑> None
Expand source code
def write(self, b: io.BytesIO) -> None:
    channel.write_string(b, self.error)
class PingRequest (timestamp: int = 0)
Expand source code
@dataclass
class PingRequest:
    """sent by the main process to the subprocess to check if it is still alive"""

    MSG_ID: ClassVar[int] = 2
    timestamp: int = 0

    def write(self, b: io.BytesIO) -> None:
        channel.write_long(b, self.timestamp)

    def read(self, b: io.BytesIO) -> None:
        self.timestamp = channel.read_long(b)

sent by the main process to the subprocess to check if it is still alive

Instance variables

var MSG_ID : ClassVar[int]
var timestamp : int

Methods

def read(self, b: io.BytesIO) ‑> None
Expand source code
def read(self, b: io.BytesIO) -> None:
    self.timestamp = channel.read_long(b)
def write(self, b: io.BytesIO) ‑> None
Expand source code
def write(self, b: io.BytesIO) -> None:
    channel.write_long(b, self.timestamp)
class PongResponse (last_timestamp: int = 0, timestamp: int = 0)
Expand source code
@dataclass
class PongResponse:
    """response to a PingRequest"""

    MSG_ID: ClassVar[int] = 3
    last_timestamp: int = 0
    timestamp: int = 0

    def write(self, b: io.BytesIO) -> None:
        channel.write_long(b, self.last_timestamp)
        channel.write_long(b, self.timestamp)

    def read(self, b: io.BytesIO) -> None:
        self.last_timestamp = channel.read_long(b)
        self.timestamp = channel.read_long(b)

response to a PingRequest

Instance variables

var MSG_ID : ClassVar[int]
var last_timestamp : int
var timestamp : int

Methods

def read(self, b: io.BytesIO) ‑> None
Expand source code
def read(self, b: io.BytesIO) -> None:
    self.last_timestamp = channel.read_long(b)
    self.timestamp = channel.read_long(b)
def write(self, b: io.BytesIO) ‑> None
Expand source code
def write(self, b: io.BytesIO) -> None:
    channel.write_long(b, self.last_timestamp)
    channel.write_long(b, self.timestamp)
class ShutdownRequest (reason: str = '')
Expand source code
@dataclass
class ShutdownRequest:
    """sent by the main process to the subprocess to indicate that it should shut down
    gracefully. the subprocess will follow with a ExitInfo message"""

    MSG_ID: ClassVar[int] = 5
    reason: str = ""

    def write(self, b: io.BytesIO) -> None:
        channel.write_string(b, self.reason)

    def read(self, b: io.BytesIO) -> None:
        self.reason = channel.read_string(b)

sent by the main process to the subprocess to indicate that it should shut down gracefully. the subprocess will follow with a ExitInfo message

Instance variables

var MSG_ID : ClassVar[int]
var reason : str

Methods

def read(self, b: io.BytesIO) ‑> None
Expand source code
def read(self, b: io.BytesIO) -> None:
    self.reason = channel.read_string(b)
def write(self, b: io.BytesIO) ‑> None
Expand source code
def write(self, b: io.BytesIO) -> None:
    channel.write_string(b, self.reason)
class StartJobRequest
Expand source code
@dataclass
class StartJobRequest:
    """sent by the main process to the subprocess to start a job, the subprocess will only
    receive this message if the process is fully initialized (after sending a InitializeResponse)."""

    MSG_ID: ClassVar[int] = 4
    running_job: RunningJobInfo = field(init=False)

    def write(self, b: io.BytesIO) -> None:
        accept_args = self.running_job.accept_arguments
        channel.write_bytes(b, self.running_job.job.SerializeToString())
        channel.write_string(b, accept_args.name)
        channel.write_string(b, accept_args.identity)
        channel.write_string(b, accept_args.metadata)
        channel.write_string(b, self.running_job.url)
        channel.write_string(b, self.running_job.token)
        channel.write_string(b, self.running_job.worker_id)

    def read(self, b: io.BytesIO) -> None:
        job = agent.Job()
        job.ParseFromString(channel.read_bytes(b))
        self.running_job = RunningJobInfo(
            accept_arguments=JobAcceptArguments(
                name=channel.read_string(b),
                identity=channel.read_string(b),
                metadata=channel.read_string(b),
            ),
            job=job,
            url=channel.read_string(b),
            token=channel.read_string(b),
            worker_id=channel.read_string(b),
        )

sent by the main process to the subprocess to start a job, the subprocess will only receive this message if the process is fully initialized (after sending a InitializeResponse).

Instance variables

var MSG_ID : ClassVar[int]
var running_job : livekit.agents.job.RunningJobInfo

Methods

def read(self, b: io.BytesIO) ‑> None
Expand source code
def read(self, b: io.BytesIO) -> None:
    job = agent.Job()
    job.ParseFromString(channel.read_bytes(b))
    self.running_job = RunningJobInfo(
        accept_arguments=JobAcceptArguments(
            name=channel.read_string(b),
            identity=channel.read_string(b),
            metadata=channel.read_string(b),
        ),
        job=job,
        url=channel.read_string(b),
        token=channel.read_string(b),
        worker_id=channel.read_string(b),
    )
def write(self, b: io.BytesIO) ‑> None
Expand source code
def write(self, b: io.BytesIO) -> None:
    accept_args = self.running_job.accept_arguments
    channel.write_bytes(b, self.running_job.job.SerializeToString())
    channel.write_string(b, accept_args.name)
    channel.write_string(b, accept_args.identity)
    channel.write_string(b, accept_args.metadata)
    channel.write_string(b, self.running_job.url)
    channel.write_string(b, self.running_job.token)
    channel.write_string(b, self.running_job.worker_id)