Module livekit.plugins.browser

Browser plugin for LiveKit Agents

Support for Chromium Embedded Framework (CEF).

Classes

class AudioData (frame: rtc.AudioFrame, pts: int)
Expand source code
@dataclass
class AudioData:
    frame: rtc.AudioFrame
    pts: int

AudioData(frame: 'rtc.AudioFrame', pts: 'int')

Instance variables

var frameAudioFrame
var pts : int
class BrowserAgent (*,
url: str = 'https://www.google.com/',
llm: llm.LLM,
instructions: str = 'You are a helpful AI assistant that can browse the web. Use the computer tool to interact with the browser.',
width: int = 1280,
height: int = 720,
framerate: int = 30,
tools: list[llm.Tool] | None = None,
chat_enabled: bool = True)
Expand source code
class BrowserAgent:
    def __init__(
        self,
        *,
        url: str = "https://www.google.com/",
        llm: llm.LLM,
        instructions: str = "You are a helpful AI assistant that can browse the web. Use the computer tool to interact with the browser.",
        width: int = 1280,
        height: int = 720,
        framerate: int = 30,
        tools: list[llm.Tool] | None = None,
        chat_enabled: bool = True,
    ) -> None:
        self._url = url
        self._llm = llm
        self._instructions = instructions
        self._width = width
        self._height = height
        self._framerate = framerate
        self._extra_tools = tools or []
        self._chat_enabled = chat_enabled

        self._room: rtc.Room | None = None
        self._browser_ctx: BrowserContext | None = None
        self._page: BrowserPage | None = None
        self._session: BrowserSession | None = None
        self._computer_tool: ComputerTool | None = None
        self._chat_ctx: llm.ChatContext | None = None

        self._agent_loop_task: asyncio.Task[None] | None = None
        self._pending_messages: asyncio.Queue[str] = asyncio.Queue()
        self._started = False

    @property
    def page(self) -> BrowserPage | None:
        return self._page

    @property
    def session(self) -> BrowserSession | None:
        return self._session

    @property
    def computer_tool(self) -> ComputerTool | None:
        return self._computer_tool

    @property
    def chat_ctx(self) -> llm.ChatContext | None:
        return self._chat_ctx

    async def start(self, *, room: rtc.Room) -> None:
        if self._started:
            return
        self._started = True
        self._room = room

        self._browser_ctx = BrowserContext(dev_mode=False)
        await self._browser_ctx.initialize()

        self._page = await self._browser_ctx.new_page(
            url=self._url,
            width=self._width,
            height=self._height,
            framerate=self._framerate,
        )

        self._session = BrowserSession(page=self._page, room=room)
        await self._session.start()

        from livekit.plugins.anthropic.computer_tool import ComputerTool

        self._page_actions = PageActions(page=self._page)
        self._computer_tool = ComputerTool(
            actions=self._page_actions,
            width=self._width,
            height=self._height,
        )

        @function_tool(name="navigate", description="Navigate the browser to a URL.")
        async def _navigate(url: str) -> None:
            pass

        @function_tool(name="go_back", description="Go back to the previous page.")
        async def _go_back() -> None:
            pass

        @function_tool(name="go_forward", description="Go forward to the next page.")
        async def _go_forward() -> None:
            pass

        self._nav_tools: list[llm.Tool] = [_navigate, _go_back, _go_forward]

        self._chat_ctx = llm.ChatContext()
        self._chat_ctx.add_message(role="system", content=self._instructions)

        await self._session.reclaim_agent_focus()

        if self._chat_enabled:

            @room.on("data_received")
            def _on_chat_data(packet: rtc.DataPacket) -> None:
                if packet.topic != _CHAT_TOPIC:
                    return
                try:
                    data = json.loads(packet.data)
                    text = data.get("text", "")
                    if text:
                        self._pending_messages.put_nowait(text)
                except (json.JSONDecodeError, UnicodeDecodeError):
                    pass

            self._on_chat_data = _on_chat_data

        self._agent_loop_task = asyncio.create_task(self._agent_loop())

    async def send_message(self, text: str) -> None:
        self._pending_messages.put_nowait(text)

    async def _agent_loop(self) -> None:
        assert self._chat_ctx is not None
        assert self._computer_tool is not None
        assert self._session is not None

        while True:
            try:
                text = await self._pending_messages.get()

                if self._session.agent_interrupted.is_set():
                    self._session.agent_interrupted.clear()
                    await self._session.reclaim_agent_focus()

                self._chat_ctx.add_message(role="user", content=text)

                await self._send_status("thinking")

                await self._run_llm_loop()

                await self._send_cursor_hide()
                await self._send_status("idle")

            except asyncio.CancelledError:
                break
            except Exception:
                logger.exception("error in agent loop")
                await self._send_cursor_hide()
                await self._send_status("idle")

    async def _run_llm_loop(self) -> None:
        assert self._chat_ctx is not None
        assert self._computer_tool is not None
        assert self._session is not None

        all_tools: list[llm.Tool] = [
            *self._computer_tool.tools,
            *self._nav_tools,
            *self._extra_tools,
        ]
        tool_ctx = llm.ToolContext(all_tools)

        while True:
            if self._session.agent_interrupted.is_set():
                logger.info("agent interrupted by human, pausing")
                await self._send_chat("(paused — you have control)")
                return

            response = await self._llm.chat(
                chat_ctx=self._chat_ctx,
                tools=all_tools,
            ).collect()

            if response.text:
                self._chat_ctx.add_message(role="assistant", content=response.text)
                await self._send_chat(response.text)

            if not response.tool_calls:
                return

            for tc in response.tool_calls:
                if self._session.agent_interrupted.is_set():
                    logger.info("agent interrupted between tool calls")
                    await self._send_chat("(paused — you have control)")
                    return

                if tc.name == "computer":
                    import json as _json

                    args = _json.loads(tc.arguments or "{}")
                    action = args.pop("action", "screenshot")

                    # Broadcast cursor position for frontend overlay
                    coord = args.get("coordinate")
                    if coord and len(coord) == 2:
                        await self._send_cursor_position(int(coord[0]), int(coord[1]), action)

                    await self._send_status("acting")

                    screenshot_content = await self._computer_tool.execute(action, **args)

                    # Wait for page to settle after clicks/typing
                    if action in (
                        "left_click",
                        "middle_click",
                        "key",
                        "type",
                    ):
                        await asyncio.sleep(_POST_ACTION_DELAY)

                    fnc_call = llm.FunctionCall(
                        call_id=tc.call_id,
                        name=tc.name,
                        arguments=tc.arguments or "{}",
                    )
                    fnc_output = llm.FunctionCallOutput(
                        call_id=tc.call_id,
                        name=tc.name,
                        output=json.dumps(screenshot_content),
                        is_error=False,
                    )
                    self._chat_ctx.items.append(fnc_call)
                    self._chat_ctx.items.append(fnc_output)
                elif tc.name in ("navigate", "go_back", "go_forward"):
                    await self._send_status("acting")
                    if tc.name == "navigate":
                        import json as _json

                        url = _json.loads(tc.arguments or "{}").get("url", "")
                        await self._page_actions.navigate(url)
                    elif tc.name == "go_back":
                        await self._page_actions.go_back()
                    else:
                        await self._page_actions.go_forward()
                    await asyncio.sleep(_POST_ACTION_DELAY)

                    screenshot_content = _screenshot_content(self._page_actions)
                    fnc_call = llm.FunctionCall(
                        call_id=tc.call_id,
                        name=tc.name,
                        arguments=tc.arguments or "{}",
                    )
                    fnc_output = llm.FunctionCallOutput(
                        call_id=tc.call_id,
                        name=tc.name,
                        output=json.dumps(screenshot_content),
                        is_error=False,
                    )
                    self._chat_ctx.items.append(fnc_call)
                    self._chat_ctx.items.append(fnc_output)
                else:
                    result = await llm.execute_function_call(tc, tool_ctx)
                    self._chat_ctx.items.append(result.fnc_call)
                    if result.fnc_call_out:
                        self._chat_ctx.items.append(result.fnc_call_out)

            await self._send_status("thinking")

    async def _send_chat(self, text: str) -> None:
        if self._room is None:
            return
        payload = json.dumps({"text": text, "sender": "agent"}).encode()
        try:
            await self._room.local_participant.publish_data(
                payload, reliable=True, topic=_CHAT_TOPIC
            )
        except Exception:
            logger.debug("failed to send chat message")

    async def _send_status(self, status: str) -> None:
        if self._room is None:
            return
        payload = json.dumps({"status": status}).encode()
        try:
            await self._room.local_participant.publish_data(
                payload, reliable=True, topic=_STATUS_TOPIC
            )
        except Exception:
            logger.debug("failed to send status")

    async def _send_cursor_position(self, x: int, y: int, action: str) -> None:
        if self._room is None:
            return
        payload = json.dumps(
            {
                "x": x,
                "y": y,
                "action": action,
                "visible": True,
                "width": self._width,
                "height": self._height,
            }
        ).encode()
        try:
            await self._room.local_participant.publish_data(
                payload, reliable=True, topic=_CURSOR_TOPIC
            )
        except Exception:
            logger.debug("failed to send cursor position")

    async def _send_cursor_hide(self) -> None:
        if self._room is None:
            return
        payload = json.dumps({"visible": False}).encode()
        try:
            await self._room.local_participant.publish_data(
                payload, reliable=True, topic=_CURSOR_TOPIC
            )
        except Exception:
            pass

    async def aclose(self) -> None:
        if self._agent_loop_task:
            self._agent_loop_task.cancel()
            with contextlib.suppress(asyncio.CancelledError):
                await self._agent_loop_task

        if self._computer_tool:
            self._computer_tool.aclose()

        if self._session:
            await self._session.aclose()

        if self._page:
            await self._page.aclose()

        if self._browser_ctx:
            await self._browser_ctx.aclose()

        if self._room and hasattr(self, "_on_chat_data"):
            self._room.off("data_received", self._on_chat_data)

