Module livekit.agents.ipc.thread_job_executor
Classes
class ThreadJobExecutor (*, initialize_process_fnc: Callable[[JobProcess], Any], job_entrypoint_fnc: Callable[[JobContext], Awaitable[None]], initialize_timeout: float, close_timeout: float, loop: asyncio.AbstractEventLoop)
-
Expand source code
class ThreadJobExecutor: def __init__( self, *, initialize_process_fnc: Callable[[JobProcess], Any], job_entrypoint_fnc: Callable[[JobContext], Awaitable[None]], initialize_timeout: float, close_timeout: float, loop: asyncio.AbstractEventLoop, ) -> None: self._loop = loop self._opts = _ProcOpts( initialize_process_fnc=initialize_process_fnc, job_entrypoint_fnc=job_entrypoint_fnc, initialize_timeout=initialize_timeout, close_timeout=close_timeout, ) self._user_args: Any | None = None self._running_job: RunningJobInfo | None = None self._main_atask: asyncio.Task[None] | None = None self._closing = False self._initialize_fut = asyncio.Future[None]() self._lock = asyncio.Lock() @property def started(self) -> bool: return self._main_atask is not None @property def start_arguments(self) -> Any | None: return self._user_args @start_arguments.setter def start_arguments(self, value: Any | None) -> None: self._user_args = value @property def running_job(self) -> RunningJobInfo | None: return self._running_job async def start(self) -> None: if self.started: raise RuntimeError("runner already started") if self._closing: raise RuntimeError("runner is closed") await asyncio.shield(self._start()) async def _start(self) -> None: async with self._lock: # to simplify the runners implementation, we also use a duplex in the threaded executor # (ThreadedRunners), so we can use the same protocol mp_pch, mp_cch = socket.socketpair() self._pch = await duplex_unix._AsyncDuplex.open(mp_pch) self._join_fut = asyncio.Future[None]() def _on_join() -> None: with contextlib.suppress(RuntimeError): self._loop.call_soon_threadsafe(self._join_fut.set_result, None) targs = job_main.ThreadStartArgs( mp_cch=mp_cch, initialize_process_fnc=self._opts.initialize_process_fnc, job_entrypoint_fnc=self._opts.job_entrypoint_fnc, user_arguments=self._user_args, asyncio_debug=self._loop.get_debug(), join_fnc=_on_join, ) self._thread = t = threading.Thread( target=job_main.thread_main, args=(targs,), name="job_thread_runner", ) t.start() self._main_atask = asyncio.create_task(self._main_task()) async def join(self) -> None: """wait for the thread to finish""" if not self.started: raise RuntimeError("runner not started") async with self._lock: if self._main_atask: await asyncio.shield(self._main_atask) async def initialize(self) -> None: await channel.asend_message(self._pch, proto.InitializeRequest()) try: init_res = await asyncio.wait_for( channel.arecv_message(self._pch, proto.IPC_MESSAGES), timeout=self._opts.initialize_timeout, ) assert isinstance( init_res, proto.InitializeResponse ), "first message must be InitializeResponse" except asyncio.TimeoutError: self._initialize_fut.set_exception( asyncio.TimeoutError("runner initialization timed out") ) logger.error( "job initialization is taking too much time..", extra=self.logging_extra(), ) raise except Exception as e: # should be channel.ChannelClosed most of the time self._initialize_fut.set_exception(e) raise else: self._initialize_fut.set_result(None) async def aclose(self) -> None: """ attempt to gracefully close the job. warn if it takes too long to close (in the threaded executor, the job can't be "killed") """ if not self.started: return self._closing = True with contextlib.suppress(utils.aio.duplex_unix.DuplexClosed): await channel.asend_message(self._pch, proto.ShutdownRequest()) try: if self._main_atask: await asyncio.wait_for( asyncio.shield(self._main_atask), timeout=self._opts.close_timeout ) except asyncio.TimeoutError: logger.error( "job shutdown is taking too much time..", extra=self.logging_extra() ) async with self._lock: if self._main_atask: await asyncio.shield(self._main_atask) async def launch_job(self, info: RunningJobInfo) -> None: """start/assign a job to the executor""" if self._running_job is not None: raise RuntimeError("executor already has a running job") self._running_job = info start_req = proto.StartJobRequest() start_req.running_job = info await channel.asend_message(self._pch, start_req) @utils.log_exceptions(logger=logger) async def _main_task(self) -> None: try: await self._initialize_fut except asyncio.TimeoutError: pass # this happens when the initialization takes longer than self._initialize_timeout except Exception: pass # initialization failed pong_timeout = utils.aio.sleep(proto.PING_TIMEOUT) ping_task = asyncio.create_task(self._ping_pong_task(pong_timeout)) monitor_task = asyncio.create_task(self._monitor_task(pong_timeout)) await self._join_fut await utils.aio.gracefully_cancel(ping_task, monitor_task) with contextlib.suppress(duplex_unix.DuplexClosed): await self._pch.aclose() @utils.log_exceptions(logger=logger) async def _monitor_task(self, pong_timeout: utils.aio.Sleep) -> None: while True: try: msg = await channel.arecv_message(self._pch, proto.IPC_MESSAGES) except utils.aio.duplex_unix.DuplexClosed: break if isinstance(msg, proto.PongResponse): delay = utils.time_ms() - msg.timestamp if delay > proto.HIGH_PING_THRESHOLD * 1000: logger.warning( "job executor is unresponsive", extra={"delay": delay, **self.logging_extra()}, ) with contextlib.suppress(utils.aio.SleepFinished): pong_timeout.reset() if isinstance(msg, proto.Exiting): logger.debug( "job exiting", extra={"reason": msg.reason, **self.logging_extra()} ) @utils.log_exceptions(logger=logger) async def _ping_pong_task(self, pong_timeout: utils.aio.Sleep) -> None: ping_interval = utils.aio.interval(proto.PING_INTERVAL) async def _send_ping_co(): while True: await ping_interval.tick() try: await channel.asend_message( self._pch, proto.PingRequest(timestamp=utils.time_ms()) ) except utils.aio.duplex_unix.DuplexClosed: break async def _pong_timeout_co(): await pong_timeout logger.error("job is unresponsive..", extra=self.logging_extra()) tasks = [ asyncio.create_task(_send_ping_co()), asyncio.create_task(_pong_timeout_co()), ] try: await asyncio.gather(*tasks) finally: await utils.aio.gracefully_cancel(*tasks) def logging_extra(self): extra: dict[str, Any] = { "tid": self._thread.native_id, } if self._running_job: extra["job_id"] = self._running_job.job.id return extra
Instance variables
prop running_job : RunningJobInfo | None
-
Expand source code
@property def running_job(self) -> RunningJobInfo | None: return self._running_job
prop start_arguments : Any | None
-
Expand source code
@property def start_arguments(self) -> Any | None: return self._user_args
prop started : bool
-
Expand source code
@property def started(self) -> bool: return self._main_atask is not None
Methods
async def aclose(self) ‑> None
-
attempt to gracefully close the job. warn if it takes too long to close (in the threaded executor, the job can't be "killed")
async def initialize(self) ‑> None
async def join(self) ‑> None
-
wait for the thread to finish
async def launch_job(self, info: RunningJobInfo) ‑> None
-
start/assign a job to the executor
def logging_extra(self)
async def start(self) ‑> None