Module livekit.agents.ipc.log_queue

Classes

class LogQueueHandler (duplex: utils.aio.duplex_unix._Duplex)
Expand source code
class LogQueueHandler(logging.Handler):
    _sentinal = None

    def __init__(self, duplex: utils.aio.duplex_unix._Duplex) -> None:
        super().__init__()
        self._duplex = duplex
        self._send_q = queue.SimpleQueue[Optional[bytes]]()
        self._send_thread = threading.Thread(
            target=self._forward_logs, name="ipc_log_forwarder"
        )
        self._send_thread.start()

    def _forward_logs(self):
        while True:
            serialized_record = self._send_q.get()
            if serialized_record is None:
                break

            try:
                self._duplex.send_bytes(serialized_record)
            except duplex_unix.DuplexClosed:
                break

        self._duplex.close()

    def emit(self, record: logging.LogRecord) -> None:
        try:
            # Check if Python is shutting down
            if sys.is_finalizing():
                return

            # from https://github.com/python/cpython/blob/91b7f2e7f6593acefda4fa860250dd87d6f849bf/Lib/logging/handlers.py#L1453
            msg = self.format(record)
            record = copy.copy(record)
            record.message = msg
            record.msg = msg
            record.args = None
            record.exc_info = None
            record.exc_text = None
            record.stack_info = None

            # https://websockets.readthedocs.io/en/stable/topics/logging.html#logging-to-json
            # webosckets library add "websocket" attribute to log records, which is not pickleable
            if hasattr(record, "websocket"):
                record.websocket = None

            self._send_q.put_nowait(pickle.dumps(record))

        except Exception:
            self.handleError(record)

    def close(self) -> None:
        super().close()
        self._send_q.put_nowait(self._sentinal)

Handler instances dispatch logging events to specific destinations.

The base handler class. Acts as a placeholder which defines the Handler interface. Handlers can optionally use Formatter instances to format records as desired. By default, no formatter is specified; in this case, the 'raw' message as determined by record.message is logged.

Initializes the instance - basically setting the formatter to None and the filter list to empty.

Ancestors

  • logging.Handler
  • logging.Filterer

Methods

def close(self) ‑> None
Expand source code
def close(self) -> None:
    super().close()
    self._send_q.put_nowait(self._sentinal)

Tidy up any resources used by the handler.

This version removes the handler from an internal map of handlers, _handlers, which is used for handler lookup by name. Subclasses should ensure that this gets called from overridden close() methods.

def emit(self, record: logging.LogRecord) ‑> None
Expand source code
def emit(self, record: logging.LogRecord) -> None:
    try:
        # Check if Python is shutting down
        if sys.is_finalizing():
            return

        # from https://github.com/python/cpython/blob/91b7f2e7f6593acefda4fa860250dd87d6f849bf/Lib/logging/handlers.py#L1453
        msg = self.format(record)
        record = copy.copy(record)
        record.message = msg
        record.msg = msg
        record.args = None
        record.exc_info = None
        record.exc_text = None
        record.stack_info = None

        # https://websockets.readthedocs.io/en/stable/topics/logging.html#logging-to-json
        # webosckets library add "websocket" attribute to log records, which is not pickleable
        if hasattr(record, "websocket"):
            record.websocket = None

        self._send_q.put_nowait(pickle.dumps(record))

    except Exception:
        self.handleError(record)

Do whatever it takes to actually log the specified logging record.

This version is intended to be implemented by subclasses and so raises a NotImplementedError.

class LogQueueListener (duplex: utils.aio.duplex_unix._Duplex,
prepare_fnc: Callable[[logging.LogRecord], None])
Expand source code
class LogQueueListener:
    def __init__(
        self,
        duplex: utils.aio.duplex_unix._Duplex,
        prepare_fnc: Callable[[logging.LogRecord], None],
    ):
        self._thread: threading.Thread | None = None
        self._duplex = duplex
        self._prepare_fnc = prepare_fnc

    def start(self) -> None:
        self._thread = threading.Thread(target=self._monitor, name="ipc_log_listener")
        self._thread.start()

    def stop(self) -> None:
        if self._thread is None:
            return

        self._duplex.close()
        self._thread.join()
        self._thread = None

    def handle(self, record: logging.LogRecord) -> None:
        self._prepare_fnc(record)

        lger = logging.getLogger(record.name)
        if not lger.isEnabledFor(record.levelno):
            return

        lger.callHandlers(record)

    def _monitor(self):
        while True:
            try:
                data = self._duplex.recv_bytes()
            except utils.aio.duplex_unix.DuplexClosed:
                break

            record = pickle.loads(data)
            self.handle(record)

Methods

def handle(self, record: logging.LogRecord) ‑> None
Expand source code
def handle(self, record: logging.LogRecord) -> None:
    self._prepare_fnc(record)

    lger = logging.getLogger(record.name)
    if not lger.isEnabledFor(record.levelno):
        return

    lger.callHandlers(record)
def start(self) ‑> None
Expand source code
def start(self) -> None:
    self._thread = threading.Thread(target=self._monitor, name="ipc_log_listener")
    self._thread.start()
def stop(self) ‑> None
Expand source code
def stop(self) -> None:
    if self._thread is None:
        return

    self._duplex.close()
    self._thread.join()
    self._thread = None