Instance variables

prop chat_ctx : llm.ChatContext | None
Expand source code
@property
def chat_ctx(self) -> llm.ChatContext | None:
    return self._chat_ctx
prop computer_tool : ComputerTool | None
Expand source code
@property
def computer_tool(self) -> ComputerTool | None:
    return self._computer_tool
prop pageBrowserPage | None
Expand source code
@property
def page(self) -> BrowserPage | None:
    return self._page
prop sessionBrowserSession | None
Expand source code
@property
def session(self) -> BrowserSession | None:
    return self._session

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    if self._agent_loop_task:
        self._agent_loop_task.cancel()
        with contextlib.suppress(asyncio.CancelledError):
            await self._agent_loop_task

    if self._computer_tool:
        self._computer_tool.aclose()

    if self._session:
        await self._session.aclose()

    if self._page:
        await self._page.aclose()

    if self._browser_ctx:
        await self._browser_ctx.aclose()

    if self._room and hasattr(self, "_on_chat_data"):
        self._room.off("data_received", self._on_chat_data)
async def send_message(self, text: str) ‑> None
Expand source code
async def send_message(self, text: str) -> None:
    self._pending_messages.put_nowait(text)
async def start(self, *, room: rtc.Room) ‑> None
Expand source code
async def start(self, *, room: rtc.Room) -> None:
    if self._started:
        return
    self._started = True
    self._room = room

    self._browser_ctx = BrowserContext(dev_mode=False)
    await self._browser_ctx.initialize()

    self._page = await self._browser_ctx.new_page(
        url=self._url,
        width=self._width,
        height=self._height,
        framerate=self._framerate,
    )

    self._session = BrowserSession(page=self._page, room=room)
    await self._session.start()

    from livekit.plugins.anthropic.computer_tool import ComputerTool

    self._page_actions = PageActions(page=self._page)
    self._computer_tool = ComputerTool(
        actions=self._page_actions,
        width=self._width,
        height=self._height,
    )

    @function_tool(name="navigate", description="Navigate the browser to a URL.")
    async def _navigate(url: str) -> None:
        pass

    @function_tool(name="go_back", description="Go back to the previous page.")
    async def _go_back() -> None:
        pass

    @function_tool(name="go_forward", description="Go forward to the next page.")
    async def _go_forward() -> None:
        pass

    self._nav_tools: list[llm.Tool] = [_navigate, _go_back, _go_forward]

    self._chat_ctx = llm.ChatContext()
    self._chat_ctx.add_message(role="system", content=self._instructions)

    await self._session.reclaim_agent_focus()

    if self._chat_enabled:

        @room.on("data_received")
        def _on_chat_data(packet: rtc.DataPacket) -> None:
            if packet.topic != _CHAT_TOPIC:
                return
            try:
                data = json.loads(packet.data)
                text = data.get("text", "")
                if text:
                    self._pending_messages.put_nowait(text)
            except (json.JSONDecodeError, UnicodeDecodeError):
                pass

        self._on_chat_data = _on_chat_data

    self._agent_loop_task = asyncio.create_task(self._agent_loop())
class BrowserContext (*, dev_mode: bool, remote_debugging_port: int = 0)
Expand source code
class BrowserContext:
    def __init__(self, *, dev_mode: bool, remote_debugging_port: int = 0) -> None:
        self._mp_ctx = mp.get_context("spawn")
        self._pages: dict[int, BrowserPage] = {}
        self._dev_mode = dev_mode
        self._initialized = False
        self._next_page_id = 1
        self._remote_debugging_port = remote_debugging_port
        self._closed_event = asyncio.Event()
        self._proc = None
        self._duplex = None
        self._main_atask = None
        self._root_cache_path = None

    async def initialize(self) -> None:
        from .download import get_resources_dir

        resources_dir = str(get_resources_dir())

        mp_pch, mp_cch = socket.socketpair()
        self._duplex = await AsyncDuplex.open(mp_pch)

        self._proc = self._mp_ctx.Process(target=proc_main.main, args=(mp_cch, resources_dir))

        # On Linux aarch64, libcef.so is too large for late TLS allocation.
        # Set LD_PRELOAD so the dynamic linker reserves TLS early.
        _preload_restore = None
        if sys.platform.startswith("linux"):
            libcef = os.path.join(resources_dir, "libcef.so")
            if os.path.exists(libcef):
                _preload_restore = os.environ.get("LD_PRELOAD")
                old = _preload_restore or ""
                os.environ["LD_PRELOAD"] = libcef + (":" + old if old else "")

        self._proc.start()

        if _preload_restore is not None:
            os.environ["LD_PRELOAD"] = _preload_restore
        elif "LD_PRELOAD" in os.environ and sys.platform.startswith("linux"):
            del os.environ["LD_PRELOAD"]
        mp_cch.close()

        try:
            await self._initialize_context(resources_dir)
        except BaseException:
            await self._cleanup_process()
            raise

    async def _initialize_context(self, resources_dir: str) -> None:
        if not self._remote_debugging_port:
            with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
                s.bind(("", 0))
                s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
                self._remote_debugging_port = s.getsockname()[1]

            logger.debug("using remote debugging port %d", self._remote_debugging_port)

        self._root_cache_path = tempfile.mkdtemp(prefix="lkcef_cache_")
        await channel.asend_message(
            self._duplex,
            proto.InitializeContextRequest(
                dev_mode=self._dev_mode,
                remote_debugging_port=self._remote_debugging_port,
                root_cache_path=self._root_cache_path,
            ),
        )
        resp = await channel.arecv_message(self._duplex, proto.IPC_MESSAGES)
        assert isinstance(resp, proto.ContextInitializedResponse)
        self._initialized = True
        logger.debug("browser context initialized", extra={"pid": self._proc.pid})

        self._main_atask = asyncio.create_task(self._main_task(self._duplex))

    async def _cleanup_process(self) -> None:
        if self._duplex:
            try:
                await self._duplex.aclose()
            except DuplexClosed:
                pass

        # Subprocess has its own 5s watchdog — just wait for it
        if self._proc and self._proc.is_alive():
            self._proc.join(timeout=15)
            if self._proc.is_alive():
                logger.warning("subprocess did not exit, killing")
                self._proc.kill()
                self._proc.join()

        if self._proc:
            self._proc.close()

        if self._root_cache_path:
            shutil.rmtree(self._root_cache_path, ignore_errors=True)

    @asynccontextmanager
    async def playwright(self, timeout: float | None = None):
        if not self._initialized:
            raise RuntimeError("BrowserContext not initialized")

        from playwright.async_api import async_playwright

        async with async_playwright() as p:
            url = f"http://localhost:{self._remote_debugging_port}"
            browser = await p.chromium.connect_over_cdp(url, timeout=timeout)
            try:
                yield browser
            finally:
                await browser.close()

    @_log_exceptions(_logger=logger)
    async def _main_task(self, duplex: AsyncDuplex) -> None:
        while True:
            try:
                msg = await channel.arecv_message(duplex, proto.IPC_MESSAGES)
            except DuplexClosed:
                break

            try:
                if isinstance(msg, proto.CreateBrowserResponse):
                    page = self._pages[msg.page_id]
                    await page._handle_created(msg)
                elif isinstance(msg, proto.AcquireAudioData):
                    page = self._pages[msg.page_id]
                    page._handle_audio_data(msg)
                elif isinstance(msg, proto.AcquirePaintData):
                    page = self._pages[msg.page_id]
                    await page._handle_paint(msg)
                elif isinstance(msg, proto.AudioStreamStarted):
                    pass
                elif isinstance(msg, proto.AudioStreamStopped):
                    pass
                elif isinstance(msg, proto.CursorChanged):
                    page = self._pages[msg.page_id]
                    page.emit("cursor_changed", msg.cursor_type)
                elif isinstance(msg, proto.UrlChanged):
                    page = self._pages[msg.page_id]
                    page.emit("url_changed", msg.url)
                elif isinstance(msg, proto.BrowserClosed):
                    page = self._pages[msg.page_id]
                    await page._handle_close(msg)
            except Exception:
                logger.exception("error handling IPC message %s", type(msg).__name__)

        # Subprocess exited — resolve any pending page futures so cleanup doesn't hang
        for page in self._pages.values():
            if not page._close_fut.done():
                page._close_fut.set_result(None)
            if not page._created_fut.done():
                page._created_fut.set_result(None)
        self._closed_event.set()

    async def new_page(
        self, *, url: str, width: int = 1920, height: int = 1080, framerate: int = 24
    ) -> BrowserPage:
        if not self._initialized:
            raise RuntimeError("BrowserContext not initialized")

        page_id = self._next_page_id
        self._next_page_id += 1
        page = BrowserPage(
            self._mp_ctx,
            _PageOptions(
                page_id=page_id,
                url=url,
                width=width,
                height=height,
                framerate=framerate,
            ),
            self._duplex,
        )
        self._pages[page_id] = page
        await page.start()
        return page

    async def aclose(self) -> None:
        if not self._initialized:
            return

        self._initialized = False

        # Close all pages (ensure shm cleanup even on errors)
        for page in list(self._pages.values()):
            try:
                await page.aclose()
            except Exception:
                page._cleanup_shm()

        # Cancel the main task
        if self._main_atask:
            self._main_atask.cancel()
            with contextlib.suppress(asyncio.CancelledError):
                await self._main_atask

        await self._cleanup_process()

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    if not self._initialized:
        return

    self._initialized = False

    # Close all pages (ensure shm cleanup even on errors)
    for page in list(self._pages.values()):
        try:
            await page.aclose()
        except Exception:
            page._cleanup_shm()

    # Cancel the main task
    if self._main_atask:
        self._main_atask.cancel()
        with contextlib.suppress(asyncio.CancelledError):
            await self._main_atask

    await self._cleanup_process()
