Module livekit.browser.proc_main

Functions

def main(mp_cch: socket.socket, resources_dir: str)
Expand source code
def main(mp_cch: socket.socket, resources_dir: str):
    import sys
    import os

    # Create a new process group so killpg only affects this subprocess
    # and its CEF helpers, not the parent.
    try:
        os.setpgrp()
    except OSError:
        pass

    # Add resources_dir to sys.path so `import lkcef_python` works
    sys.path.insert(0, resources_dir)

    import lkcef_python as lkcef

    duplex = Duplex.open(mp_cch)

    init_req = channel.recv_message(duplex, proto.IPC_MESSAGES)
    assert isinstance(init_req, proto.InitializeContextRequest)

    logger.debug("initializing browser context", extra={"dev_mode": init_req.dev_mode})

    def _context_initialized():
        try:
            channel.send_message(duplex, proto.ContextInitializedResponse())
        except DuplexClosed:
            logger.warning("duplex closed, failed to send ContextInitializedResponse")

    opts = lkcef.AppOptions()
    opts.dev_mode = init_req.dev_mode
    opts.remote_debugging_port = init_req.remote_debugging_port
    opts.root_cache_path = init_req.root_cache_path
    opts.initialized_callback = _context_initialized

    if sys.platform.startswith("darwin"):
        app_path = os.path.join(resources_dir, "lkcef_app.app")
        opts.framework_path = os.path.join(
            app_path, "Contents", "Frameworks", "Chromium Embedded Framework.framework"
        )
        opts.main_bundle_path = app_path
        opts.subprocess_path = os.path.join(
            app_path, "Contents", "Frameworks", "lkcef Helper.app",
            "Contents", "MacOS", "lkcef Helper"
        )
    else:
        opts.framework_path = ""
        opts.main_bundle_path = ""
        opts.subprocess_path = os.path.join(resources_dir, "lkcef_helper")

    app = lkcef.BrowserApp(opts)
    man_t = threading.Thread(target=_manager_thread, args=(duplex, app))
    man_t.start()

    app.run()
    duplex.close()
    man_t.join()

Classes

