Module livekit.agents.ipc.supervised_proc
Classes
class SupervisedProc (*,
initialize_timeout: float,
close_timeout: float,
memory_warn_mb: float,
memory_limit_mb: float,
ping_interval: float,
ping_timeout: float,
high_ping_threshold: float,
mp_ctx: BaseContext,
loop: asyncio.AbstractEventLoop)-
Expand source code
class SupervisedProc(ABC): def __init__( self, *, initialize_timeout: float, close_timeout: float, memory_warn_mb: float, memory_limit_mb: float, ping_interval: float, ping_timeout: float, high_ping_threshold: float, mp_ctx: BaseContext, loop: asyncio.AbstractEventLoop, ) -> None: self._loop = loop self._mp_ctx = mp_ctx self._opts = _ProcOpts( initialize_timeout=initialize_timeout, close_timeout=close_timeout, memory_warn_mb=memory_warn_mb, memory_limit_mb=memory_limit_mb, ping_interval=ping_interval, ping_timeout=ping_timeout, high_ping_threshold=high_ping_threshold, ) self._exitcode: int | None = None self._pid: int | None = None self._supervise_atask: asyncio.Task[None] | None = None self._closing = False self._kill_sent = False self._initialize_fut = asyncio.Future[None]() self._lock = asyncio.Lock() @abstractmethod def _create_process( self, cch: socket.socket, log_cch: socket.socket ) -> mp.Process: ... @abstractmethod async def _main_task(self, ipc_ch: aio.ChanReceiver[channel.Message]) -> None: ... @property def exitcode(self) -> int | None: return self._exitcode @property def killed(self) -> bool: return self._kill_sent @property def pid(self) -> int | None: return self._pid @property def started(self) -> bool: return self._supervise_atask is not None async def start(self) -> None: """start the supervised process""" if self.started: raise RuntimeError("process already started") if self._closing: raise RuntimeError("process is closed") await asyncio.shield(self._start()) async def _start(self) -> None: def _add_proc_ctx_log(record: logging.LogRecord) -> None: extra = self.logging_extra() for key, value in extra.items(): setattr(record, key, value) async with self._lock: mp_pch, mp_cch = socket.socketpair() mp_log_pch, mp_log_cch = socket.socketpair() self._pch = await duplex_unix._AsyncDuplex.open(mp_pch) log_pch = duplex_unix._Duplex.open(mp_log_pch) log_listener = LogQueueListener(log_pch, _add_proc_ctx_log) log_listener.start() self._proc = self._create_process(mp_cch, mp_log_cch) await self._loop.run_in_executor(None, self._proc.start) mp_log_cch.close() mp_cch.close() self._pid = self._proc.pid self._join_fut = asyncio.Future[None]() def _sync_run(): self._proc.join() log_listener.stop() try: self._loop.call_soon_threadsafe(self._join_fut.set_result, None) except RuntimeError: pass thread = threading.Thread(target=_sync_run, name="proc_join_thread") thread.start() self._supervise_atask = asyncio.create_task(self._supervise_task()) async def join(self) -> None: """wait for the process to finish""" if not self.started: raise RuntimeError("process not started") async with self._lock: if self._supervise_atask: await asyncio.shield(self._supervise_atask) async def initialize(self) -> None: """initialize the process, this is sending a InitializeRequest message and waiting for a InitializeResponse with a timeout""" await channel.asend_message( self._pch, proto.InitializeRequest( asyncio_debug=self._loop.get_debug(), ping_interval=self._opts.ping_interval, ping_timeout=self._opts.ping_timeout, high_ping_threshold=self._opts.high_ping_threshold, ), ) # wait for the process to become ready try: init_res = await asyncio.wait_for( channel.arecv_message(self._pch, proto.IPC_MESSAGES), timeout=self._opts.initialize_timeout, ) assert isinstance(init_res, proto.InitializeResponse), ( "first message must be InitializeResponse" ) if init_res.error: self._initialize_fut.set_exception( RuntimeError(f"process initialization failed: {init_res.error}") ) logger.error( f"process initialization failed: {init_res.error}", extra=self.logging_extra(), ) raise RuntimeError(f"process initialization failed: {init_res.error}") else: self._initialize_fut.set_result(None) except asyncio.TimeoutError: self._initialize_fut.set_exception( asyncio.TimeoutError("process initialization timed out") ) logger.error( "initialization timed out, killing process", extra=self.logging_extra() ) self._send_kill_signal() raise except Exception as e: # should be channel.ChannelClosed most of the time self._initialize_fut.set_exception(e) raise async def aclose(self) -> None: """attempt to gracefully close the supervised process""" if not self.started: return self._closing = True with contextlib.suppress(duplex_unix.DuplexClosed): await channel.asend_message(self._pch, proto.ShutdownRequest()) try: if self._supervise_atask: await asyncio.wait_for( asyncio.shield(self._supervise_atask), timeout=self._opts.close_timeout, ) except asyncio.TimeoutError: logger.error( "process did not exit in time, killing process", extra=self.logging_extra(), ) self._send_kill_signal() async with self._lock: if self._supervise_atask: await asyncio.shield(self._supervise_atask) async def kill(self) -> None: """forcefully kill the supervised process""" if not self.started: raise RuntimeError("process not started") self._closing = True self._send_kill_signal() async with self._lock: if self._supervise_atask: await asyncio.shield(self._supervise_atask) def _send_kill_signal(self) -> None: """forcefully kill the process""" try: if not self._proc.is_alive(): return except ValueError: return logger.info("killing process", extra=self.logging_extra()) if sys.platform == "win32": self._proc.terminate() else: self._proc.kill() self._kill_sent = True @log_exceptions(logger=logger) async def _supervise_task(self) -> None: try: await self._initialize_fut except asyncio.TimeoutError: pass # this happens when the initialization takes longer than self._initialize_timeout except Exception: pass # initialization failed # the process is killed if it doesn't respond to ping requests pong_timeout = aio.sleep(self._opts.ping_timeout) ipc_ch = aio.Chan[channel.Message]() main_task = asyncio.create_task(self._main_task(ipc_ch)) read_ipc_task = asyncio.create_task(self._read_ipc_task(ipc_ch, pong_timeout)) ping_task = asyncio.create_task(self._ping_pong_task(pong_timeout)) read_ipc_task.add_done_callback(lambda _: ipc_ch.close()) memory_monitor_task: asyncio.Task[None] | None = None if self._opts.memory_limit_mb > 0 or self._opts.memory_warn_mb > 0: memory_monitor_task = asyncio.create_task(self._memory_monitor_task()) await self._join_fut self._exitcode = self._proc.exitcode self._proc.close() await aio.gracefully_cancel(ping_task, read_ipc_task, main_task) if memory_monitor_task is not None: await aio.gracefully_cancel(memory_monitor_task) with contextlib.suppress(duplex_unix.DuplexClosed): await self._pch.aclose() if self._exitcode != 0 and not self._kill_sent: logger.error( f"process exited with non-zero exit code {self.exitcode}", extra=self.logging_extra(), ) @log_exceptions(logger=logger) async def _read_ipc_task( self, ipc_ch: aio.Chan[channel.Message], pong_timeout: aio.Sleep ) -> None: while True: try: msg = await channel.arecv_message(self._pch, proto.IPC_MESSAGES) except duplex_unix.DuplexClosed: break if isinstance(msg, proto.PongResponse): delay = time_ms() - msg.timestamp if delay > self._opts.high_ping_threshold * 1000: logger.warning( "process is unresponsive", extra={"delay": delay, **self.logging_extra()}, ) with contextlib.suppress(aio.SleepFinished): pong_timeout.reset() if isinstance(msg, proto.Exiting): logger.info( "process exiting", extra={"reason": msg.reason, **self.logging_extra()}, ) ipc_ch.send_nowait(msg) @log_exceptions(logger=logger) async def _ping_pong_task(self, pong_timeout: aio.Sleep) -> None: ping_interval = aio.interval(self._opts.ping_interval) async def _send_ping_co(): while True: await ping_interval.tick() try: await channel.asend_message( self._pch, proto.PingRequest(timestamp=time_ms()) ) except duplex_unix.DuplexClosed: break async def _pong_timeout_co(): await pong_timeout logger.error( "process is unresponsive, killing process", extra=self.logging_extra() ) self._send_kill_signal() tasks = [ asyncio.create_task(_send_ping_co()), asyncio.create_task(_pong_timeout_co()), ] try: await asyncio.gather(*tasks) finally: await aio.gracefully_cancel(*tasks) @log_exceptions(logger=logger) async def _memory_monitor_task(self) -> None: """Monitor memory usage and kill the process if it exceeds the limit.""" while not self._closing and not self._kill_sent: try: if not self._pid: await asyncio.sleep(5) continue # get process memory info process = psutil.Process(self._pid) memory_info = process.memory_info() memory_mb = memory_info.rss / (1024 * 1024) # Convert to MB if ( self._opts.memory_limit_mb > 0 and memory_mb > self._opts.memory_limit_mb ): logger.error( "process exceeded memory limit, killing process", extra={ "memory_usage_mb": memory_mb, "memory_limit_mb": self._opts.memory_limit_mb, **self.logging_extra(), }, ) self._send_kill_signal() elif ( self._opts.memory_warn_mb > 0 and memory_mb > self._opts.memory_warn_mb ): logger.warning( "process memory usage is high", extra={ "memory_usage_mb": memory_mb, "memory_warn_mb": self._opts.memory_warn_mb, "memory_limit_mb": self._opts.memory_limit_mb, **self.logging_extra(), }, ) except (psutil.NoSuchProcess, psutil.AccessDenied) as e: if self._closing or self._kill_sent: return logger.warning( "Failed to get memory info for process", extra=self.logging_extra(), exc_info=e, ) # don't bother rechecking if we cannot get process info return except Exception: if self._closing or self._kill_sent: return logger.exception( "Error in memory monitoring task", extra=self.logging_extra(), ) await asyncio.sleep(5) # check every 5 seconds def logging_extra(self): extra: dict[str, Any] = { "pid": self.pid, } return extra
Helper class that provides a standard way to create an ABC using inheritance.
Ancestors
- abc.ABC
Subclasses
Instance variables
prop exitcode : int | None
-
Expand source code
@property def exitcode(self) -> int | None: return self._exitcode
prop killed : bool
-
Expand source code
@property def killed(self) -> bool: return self._kill_sent
prop pid : int | None
-
Expand source code
@property def pid(self) -> int | None: return self._pid
prop started : bool
-
Expand source code
@property def started(self) -> bool: return self._supervise_atask is not None
Methods
async def aclose(self) ‑> None
-
Expand source code
async def aclose(self) -> None: """attempt to gracefully close the supervised process""" if not self.started: return self._closing = True with contextlib.suppress(duplex_unix.DuplexClosed): await channel.asend_message(self._pch, proto.ShutdownRequest()) try: if self._supervise_atask: await asyncio.wait_for( asyncio.shield(self._supervise_atask), timeout=self._opts.close_timeout, ) except asyncio.TimeoutError: logger.error( "process did not exit in time, killing process", extra=self.logging_extra(), ) self._send_kill_signal() async with self._lock: if self._supervise_atask: await asyncio.shield(self._supervise_atask)
attempt to gracefully close the supervised process
async def initialize(self) ‑> None
-
Expand source code
async def initialize(self) -> None: """initialize the process, this is sending a InitializeRequest message and waiting for a InitializeResponse with a timeout""" await channel.asend_message( self._pch, proto.InitializeRequest( asyncio_debug=self._loop.get_debug(), ping_interval=self._opts.ping_interval, ping_timeout=self._opts.ping_timeout, high_ping_threshold=self._opts.high_ping_threshold, ), ) # wait for the process to become ready try: init_res = await asyncio.wait_for( channel.arecv_message(self._pch, proto.IPC_MESSAGES), timeout=self._opts.initialize_timeout, ) assert isinstance(init_res, proto.InitializeResponse), ( "first message must be InitializeResponse" ) if init_res.error: self._initialize_fut.set_exception( RuntimeError(f"process initialization failed: {init_res.error}") ) logger.error( f"process initialization failed: {init_res.error}", extra=self.logging_extra(), ) raise RuntimeError(f"process initialization failed: {init_res.error}") else: self._initialize_fut.set_result(None) except asyncio.TimeoutError: self._initialize_fut.set_exception( asyncio.TimeoutError("process initialization timed out") ) logger.error( "initialization timed out, killing process", extra=self.logging_extra() ) self._send_kill_signal() raise except Exception as e: # should be channel.ChannelClosed most of the time self._initialize_fut.set_exception(e) raise
initialize the process, this is sending a InitializeRequest message and waiting for a InitializeResponse with a timeout
async def join(self) ‑> None
-
Expand source code
async def join(self) -> None: """wait for the process to finish""" if not self.started: raise RuntimeError("process not started") async with self._lock: if self._supervise_atask: await asyncio.shield(self._supervise_atask)
wait for the process to finish
async def kill(self) ‑> None
-
Expand source code
async def kill(self) -> None: """forcefully kill the supervised process""" if not self.started: raise RuntimeError("process not started") self._closing = True self._send_kill_signal() async with self._lock: if self._supervise_atask: await asyncio.shield(self._supervise_atask)
forcefully kill the supervised process
def logging_extra(self)
-
Expand source code
def logging_extra(self): extra: dict[str, Any] = { "pid": self.pid, } return extra
async def start(self) ‑> None
-
Expand source code
async def start(self) -> None: """start the supervised process""" if self.started: raise RuntimeError("process already started") if self._closing: raise RuntimeError("process is closed") await asyncio.shield(self._start())
start the supervised process