async def initialize(self) ‑> None
Expand source code
async def initialize(self) -> None:
    from .download import get_resources_dir

    resources_dir = str(get_resources_dir())

    mp_pch, mp_cch = socket.socketpair()
    self._duplex = await AsyncDuplex.open(mp_pch)

    self._proc = self._mp_ctx.Process(target=proc_main.main, args=(mp_cch, resources_dir))

    # On Linux aarch64, libcef.so is too large for late TLS allocation.
    # Set LD_PRELOAD so the dynamic linker reserves TLS early.
    _preload_restore = None
    if sys.platform.startswith("linux"):
        libcef = os.path.join(resources_dir, "libcef.so")
        if os.path.exists(libcef):
            _preload_restore = os.environ.get("LD_PRELOAD")
            old = _preload_restore or ""
            os.environ["LD_PRELOAD"] = libcef + (":" + old if old else "")

    self._proc.start()

    if _preload_restore is not None:
        os.environ["LD_PRELOAD"] = _preload_restore
    elif "LD_PRELOAD" in os.environ and sys.platform.startswith("linux"):
        del os.environ["LD_PRELOAD"]
    mp_cch.close()

    try:
        await self._initialize_context(resources_dir)
    except BaseException:
        await self._cleanup_process()
        raise
async def new_page(self, *, url: str, width: int = 1920, height: int = 1080, framerate: int = 24) ‑> BrowserPage
Expand source code
async def new_page(
    self, *, url: str, width: int = 1920, height: int = 1080, framerate: int = 24
) -> BrowserPage:
    if not self._initialized:
        raise RuntimeError("BrowserContext not initialized")

    page_id = self._next_page_id
    self._next_page_id += 1
    page = BrowserPage(
        self._mp_ctx,
        _PageOptions(
            page_id=page_id,
            url=url,
            width=width,
            height=height,
            framerate=framerate,
        ),
        self._duplex,
    )
    self._pages[page_id] = page
    await page.start()
    return page
async def playwright(self, timeout: float | None = None)
Expand source code
@asynccontextmanager
async def playwright(self, timeout: float | None = None):
    if not self._initialized:
        raise RuntimeError("BrowserContext not initialized")

    from playwright.async_api import async_playwright

    async with async_playwright() as p:
        url = f"http://localhost:{self._remote_debugging_port}"
        browser = await p.chromium.connect_over_cdp(url, timeout=timeout)
        try:
            yield browser
        finally:
            await browser.close()
class BrowserPage (mp_ctx: mpc.SpawnContext, opts: _PageOptions, ctx_duplex: AsyncDuplex)
Expand source code
class BrowserPage(EventEmitter[EventTypes]):
    def __init__(
        self,
        mp_ctx: mpc.SpawnContext,
        opts: _PageOptions,
        ctx_duplex: AsyncDuplex,
    ) -> None:
        super().__init__()
        self._mp_ctx = mp_ctx
        self._opts = opts
        self._ctx_duplex = ctx_duplex

        self._view_width = 0
        self._view_height = 0

        self._created_fut = asyncio.Future()
        self._close_fut = asyncio.Future()
        self._closed = False

    @property
    def id(self) -> int:
        return self._opts.page_id

    _NUM_SHM_BUFFERS = 2

    async def start(self) -> None:
        import multiprocessing.shared_memory as mp_shm

        shm_size = proto.SHM_MAX_WIDTH * proto.SHM_MAX_HEIGHT * 4
        self._shms = []
        shm_names = []
        for _ in range(self._NUM_SHM_BUFFERS):
            name = f"lkcef_browser_{_shortuuid()}"
            shm = mp_shm.SharedMemory(create=True, size=shm_size, name=name)
            self._shms.append(shm)
            shm_names.append(name)

        self._framebuffer = rtc.VideoFrame(
            proto.SHM_MAX_WIDTH,
            proto.SHM_MAX_HEIGHT,
            rtc.VideoBufferType.BGRA,
            bytearray(proto.SHM_MAX_WIDTH * proto.SHM_MAX_HEIGHT * 4),
        )

        req = proto.CreateBrowserRequest(
            page_id=self._opts.page_id,
            width=self._opts.width,
            height=self._opts.height,
            shm_names=shm_names,
            url=self._opts.url,
            framerate=self._opts.framerate,
        )

        await channel.asend_message(self._ctx_duplex, req)

        # TODO(theomonnom): create timeout (would prevent never resolving futures if the
        #  browser process crashed for some reasons)
        await asyncio.shield(self._created_fut)

    async def aclose(self) -> None:
        if self._closed:
            return
        self._closed = True

        try:
            await channel.asend_message(
                self._ctx_duplex, proto.CloseBrowserRequest(page_id=self.id)
            )
            await asyncio.shield(self._close_fut)
        except DuplexClosed:
            pass  # Subprocess already closed (e.g., window closed manually)
        finally:
            self._cleanup_shm()

    def _cleanup_shm(self) -> None:
        for shm in self._shms:
            try:
                shm.close()
            except Exception:
                pass
            try:
                shm.unlink()
            except Exception:
                pass
        self._shms.clear()

    async def _handle_created(self, msg: proto.CreateBrowserResponse) -> None:
        self._created_fut.set_result(None)

    async def _handle_paint(self, acq: proto.AcquirePaintData) -> None:
        old_width = self._view_width
        old_height = self._view_height
        self._view_width = acq.width
        self._view_height = acq.height

        # TODO(theomonnom): remove hacky alloc-free resizing
        self._framebuffer._width = acq.width
        self._framebuffer._height = acq.height

        proto.copy_paint_data(acq, old_width, old_height, self._shms[acq.buffer_id].buf, self._framebuffer.data)

        release_paint = proto.ReleasePaintData(page_id=acq.page_id, buffer_id=acq.buffer_id)
        await channel.asend_message(self._ctx_duplex, release_paint)

        paint_data = PaintData(
            dirty_rects=acq.dirty_rects,
            frame=self._framebuffer,
            width=acq.width,
            height=acq.height,
        )
        self.emit("paint", paint_data)

    async def send_mouse_move(self, x: int, y: int, mouse_leave: bool = False) -> None:
        await channel.asend_message(
            self._ctx_duplex,
            proto.SendMouseMoveEvent(page_id=self.id, x=x, y=y, mouse_leave=mouse_leave),
        )

    async def send_mouse_click(
        self, x: int, y: int, button: int = 0, mouse_up: bool = False, click_count: int = 1
    ) -> None:
        await channel.asend_message(
            self._ctx_duplex,
            proto.SendMouseClickEvent(
                page_id=self.id, x=x, y=y, button_type=button, mouse_up=mouse_up, click_count=click_count
            ),
        )

    async def send_mouse_wheel(self, x: int, y: int, delta_x: int = 0, delta_y: int = 0) -> None:
        await channel.asend_message(
            self._ctx_duplex,
            proto.SendMouseWheelEvent(page_id=self.id, x=x, y=y, delta_x=delta_x, delta_y=delta_y),
        )

    async def send_key_event(
        self, type: int, modifiers: int, windows_key_code: int, native_key_code: int = 0, character: int = 0
    ) -> None:
        await channel.asend_message(
            self._ctx_duplex,
            proto.SendKeyEvent(
                page_id=self.id,
                type=type,
                modifiers=modifiers,
                windows_key_code=windows_key_code,
                native_key_code=native_key_code,
                character=character,
            ),
        )

    async def navigate(self, url: str) -> None:
        await channel.asend_message(
            self._ctx_duplex,
            proto.NavigateRequest(page_id=self.id, url=url),
        )

    async def go_back(self) -> None:
        await channel.asend_message(
            self._ctx_duplex,
            proto.GoBackRequest(page_id=self.id),
        )

    async def go_forward(self) -> None:
        await channel.asend_message(
            self._ctx_duplex,
            proto.GoForwardRequest(page_id=self.id),
        )

    async def send_focus_event(self, set_focus: bool = True) -> None:
        await channel.asend_message(
            self._ctx_duplex,
            proto.SendFocusEvent(page_id=self.id, set_focus=set_focus),
        )

    def _handle_audio_data(self, msg: proto.AcquireAudioData) -> None:
        if msg.channels <= 0 or msg.sample_rate <= 0 or msg.frames <= 0:
            return

        frame = rtc.AudioFrame(
            data=msg.data,
            sample_rate=msg.sample_rate,
            num_channels=msg.channels,
            samples_per_channel=msg.frames,
        )
        self.emit("audio", AudioData(frame=frame, pts=msg.pts))

    async def _handle_close(self, msg: proto.BrowserClosed) -> None:
        logger.debug("browser page closed", extra={"page_id": self.id})
        self._close_fut.set_result(None)

