Module livekit.agents.ipc.job_proc_executor
Classes
class ProcJobExecutor (*,
initialize_process_fnc: Callable[[JobProcess], Any],
job_entrypoint_fnc: Callable[[JobContext], Awaitable[None]],
inference_executor: InferenceExecutor | None,
initialize_timeout: float,
close_timeout: float,
memory_warn_mb: float,
memory_limit_mb: float,
ping_interval: float,
ping_timeout: float,
high_ping_threshold: float,
http_proxy: str | None,
mp_ctx: BaseContext,
loop: asyncio.AbstractEventLoop)-
Expand source code
class ProcJobExecutor(SupervisedProc): def __init__( self, *, initialize_process_fnc: Callable[[JobProcess], Any], job_entrypoint_fnc: Callable[[JobContext], Awaitable[None]], inference_executor: InferenceExecutor | None, initialize_timeout: float, close_timeout: float, memory_warn_mb: float, memory_limit_mb: float, ping_interval: float, ping_timeout: float, high_ping_threshold: float, http_proxy: str | None, mp_ctx: BaseContext, loop: asyncio.AbstractEventLoop, ) -> None: super().__init__( initialize_timeout=initialize_timeout, close_timeout=close_timeout, memory_warn_mb=memory_warn_mb, memory_limit_mb=memory_limit_mb, ping_interval=ping_interval, ping_timeout=ping_timeout, high_ping_threshold=high_ping_threshold, mp_ctx=mp_ctx, loop=loop, http_proxy=http_proxy, ) self._user_args: Any | None = None self._job_status: JobStatus | None = None self._running_job: RunningJobInfo | None = None self._initialize_process_fnc = initialize_process_fnc self._job_entrypoint_fnc = job_entrypoint_fnc self._inference_executor = inference_executor self._inference_tasks: list[asyncio.Task[None]] = [] self._id = shortuuid("PCEXEC_") @property def id(self) -> str: return self._id @property def status(self) -> JobStatus: if self._job_status is None: raise RuntimeError("job status not available") return self._job_status @property def user_arguments(self) -> Any | None: return self._user_args @user_arguments.setter def user_arguments(self, value: Any | None) -> None: self._user_args = value @property def running_job(self) -> RunningJobInfo | None: return self._running_job def _create_process(self, cch: socket.socket, log_cch: socket.socket) -> mp.Process: proc_args = ProcStartArgs( initialize_process_fnc=self._initialize_process_fnc, job_entrypoint_fnc=self._job_entrypoint_fnc, log_cch=log_cch, mp_cch=cch, user_arguments=self._user_args, ) return self._mp_ctx.Process( # type: ignore target=proc_main, args=(proc_args,), name="job_proc" ) @log_exceptions(logger=logger) async def _main_task(self, ipc_ch: aio.ChanReceiver[channel.Message]) -> None: try: async for msg in ipc_ch: if isinstance(msg, proto.InferenceRequest): self._inference_tasks.append(asyncio.create_task(self._do_inference_task(msg))) finally: await aio.cancel_and_wait(*self._inference_tasks) @log_exceptions(logger=logger) async def _supervise_task(self) -> None: try: await super()._supervise_task() finally: if self._running_job: metrics.job_ended() self._job_status = JobStatus.SUCCESS if self.exitcode == 0 else JobStatus.FAILED async def _do_inference_task(self, inf_req: proto.InferenceRequest) -> None: if self._inference_executor is None: logger.warning("inference request received but no inference executor") await channel.asend_message( self._pch, proto.InferenceResponse( request_id=inf_req.request_id, error="no inference executor" ), ) return try: inf_res = await self._inference_executor.do_inference(inf_req.method, inf_req.data) await channel.asend_message( self._pch, proto.InferenceResponse(request_id=inf_req.request_id, data=inf_res), ) except Exception as e: await channel.asend_message( self._pch, proto.InferenceResponse(request_id=inf_req.request_id, error=str(e)), ) async def launch_job(self, info: RunningJobInfo) -> None: """start/assign a job to the process""" if self._running_job is not None: raise RuntimeError("process already has a running job") if not self._initialize_fut.done(): raise RuntimeError("process not initialized") metrics.job_started() self._job_status = JobStatus.RUNNING self._running_job = info start_req = proto.StartJobRequest() start_req.running_job = info await channel.asend_message(self._pch, start_req) def logging_extra(self) -> dict[str, Any]: extra = super().logging_extra() if self._running_job: extra["job_id"] = self._running_job.job.id return extra
Helper class that provides a standard way to create an ABC using inheritance.
Ancestors
- livekit.agents.ipc.supervised_proc.SupervisedProc
- abc.ABC
Instance variables
prop id : str
-
Expand source code
@property def id(self) -> str: return self._id
prop running_job : RunningJobInfo | None
-
Expand source code
@property def running_job(self) -> RunningJobInfo | None: return self._running_job
prop status : JobStatus
-
Expand source code
@property def status(self) -> JobStatus: if self._job_status is None: raise RuntimeError("job status not available") return self._job_status
prop user_arguments : Any | None
-
Expand source code
@property def user_arguments(self) -> Any | None: return self._user_args
Methods
async def launch_job(self, info: RunningJobInfo) ‑> None
-
Expand source code
async def launch_job(self, info: RunningJobInfo) -> None: """start/assign a job to the process""" if self._running_job is not None: raise RuntimeError("process already has a running job") if not self._initialize_fut.done(): raise RuntimeError("process not initialized") metrics.job_started() self._job_status = JobStatus.RUNNING self._running_job = info start_req = proto.StartJobRequest() start_req.running_job = info await channel.asend_message(self._pch, start_req)
start/assign a job to the process
def logging_extra(self) ‑> dict[str, typing.Any]
-
Expand source code
def logging_extra(self) -> dict[str, Any]: extra = super().logging_extra() if self._running_job: extra["job_id"] = self._running_job.job.id return extra