Module livekit.agents.ipc.job_proc_executor

Classes

class ProcJobExecutor (*,
initialize_process_fnc: Callable[[JobProcess], Any],
job_entrypoint_fnc: Callable[[JobContext], Awaitable[None]],
inference_executor: InferenceExecutor | None,
initialize_timeout: float,
close_timeout: float,
memory_warn_mb: float,
memory_limit_mb: float,
ping_interval: float,
ping_timeout: float,
high_ping_threshold: float,
mp_ctx: BaseContext,
loop: asyncio.AbstractEventLoop)
Expand source code
class ProcJobExecutor(SupervisedProc):
    def __init__(
        self,
        *,
        initialize_process_fnc: Callable[[JobProcess], Any],
        job_entrypoint_fnc: Callable[[JobContext], Awaitable[None]],
        inference_executor: InferenceExecutor | None,
        initialize_timeout: float,
        close_timeout: float,
        memory_warn_mb: float,
        memory_limit_mb: float,
        ping_interval: float,
        ping_timeout: float,
        high_ping_threshold: float,
        mp_ctx: BaseContext,
        loop: asyncio.AbstractEventLoop,
    ) -> None:
        super().__init__(
            initialize_timeout=initialize_timeout,
            close_timeout=close_timeout,
            memory_warn_mb=memory_warn_mb,
            memory_limit_mb=memory_limit_mb,
            ping_interval=ping_interval,
            ping_timeout=ping_timeout,
            high_ping_threshold=high_ping_threshold,
            mp_ctx=mp_ctx,
            loop=loop,
        )

        self._user_args: Any | None = None
        self._job_status: JobStatus | None = None
        self._running_job: RunningJobInfo | None = None
        self._initialize_process_fnc = initialize_process_fnc
        self._job_entrypoint_fnc = job_entrypoint_fnc
        self._inference_executor = inference_executor
        self._inference_tasks: list[asyncio.Task[None]] = []

    @property
    def status(self) -> JobStatus:
        if self._job_status is None:
            raise RuntimeError("job status not available")

        return self._job_status

    @property
    def user_arguments(self) -> Any | None:
        return self._user_args

    @user_arguments.setter
    def user_arguments(self, value: Any | None) -> None:
        self._user_args = value

    @property
    def running_job(self) -> RunningJobInfo | None:
        return self._running_job

    def _create_process(self, cch: socket.socket, log_cch: socket.socket) -> mp.Process:
        proc_args = ProcStartArgs(
            initialize_process_fnc=self._initialize_process_fnc,
            job_entrypoint_fnc=self._job_entrypoint_fnc,
            log_cch=log_cch,
            mp_cch=cch,
            user_arguments=self._user_args,
        )

        return self._mp_ctx.Process(  # type: ignore
            target=proc_main,
            args=(proc_args,),
            name="job_proc",
        )

    @log_exceptions(logger=logger)
    async def _main_task(self, ipc_ch: aio.ChanReceiver[channel.Message]) -> None:
        try:
            async for msg in ipc_ch:
                if isinstance(msg, proto.InferenceRequest):
                    self._inference_tasks.append(
                        asyncio.create_task(self._do_inference_task(msg))
                    )
        finally:
            await aio.gracefully_cancel(*self._inference_tasks)

    @log_exceptions(logger=logger)
    async def _supervise_task(self) -> None:
        try:
            await super()._supervise_task()
        finally:
            self._job_status = (
                JobStatus.SUCCESS if self.exitcode == 0 else JobStatus.FAILED
            )

    async def _do_inference_task(self, inf_req: proto.InferenceRequest) -> None:
        if self._inference_executor is None:
            logger.warning("inference request received but no inference executor")
            await channel.asend_message(
                self._pch,
                proto.InferenceResponse(
                    request_id=inf_req.request_id, error="no inference executor"
                ),
            )
            return

        try:
            inf_res = await self._inference_executor.do_inference(
                inf_req.method, inf_req.data
            )
            await channel.asend_message(
                self._pch,
                proto.InferenceResponse(request_id=inf_req.request_id, data=inf_res),
            )
        except Exception as e:
            await channel.asend_message(
                self._pch,
                proto.InferenceResponse(request_id=inf_req.request_id, error=str(e)),
            )

    async def launch_job(self, info: RunningJobInfo) -> None:
        """start/assign a job to the process"""
        if self._running_job is not None:
            raise RuntimeError("process already has a running job")

        if not self._initialize_fut.done():
            raise RuntimeError("process not initialized")

        self._job_status = JobStatus.RUNNING
        self._running_job = info

        start_req = proto.StartJobRequest()
        start_req.running_job = info
        await channel.asend_message(self._pch, start_req)

    def logging_extra(self):
        extra = super().logging_extra()

        if self._running_job:
            extra["job_id"] = self._running_job.job.id

        return extra

Helper class that provides a standard way to create an ABC using inheritance.

Ancestors

Instance variables

prop running_job : RunningJobInfo | None
Expand source code
@property
def running_job(self) -> RunningJobInfo | None:
    return self._running_job
prop status : JobStatus
Expand source code
@property
def status(self) -> JobStatus:
    if self._job_status is None:
        raise RuntimeError("job status not available")

    return self._job_status
prop user_arguments : Any | None
Expand source code
@property
def user_arguments(self) -> Any | None:
    return self._user_args

Methods

async def launch_job(self, info: RunningJobInfo) ‑> None
Expand source code
async def launch_job(self, info: RunningJobInfo) -> None:
    """start/assign a job to the process"""
    if self._running_job is not None:
        raise RuntimeError("process already has a running job")

    if not self._initialize_fut.done():
        raise RuntimeError("process not initialized")

    self._job_status = JobStatus.RUNNING
    self._running_job = info

    start_req = proto.StartJobRequest()
    start_req.running_job = info
    await channel.asend_message(self._pch, start_req)

start/assign a job to the process

def logging_extra(self)
Expand source code
def logging_extra(self):
    extra = super().logging_extra()

    if self._running_job:
        extra["job_id"] = self._running_job.job.id

    return extra

Inherited members