Abstract base class for generic types.

On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::

class Mapping[KT, VT]:
    def __getitem__(self, key: KT) -> VT:
        ...
    # Etc.

On older versions of Python, however, generic classes have to explicitly inherit from Generic.

After a class has been declared to be generic, it can then be used as follows::

def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT:
    try:
        return mapping[key]
    except KeyError:
        return default

Initialize a new instance of EventEmitter.

Ancestors

Instance variables

prop id : int
Expand source code
@property
def id(self) -> int:
    return self._opts.page_id

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    if self._closed:
        return
    self._closed = True

    try:
        await channel.asend_message(
            self._ctx_duplex, proto.CloseBrowserRequest(page_id=self.id)
        )
        await asyncio.shield(self._close_fut)
    except DuplexClosed:
        pass  # Subprocess already closed (e.g., window closed manually)
    finally:
        self._cleanup_shm()
async def go_back(self) ‑> None
Expand source code
async def go_back(self) -> None:
    await channel.asend_message(
        self._ctx_duplex,
        proto.GoBackRequest(page_id=self.id),
    )
async def go_forward(self) ‑> None
Expand source code
async def go_forward(self) -> None:
    await channel.asend_message(
        self._ctx_duplex,
        proto.GoForwardRequest(page_id=self.id),
    )
async def navigate(self, url: str) ‑> None
Expand source code
async def navigate(self, url: str) -> None:
    await channel.asend_message(
        self._ctx_duplex,
        proto.NavigateRequest(page_id=self.id, url=url),
    )
async def send_focus_event(self, set_focus: bool = True) ‑> None
Expand source code
async def send_focus_event(self, set_focus: bool = True) -> None:
    await channel.asend_message(
        self._ctx_duplex,
        proto.SendFocusEvent(page_id=self.id, set_focus=set_focus),
    )
async def send_key_event(self,
type: int,
modifiers: int,
windows_key_code: int,
native_key_code: int = 0,
character: int = 0) ‑> None
Expand source code
async def send_key_event(
    self, type: int, modifiers: int, windows_key_code: int, native_key_code: int = 0, character: int = 0
) -> None:
    await channel.asend_message(
        self._ctx_duplex,
        proto.SendKeyEvent(
            page_id=self.id,
            type=type,
            modifiers=modifiers,
            windows_key_code=windows_key_code,
            native_key_code=native_key_code,
            character=character,
        ),
    )
async def send_mouse_click(self, x: int, y: int, button: int = 0, mouse_up: bool = False, click_count: int = 1) ‑> None
Expand source code
async def send_mouse_click(
    self, x: int, y: int, button: int = 0, mouse_up: bool = False, click_count: int = 1
) -> None:
    await channel.asend_message(
        self._ctx_duplex,
        proto.SendMouseClickEvent(
            page_id=self.id, x=x, y=y, button_type=button, mouse_up=mouse_up, click_count=click_count
        ),
    )
async def send_mouse_move(self, x: int, y: int, mouse_leave: bool = False) ‑> None
Expand source code
async def send_mouse_move(self, x: int, y: int, mouse_leave: bool = False) -> None:
    await channel.asend_message(
        self._ctx_duplex,
        proto.SendMouseMoveEvent(page_id=self.id, x=x, y=y, mouse_leave=mouse_leave),
    )
async def send_mouse_wheel(self, x: int, y: int, delta_x: int = 0, delta_y: int = 0) ‑> None
Expand source code
async def send_mouse_wheel(self, x: int, y: int, delta_x: int = 0, delta_y: int = 0) -> None:
    await channel.asend_message(
        self._ctx_duplex,
        proto.SendMouseWheelEvent(page_id=self.id, x=x, y=y, delta_x=delta_x, delta_y=delta_y),
    )
async def start(self) ‑> None
Expand source code
async def start(self) -> None:
    import multiprocessing.shared_memory as mp_shm

    shm_size = proto.SHM_MAX_WIDTH * proto.SHM_MAX_HEIGHT * 4
    self._shms = []
    shm_names = []
    for _ in range(self._NUM_SHM_BUFFERS):
        name = f"lkcef_browser_{_shortuuid()}"
        shm = mp_shm.SharedMemory(create=True, size=shm_size, name=name)
        self._shms.append(shm)
        shm_names.append(name)

    self._framebuffer = rtc.VideoFrame(
        proto.SHM_MAX_WIDTH,
        proto.SHM_MAX_HEIGHT,
        rtc.VideoBufferType.BGRA,
        bytearray(proto.SHM_MAX_WIDTH * proto.SHM_MAX_HEIGHT * 4),
    )

    req = proto.CreateBrowserRequest(
        page_id=self._opts.page_id,
        width=self._opts.width,
        height=self._opts.height,
        shm_names=shm_names,
        url=self._opts.url,
        framerate=self._opts.framerate,
    )

    await channel.asend_message(self._ctx_duplex, req)

    # TODO(theomonnom): create timeout (would prevent never resolving futures if the
    #  browser process crashed for some reasons)
    await asyncio.shield(self._created_fut)

Inherited members

