Module livekit.browser.proc

Classes

class AudioData (frame: rtc.AudioFrame, pts: int)
Expand source code
@dataclass
class AudioData:
    frame: rtc.AudioFrame
    pts: int

AudioData(frame: 'rtc.AudioFrame', pts: 'int')

Instance variables

var frameAudioFrame
var pts : int
class BrowserContext (*, dev_mode: bool, remote_debugging_port: int = 0)
Expand source code
class BrowserContext:
    def __init__(self, *, dev_mode: bool, remote_debugging_port: int = 0) -> None:
        self._mp_ctx = mp.get_context("spawn")
        self._pages: dict[int, BrowserPage] = {}
        self._dev_mode = dev_mode
        self._initialized = False
        self._next_page_id = 1
        self._remote_debugging_port = remote_debugging_port
        self._closed_event = asyncio.Event()
        self._proc = None
        self._duplex = None
        self._main_atask = None
        self._root_cache_path = None

    async def initialize(self) -> None:
        from .download import get_resources_dir

        resources_dir = str(get_resources_dir())

        mp_pch, mp_cch = socket.socketpair()
        self._duplex = await AsyncDuplex.open(mp_pch)

        self._proc = self._mp_ctx.Process(target=proc_main.main, args=(mp_cch, resources_dir))

        # On Linux aarch64, libcef.so is too large for late TLS allocation.
        # Set LD_PRELOAD so the dynamic linker reserves TLS early.
        _preload_restore = None
        if sys.platform.startswith("linux"):
            libcef = os.path.join(resources_dir, "libcef.so")
            if os.path.exists(libcef):
                _preload_restore = os.environ.get("LD_PRELOAD")
                old = _preload_restore or ""
                os.environ["LD_PRELOAD"] = libcef + (":" + old if old else "")

        self._proc.start()

        if _preload_restore is not None:
            os.environ["LD_PRELOAD"] = _preload_restore
        elif "LD_PRELOAD" in os.environ and sys.platform.startswith("linux"):
            del os.environ["LD_PRELOAD"]
        mp_cch.close()

        try:
            await self._initialize_context(resources_dir)
        except BaseException:
            await self._cleanup_process()
            raise

    async def _initialize_context(self, resources_dir: str) -> None:
        if not self._remote_debugging_port:
            with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
                s.bind(("", 0))
                s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
                self._remote_debugging_port = s.getsockname()[1]

            logger.debug("using remote debugging port %d", self._remote_debugging_port)

        self._root_cache_path = tempfile.mkdtemp(prefix="lkcef_cache_")
        await channel.asend_message(
            self._duplex,
            proto.InitializeContextRequest(
                dev_mode=self._dev_mode,
                remote_debugging_port=self._remote_debugging_port,
                root_cache_path=self._root_cache_path,
            ),
        )
        resp = await channel.arecv_message(self._duplex, proto.IPC_MESSAGES)
        assert isinstance(resp, proto.ContextInitializedResponse)
        self._initialized = True
        logger.debug("browser context initialized", extra={"pid": self._proc.pid})

        self._main_atask = asyncio.create_task(self._main_task(self._duplex))

    async def _cleanup_process(self) -> None:
        if self._duplex:
            try:
                await self._duplex.aclose()
            except DuplexClosed:
                pass

        # Subprocess has its own 5s watchdog — just wait for it
        if self._proc and self._proc.is_alive():
            self._proc.join(timeout=15)
            if self._proc.is_alive():
                logger.warning("subprocess did not exit, killing")
                self._proc.kill()
                self._proc.join()

        if self._proc:
            self._proc.close()

        if self._root_cache_path:
            shutil.rmtree(self._root_cache_path, ignore_errors=True)

    @asynccontextmanager
    async def playwright(self, timeout: float | None = None):
        if not self._initialized:
            raise RuntimeError("BrowserContext not initialized")

        from playwright.async_api import async_playwright

        async with async_playwright() as p:
            url = f"http://localhost:{self._remote_debugging_port}"
            browser = await p.chromium.connect_over_cdp(url, timeout=timeout)
            try:
                yield browser
            finally:
                await browser.close()

    @_log_exceptions(_logger=logger)
    async def _main_task(self, duplex: AsyncDuplex) -> None:
        while True:
            try:
                msg = await channel.arecv_message(duplex, proto.IPC_MESSAGES)
            except DuplexClosed:
                break

            try:
                if isinstance(msg, proto.CreateBrowserResponse):
                    page = self._pages[msg.page_id]
                    await page._handle_created(msg)
                elif isinstance(msg, proto.AcquireAudioData):
                    page = self._pages[msg.page_id]
                    page._handle_audio_data(msg)
                elif isinstance(msg, proto.AcquirePaintData):
                    page = self._pages[msg.page_id]
                    await page._handle_paint(msg)
                elif isinstance(msg, proto.AudioStreamStarted):
                    pass
                elif isinstance(msg, proto.AudioStreamStopped):
                    pass
                elif isinstance(msg, proto.CursorChanged):
                    page = self._pages[msg.page_id]
                    page.emit("cursor_changed", msg.cursor_type)
                elif isinstance(msg, proto.UrlChanged):
                    page = self._pages[msg.page_id]
                    page.emit("url_changed", msg.url)
                elif isinstance(msg, proto.BrowserClosed):
                    page = self._pages[msg.page_id]
                    await page._handle_close(msg)
            except Exception:
                logger.exception("error handling IPC message %s", type(msg).__name__)

        # Subprocess exited — resolve any pending page futures so cleanup doesn't hang
        for page in self._pages.values():
            if not page._close_fut.done():
                page._close_fut.set_result(None)
            if not page._created_fut.done():
                page._created_fut.set_result(None)
        self._closed_event.set()

    async def new_page(
        self, *, url: str, width: int = 1920, height: int = 1080, framerate: int = 24
    ) -> BrowserPage:
        if not self._initialized:
            raise RuntimeError("BrowserContext not initialized")

        page_id = self._next_page_id
        self._next_page_id += 1
        page = BrowserPage(
            self._mp_ctx,
            _PageOptions(
                page_id=page_id,
                url=url,
                width=width,
                height=height,
                framerate=framerate,
            ),
            self._duplex,
        )
        self._pages[page_id] = page
        await page.start()
        return page

    async def aclose(self) -> None:
        if not self._initialized:
            return

        self._initialized = False

        # Close all pages (ensure shm cleanup even on errors)
        for page in list(self._pages.values()):
            try:
                await page.aclose()
            except Exception:
                page._cleanup_shm()

        # Cancel the main task
        if self._main_atask:
            self._main_atask.cancel()
            with contextlib.suppress(asyncio.CancelledError):
                await self._main_atask

        await self._cleanup_process()

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    if not self._initialized:
        return

    self._initialized = False

    # Close all pages (ensure shm cleanup even on errors)
    for page in list(self._pages.values()):
        try:
            await page.aclose()
        except Exception:
            page._cleanup_shm()

    # Cancel the main task
    if self._main_atask:
        self._main_atask.cancel()
        with contextlib.suppress(asyncio.CancelledError):
            await self._main_atask

    await self._cleanup_process()