class BrowserServer (duplex: livekit.agents.utils.aio.duplex_unix._Duplex,
shms: list[multiprocessing.shared_memory.SharedMemory],
page_id: int,
send_lock: )
Expand source code
class BrowserServer:
    def __init__(
        self,
        duplex: Duplex,
        shms: list[mp_shm.SharedMemory],
        page_id: int,
        send_lock: threading.Lock,
    ):
        self._duplex = duplex
        self._shms = shms
        self._page_id = page_id
        self._send_lock = send_lock

        self._view_width = 0
        self._view_height = 0

        self._closing = False
        self._shm_available = []
        for _ in shms:
            ev = threading.Event()
            ev.set()
            self._shm_available.append(ev)

    @staticmethod
    def create(
        *,
        duplex: Duplex,
        create_req: proto.CreateBrowserRequest,
        browser_app,
        send_lock: threading.Lock,
    ) -> "BrowserServer":
        logger.debug(
            "creating browser",
            extra={
                "page_id": create_req.page_id,
                "url": create_req.url,
                "framerate": create_req.framerate,
                "width": create_req.width,
                "height": create_req.height,
                "shm_names": create_req.shm_names,
            },
        )

        import lkcef_python as lkcef

        opts = lkcef.BrowserOptions()
        opts.framerate = create_req.framerate
        opts.width = create_req.width
        opts.height = create_req.height

        shms = [mp_shm.SharedMemory(name=name) for name in create_req.shm_names]
        bserver = BrowserServer(duplex, shms, create_req.page_id, send_lock)

        opts.created_callback = bserver._browser_created
        opts.paint_callback = bserver._paint
        opts.close_callback = bserver._closed
        opts.audio_stream_started_callback = bserver._audio_stream_started
        opts.audio_stream_packet_callback = bserver._audio_packet
        opts.audio_stream_stopped_callback = bserver._audio_stream_stopped
        opts.cursor_change_callback = bserver._cursor_changed
        opts.url_changed_callback = bserver._url_changed
        browser_app.create_browser(create_req.url, opts)
        return bserver

    def _browser_created(self, impl):
        browser_id = impl.identifier()
        logger.debug(
            "browser created",
            extra={"browser_id": browser_id, "page_id": self._page_id},
        )

        self._impl = impl

        try:
            with self._send_lock:
                channel.send_message(
                    self._duplex,
                    proto.CreateBrowserResponse(page_id=self._page_id, browser_id=browser_id),
                )
        except DuplexClosed:
            self._closing = True
            logger.warning("duplex closed, failed to send CreateBrowserResponse")

    def _paint(self, frame_data):
        if self._closing:
            return  # make sure to not use the shm

        # Find a free SHM buffer
        buffer_id = -1
        for i, ev in enumerate(self._shm_available):
            if ev.is_set():
                buffer_id = i
                break

        if buffer_id == -1:
            logger.warning("paint frame dropped — all SHM buffers busy")
            return

        self._shm_available[buffer_id].clear()

        acq = proto.AcquirePaintData()
        acq.page_id = self._page_id
        acq.buffer_id = buffer_id
        acq.width = frame_data.width
        acq.height = frame_data.height

        dirty_rects = []
        for rect in frame_data.dirty_rects:
            dirty_rects.append((rect.x, rect.y, rect.width, rect.height))

        acq.dirty_rects = dirty_rects

        old_width = self._view_width
        old_height = self._view_height
        self._view_width = frame_data.width
        self._view_height = frame_data.height

        proto.copy_paint_data(acq, old_width, old_height, frame_data.buffer, self._shms[buffer_id].buf)

        try:
            with self._send_lock:
                channel.send_message(self._duplex, acq)
        except DuplexClosed:
            self._shm_available[buffer_id].set()
            self._closing = True
            logger.warning("duplex closed, stopping paint")

    def _audio_stream_started(self, sample_rate: int, channels: int, frames: int):
        try:
            with self._send_lock:
                channel.send_message(
                    self._duplex,
                    proto.AudioStreamStarted(
                        page_id=self._page_id,
                        sample_rate=sample_rate,
                        channels=channels,
                        frames_per_buffer=frames,
                    ),
                )
        except DuplexClosed:
            self._closing = True
            logger.warning("duplex closed, failed to send AudioStreamStarted")

    def _audio_packet(
        self,
        data_bytes: bytes,
        frames: int,
        channels: int,
        sample_rate: int,
        pts: int,
    ):
        if self._closing:
            return

        try:
            with self._send_lock:
                channel.send_message(
                    self._duplex,
                    proto.AcquireAudioData(
                        page_id=self._page_id,
                        sample_rate=sample_rate,
                        channels=channels,
                        frames=frames,
                        pts=pts,
                        data=data_bytes,
                    ),
                )
        except DuplexClosed:
            self._closing = True
            logger.warning("duplex closed, stopping audio")

    def _audio_stream_stopped(self):
        try:
            with self._send_lock:
                channel.send_message(
                    self._duplex,
                    proto.AudioStreamStopped(page_id=self._page_id),
                )
        except DuplexClosed:
            self._closing = True
            logger.warning("duplex closed, failed to send AudioStreamStopped")

    def _cursor_changed(self, cursor_type: int):
        if self._closing:
            return

        try:
            with self._send_lock:
                channel.send_message(
                    self._duplex,
                    proto.CursorChanged(
                        page_id=self._page_id, cursor_type=cursor_type
                    ),
                )
        except DuplexClosed:
            self._closing = True
            logger.warning("duplex closed, failed to send CursorChanged")

    def _url_changed(self, url: str):
        if self._closing:
            return

        try:
            with self._send_lock:
                channel.send_message(
                    self._duplex,
                    proto.UrlChanged(
                        page_id=self._page_id, url=url
                    ),
                )
        except DuplexClosed:
            self._closing = True
            logger.warning("duplex closed, failed to send UrlChanged")

    def _closed(self) -> None:
        try:
            with self._send_lock:
                channel.send_message(self._duplex, proto.BrowserClosed(page_id=self._page_id))
        except DuplexClosed:
            self._closing = True
            logger.warning("duplex closed, failed to send BrowserClosed")

    def handle_release_paint(self, msg: proto.ReleasePaintData):
        self._shm_available[msg.buffer_id].set()

    def handle_close(self, msg: proto.CloseBrowserRequest):
        self._closing = True
        self._impl.close()

Static methods

def create(*,
duplex: livekit.agents.utils.aio.duplex_unix._Duplex,
create_req: CreateBrowserRequest,
browser_app,
send_lock: ) ‑> BrowserServer
Expand source code
@staticmethod
def create(
    *,
    duplex: Duplex,
    create_req: proto.CreateBrowserRequest,
    browser_app,
    send_lock: threading.Lock,
) -> "BrowserServer":
    logger.debug(
        "creating browser",
        extra={
            "page_id": create_req.page_id,
            "url": create_req.url,
            "framerate": create_req.framerate,
            "width": create_req.width,
            "height": create_req.height,
            "shm_names": create_req.shm_names,
        },
    )

    import lkcef_python as lkcef

    opts = lkcef.BrowserOptions()
    opts.framerate = create_req.framerate
    opts.width = create_req.width
    opts.height = create_req.height

    shms = [mp_shm.SharedMemory(name=name) for name in create_req.shm_names]
    bserver = BrowserServer(duplex, shms, create_req.page_id, send_lock)

    opts.created_callback = bserver._browser_created
    opts.paint_callback = bserver._paint
    opts.close_callback = bserver._closed
    opts.audio_stream_started_callback = bserver._audio_stream_started
    opts.audio_stream_packet_callback = bserver._audio_packet
    opts.audio_stream_stopped_callback = bserver._audio_stream_stopped
    opts.cursor_change_callback = bserver._cursor_changed
    opts.url_changed_callback = bserver._url_changed
    browser_app.create_browser(create_req.url, opts)
    return bserver

Methods

def handle_close(self,
msg: CloseBrowserRequest)
Expand source code
def handle_close(self, msg: proto.CloseBrowserRequest):
    self._closing = True
    self._impl.close()
def handle_release_paint(self,
msg: ReleasePaintData)
Expand source code
def handle_release_paint(self, msg: proto.ReleasePaintData):
    self._shm_available[msg.buffer_id].set()