class BrowserSession (*,
page: BrowserPage,
room: rtc.Room)
Expand source code
class BrowserSession:
    def __init__(self, *, page: BrowserPage, room: rtc.Room) -> None:
        self._page = page
        self._room = room
        self._video_source: rtc.VideoSource | None = None
        self._audio_source: rtc.AudioSource | None = None
        self._video_track: rtc.LocalVideoTrack | None = None
        self._audio_track: rtc.LocalAudioTrack | None = None
        self._started = False
        self._last_frame: rtc.VideoFrame | None = None
        self._video_task: asyncio.Task[None] | None = None
        self._audio_init_task: asyncio.Task[None] | None = None
        self._audio_task: asyncio.Task[None] | None = None
        self._audio_queue: asyncio.Queue[rtc.AudioFrame] | None = None
        self._input_queue: asyncio.Queue[Any] = asyncio.Queue(maxsize=256)
        self._input_task: asyncio.Task[None] | None = None

        self._focus_identity: str | None = None

        # Event set when a human interrupts the agent's focus
        self._agent_interrupted = asyncio.Event()

    @property
    def focus_identity(self) -> str | None:
        return self._focus_identity

    @property
    def agent_interrupted(self) -> asyncio.Event:
        """Event that is set when a human takes focus from the agent."""
        return self._agent_interrupted

    def set_agent_focus(self, active: bool) -> None:
        """Grant or revoke browser focus for the AI agent."""
        if active:
            self._focus_identity = "__agent__"
            self._agent_interrupted.clear()
        elif self._focus_identity == "__agent__":
            self._focus_identity = None

    async def reclaim_agent_focus(self) -> None:
        """Reclaim focus for the agent and notify all participants."""
        self.set_agent_focus(True)
        await self._page.send_focus_event(True)
        await self._broadcast_focus()

    async def start(self) -> None:
        if self._started:
            return
        self._started = True

        opts = self._page._opts

        self._video_source = rtc.VideoSource(opts.width, opts.height, is_screencast=True)

        self._video_track = rtc.LocalVideoTrack.create_video_track(
            "browser-video", self._video_source
        )

        video_opts = rtc.TrackPublishOptions(
            source=rtc.TrackSource.SOURCE_SCREENSHARE,
            video_encoding=rtc.VideoEncoding(max_bitrate=8_000_000, max_framerate=opts.framerate),
            simulcast=False,
        )
        self._page.on("paint", self._on_paint)
        self._page.on("audio", self._on_audio)
        self._page.on("cursor_changed", self._on_cursor)
        self._page.on("url_changed", self._on_url_changed)

        await self._room.local_participant.publish_track(self._video_track, video_opts)

        self._video_task = asyncio.create_task(self._video_loop(opts.framerate))

        # Single persistent task for sending input events to subprocess
        self._input_task = asyncio.create_task(self._input_sender_loop())

        # Register RPC methods for focus management
        @self._room.local_participant.register_rpc_method("browser/request-focus")  # type: ignore[arg-type]
        async def _handle_request_focus(
            data: rtc.rpc.RpcInvocationData,
        ) -> str:
            if self._focus_identity is None:
                self._focus_identity = data.caller_identity
                await self._page.send_focus_event(True)
                await self._broadcast_focus()
                return json.dumps({"granted": True})

            # If agent has focus, allow human to interrupt
            if self._focus_identity == "__agent__":
                self._focus_identity = data.caller_identity
                self._agent_interrupted.set()
                await self._page.send_focus_event(True)
                await self._broadcast_focus()
                return json.dumps({"granted": True})

            return json.dumps({"granted": False, "holder": self._focus_identity})

        @self._room.local_participant.register_rpc_method("browser/release-focus")  # type: ignore[arg-type]
        async def _handle_release_focus(
            data: rtc.rpc.RpcInvocationData,
        ) -> str:
            if self._focus_identity == data.caller_identity:
                self._focus_identity = None
                await self._page.send_focus_event(False)
                await self._broadcast_focus()
                return json.dumps({"released": True})
            return json.dumps({"released": False})

        @self._room.local_participant.register_rpc_method("browser/navigate")  # type: ignore[arg-type]
        async def _handle_navigate(
            data: rtc.rpc.RpcInvocationData,
        ) -> str:
            payload = json.loads(data.payload)
            url = payload.get("url", "")
            if url:
                self._queue_input(self._page.navigate(url))
            return json.dumps({"status": "ok"})

        @self._room.local_participant.register_rpc_method("browser/go-back")  # type: ignore[arg-type]
        async def _handle_go_back(
            data: rtc.rpc.RpcInvocationData,
        ) -> str:
            self._queue_input(self._page.go_back())
            return json.dumps({"status": "ok"})

        @self._room.local_participant.register_rpc_method("browser/go-forward")  # type: ignore[arg-type]
        async def _handle_go_forward(
            data: rtc.rpc.RpcInvocationData,
        ) -> str:
            self._queue_input(self._page.go_forward())
            return json.dumps({"status": "ok"})

        # Listen for input data from participants
        @self._room.on("data_received")
        def _on_data_received(packet: rtc.DataPacket) -> None:
            if packet.topic != "browser-input":
                return
            if packet.participant is None:
                return
            if packet.participant.identity != self._focus_identity:
                return
            try:
                events = json.loads(packet.data)
            except (json.JSONDecodeError, UnicodeDecodeError):
                return
            for evt in events:
                self._dispatch_input(evt)

        self._on_data_received = _on_data_received

        # Release focus when the holder disconnects
        @self._room.on("participant_disconnected")
        def _on_participant_disconnected(participant: rtc.RemoteParticipant) -> None:
            if participant.identity == self._focus_identity:
                self._focus_identity = None
                self._queue_input(self._page.send_focus_event(False))
                self._queue_input(self._broadcast_focus())

        self._on_participant_disconnected = _on_participant_disconnected

    def _queue_input(self, coro: Any) -> None:
        try:
            self._input_queue.put_nowait(coro)
        except asyncio.QueueFull:
            pass

    async def _input_sender_loop(self) -> None:
        while True:
            coro = await self._input_queue.get()
            try:
                await coro
            except Exception:
                pass

    def _dispatch_input(self, evt: dict[str, Any]) -> None:
        t = evt.get("type")
        if t == "mousemove":
            self._queue_input(self._page.send_mouse_move(evt["x"], evt["y"]))
        elif t == "mousedown":
            self._queue_input(
                self._page.send_mouse_click(evt["x"], evt["y"], evt.get("button", 0), False, 1)
            )
        elif t == "mouseup":
            self._queue_input(
                self._page.send_mouse_click(evt["x"], evt["y"], evt.get("button", 0), True, 1)
            )
        elif t == "wheel":
            self._queue_input(
                self._page.send_mouse_wheel(
                    evt["x"], evt["y"], evt.get("deltaX", 0), evt.get("deltaY", 0)
                )
            )
        elif t == "keydown":
            wkc = evt["keyCode"]
            nkc = _NATIVE_KEY_CODES.get(wkc, 0)
            self._queue_input(self._page.send_key_event(0, evt.get("modifiers", 0), wkc, nkc, 0))
        elif t == "keyup":
            wkc = evt["keyCode"]
            nkc = _NATIVE_KEY_CODES.get(wkc, 0)
            self._queue_input(self._page.send_key_event(2, evt.get("modifiers", 0), wkc, 0, 0))
        elif t == "char":
            wkc = evt["keyCode"]
            nkc = _NATIVE_KEY_CODES.get(wkc, 0)
            self._queue_input(
                self._page.send_key_event(
                    3, evt.get("modifiers", 0), wkc, nkc, evt.get("charCode", 0)
                )
            )

    async def _broadcast_focus(self) -> None:
        payload = json.dumps({"identity": self._focus_identity}).encode()
        await self._room.local_participant.publish_data(
            payload, reliable=True, topic="browser-focus"
        )

    async def _init_audio(self, frame: rtc.AudioFrame) -> None:
        try:
            self._audio_source = rtc.AudioSource(
                frame.sample_rate, frame.num_channels, queue_size_ms=100
            )
            self._audio_track = rtc.LocalAudioTrack.create_audio_track(
                "browser-audio", self._audio_source
            )
            audio_opts = rtc.TrackPublishOptions(
                source=rtc.TrackSource.SOURCE_SCREENSHARE_AUDIO,
            )
            await self._room.local_participant.publish_track(self._audio_track, audio_opts)
            self._audio_queue = asyncio.Queue(maxsize=50)
            self._audio_queue.put_nowait(frame)
            self._audio_task = asyncio.create_task(self._audio_loop())
        except Exception:
            logger.exception("failed to initialize audio, will retry on next packet")
            self._audio_source = None
            self._audio_track = None
            self._audio_init_task = None

    async def _audio_loop(self) -> None:
        assert self._audio_queue is not None
        assert self._audio_source is not None
        while True:
            frame = await self._audio_queue.get()
            try:
                await self._audio_source.capture_frame(frame)
            except Exception:
                logger.exception("audio capture error")

    def _on_audio(self, data: AudioData) -> None:
        if self._audio_source is None:
            if self._audio_init_task is not None:
                return
            self._audio_init_task = asyncio.get_event_loop().create_task(
                self._init_audio(data.frame)
            )
            return

        if self._audio_queue is None:
            return

        try:
            self._audio_queue.put_nowait(data.frame)
        except asyncio.QueueFull:
            pass

    _CEF_CURSOR_MAP: dict[int, str] = {
        0: "default",  # CT_POINTER
        1: "crosshair",  # CT_CROSS
        2: "pointer",  # CT_HAND
        3: "text",  # CT_IBEAM
        4: "wait",  # CT_WAIT
        5: "help",  # CT_HELP
        6: "ew-resize",  # CT_EASTRESIZE
        7: "ns-resize",  # CT_NORTHRESIZE
        8: "nesw-resize",  # CT_NORTHEASTRESIZE
        9: "nwse-resize",  # CT_NORTHWESTRESIZE
        10: "ns-resize",  # CT_SOUTHRESIZE
        11: "nwse-resize",  # CT_SOUTHEASTRESIZE
        12: "nesw-resize",  # CT_SOUTHWESTRESIZE
        13: "ew-resize",  # CT_WESTRESIZE
        14: "ns-resize",  # CT_NORTHSOUTHRESIZE
        15: "ew-resize",  # CT_EASTWESTRESIZE
        16: "nesw-resize",  # CT_NORTHEASTSOUTHWESTRESIZE
        17: "nwse-resize",  # CT_NORTHWESTSOUTHEASTRESIZE
        18: "ew-resize",  # CT_COLUMNRESIZE
        19: "ns-resize",  # CT_ROWRESIZE
        20: "move",  # CT_MIDDLEPANNING
        28: "not-allowed",  # CT_NOTALLOWED
        29: "grab",  # CT_GRAB
        30: "grabbing",  # CT_GRABBING
        32: "move",  # CT_MOVE
    }

    def _on_url_changed(self, url: str) -> None:
        payload = json.dumps({"url": url}).encode()
        self._queue_input(
            self._room.local_participant.publish_data(payload, reliable=True, topic="browser-url")
        )

    def _on_cursor(self, cursor_type: int) -> None:
        css_cursor = self._CEF_CURSOR_MAP.get(cursor_type, "default")
        payload = json.dumps({"cursor": css_cursor}).encode()
        self._queue_input(
            self._room.local_participant.publish_data(
                payload, reliable=True, topic="cursor_changed"
            )
        )

    def _on_paint(self, data: PaintData) -> None:
        self._last_frame = data.frame

    async def _video_loop(self, fps: float) -> None:
        interval = 1.0 / fps
        loop = asyncio.get_event_loop()
        next_time = loop.time()
        while True:
            if self._last_frame is not None and self._video_source is not None:
                ts_us = time.monotonic_ns() // 1000
                self._video_source.capture_frame(self._last_frame, timestamp_us=ts_us)
            next_time += interval
            delay = next_time - loop.time()
            if delay > 0:
                await asyncio.sleep(delay)
            else:
                next_time = loop.time()

    async def aclose(self) -> None:
        if not self._started:
            return

        self._page.off("paint", self._on_paint)
        self._page.off("audio", self._on_audio)
        self._page.off("cursor_changed", self._on_cursor)
        self._page.off("url_changed", self._on_url_changed)

        if hasattr(self, "_on_data_received"):
            self._room.off("data_received", self._on_data_received)
        if hasattr(self, "_on_participant_disconnected"):
            self._room.off("participant_disconnected", self._on_participant_disconnected)

        if self._video_task:
            self._video_task.cancel()
            with contextlib.suppress(asyncio.CancelledError):
                await self._video_task

        if self._audio_task:
            self._audio_task.cancel()
            with contextlib.suppress(asyncio.CancelledError):
                await self._audio_task

        if self._input_task:
            self._input_task.cancel()
            with contextlib.suppress(asyncio.CancelledError):
                await self._input_task

        try:
            if self._video_track:
                await self._room.local_participant.unpublish_track(self._video_track.sid)
            if self._audio_track:
                await self._room.local_participant.unpublish_track(self._audio_track.sid)
        except Exception:
            pass  # tracks may already be gone if room disconnected first