async def initialize(self) ‑> None
Expand source code
async def initialize(self) -> None:
    from .download import get_resources_dir

    resources_dir = str(get_resources_dir())

    mp_pch, mp_cch = socket.socketpair()
    self._duplex = await AsyncDuplex.open(mp_pch)

    self._proc = self._mp_ctx.Process(target=proc_main.main, args=(mp_cch, resources_dir))

    # On Linux aarch64, libcef.so is too large for late TLS allocation.
    # Set LD_PRELOAD so the dynamic linker reserves TLS early.
    _preload_restore = None
    if sys.platform.startswith("linux"):
        libcef = os.path.join(resources_dir, "libcef.so")
        if os.path.exists(libcef):
            _preload_restore = os.environ.get("LD_PRELOAD")
            old = _preload_restore or ""
            os.environ["LD_PRELOAD"] = libcef + (":" + old if old else "")

    self._proc.start()

    if _preload_restore is not None:
        os.environ["LD_PRELOAD"] = _preload_restore
    elif "LD_PRELOAD" in os.environ and sys.platform.startswith("linux"):
        del os.environ["LD_PRELOAD"]
    mp_cch.close()

    try:
        await self._initialize_context(resources_dir)
    except BaseException:
        await self._cleanup_process()
        raise
async def new_page(self, *, url: str, width: int = 1920, height: int = 1080, framerate: int = 24) ‑> BrowserPage
Expand source code
async def new_page(
    self, *, url: str, width: int = 1920, height: int = 1080, framerate: int = 24
) -> BrowserPage:
    if not self._initialized:
        raise RuntimeError("BrowserContext not initialized")

    page_id = self._next_page_id
    self._next_page_id += 1
    page = BrowserPage(
        self._mp_ctx,
        _PageOptions(
            page_id=page_id,
            url=url,
            width=width,
            height=height,
            framerate=framerate,
        ),
        self._duplex,
    )
    self._pages[page_id] = page
    await page.start()
    return page
