Module livekit.agents.ipc.proc_pool
Classes
class ProcPool (*,
initialize_process_fnc: Callable[[JobProcess], Any],
job_entrypoint_fnc: Callable[[JobContext], Awaitable[None]],
num_idle_processes: int,
initialize_timeout: float,
close_timeout: float,
inference_executor: inference_executor.InferenceExecutor | None,
job_executor_type: JobExecutorType,
mp_ctx: BaseContext,
memory_warn_mb: float,
memory_limit_mb: float,
loop: asyncio.AbstractEventLoop)-
Expand source code
class ProcPool(utils.EventEmitter[EventTypes]): def __init__( self, *, initialize_process_fnc: Callable[[JobProcess], Any], job_entrypoint_fnc: Callable[[JobContext], Awaitable[None]], num_idle_processes: int, initialize_timeout: float, close_timeout: float, inference_executor: inference_executor.InferenceExecutor | None, job_executor_type: JobExecutorType, mp_ctx: BaseContext, memory_warn_mb: float, memory_limit_mb: float, loop: asyncio.AbstractEventLoop, ) -> None: super().__init__() self._job_executor_type = job_executor_type self._mp_ctx = mp_ctx self._initialize_process_fnc = initialize_process_fnc self._job_entrypoint_fnc = job_entrypoint_fnc self._close_timeout = close_timeout self._inf_executor = inference_executor self._initialize_timeout = initialize_timeout self._loop = loop self._memory_limit_mb = memory_limit_mb self._memory_warn_mb = memory_warn_mb self._num_idle_processes = num_idle_processes self._init_sem = asyncio.Semaphore(MAX_CONCURRENT_INITIALIZATIONS) self._proc_needed_sem = asyncio.Semaphore(num_idle_processes) self._warmed_proc_queue = asyncio.Queue[JobExecutor]() self._executors: list[JobExecutor] = [] self._started = False self._closed = False @property def processes(self) -> list[JobExecutor]: return self._executors def get_by_job_id(self, job_id: str) -> JobExecutor | None: return next( ( x for x in self._executors if x.running_job and x.running_job.job.id == job_id ), None, ) def start(self) -> None: if self._started: return self._started = True self._main_atask = asyncio.create_task(self._main_task()) async def aclose(self) -> None: if not self._started: return self._closed = True await aio.gracefully_cancel(self._main_atask) async def launch_job(self, info: RunningJobInfo) -> None: if self._num_idle_processes == 0: self._proc_needed_sem.release() # ask for a process if prewarmed processes are not disabled proc = await self._warmed_proc_queue.get() else: proc = await self._warmed_proc_queue.get() self._proc_needed_sem.release() # notify that a new process can be warmed/started await proc.launch_job(info) self.emit("process_job_launched", proc) @utils.log_exceptions(logger=logger) async def _proc_watch_task(self) -> None: proc: JobExecutor if self._job_executor_type == JobExecutorType.THREAD: proc = job_thread_executor.ThreadJobExecutor( initialize_process_fnc=self._initialize_process_fnc, job_entrypoint_fnc=self._job_entrypoint_fnc, initialize_timeout=self._initialize_timeout, close_timeout=self._close_timeout, inference_executor=self._inf_executor, ping_interval=2.5, high_ping_threshold=0.5, loop=self._loop, ) elif self._job_executor_type == JobExecutorType.PROCESS: proc = job_proc_executor.ProcJobExecutor( initialize_process_fnc=self._initialize_process_fnc, job_entrypoint_fnc=self._job_entrypoint_fnc, initialize_timeout=self._initialize_timeout, close_timeout=self._close_timeout, inference_executor=self._inf_executor, mp_ctx=self._mp_ctx, loop=self._loop, ping_interval=2.5, ping_timeout=60, high_ping_threshold=0.5, memory_warn_mb=self._memory_warn_mb, memory_limit_mb=self._memory_limit_mb, ) else: raise ValueError(f"unsupported job executor: {self._job_executor_type}") try: self._executors.append(proc) async with self._init_sem: if self._closed: return self.emit("process_created", proc) await proc.start() self.emit("process_started", proc) try: await proc.initialize() # process where initialization times out will never fire "process_ready" # neither be used to launch jobs self.emit("process_ready", proc) self._warmed_proc_queue.put_nowait(proc) except Exception: self._proc_needed_sem.release() # notify to warm a new process after initialization failure await proc.join() self.emit("process_closed", proc) finally: self._executors.remove(proc) @utils.log_exceptions(logger=logger) async def _main_task(self) -> None: watch_tasks: list[asyncio.Task[None]] = [] try: while True: await self._proc_needed_sem.acquire() task = asyncio.create_task(self._proc_watch_task()) watch_tasks.append(task) task.add_done_callback(watch_tasks.remove) except asyncio.CancelledError: await asyncio.gather(*[proc.aclose() for proc in self._executors]) await asyncio.gather(*watch_tasks)
Abstract base class for generic types.
On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::
class Mapping[KT, VT]: def __getitem__(self, key: KT) -> VT: ... # Etc.
On older versions of Python, however, generic classes have to explicitly inherit from Generic.
After a class has been declared to be generic, it can then be used as follows::
def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
Initialize a new instance of EventEmitter.
Ancestors
- EventEmitter
- typing.Generic
Instance variables
prop processes : list[JobExecutor]
-
Expand source code
@property def processes(self) -> list[JobExecutor]: return self._executors
Methods
async def aclose(self) ‑> None
-
Expand source code
async def aclose(self) -> None: if not self._started: return self._closed = True await aio.gracefully_cancel(self._main_atask)
def get_by_job_id(self, job_id: str) ‑> JobExecutor | None
-
Expand source code
def get_by_job_id(self, job_id: str) -> JobExecutor | None: return next( ( x for x in self._executors if x.running_job and x.running_job.job.id == job_id ), None, )
async def launch_job(self, info: RunningJobInfo) ‑> None
-
Expand source code
async def launch_job(self, info: RunningJobInfo) -> None: if self._num_idle_processes == 0: self._proc_needed_sem.release() # ask for a process if prewarmed processes are not disabled proc = await self._warmed_proc_queue.get() else: proc = await self._warmed_proc_queue.get() self._proc_needed_sem.release() # notify that a new process can be warmed/started await proc.launch_job(info) self.emit("process_job_launched", proc)
def start(self) ‑> None
-
Expand source code
def start(self) -> None: if self._started: return self._started = True self._main_atask = asyncio.create_task(self._main_task())
Inherited members