Instance variables

prop agent_interrupted : asyncio.Event
Expand source code
@property
def agent_interrupted(self) -> asyncio.Event:
    """Event that is set when a human takes focus from the agent."""
    return self._agent_interrupted

Event that is set when a human takes focus from the agent.

prop focus_identity : str | None
Expand source code
@property
def focus_identity(self) -> str | None:
    return self._focus_identity

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    if not self._started:
        return

    self._page.off("paint", self._on_paint)
    self._page.off("audio", self._on_audio)
    self._page.off("cursor_changed", self._on_cursor)
    self._page.off("url_changed", self._on_url_changed)

    if hasattr(self, "_on_data_received"):
        self._room.off("data_received", self._on_data_received)
    if hasattr(self, "_on_participant_disconnected"):
        self._room.off("participant_disconnected", self._on_participant_disconnected)

    if self._video_task:
        self._video_task.cancel()
        with contextlib.suppress(asyncio.CancelledError):
            await self._video_task

    if self._audio_task:
        self._audio_task.cancel()
        with contextlib.suppress(asyncio.CancelledError):
            await self._audio_task

    if self._input_task:
        self._input_task.cancel()
        with contextlib.suppress(asyncio.CancelledError):
            await self._input_task

    try:
        if self._video_track:
            await self._room.local_participant.unpublish_track(self._video_track.sid)
        if self._audio_track:
            await self._room.local_participant.unpublish_track(self._audio_track.sid)
    except Exception:
        pass  # tracks may already be gone if room disconnected first
async def reclaim_agent_focus(self) ‑> None
Expand source code
async def reclaim_agent_focus(self) -> None:
    """Reclaim focus for the agent and notify all participants."""
    self.set_agent_focus(True)
    await self._page.send_focus_event(True)
    await self._broadcast_focus()

Reclaim focus for the agent and notify all participants.

def set_agent_focus(self, active: bool) ‑> None
Expand source code
def set_agent_focus(self, active: bool) -> None:
    """Grant or revoke browser focus for the AI agent."""
    if active:
        self._focus_identity = "__agent__"
        self._agent_interrupted.clear()
    elif self._focus_identity == "__agent__":
        self._focus_identity = None

Grant or revoke browser focus for the AI agent.

async def start(self) ‑> None
Expand source code
async def start(self) -> None:
    if self._started:
        return
    self._started = True

    opts = self._page._opts

    self._video_source = rtc.VideoSource(opts.width, opts.height, is_screencast=True)

    self._video_track = rtc.LocalVideoTrack.create_video_track(
        "browser-video", self._video_source
    )

    video_opts = rtc.TrackPublishOptions(
        source=rtc.TrackSource.SOURCE_SCREENSHARE,
        video_encoding=rtc.VideoEncoding(max_bitrate=8_000_000, max_framerate=opts.framerate),
        simulcast=False,
    )
    self._page.on("paint", self._on_paint)
    self._page.on("audio", self._on_audio)
    self._page.on("cursor_changed", self._on_cursor)
    self._page.on("url_changed", self._on_url_changed)

    await self._room.local_participant.publish_track(self._video_track, video_opts)

    self._video_task = asyncio.create_task(self._video_loop(opts.framerate))

    # Single persistent task for sending input events to subprocess
    self._input_task = asyncio.create_task(self._input_sender_loop())

    # Register RPC methods for focus management
    @self._room.local_participant.register_rpc_method("browser/request-focus")  # type: ignore[arg-type]
    async def _handle_request_focus(
        data: rtc.rpc.RpcInvocationData,
    ) -> str:
        if self._focus_identity is None:
            self._focus_identity = data.caller_identity
            await self._page.send_focus_event(True)
            await self._broadcast_focus()
            return json.dumps({"granted": True})

        # If agent has focus, allow human to interrupt
        if self._focus_identity == "__agent__":
            self._focus_identity = data.caller_identity
            self._agent_interrupted.set()
            await self._page.send_focus_event(True)
            await self._broadcast_focus()
            return json.dumps({"granted": True})

        return json.dumps({"granted": False, "holder": self._focus_identity})

    @self._room.local_participant.register_rpc_method("browser/release-focus")  # type: ignore[arg-type]
    async def _handle_release_focus(
        data: rtc.rpc.RpcInvocationData,
    ) -> str:
        if self._focus_identity == data.caller_identity:
            self._focus_identity = None
            await self._page.send_focus_event(False)
            await self._broadcast_focus()
            return json.dumps({"released": True})
        return json.dumps({"released": False})

    @self._room.local_participant.register_rpc_method("browser/navigate")  # type: ignore[arg-type]
    async def _handle_navigate(
        data: rtc.rpc.RpcInvocationData,
    ) -> str:
        payload = json.loads(data.payload)
        url = payload.get("url", "")
        if url:
            self._queue_input(self._page.navigate(url))
        return json.dumps({"status": "ok"})

    @self._room.local_participant.register_rpc_method("browser/go-back")  # type: ignore[arg-type]
    async def _handle_go_back(
        data: rtc.rpc.RpcInvocationData,
    ) -> str:
        self._queue_input(self._page.go_back())
        return json.dumps({"status": "ok"})

    @self._room.local_participant.register_rpc_method("browser/go-forward")  # type: ignore[arg-type]
    async def _handle_go_forward(
        data: rtc.rpc.RpcInvocationData,
    ) -> str:
        self._queue_input(self._page.go_forward())
        return json.dumps({"status": "ok"})

    # Listen for input data from participants
    @self._room.on("data_received")
    def _on_data_received(packet: rtc.DataPacket) -> None:
        if packet.topic != "browser-input":
            return
        if packet.participant is None:
            return
        if packet.participant.identity != self._focus_identity:
            return
        try:
            events = json.loads(packet.data)
        except (json.JSONDecodeError, UnicodeDecodeError):
            return
        for evt in events:
            self._dispatch_input(evt)

    self._on_data_received = _on_data_received

    # Release focus when the holder disconnects
    @self._room.on("participant_disconnected")
    def _on_participant_disconnected(participant: rtc.RemoteParticipant) -> None:
        if participant.identity == self._focus_identity:
            self._focus_identity = None
            self._queue_input(self._page.send_focus_event(False))
            self._queue_input(self._broadcast_focus())

    self._on_participant_disconnected = _on_participant_disconnected