async def playwright(self, timeout: float | None = None)
Expand source code
@asynccontextmanager
async def playwright(self, timeout: float | None = None):
    if not self._initialized:
        raise RuntimeError("BrowserContext not initialized")

    from playwright.async_api import async_playwright

    async with async_playwright() as p:
        url = f"http://localhost:{self._remote_debugging_port}"
        browser = await p.chromium.connect_over_cdp(url, timeout=timeout)
        try:
            yield browser
        finally:
            await browser.close()
class BrowserOptions (url: str,
framerate: int,
width: int,
height: int,
paint_callback: Callable[[PaintData], None])
Expand source code
@dataclass
class BrowserOptions:
    url: str
    framerate: int
    width: int
    height: int
    paint_callback: Callable[[PaintData], None]

BrowserOptions(url: 'str', framerate: 'int', width: 'int', height: 'int', paint_callback: 'Callable[[PaintData], None]')

Instance variables

var framerate : int
var height : int
var paint_callback : Callable[[PaintData], None]
var url : str
var width : int
class BrowserPage (mp_ctx: mpc.SpawnContext, opts: _PageOptions, ctx_duplex: AsyncDuplex)
Expand source code
class BrowserPage(EventEmitter[EventTypes]):
    def __init__(
        self,
        mp_ctx: mpc.SpawnContext,
        opts: _PageOptions,
        ctx_duplex: AsyncDuplex,
    ) -> None:
        super().__init__()
        self._mp_ctx = mp_ctx
        self._opts = opts
        self._ctx_duplex = ctx_duplex

        self._view_width = 0
        self._view_height = 0

        self._created_fut = asyncio.Future()
        self._close_fut = asyncio.Future()
        self._closed = False

    @property
    def id(self) -> int:
        return self._opts.page_id

    _NUM_SHM_BUFFERS = 2

    async def start(self) -> None:
        import multiprocessing.shared_memory as mp_shm

        shm_size = proto.SHM_MAX_WIDTH * proto.SHM_MAX_HEIGHT * 4
        self._shms = []
        shm_names = []
        for _ in range(self._NUM_SHM_BUFFERS):
            name = f"lkcef_browser_{_shortuuid()}"
            shm = mp_shm.SharedMemory(create=True, size=shm_size, name=name)
            self._shms.append(shm)
            shm_names.append(name)

        self._framebuffer = rtc.VideoFrame(
            proto.SHM_MAX_WIDTH,
            proto.SHM_MAX_HEIGHT,
            rtc.VideoBufferType.BGRA,
            bytearray(proto.SHM_MAX_WIDTH * proto.SHM_MAX_HEIGHT * 4),
        )

        req = proto.CreateBrowserRequest(
            page_id=self._opts.page_id,
            width=self._opts.width,
            height=self._opts.height,
            shm_names=shm_names,
            url=self._opts.url,
            framerate=self._opts.framerate,
        )

        await channel.asend_message(self._ctx_duplex, req)

        # TODO(theomonnom): create timeout (would prevent never resolving futures if the
        #  browser process crashed for some reasons)
        await asyncio.shield(self._created_fut)

    async def aclose(self) -> None:
        if self._closed:
            return
        self._closed = True

        try:
            await channel.asend_message(
                self._ctx_duplex, proto.CloseBrowserRequest(page_id=self.id)
            )
            await asyncio.shield(self._close_fut)
        except DuplexClosed:
            pass  # Subprocess already closed (e.g., window closed manually)
        finally:
            self._cleanup_shm()

    def _cleanup_shm(self) -> None:
        for shm in self._shms:
            try:
                shm.close()
            except Exception:
                pass
            try:
                shm.unlink()
            except Exception:
                pass
        self._shms.clear()

    async def _handle_created(self, msg: proto.CreateBrowserResponse) -> None:
        self._created_fut.set_result(None)

    async def _handle_paint(self, acq: proto.AcquirePaintData) -> None:
        old_width = self._view_width
        old_height = self._view_height
        self._view_width = acq.width
        self._view_height = acq.height

        # TODO(theomonnom): remove hacky alloc-free resizing
        self._framebuffer._width = acq.width
        self._framebuffer._height = acq.height

        proto.copy_paint_data(acq, old_width, old_height, self._shms[acq.buffer_id].buf, self._framebuffer.data)

        release_paint = proto.ReleasePaintData(page_id=acq.page_id, buffer_id=acq.buffer_id)
        await channel.asend_message(self._ctx_duplex, release_paint)

        paint_data = PaintData(
            dirty_rects=acq.dirty_rects,
            frame=self._framebuffer,
            width=acq.width,
            height=acq.height,
        )
        self.emit("paint", paint_data)

    async def send_mouse_move(self, x: int, y: int, mouse_leave: bool = False) -> None:
        await channel.asend_message(
            self._ctx_duplex,
            proto.SendMouseMoveEvent(page_id=self.id, x=x, y=y, mouse_leave=mouse_leave),
        )

    async def send_mouse_click(
        self, x: int, y: int, button: int = 0, mouse_up: bool = False, click_count: int = 1
    ) -> None:
        await channel.asend_message(
            self._ctx_duplex,
            proto.SendMouseClickEvent(
                page_id=self.id, x=x, y=y, button_type=button, mouse_up=mouse_up, click_count=click_count
            ),
        )

    async def send_mouse_wheel(self, x: int, y: int, delta_x: int = 0, delta_y: int = 0) -> None:
        await channel.asend_message(
            self._ctx_duplex,
            proto.SendMouseWheelEvent(page_id=self.id, x=x, y=y, delta_x=delta_x, delta_y=delta_y),
        )

    async def send_key_event(
        self, type: int, modifiers: int, windows_key_code: int, native_key_code: int = 0, character: int = 0
    ) -> None:
        await channel.asend_message(
            self._ctx_duplex,
            proto.SendKeyEvent(
                page_id=self.id,
                type=type,
                modifiers=modifiers,
                windows_key_code=windows_key_code,
                native_key_code=native_key_code,
                character=character,
            ),
        )

    async def navigate(self, url: str) -> None:
        await channel.asend_message(
            self._ctx_duplex,
            proto.NavigateRequest(page_id=self.id, url=url),
        )

    async def go_back(self) -> None:
        await channel.asend_message(
            self._ctx_duplex,
            proto.GoBackRequest(page_id=self.id),
        )

    async def go_forward(self) -> None:
        await channel.asend_message(
            self._ctx_duplex,
            proto.GoForwardRequest(page_id=self.id),
        )

    async def send_focus_event(self, set_focus: bool = True) -> None:
        await channel.asend_message(
            self._ctx_duplex,
            proto.SendFocusEvent(page_id=self.id, set_focus=set_focus),
        )

    def _handle_audio_data(self, msg: proto.AcquireAudioData) -> None:
        if msg.channels <= 0 or msg.sample_rate <= 0 or msg.frames <= 0:
            return

        frame = rtc.AudioFrame(
            data=msg.data,
            sample_rate=msg.sample_rate,
            num_channels=msg.channels,
            samples_per_channel=msg.frames,
        )
        self.emit("audio", AudioData(frame=frame, pts=msg.pts))

    async def _handle_close(self, msg: proto.BrowserClosed) -> None:
        logger.debug("browser page closed", extra={"page_id": self.id})
        self._close_fut.set_result(None)

