Module livekit.agents.ipc.job_main

Functions

def thread_main(args: ThreadStartArgs) ‑> None

main function for the job process when using the ThreadedJobRunner

Classes

class JobTask (job_ctx: JobContext, task: asyncio.Task, shutdown_fut: asyncio.Future[_ShutdownInfo])

JobTask(job_ctx: 'JobContext', task: 'asyncio.Task', shutdown_fut: 'asyncio.Future[_ShutdownInfo]')

Expand source code
@dataclass
class JobTask:
    job_ctx: JobContext
    task: asyncio.Task
    shutdown_fut: asyncio.Future[_ShutdownInfo]

Class variables

var job_ctx : livekit.agents.job.JobContext
var shutdown_fut : _asyncio.Future[livekit.agents.ipc.job_main._ShutdownInfo]
var task : _asyncio.Task
class LogQueueHandler (duplex: utils.aio.duplex_unix._Duplex)

Handler instances dispatch logging events to specific destinations.

The base handler class. Acts as a placeholder which defines the Handler interface. Handlers can optionally use Formatter instances to format records as desired. By default, no formatter is specified; in this case, the 'raw' message as determined by record.message is logged.

Initializes the instance - basically setting the formatter to None and the filter list to empty.

Expand source code
class LogQueueHandler(logging.Handler):
    _sentinal = None

    def __init__(self, duplex: utils.aio.duplex_unix._Duplex) -> None:
        super().__init__()
        self._duplex = duplex
        self._send_q = queue.SimpleQueue[Optional[bytes]]()
        self._send_thread = threading.Thread(
            target=self._forward_logs, name="ipc_log_forwarder"
        )
        self._send_thread.start()

    def _forward_logs(self):
        while True:
            serialized_record = self._send_q.get()
            if serialized_record is None:
                break

            try:
                self._duplex.send_bytes(serialized_record)
            except duplex_unix.DuplexClosed:
                break

        self._duplex.close()

    def emit(self, record: logging.LogRecord) -> None:
        try:
            # Check if Python is shutting down
            if sys.is_finalizing():
                return

            # from https://github.com/python/cpython/blob/91b7f2e7f6593acefda4fa860250dd87d6f849bf/Lib/logging/handlers.py#L1453
            msg = self.format(record)
            record = copy.copy(record)
            record.message = msg
            record.msg = msg
            record.args = None
            record.exc_info = None
            record.exc_text = None
            record.stack_info = None

            # https://websockets.readthedocs.io/en/stable/topics/logging.html#logging-to-json
            # webosckets library add "websocket" attribute to log records, which is not pickleable
            if hasattr(record, "websocket"):
                record.websocket = None

            self._send_q.put_nowait(pickle.dumps(record))

        except Exception:
            self.handleError(record)

    def close(self) -> None:
        super().close()
        self._send_q.put_nowait(self._sentinal)

Ancestors

  • logging.Handler
  • logging.Filterer

Methods

def close(self) ‑> None

Tidy up any resources used by the handler.

This version removes the handler from an internal map of handlers, _handlers, which is used for handler lookup by name. Subclasses should ensure that this gets called from overridden close() methods.

def emit(self, record: logging.LogRecord) ‑> None

Do whatever it takes to actually log the specified logging record.

This version is intended to be implemented by subclasses and so raises a NotImplementedError.

class ProcStartArgs (initialize_process_fnc: Callable[[JobProcess], Any], job_entrypoint_fnc: Callable[[JobContext], Any], log_cch: socket.socket, mp_cch: socket.socket, asyncio_debug: bool, user_arguments: Any | None = None)

ProcStartArgs(initialize_process_fnc: 'Callable[[JobProcess], Any]', job_entrypoint_fnc: 'Callable[[JobContext], Any]', log_cch: 'socket.socket', mp_cch: 'socket.socket', asyncio_debug: 'bool', user_arguments: 'Any | None' = None)

Expand source code
@dataclass
class ProcStartArgs:
    initialize_process_fnc: Callable[[JobProcess], Any]
    job_entrypoint_fnc: Callable[[JobContext], Any]
    log_cch: socket.socket
    mp_cch: socket.socket
    asyncio_debug: bool
    user_arguments: Any | None = None

Class variables

var asyncio_debug : bool
var initialize_process_fnc : Callable[[livekit.agents.job.JobProcess], Any]
var job_entrypoint_fnc : Callable[[livekit.agents.job.JobContext], Any]
var log_cch : socket.socket
var mp_cch : socket.socket
var user_arguments : typing.Any | None
class ThreadStartArgs (mp_cch: socket.socket, initialize_process_fnc: Callable[[JobProcess], Any], job_entrypoint_fnc: Callable[[JobContext], Any], user_arguments: Any | None, asyncio_debug: bool, join_fnc: Callable[[], None])

ThreadStartArgs(mp_cch: 'socket.socket', initialize_process_fnc: 'Callable[[JobProcess], Any]', job_entrypoint_fnc: 'Callable[[JobContext], Any]', user_arguments: 'Any | None', asyncio_debug: 'bool', join_fnc: 'Callable[[], None]')

Expand source code
@dataclass
class ThreadStartArgs:
    mp_cch: socket.socket
    initialize_process_fnc: Callable[[JobProcess], Any]
    job_entrypoint_fnc: Callable[[JobContext], Any]
    user_arguments: Any | None
    asyncio_debug: bool
    join_fnc: Callable[[], None]

Class variables

var asyncio_debug : bool
var initialize_process_fnc : Callable[[livekit.agents.job.JobProcess], Any]
var job_entrypoint_fnc : Callable[[livekit.agents.job.JobContext], Any]
var join_fnc : Callable[[], None]
var mp_cch : socket.socket
var user_arguments : typing.Any | None