class PageActions (*,
page: BrowserPage)
Expand source code
class PageActions:
    """Typed input API for a CEF BrowserPage with frame capture.

    Usage::

        actions = PageActions(page=page)
        await actions.left_click(100, 200)
        frame = actions.last_frame
    """

    def __init__(self, *, page: BrowserPage) -> None:
        self._page = page
        self._lock = asyncio.Lock()
        self._last_frame: rtc.VideoFrame | None = None
        self._page.on("paint", self._on_paint)

    def _on_paint(self, data: Any) -> None:
        self._last_frame = data.frame

    @property
    def last_frame(self) -> rtc.VideoFrame | None:
        return self._last_frame

    async def left_click(self, x: int, y: int, *, modifiers: str | None = None) -> None:
        async with self._lock:
            mod_keys = _parse_modifier_keys(modifiers)
            await self._page.send_mouse_move(x, y)
            await _press_modifiers(self._page, mod_keys)
            await self._page.send_mouse_click(x, y, 0, False, 1)
            await self._page.send_mouse_click(x, y, 0, True, 1)
            await _release_modifiers(self._page, mod_keys)

    async def right_click(self, x: int, y: int) -> None:
        async with self._lock:
            await self._page.send_mouse_move(x, y)
            await self._page.send_mouse_click(x, y, 2, False, 1)
            await self._page.send_mouse_click(x, y, 2, True, 1)

    async def double_click(self, x: int, y: int) -> None:
        async with self._lock:
            await self._page.send_mouse_move(x, y)
            await self._page.send_mouse_click(x, y, 0, False, 1)
            await self._page.send_mouse_click(x, y, 0, True, 1)
            await self._page.send_mouse_click(x, y, 0, False, 2)
            await self._page.send_mouse_click(x, y, 0, True, 2)

    async def triple_click(self, x: int, y: int) -> None:
        async with self._lock:
            await self._page.send_mouse_move(x, y)
            await self._page.send_mouse_click(x, y, 0, False, 1)
            await self._page.send_mouse_click(x, y, 0, True, 1)
            await self._page.send_mouse_click(x, y, 0, False, 2)
            await self._page.send_mouse_click(x, y, 0, True, 2)
            await self._page.send_mouse_click(x, y, 0, False, 3)
            await self._page.send_mouse_click(x, y, 0, True, 3)

    async def middle_click(self, x: int, y: int) -> None:
        async with self._lock:
            await self._page.send_mouse_move(x, y)
            await self._page.send_mouse_click(x, y, 1, False, 1)
            await self._page.send_mouse_click(x, y, 1, True, 1)

    async def mouse_move(self, x: int, y: int) -> None:
        async with self._lock:
            await self._page.send_mouse_move(x, y)

    async def left_click_drag(self, *, start_x: int, start_y: int, end_x: int, end_y: int) -> None:
        async with self._lock:
            await self._page.send_mouse_move(start_x, start_y)
            await self._page.send_mouse_click(start_x, start_y, 0, False, 1)
            await asyncio.sleep(0.05)
            await self._page.send_mouse_move(end_x, end_y)
            await asyncio.sleep(0.05)
            await self._page.send_mouse_click(end_x, end_y, 0, True, 1)

    async def left_mouse_down(self, x: int, y: int) -> None:
        async with self._lock:
            await self._page.send_mouse_move(x, y)
            await self._page.send_mouse_click(x, y, 0, False, 1)

    async def left_mouse_up(self, x: int, y: int) -> None:
        async with self._lock:
            await self._page.send_mouse_move(x, y)
            await self._page.send_mouse_click(x, y, 0, True, 1)

    async def scroll(
        self,
        x: int,
        y: int,
        *,
        direction: str = "down",
        amount: int = 3,
    ) -> None:
        async with self._lock:
            pixels = amount * 120

            delta_x, delta_y = 0, 0
            if direction == "down":
                delta_y = -pixels
            elif direction == "up":
                delta_y = pixels
            elif direction == "left":
                delta_x = pixels
            elif direction == "right":
                delta_x = -pixels

            await self._page.send_mouse_move(x, y)
            await self._page.send_mouse_wheel(x, y, delta_x, delta_y)

    async def type_text(self, text: str) -> None:
        async with self._lock:
            for ch in text:
                char_code = ord(ch)
                shifted = False

                if ch.isalpha():
                    vk = ord(ch.upper())
                    shifted = ch.isupper()
                elif ch in SHIFTED_CHAR_TO_VK:
                    vk = SHIFTED_CHAR_TO_VK[ch]
                    shifted = True
                else:
                    vk = KEY_NAME_TO_VK.get(ch, 0)

                if not vk:
                    # Unknown character (e.g. unicode) — CHAR-only
                    await self._page.send_key_event(CHAR, 0, 0, 0, char_code)
                    await asyncio.sleep(0.01)
                    continue

                modifiers = MOD_SHIFT if shifted else 0
                nkc = NATIVE_KEY_CODES.get(vk, 0)

                if shifted:
                    shift_nkc = NATIVE_KEY_CODES.get(VK_SHIFT, 0)
                    await self._page.send_key_event(RAWKEYDOWN, MOD_SHIFT, VK_SHIFT, shift_nkc, 0)

                await self._page.send_key_event(RAWKEYDOWN, modifiers, vk, nkc, 0)
                await self._page.send_key_event(CHAR, modifiers, char_code, nkc, char_code)
                await self._page.send_key_event(KEYUP, modifiers, vk, 0, 0)

                if shifted:
                    await self._page.send_key_event(KEYUP, 0, VK_SHIFT, 0, 0)

                await asyncio.sleep(0.01)

    async def key(self, text: str) -> None:
        async with self._lock:
            await _send_key_combo(self._page, text)

    async def hold_key(self, text: str, *, duration: float = 0.5) -> None:
        async with self._lock:
            keys = [k.strip().lower() for k in text.split("+")]

            modifiers = 0
            for k in keys:
                if k in MODIFIER_MAP:
                    modifiers |= MODIFIER_MAP[k]
                vk = KEY_NAME_TO_VK.get(k, 0)
                nkc = NATIVE_KEY_CODES.get(vk, 0)
                await self._page.send_key_event(RAWKEYDOWN, modifiers, vk, nkc, 0)

            await asyncio.sleep(duration)

            for k in reversed(keys):
                vk = KEY_NAME_TO_VK.get(k, 0)
                await self._page.send_key_event(KEYUP, 0, vk, 0, 0)

    async def wait(self) -> None:
        async with self._lock:
            await asyncio.sleep(1)

    async def navigate(self, url: str) -> None:
        async with self._lock:
            await self._page.navigate(url)

    async def go_back(self) -> None:
        async with self._lock:
            await self._page.go_back()

    async def go_forward(self) -> None:
        async with self._lock:
            await self._page.go_forward()

    def aclose(self) -> None:
        self._page.off("paint", self._on_paint)

