Module livekit.agents.ipc.log_queue
Classes
class LogQueueHandler (duplex: utils.aio.duplex_unix._Duplex)
-
Expand source code
class LogQueueHandler(logging.Handler): _sentinal = None def __init__(self, duplex: utils.aio.duplex_unix._Duplex) -> None: super().__init__() self._duplex = duplex self._send_q = queue.SimpleQueue[Optional[bytes]]() self._send_thread = threading.Thread( target=self._forward_logs, name="ipc_log_forwarder" ) self._send_thread.start() def _forward_logs(self): while True: serialized_record = self._send_q.get() if serialized_record is None: break try: self._duplex.send_bytes(serialized_record) except duplex_unix.DuplexClosed: break self._duplex.close() def emit(self, record: logging.LogRecord) -> None: try: # Check if Python is shutting down if sys.is_finalizing(): return # from https://github.com/python/cpython/blob/91b7f2e7f6593acefda4fa860250dd87d6f849bf/Lib/logging/handlers.py#L1453 msg = self.format(record) record = copy.copy(record) record.message = msg record.msg = msg record.args = None record.exc_info = None record.exc_text = None record.stack_info = None # https://websockets.readthedocs.io/en/stable/topics/logging.html#logging-to-json # webosckets library add "websocket" attribute to log records, which is not pickleable if hasattr(record, "websocket"): record.websocket = None self._send_q.put_nowait(pickle.dumps(record)) except Exception: self.handleError(record) def close(self) -> None: super().close() self._send_q.put_nowait(self._sentinal)
Handler instances dispatch logging events to specific destinations.
The base handler class. Acts as a placeholder which defines the Handler interface. Handlers can optionally use Formatter instances to format records as desired. By default, no formatter is specified; in this case, the 'raw' message as determined by record.message is logged.
Initializes the instance - basically setting the formatter to None and the filter list to empty.
Ancestors
- logging.Handler
- logging.Filterer
Methods
def close(self) ‑> None
-
Expand source code
def close(self) -> None: super().close() self._send_q.put_nowait(self._sentinal)
Tidy up any resources used by the handler.
This version removes the handler from an internal map of handlers, _handlers, which is used for handler lookup by name. Subclasses should ensure that this gets called from overridden close() methods.
def emit(self, record: logging.LogRecord) ‑> None
-
Expand source code
def emit(self, record: logging.LogRecord) -> None: try: # Check if Python is shutting down if sys.is_finalizing(): return # from https://github.com/python/cpython/blob/91b7f2e7f6593acefda4fa860250dd87d6f849bf/Lib/logging/handlers.py#L1453 msg = self.format(record) record = copy.copy(record) record.message = msg record.msg = msg record.args = None record.exc_info = None record.exc_text = None record.stack_info = None # https://websockets.readthedocs.io/en/stable/topics/logging.html#logging-to-json # webosckets library add "websocket" attribute to log records, which is not pickleable if hasattr(record, "websocket"): record.websocket = None self._send_q.put_nowait(pickle.dumps(record)) except Exception: self.handleError(record)
Do whatever it takes to actually log the specified logging record.
This version is intended to be implemented by subclasses and so raises a NotImplementedError.
class LogQueueListener (duplex: utils.aio.duplex_unix._Duplex,
prepare_fnc: Callable[[logging.LogRecord], None])-
Expand source code
class LogQueueListener: def __init__( self, duplex: utils.aio.duplex_unix._Duplex, prepare_fnc: Callable[[logging.LogRecord], None], ): self._thread: threading.Thread | None = None self._duplex = duplex self._prepare_fnc = prepare_fnc def start(self) -> None: self._thread = threading.Thread(target=self._monitor, name="ipc_log_listener") self._thread.start() def stop(self) -> None: if self._thread is None: return self._duplex.close() self._thread.join() self._thread = None def handle(self, record: logging.LogRecord) -> None: self._prepare_fnc(record) lger = logging.getLogger(record.name) if not lger.isEnabledFor(record.levelno): return lger.callHandlers(record) def _monitor(self): while True: try: data = self._duplex.recv_bytes() except utils.aio.duplex_unix.DuplexClosed: break record = pickle.loads(data) self.handle(record)
Methods
def handle(self, record: logging.LogRecord) ‑> None
-
Expand source code
def handle(self, record: logging.LogRecord) -> None: self._prepare_fnc(record) lger = logging.getLogger(record.name) if not lger.isEnabledFor(record.levelno): return lger.callHandlers(record)
def start(self) ‑> None
-
Expand source code
def start(self) -> None: self._thread = threading.Thread(target=self._monitor, name="ipc_log_listener") self._thread.start()
def stop(self) ‑> None
-
Expand source code
def stop(self) -> None: if self._thread is None: return self._duplex.close() self._thread.join() self._thread = None