Module livekit.agents.cli.proto

Classes

class ActiveJobsRequest

ActiveJobsRequest()

Expand source code
@dataclass
class ActiveJobsRequest:
    MSG_ID: ClassVar[int] = 1

Class variables

var MSG_ID : ClassVar[int]
class ActiveJobsResponse (jobs: list[RunningJobInfo] = <factory>)

ActiveJobsResponse(jobs: 'list[RunningJobInfo]' = )

Expand source code
@dataclass
class ActiveJobsResponse:
    MSG_ID: ClassVar[int] = 2
    jobs: list[RunningJobInfo] = field(default_factory=list)

    def write(self, b: io.BytesIO) -> None:
        channel.write_int(b, len(self.jobs))
        for running_job in self.jobs:
            accept_args = running_job.accept_arguments
            channel.write_bytes(b, running_job.job.SerializeToString())
            channel.write_string(b, accept_args.name)
            channel.write_string(b, accept_args.identity)
            channel.write_string(b, accept_args.metadata)
            channel.write_string(b, running_job.url)
            channel.write_string(b, running_job.token)

    def read(self, b: io.BytesIO) -> None:
        for _ in range(channel.read_int(b)):
            job = agent.Job()
            job.ParseFromString(channel.read_bytes(b))
            self.jobs.append(
                RunningJobInfo(
                    accept_arguments=JobAcceptArguments(
                        name=channel.read_string(b),
                        identity=channel.read_string(b),
                        metadata=channel.read_string(b),
                    ),
                    job=job,
                    url=channel.read_string(b),
                    token=channel.read_string(b),
                )
            )

Subclasses

Class variables

var MSG_ID : ClassVar[int]
var jobs : list[livekit.agents.job.RunningJobInfo]

Methods

def read(self, b: io.BytesIO) ‑> None
def write(self, b: io.BytesIO) ‑> None
class CliArgs (opts: WorkerOptions, log_level: str, devmode: bool, asyncio_debug: bool, watch: bool, drain_timeout: int, room: str = '', participant_identity: str = '', reload_count: int = 0, mp_cch: socket.socket | None = None)

CliArgs(opts: 'WorkerOptions', log_level: 'str', devmode: 'bool', asyncio_debug: 'bool', watch: 'bool', drain_timeout: 'int', room: 'str' = '', participant_identity: 'str' = '', reload_count: 'int' = 0, mp_cch: 'socket.socket | None' = None)

Expand source code
@dataclass
class CliArgs:
    opts: WorkerOptions
    log_level: str
    devmode: bool
    asyncio_debug: bool
    watch: bool
    drain_timeout: int
    room: str = ""
    participant_identity: str = ""

    # amount of time this worker has been reloaded
    reload_count: int = 0

    # pipe used for the communication between the watch server and the watch client
    # when reload/dev mode is enabled
    mp_cch: socket.socket | None = None

Class variables

var asyncio_debug : bool
var devmode : bool
var drain_timeout : int
var log_level : str
var mp_cch : socket.socket | None
var opts : livekit.agents.worker.WorkerOptions
var participant_identity : str
var reload_count : int
var room : str
var watch : bool
class ReloadJobsRequest

ReloadJobsRequest()

Expand source code
@dataclass
class ReloadJobsRequest:
    MSG_ID: ClassVar[int] = 3

Class variables

var MSG_ID : ClassVar[int]
class ReloadJobsResponse (jobs: list[RunningJobInfo] = <factory>)

ReloadJobsResponse(jobs: 'list[RunningJobInfo]' = )

Expand source code
@dataclass
class ReloadJobsResponse(ActiveJobsResponse):
    MSG_ID: ClassVar[int] = 4

Ancestors

Class variables

var MSG_ID : ClassVar[int]
class Reloaded

Reloaded()

Expand source code
@dataclass
class Reloaded:
    MSG_ID: ClassVar[int] = 5

Class variables

var MSG_ID : ClassVar[int]