Typed input API for a CEF BrowserPage with frame capture.

Usage::

actions = PageActions(page=page)
await actions.left_click(100, 200)
frame = actions.last_frame

Instance variables

prop last_frame : rtc.VideoFrame | None
Expand source code
@property
def last_frame(self) -> rtc.VideoFrame | None:
    return self._last_frame

Methods

def aclose(self) ‑> None
Expand source code
def aclose(self) -> None:
    self._page.off("paint", self._on_paint)
async def double_click(self, x: int, y: int) ‑> None
Expand source code
async def double_click(self, x: int, y: int) -> None:
    async with self._lock:
        await self._page.send_mouse_move(x, y)
        await self._page.send_mouse_click(x, y, 0, False, 1)
        await self._page.send_mouse_click(x, y, 0, True, 1)
        await self._page.send_mouse_click(x, y, 0, False, 2)
        await self._page.send_mouse_click(x, y, 0, True, 2)
async def go_back(self) ‑> None
Expand source code
async def go_back(self) -> None:
    async with self._lock:
        await self._page.go_back()
async def go_forward(self) ‑> None
Expand source code
async def go_forward(self) -> None:
    async with self._lock:
        await self._page.go_forward()
async def hold_key(self, text: str, *, duration: float = 0.5) ‑> None
Expand source code
async def hold_key(self, text: str, *, duration: float = 0.5) -> None:
    async with self._lock:
        keys = [k.strip().lower() for k in text.split("+")]

        modifiers = 0
        for k in keys:
            if k in MODIFIER_MAP:
                modifiers |= MODIFIER_MAP[k]
            vk = KEY_NAME_TO_VK.get(k, 0)
            nkc = NATIVE_KEY_CODES.get(vk, 0)
            await self._page.send_key_event(RAWKEYDOWN, modifiers, vk, nkc, 0)

        await asyncio.sleep(duration)

        for k in reversed(keys):
            vk = KEY_NAME_TO_VK.get(k, 0)
            await self._page.send_key_event(KEYUP, 0, vk, 0, 0)
async def key(self, text: str) ‑> None
Expand source code
async def key(self, text: str) -> None:
    async with self._lock:
        await _send_key_combo(self._page, text)
async def left_click(self, x: int, y: int, *, modifiers: str | None = None) ‑> None
Expand source code
async def left_click(self, x: int, y: int, *, modifiers: str | None = None) -> None:
    async with self._lock:
        mod_keys = _parse_modifier_keys(modifiers)
        await self._page.send_mouse_move(x, y)
        await _press_modifiers(self._page, mod_keys)
        await self._page.send_mouse_click(x, y, 0, False, 1)
        await self._page.send_mouse_click(x, y, 0, True, 1)
        await _release_modifiers(self._page, mod_keys)
async def left_click_drag(self, *, start_x: int, start_y: int, end_x: int, end_y: int) ‑> None
Expand source code
async def left_click_drag(self, *, start_x: int, start_y: int, end_x: int, end_y: int) -> None:
    async with self._lock:
        await self._page.send_mouse_move(start_x, start_y)
        await self._page.send_mouse_click(start_x, start_y, 0, False, 1)
        await asyncio.sleep(0.05)
        await self._page.send_mouse_move(end_x, end_y)
        await asyncio.sleep(0.05)
        await self._page.send_mouse_click(end_x, end_y, 0, True, 1)
async def left_mouse_down(self, x: int, y: int) ‑> None
Expand source code
async def left_mouse_down(self, x: int, y: int) -> None:
    async with self._lock:
        await self._page.send_mouse_move(x, y)
        await self._page.send_mouse_click(x, y, 0, False, 1)
async def left_mouse_up(self, x: int, y: int) ‑> None
Expand source code
async def left_mouse_up(self, x: int, y: int) -> None:
    async with self._lock:
        await self._page.send_mouse_move(x, y)
        await self._page.send_mouse_click(x, y, 0, True, 1)
async def middle_click(self, x: int, y: int) ‑> None
Expand source code
async def middle_click(self, x: int, y: int) -> None:
    async with self._lock:
        await self._page.send_mouse_move(x, y)
        await self._page.send_mouse_click(x, y, 1, False, 1)
        await self._page.send_mouse_click(x, y, 1, True, 1)
async def mouse_move(self, x: int, y: int) ‑> None
Expand source code
async def mouse_move(self, x: int, y: int) -> None:
    async with self._lock:
        await self._page.send_mouse_move(x, y)
async def navigate(self, url: str) ‑> None
Expand source code
async def navigate(self, url: str) -> None:
    async with self._lock:
        await self._page.navigate(url)
async def right_click(self, x: int, y: int) ‑> None
Expand source code
async def right_click(self, x: int, y: int) -> None:
    async with self._lock:
        await self._page.send_mouse_move(x, y)
        await self._page.send_mouse_click(x, y, 2, False, 1)
        await self._page.send_mouse_click(x, y, 2, True, 1)
async def scroll(self, x: int, y: int, *, direction: str = 'down', amount: int = 3) ‑> None
Expand source code
async def scroll(
    self,
    x: int,
    y: int,
    *,
    direction: str = "down",
    amount: int = 3,
) -> None:
    async with self._lock:
        pixels = amount * 120

        delta_x, delta_y = 0, 0
        if direction == "down":
            delta_y = -pixels
        elif direction == "up":
            delta_y = pixels
        elif direction == "left":
            delta_x = pixels
        elif direction == "right":
            delta_x = -pixels

        await self._page.send_mouse_move(x, y)
        await self._page.send_mouse_wheel(x, y, delta_x, delta_y)
async def triple_click(self, x: int, y: int) ‑> None
Expand source code
async def triple_click(self, x: int, y: int) -> None:
    async with self._lock:
        await self._page.send_mouse_move(x, y)
        await self._page.send_mouse_click(x, y, 0, False, 1)
        await self._page.send_mouse_click(x, y, 0, True, 1)
        await self._page.send_mouse_click(x, y, 0, False, 2)
        await self._page.send_mouse_click(x, y, 0, True, 2)
        await self._page.send_mouse_click(x, y, 0, False, 3)
        await self._page.send_mouse_click(x, y, 0, True, 3)
async def type_text(self, text: str) ‑> None
Expand source code
async def type_text(self, text: str) -> None:
    async with self._lock:
        for ch in text:
            char_code = ord(ch)
            shifted = False

            if ch.isalpha():
                vk = ord(ch.upper())
                shifted = ch.isupper()
            elif ch in SHIFTED_CHAR_TO_VK:
                vk = SHIFTED_CHAR_TO_VK[ch]
                shifted = True
            else:
                vk = KEY_NAME_TO_VK.get(ch, 0)

            if not vk:
                # Unknown character (e.g. unicode) — CHAR-only
                await self._page.send_key_event(CHAR, 0, 0, 0, char_code)
                await asyncio.sleep(0.01)
                continue

            modifiers = MOD_SHIFT if shifted else 0
            nkc = NATIVE_KEY_CODES.get(vk, 0)

            if shifted:
                shift_nkc = NATIVE_KEY_CODES.get(VK_SHIFT, 0)
                await self._page.send_key_event(RAWKEYDOWN, MOD_SHIFT, VK_SHIFT, shift_nkc, 0)

            await self._page.send_key_event(RAWKEYDOWN, modifiers, vk, nkc, 0)
            await self._page.send_key_event(CHAR, modifiers, char_code, nkc, char_code)
            await self._page.send_key_event(KEYUP, modifiers, vk, 0, 0)

            if shifted:
                await self._page.send_key_event(KEYUP, 0, VK_SHIFT, 0, 0)

            await asyncio.sleep(0.01)
async def wait(self) ‑> None
Expand source code
async def wait(self) -> None:
    async with self._lock:
        await asyncio.sleep(1)
class PaintData (dirty_rects: list[tuple[int, int, int, int]],
frame: rtc.VideoFrame,
width: int,
height: int)
Expand source code
@dataclass
class PaintData:
    dirty_rects: list[tuple[int, int, int, int]]
    frame: rtc.VideoFrame
    width: int
    height: int

PaintData(dirty_rects: 'list[tuple[int, int, int, int]]', frame: 'rtc.VideoFrame', width: 'int', height: 'int')

Instance variables

var dirty_rects : list[tuple[int, int, int, int]]
var frameVideoFrame
var height : int
var width : int