Abstract base class for generic types.

On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::

class Mapping[KT, VT]:
    def __getitem__(self, key: KT) -> VT:
        ...
    # Etc.

On older versions of Python, however, generic classes have to explicitly inherit from Generic.

After a class has been declared to be generic, it can then be used as follows::

def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT:
    try:
        return mapping[key]
    except KeyError:
        return default

Initialize a new instance of EventEmitter.

Ancestors

Instance variables

prop id : int
Expand source code
@property
def id(self) -> int:
    return self._opts.page_id

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    if self._closed:
        return
    self._closed = True

    try:
        await channel.asend_message(
            self._ctx_duplex, proto.CloseBrowserRequest(page_id=self.id)
        )
        await asyncio.shield(self._close_fut)
    except DuplexClosed:
        pass  # Subprocess already closed (e.g., window closed manually)
    finally:
        self._cleanup_shm()
async def go_back(self) ‑> None
Expand source code
async def go_back(self) -> None:
    await channel.asend_message(
        self._ctx_duplex,
        proto.GoBackRequest(page_id=self.id),
    )
async def go_forward(self) ‑> None
Expand source code
async def go_forward(self) -> None:
    await channel.asend_message(
        self._ctx_duplex,
        proto.GoForwardRequest(page_id=self.id),
    )
