Module livekit.agents.ipc.job_main
Functions
def thread_main(args: ThreadStartArgs) ‑> None
-
main function for the job process when using the ThreadedJobRunner
Classes
class JobTask (job_ctx: JobContext, task: asyncio.Task, shutdown_fut: asyncio.Future[_ShutdownInfo])
-
JobTask(job_ctx: 'JobContext', task: 'asyncio.Task', shutdown_fut: 'asyncio.Future[_ShutdownInfo]')
Expand source code
@dataclass class JobTask: job_ctx: JobContext task: asyncio.Task shutdown_fut: asyncio.Future[_ShutdownInfo]
Class variables
var job_ctx : livekit.agents.job.JobContext
var shutdown_fut : _asyncio.Future[livekit.agents.ipc.job_main._ShutdownInfo]
var task : _asyncio.Task
class LogQueueHandler (duplex: utils.aio.duplex_unix._Duplex)
-
Handler instances dispatch logging events to specific destinations.
The base handler class. Acts as a placeholder which defines the Handler interface. Handlers can optionally use Formatter instances to format records as desired. By default, no formatter is specified; in this case, the 'raw' message as determined by record.message is logged.
Initializes the instance - basically setting the formatter to None and the filter list to empty.
Expand source code
class LogQueueHandler(logging.Handler): _sentinal = None def __init__(self, duplex: utils.aio.duplex_unix._Duplex) -> None: super().__init__() self._duplex = duplex self._send_q = queue.SimpleQueue[Optional[bytes]]() self._send_thread = threading.Thread( target=self._forward_logs, name="ipc_log_forwarder" ) self._send_thread.start() def _forward_logs(self): while True: serialized_record = self._send_q.get() if serialized_record is None: break try: self._duplex.send_bytes(serialized_record) except duplex_unix.DuplexClosed: break self._duplex.close() def emit(self, record: logging.LogRecord) -> None: try: # from https://github.com/python/cpython/blob/91b7f2e7f6593acefda4fa860250dd87d6f849bf/Lib/logging/handlers.py#L1453 msg = self.format(record) record = copy.copy(record) record.message = msg record.msg = msg record.args = None record.exc_info = None record.exc_text = None record.stack_info = None # https://websockets.readthedocs.io/en/stable/topics/logging.html#logging-to-json # webosckets library add "websocket" attribute to log records, which is not pickleable if hasattr(record, "websocket"): record.websocket = None self._send_q.put_nowait(pickle.dumps(record)) except Exception: self.handleError(record) def close(self) -> None: super().close() self._send_q.put_nowait(self._sentinal)
Ancestors
- logging.Handler
- logging.Filterer
Methods
def close(self) ‑> None
-
Tidy up any resources used by the handler.
This version removes the handler from an internal map of handlers, _handlers, which is used for handler lookup by name. Subclasses should ensure that this gets called from overridden close() methods.
def emit(self, record: logging.LogRecord) ‑> None
-
Do whatever it takes to actually log the specified logging record.
This version is intended to be implemented by subclasses and so raises a NotImplementedError.
class ProcStartArgs (initialize_process_fnc: Callable[[JobProcess], Any], job_entrypoint_fnc: Callable[[JobContext], Any], log_cch: socket.socket, mp_cch: socket.socket, asyncio_debug: bool, user_arguments: Any | None = None)
-
ProcStartArgs(initialize_process_fnc: 'Callable[[JobProcess], Any]', job_entrypoint_fnc: 'Callable[[JobContext], Any]', log_cch: 'socket.socket', mp_cch: 'socket.socket', asyncio_debug: 'bool', user_arguments: 'Any | None' = None)
Expand source code
@dataclass class ProcStartArgs: initialize_process_fnc: Callable[[JobProcess], Any] job_entrypoint_fnc: Callable[[JobContext], Any] log_cch: socket.socket mp_cch: socket.socket asyncio_debug: bool user_arguments: Any | None = None
Class variables
var asyncio_debug : bool
var initialize_process_fnc : Callable[[livekit.agents.job.JobProcess], Any]
var job_entrypoint_fnc : Callable[[livekit.agents.job.JobContext], Any]
var log_cch : socket.socket
var mp_cch : socket.socket
var user_arguments : typing.Any | None
class ThreadStartArgs (mp_cch: socket.socket, initialize_process_fnc: Callable[[JobProcess], Any], job_entrypoint_fnc: Callable[[JobContext], Any], user_arguments: Any | None, asyncio_debug: bool, join_fnc: Callable[[], None])
-
ThreadStartArgs(mp_cch: 'socket.socket', initialize_process_fnc: 'Callable[[JobProcess], Any]', job_entrypoint_fnc: 'Callable[[JobContext], Any]', user_arguments: 'Any | None', asyncio_debug: 'bool', join_fnc: 'Callable[[], None]')
Expand source code
@dataclass class ThreadStartArgs: mp_cch: socket.socket initialize_process_fnc: Callable[[JobProcess], Any] job_entrypoint_fnc: Callable[[JobContext], Any] user_arguments: Any | None asyncio_debug: bool join_fnc: Callable[[], None]
Class variables
var asyncio_debug : bool
var initialize_process_fnc : Callable[[livekit.agents.job.JobProcess], Any]
var job_entrypoint_fnc : Callable[[livekit.agents.job.JobContext], Any]
var join_fnc : Callable[[], None]
var mp_cch : socket.socket
var user_arguments : typing.Any | None