Module livekit.agents.ipc.job_proc_lazy_main

Functions

def proc_main(args: ProcStartArgs) ‑> None
Expand source code
def proc_main(args: ProcStartArgs) -> None:
    from .proc_client import _ProcClient

    job_proc = _JobProc(
        args.initialize_process_fnc, args.job_entrypoint_fnc, args.user_arguments
    )

    client = _ProcClient(
        args.mp_cch,
        args.log_cch,
        job_proc.initialize,
        job_proc.entrypoint,
    )

    client.initialize_logger()

    pid = current_process().pid
    logger.info("initializing job process", extra={"pid": pid})
    try:
        client.initialize()
    except Exception:
        return  # initialization failed, exit

    logger.info("job process initialized", extra={"pid": pid})

    client.run()
def thread_main(args: ThreadStartArgs) ‑> None
Expand source code
def thread_main(
    args: ThreadStartArgs,
) -> None:
    """main function for the job process when using the ThreadedJobRunner"""
    tid = threading.get_native_id()

    try:
        from .proc_client import _ProcClient

        job_proc = _JobProc(
            args.initialize_process_fnc, args.job_entrypoint_fnc, args.user_arguments
        )

        client = _ProcClient(
            args.mp_cch,
            None,
            job_proc.initialize,
            job_proc.entrypoint,
        )

        logger.info("initializing job runner", extra={"tid": tid})
        client.initialize()
        logger.info("job runner initialized", extra={"tid": tid})

        client.run()
    finally:
        args.join_fnc()

main function for the job process when using the ThreadedJobRunner

Classes

class ProcStartArgs (initialize_process_fnc: Callable[[JobProcess], Any],
job_entrypoint_fnc: Callable[[JobContext], Any],
mp_cch: socket.socket,
log_cch: socket.socket,
user_arguments: Any | None = None)
Expand source code
@dataclass
class ProcStartArgs:
    initialize_process_fnc: Callable[[JobProcess], Any]
    job_entrypoint_fnc: Callable[[JobContext], Any]
    mp_cch: socket.socket
    log_cch: socket.socket
    user_arguments: Any | None = None

ProcStartArgs(initialize_process_fnc: 'Callable[[JobProcess], Any]', job_entrypoint_fnc: 'Callable[[JobContext], Any]', mp_cch: 'socket.socket', log_cch: 'socket.socket', user_arguments: 'Any | None' = None)

Instance variables

var initialize_process_fnc : Callable[[livekit.agents.job.JobProcess], Any]
var job_entrypoint_fnc : Callable[[livekit.agents.job.JobContext], Any]
var log_cch : socket.socket
var mp_cch : socket.socket
var user_arguments : typing.Any | None
class ThreadStartArgs (initialize_process_fnc: Callable[[JobProcess], Any],
job_entrypoint_fnc: Callable[[JobContext], Any],
join_fnc: Callable[[], None],
mp_cch: socket.socket,
user_arguments: Any | None)
Expand source code
@dataclass
class ThreadStartArgs:
    initialize_process_fnc: Callable[[JobProcess], Any]
    job_entrypoint_fnc: Callable[[JobContext], Any]
    join_fnc: Callable[[], None]
    mp_cch: socket.socket
    user_arguments: Any | None

ThreadStartArgs(initialize_process_fnc: 'Callable[[JobProcess], Any]', job_entrypoint_fnc: 'Callable[[JobContext], Any]', join_fnc: 'Callable[[], None]', mp_cch: 'socket.socket', user_arguments: 'Any | None')

Instance variables

var initialize_process_fnc : Callable[[livekit.agents.job.JobProcess], Any]
var job_entrypoint_fnc : Callable[[livekit.agents.job.JobContext], Any]
var join_fnc : Callable[[], None]
var mp_cch : socket.socket
var user_arguments : typing.Any | None