async def navigate(self, url: str) ‑> None
Expand source code
async def navigate(self, url: str) -> None:
    await channel.asend_message(
        self._ctx_duplex,
        proto.NavigateRequest(page_id=self.id, url=url),
    )
async def send_focus_event(self, set_focus: bool = True) ‑> None
Expand source code
async def send_focus_event(self, set_focus: bool = True) -> None:
    await channel.asend_message(
        self._ctx_duplex,
        proto.SendFocusEvent(page_id=self.id, set_focus=set_focus),
    )
async def send_key_event(self,
type: int,
modifiers: int,
windows_key_code: int,
native_key_code: int = 0,
character: int = 0) ‑> None
Expand source code
async def send_key_event(
    self, type: int, modifiers: int, windows_key_code: int, native_key_code: int = 0, character: int = 0
) -> None:
    await channel.asend_message(
        self._ctx_duplex,
        proto.SendKeyEvent(
            page_id=self.id,
            type=type,
            modifiers=modifiers,
            windows_key_code=windows_key_code,
            native_key_code=native_key_code,
            character=character,
        ),
    )
async def send_mouse_click(self, x: int, y: int, button: int = 0, mouse_up: bool = False, click_count: int = 1) ‑> None
Expand source code
async def send_mouse_click(
    self, x: int, y: int, button: int = 0, mouse_up: bool = False, click_count: int = 1
) -> None:
    await channel.asend_message(
        self._ctx_duplex,
        proto.SendMouseClickEvent(
            page_id=self.id, x=x, y=y, button_type=button, mouse_up=mouse_up, click_count=click_count
        ),
    )
async def send_mouse_move(self, x: int, y: int, mouse_leave: bool = False) ‑> None
Expand source code
async def send_mouse_move(self, x: int, y: int, mouse_leave: bool = False) -> None:
    await channel.asend_message(
        self._ctx_duplex,
        proto.SendMouseMoveEvent(page_id=self.id, x=x, y=y, mouse_leave=mouse_leave),
    )
async def send_mouse_wheel(self, x: int, y: int, delta_x: int = 0, delta_y: int = 0) ‑> None
Expand source code
async def send_mouse_wheel(self, x: int, y: int, delta_x: int = 0, delta_y: int = 0) -> None:
    await channel.asend_message(
        self._ctx_duplex,
        proto.SendMouseWheelEvent(page_id=self.id, x=x, y=y, delta_x=delta_x, delta_y=delta_y),
    )
async def start(self) ‑> None
Expand source code
async def start(self) -> None:
    import multiprocessing.shared_memory as mp_shm

    shm_size = proto.SHM_MAX_WIDTH * proto.SHM_MAX_HEIGHT * 4
    self._shms = []
    shm_names = []
    for _ in range(self._NUM_SHM_BUFFERS):
        name = f"lkcef_browser_{_shortuuid()}"
        shm = mp_shm.SharedMemory(create=True, size=shm_size, name=name)
        self._shms.append(shm)
        shm_names.append(name)

    self._framebuffer = rtc.VideoFrame(
        proto.SHM_MAX_WIDTH,
        proto.SHM_MAX_HEIGHT,
        rtc.VideoBufferType.BGRA,
        bytearray(proto.SHM_MAX_WIDTH * proto.SHM_MAX_HEIGHT * 4),
    )

    req = proto.CreateBrowserRequest(
        page_id=self._opts.page_id,
        width=self._opts.width,
        height=self._opts.height,
        shm_names=shm_names,
        url=self._opts.url,
        framerate=self._opts.framerate,
    )

    await channel.asend_message(self._ctx_duplex, req)

    # TODO(theomonnom): create timeout (would prevent never resolving futures if the
    #  browser process crashed for some reasons)
    await asyncio.shield(self._created_fut)

Inherited members

class PaintData (dirty_rects: list[tuple[int, int, int, int]],
frame: rtc.VideoFrame,
width: int,
height: int)
Expand source code
@dataclass
class PaintData:
    dirty_rects: list[tuple[int, int, int, int]]
    frame: rtc.VideoFrame
    width: int
    height: int

PaintData(dirty_rects: 'list[tuple[int, int, int, int]]', frame: 'rtc.VideoFrame', width: 'int', height: 'int')

Instance variables

var dirty_rects : list[tuple[int, int, int, int]]
var frameVideoFrame
var height : int
var width : int