Module livekit.plugins.browser
Browser plugin for LiveKit Agents
Support for Chromium Embedded Framework (CEF).
Classes
class AudioData (frame: rtc.AudioFrame, pts: int)-
Expand source code
@dataclass class AudioData: frame: rtc.AudioFrame pts: intAudioData(frame: 'rtc.AudioFrame', pts: 'int')
Instance variables
var frame : AudioFramevar pts : int
class BrowserAgent (*,
url: str = 'https://www.google.com/',
llm: llm.LLM,
instructions: str = 'You are a helpful AI assistant that can browse the web. Use the computer tool to interact with the browser.',
width: int = 1280,
height: int = 720,
framerate: int = 30,
tools: list[llm.Tool] | None = None,
chat_enabled: bool = True)-
Expand source code
class BrowserAgent: def __init__( self, *, url: str = "https://www.google.com/", llm: llm.LLM, instructions: str = "You are a helpful AI assistant that can browse the web. Use the computer tool to interact with the browser.", width: int = 1280, height: int = 720, framerate: int = 30, tools: list[llm.Tool] | None = None, chat_enabled: bool = True, ) -> None: self._url = url self._llm = llm self._instructions = instructions self._width = width self._height = height self._framerate = framerate self._extra_tools = tools or [] self._chat_enabled = chat_enabled self._room: rtc.Room | None = None self._browser_ctx: BrowserContext | None = None self._page: BrowserPage | None = None self._session: BrowserSession | None = None self._computer_tool: ComputerTool | None = None self._chat_ctx: llm.ChatContext | None = None self._agent_loop_task: asyncio.Task[None] | None = None self._pending_messages: asyncio.Queue[str] = asyncio.Queue() self._started = False @property def page(self) -> BrowserPage | None: return self._page @property def session(self) -> BrowserSession | None: return self._session @property def computer_tool(self) -> ComputerTool | None: return self._computer_tool @property def chat_ctx(self) -> llm.ChatContext | None: return self._chat_ctx async def start(self, *, room: rtc.Room) -> None: if self._started: return self._started = True self._room = room self._browser_ctx = BrowserContext(dev_mode=False) await self._browser_ctx.initialize() self._page = await self._browser_ctx.new_page( url=self._url, width=self._width, height=self._height, framerate=self._framerate, ) self._session = BrowserSession(page=self._page, room=room) await self._session.start() from livekit.plugins.anthropic.computer_tool import ComputerTool self._page_actions = PageActions(page=self._page) self._computer_tool = ComputerTool( actions=self._page_actions, width=self._width, height=self._height, ) @function_tool(name="navigate", description="Navigate the browser to a URL.") async def _navigate(url: str) -> None: pass @function_tool(name="go_back", description="Go back to the previous page.") async def _go_back() -> None: pass @function_tool(name="go_forward", description="Go forward to the next page.") async def _go_forward() -> None: pass self._nav_tools: list[llm.Tool] = [_navigate, _go_back, _go_forward] self._chat_ctx = llm.ChatContext() self._chat_ctx.add_message(role="system", content=self._instructions) await self._session.reclaim_agent_focus() if self._chat_enabled: @room.on("data_received") def _on_chat_data(packet: rtc.DataPacket) -> None: if packet.topic != _CHAT_TOPIC: return try: data = json.loads(packet.data) text = data.get("text", "") if text: self._pending_messages.put_nowait(text) except (json.JSONDecodeError, UnicodeDecodeError): pass self._on_chat_data = _on_chat_data self._agent_loop_task = asyncio.create_task(self._agent_loop()) async def send_message(self, text: str) -> None: self._pending_messages.put_nowait(text) async def _agent_loop(self) -> None: assert self._chat_ctx is not None assert self._computer_tool is not None assert self._session is not None while True: try: text = await self._pending_messages.get() if self._session.agent_interrupted.is_set(): self._session.agent_interrupted.clear() await self._session.reclaim_agent_focus() self._chat_ctx.add_message(role="user", content=text) await self._send_status("thinking") await self._run_llm_loop() await self._send_cursor_hide() await self._send_status("idle") except asyncio.CancelledError: break except Exception: logger.exception("error in agent loop") await self._send_cursor_hide() await self._send_status("idle") async def _run_llm_loop(self) -> None: assert self._chat_ctx is not None assert self._computer_tool is not None assert self._session is not None all_tools: list[llm.Tool] = [ *self._computer_tool.tools, *self._nav_tools, *self._extra_tools, ] tool_ctx = llm.ToolContext(all_tools) while True: if self._session.agent_interrupted.is_set(): logger.info("agent interrupted by human, pausing") await self._send_chat("(paused — you have control)") return response = await self._llm.chat( chat_ctx=self._chat_ctx, tools=all_tools, ).collect() if response.text: self._chat_ctx.add_message(role="assistant", content=response.text) await self._send_chat(response.text) if not response.tool_calls: return for tc in response.tool_calls: if self._session.agent_interrupted.is_set(): logger.info("agent interrupted between tool calls") await self._send_chat("(paused — you have control)") return if tc.name == "computer": import json as _json args = _json.loads(tc.arguments or "{}") action = args.pop("action", "screenshot") # Broadcast cursor position for frontend overlay coord = args.get("coordinate") if coord and len(coord) == 2: await self._send_cursor_position(int(coord[0]), int(coord[1]), action) await self._send_status("acting") screenshot_content = await self._computer_tool.execute(action, **args) # Wait for page to settle after clicks/typing if action in ( "left_click", "middle_click", "key", "type", ): await asyncio.sleep(_POST_ACTION_DELAY) fnc_call = llm.FunctionCall( call_id=tc.call_id, name=tc.name, arguments=tc.arguments or "{}", ) fnc_output = llm.FunctionCallOutput( call_id=tc.call_id, name=tc.name, output=json.dumps(screenshot_content), is_error=False, ) self._chat_ctx.items.append(fnc_call) self._chat_ctx.items.append(fnc_output) elif tc.name in ("navigate", "go_back", "go_forward"): await self._send_status("acting") if tc.name == "navigate": import json as _json url = _json.loads(tc.arguments or "{}").get("url", "") await self._page_actions.navigate(url) elif tc.name == "go_back": await self._page_actions.go_back() else: await self._page_actions.go_forward() await asyncio.sleep(_POST_ACTION_DELAY) screenshot_content = _screenshot_content(self._page_actions) fnc_call = llm.FunctionCall( call_id=tc.call_id, name=tc.name, arguments=tc.arguments or "{}", ) fnc_output = llm.FunctionCallOutput( call_id=tc.call_id, name=tc.name, output=json.dumps(screenshot_content), is_error=False, ) self._chat_ctx.items.append(fnc_call) self._chat_ctx.items.append(fnc_output) else: result = await llm.execute_function_call(tc, tool_ctx) self._chat_ctx.items.append(result.fnc_call) if result.fnc_call_out: self._chat_ctx.items.append(result.fnc_call_out) await self._send_status("thinking") async def _send_chat(self, text: str) -> None: if self._room is None: return payload = json.dumps({"text": text, "sender": "agent"}).encode() try: await self._room.local_participant.publish_data( payload, reliable=True, topic=_CHAT_TOPIC ) except Exception: logger.debug("failed to send chat message") async def _send_status(self, status: str) -> None: if self._room is None: return payload = json.dumps({"status": status}).encode() try: await self._room.local_participant.publish_data( payload, reliable=True, topic=_STATUS_TOPIC ) except Exception: logger.debug("failed to send status") async def _send_cursor_position(self, x: int, y: int, action: str) -> None: if self._room is None: return payload = json.dumps( { "x": x, "y": y, "action": action, "visible": True, "width": self._width, "height": self._height, } ).encode() try: await self._room.local_participant.publish_data( payload, reliable=True, topic=_CURSOR_TOPIC ) except Exception: logger.debug("failed to send cursor position") async def _send_cursor_hide(self) -> None: if self._room is None: return payload = json.dumps({"visible": False}).encode() try: await self._room.local_participant.publish_data( payload, reliable=True, topic=_CURSOR_TOPIC ) except Exception: pass async def aclose(self) -> None: if self._agent_loop_task: self._agent_loop_task.cancel() with contextlib.suppress(asyncio.CancelledError): await self._agent_loop_task if self._computer_tool: self._computer_tool.aclose() if self._session: await self._session.aclose() if self._page: await self._page.aclose() if self._browser_ctx: await self._browser_ctx.aclose() if self._room and hasattr(self, "_on_chat_data"): self._room.off("data_received", self._on_chat_data)Instance variables
prop chat_ctx : llm.ChatContext | None-
Expand source code
@property def chat_ctx(self) -> llm.ChatContext | None: return self._chat_ctx prop computer_tool : ComputerTool | None-
Expand source code
@property def computer_tool(self) -> ComputerTool | None: return self._computer_tool prop page : BrowserPage | None-
Expand source code
@property def page(self) -> BrowserPage | None: return self._page prop session : BrowserSession | None-
Expand source code
@property def session(self) -> BrowserSession | None: return self._session
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: if self._agent_loop_task: self._agent_loop_task.cancel() with contextlib.suppress(asyncio.CancelledError): await self._agent_loop_task if self._computer_tool: self._computer_tool.aclose() if self._session: await self._session.aclose() if self._page: await self._page.aclose() if self._browser_ctx: await self._browser_ctx.aclose() if self._room and hasattr(self, "_on_chat_data"): self._room.off("data_received", self._on_chat_data) async def send_message(self, text: str) ‑> None-
Expand source code
async def send_message(self, text: str) -> None: self._pending_messages.put_nowait(text) async def start(self, *, room: rtc.Room) ‑> None-
Expand source code
async def start(self, *, room: rtc.Room) -> None: if self._started: return self._started = True self._room = room self._browser_ctx = BrowserContext(dev_mode=False) await self._browser_ctx.initialize() self._page = await self._browser_ctx.new_page( url=self._url, width=self._width, height=self._height, framerate=self._framerate, ) self._session = BrowserSession(page=self._page, room=room) await self._session.start() from livekit.plugins.anthropic.computer_tool import ComputerTool self._page_actions = PageActions(page=self._page) self._computer_tool = ComputerTool( actions=self._page_actions, width=self._width, height=self._height, ) @function_tool(name="navigate", description="Navigate the browser to a URL.") async def _navigate(url: str) -> None: pass @function_tool(name="go_back", description="Go back to the previous page.") async def _go_back() -> None: pass @function_tool(name="go_forward", description="Go forward to the next page.") async def _go_forward() -> None: pass self._nav_tools: list[llm.Tool] = [_navigate, _go_back, _go_forward] self._chat_ctx = llm.ChatContext() self._chat_ctx.add_message(role="system", content=self._instructions) await self._session.reclaim_agent_focus() if self._chat_enabled: @room.on("data_received") def _on_chat_data(packet: rtc.DataPacket) -> None: if packet.topic != _CHAT_TOPIC: return try: data = json.loads(packet.data) text = data.get("text", "") if text: self._pending_messages.put_nowait(text) except (json.JSONDecodeError, UnicodeDecodeError): pass self._on_chat_data = _on_chat_data self._agent_loop_task = asyncio.create_task(self._agent_loop())
class BrowserContext (*, dev_mode: bool, remote_debugging_port: int = 0)-
Expand source code
class BrowserContext: def __init__(self, *, dev_mode: bool, remote_debugging_port: int = 0) -> None: self._mp_ctx = mp.get_context("spawn") self._pages: dict[int, BrowserPage] = {} self._dev_mode = dev_mode self._initialized = False self._next_page_id = 1 self._remote_debugging_port = remote_debugging_port self._closed_event = asyncio.Event() self._proc = None self._duplex = None self._main_atask = None self._root_cache_path = None async def initialize(self) -> None: from .download import get_resources_dir resources_dir = str(get_resources_dir()) mp_pch, mp_cch = socket.socketpair() self._duplex = await AsyncDuplex.open(mp_pch) self._proc = self._mp_ctx.Process(target=proc_main.main, args=(mp_cch, resources_dir)) # On Linux aarch64, libcef.so is too large for late TLS allocation. # Set LD_PRELOAD so the dynamic linker reserves TLS early. _preload_restore = None if sys.platform.startswith("linux"): libcef = os.path.join(resources_dir, "libcef.so") if os.path.exists(libcef): _preload_restore = os.environ.get("LD_PRELOAD") old = _preload_restore or "" os.environ["LD_PRELOAD"] = libcef + (":" + old if old else "") self._proc.start() if _preload_restore is not None: os.environ["LD_PRELOAD"] = _preload_restore elif "LD_PRELOAD" in os.environ and sys.platform.startswith("linux"): del os.environ["LD_PRELOAD"] mp_cch.close() try: await self._initialize_context(resources_dir) except BaseException: await self._cleanup_process() raise async def _initialize_context(self, resources_dir: str) -> None: if not self._remote_debugging_port: with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: s.bind(("", 0)) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self._remote_debugging_port = s.getsockname()[1] logger.debug("using remote debugging port %d", self._remote_debugging_port) self._root_cache_path = tempfile.mkdtemp(prefix="lkcef_cache_") await channel.asend_message( self._duplex, proto.InitializeContextRequest( dev_mode=self._dev_mode, remote_debugging_port=self._remote_debugging_port, root_cache_path=self._root_cache_path, ), ) resp = await channel.arecv_message(self._duplex, proto.IPC_MESSAGES) assert isinstance(resp, proto.ContextInitializedResponse) self._initialized = True logger.debug("browser context initialized", extra={"pid": self._proc.pid}) self._main_atask = asyncio.create_task(self._main_task(self._duplex)) async def _cleanup_process(self) -> None: if self._duplex: try: await self._duplex.aclose() except DuplexClosed: pass # Subprocess has its own 5s watchdog — just wait for it if self._proc and self._proc.is_alive(): self._proc.join(timeout=15) if self._proc.is_alive(): logger.warning("subprocess did not exit, killing") self._proc.kill() self._proc.join() if self._proc: self._proc.close() if self._root_cache_path: shutil.rmtree(self._root_cache_path, ignore_errors=True) @asynccontextmanager async def playwright(self, timeout: float | None = None): if not self._initialized: raise RuntimeError("BrowserContext not initialized") from playwright.async_api import async_playwright async with async_playwright() as p: url = f"http://localhost:{self._remote_debugging_port}" browser = await p.chromium.connect_over_cdp(url, timeout=timeout) try: yield browser finally: await browser.close() @_log_exceptions(_logger=logger) async def _main_task(self, duplex: AsyncDuplex) -> None: while True: try: msg = await channel.arecv_message(duplex, proto.IPC_MESSAGES) except DuplexClosed: break try: if isinstance(msg, proto.CreateBrowserResponse): page = self._pages[msg.page_id] await page._handle_created(msg) elif isinstance(msg, proto.AcquireAudioData): page = self._pages[msg.page_id] page._handle_audio_data(msg) elif isinstance(msg, proto.AcquirePaintData): page = self._pages[msg.page_id] await page._handle_paint(msg) elif isinstance(msg, proto.AudioStreamStarted): pass elif isinstance(msg, proto.AudioStreamStopped): pass elif isinstance(msg, proto.CursorChanged): page = self._pages[msg.page_id] page.emit("cursor_changed", msg.cursor_type) elif isinstance(msg, proto.UrlChanged): page = self._pages[msg.page_id] page.emit("url_changed", msg.url) elif isinstance(msg, proto.BrowserClosed): page = self._pages[msg.page_id] await page._handle_close(msg) except Exception: logger.exception("error handling IPC message %s", type(msg).__name__) # Subprocess exited — resolve any pending page futures so cleanup doesn't hang for page in self._pages.values(): if not page._close_fut.done(): page._close_fut.set_result(None) if not page._created_fut.done(): page._created_fut.set_result(None) self._closed_event.set() async def new_page( self, *, url: str, width: int = 1920, height: int = 1080, framerate: int = 24 ) -> BrowserPage: if not self._initialized: raise RuntimeError("BrowserContext not initialized") page_id = self._next_page_id self._next_page_id += 1 page = BrowserPage( self._mp_ctx, _PageOptions( page_id=page_id, url=url, width=width, height=height, framerate=framerate, ), self._duplex, ) self._pages[page_id] = page await page.start() return page async def aclose(self) -> None: if not self._initialized: return self._initialized = False # Close all pages (ensure shm cleanup even on errors) for page in list(self._pages.values()): try: await page.aclose() except Exception: page._cleanup_shm() # Cancel the main task if self._main_atask: self._main_atask.cancel() with contextlib.suppress(asyncio.CancelledError): await self._main_atask await self._cleanup_process()Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: if not self._initialized: return self._initialized = False # Close all pages (ensure shm cleanup even on errors) for page in list(self._pages.values()): try: await page.aclose() except Exception: page._cleanup_shm() # Cancel the main task if self._main_atask: self._main_atask.cancel() with contextlib.suppress(asyncio.CancelledError): await self._main_atask await self._cleanup_process() async def initialize(self) ‑> None-
Expand source code
async def initialize(self) -> None: from .download import get_resources_dir resources_dir = str(get_resources_dir()) mp_pch, mp_cch = socket.socketpair() self._duplex = await AsyncDuplex.open(mp_pch) self._proc = self._mp_ctx.Process(target=proc_main.main, args=(mp_cch, resources_dir)) # On Linux aarch64, libcef.so is too large for late TLS allocation. # Set LD_PRELOAD so the dynamic linker reserves TLS early. _preload_restore = None if sys.platform.startswith("linux"): libcef = os.path.join(resources_dir, "libcef.so") if os.path.exists(libcef): _preload_restore = os.environ.get("LD_PRELOAD") old = _preload_restore or "" os.environ["LD_PRELOAD"] = libcef + (":" + old if old else "") self._proc.start() if _preload_restore is not None: os.environ["LD_PRELOAD"] = _preload_restore elif "LD_PRELOAD" in os.environ and sys.platform.startswith("linux"): del os.environ["LD_PRELOAD"] mp_cch.close() try: await self._initialize_context(resources_dir) except BaseException: await self._cleanup_process() raise async def new_page(self, *, url: str, width: int = 1920, height: int = 1080, framerate: int = 24) ‑> BrowserPage-
Expand source code
async def new_page( self, *, url: str, width: int = 1920, height: int = 1080, framerate: int = 24 ) -> BrowserPage: if not self._initialized: raise RuntimeError("BrowserContext not initialized") page_id = self._next_page_id self._next_page_id += 1 page = BrowserPage( self._mp_ctx, _PageOptions( page_id=page_id, url=url, width=width, height=height, framerate=framerate, ), self._duplex, ) self._pages[page_id] = page await page.start() return page async def playwright(self, timeout: float | None = None)-
Expand source code
@asynccontextmanager async def playwright(self, timeout: float | None = None): if not self._initialized: raise RuntimeError("BrowserContext not initialized") from playwright.async_api import async_playwright async with async_playwright() as p: url = f"http://localhost:{self._remote_debugging_port}" browser = await p.chromium.connect_over_cdp(url, timeout=timeout) try: yield browser finally: await browser.close()
class BrowserPage (mp_ctx: mpc.SpawnContext, opts: _PageOptions, ctx_duplex: AsyncDuplex)-
Expand source code
class BrowserPage(EventEmitter[EventTypes]): def __init__( self, mp_ctx: mpc.SpawnContext, opts: _PageOptions, ctx_duplex: AsyncDuplex, ) -> None: super().__init__() self._mp_ctx = mp_ctx self._opts = opts self._ctx_duplex = ctx_duplex self._view_width = 0 self._view_height = 0 self._created_fut = asyncio.Future() self._close_fut = asyncio.Future() self._closed = False @property def id(self) -> int: return self._opts.page_id _NUM_SHM_BUFFERS = 2 async def start(self) -> None: import multiprocessing.shared_memory as mp_shm shm_size = proto.SHM_MAX_WIDTH * proto.SHM_MAX_HEIGHT * 4 self._shms = [] shm_names = [] for _ in range(self._NUM_SHM_BUFFERS): name = f"lkcef_browser_{_shortuuid()}" shm = mp_shm.SharedMemory(create=True, size=shm_size, name=name) self._shms.append(shm) shm_names.append(name) self._framebuffer = rtc.VideoFrame( proto.SHM_MAX_WIDTH, proto.SHM_MAX_HEIGHT, rtc.VideoBufferType.BGRA, bytearray(proto.SHM_MAX_WIDTH * proto.SHM_MAX_HEIGHT * 4), ) req = proto.CreateBrowserRequest( page_id=self._opts.page_id, width=self._opts.width, height=self._opts.height, shm_names=shm_names, url=self._opts.url, framerate=self._opts.framerate, ) await channel.asend_message(self._ctx_duplex, req) # TODO(theomonnom): create timeout (would prevent never resolving futures if the # browser process crashed for some reasons) await asyncio.shield(self._created_fut) async def aclose(self) -> None: if self._closed: return self._closed = True try: await channel.asend_message( self._ctx_duplex, proto.CloseBrowserRequest(page_id=self.id) ) await asyncio.shield(self._close_fut) except DuplexClosed: pass # Subprocess already closed (e.g., window closed manually) finally: self._cleanup_shm() def _cleanup_shm(self) -> None: for shm in self._shms: try: shm.close() except Exception: pass try: shm.unlink() except Exception: pass self._shms.clear() async def _handle_created(self, msg: proto.CreateBrowserResponse) -> None: self._created_fut.set_result(None) async def _handle_paint(self, acq: proto.AcquirePaintData) -> None: old_width = self._view_width old_height = self._view_height self._view_width = acq.width self._view_height = acq.height # TODO(theomonnom): remove hacky alloc-free resizing self._framebuffer._width = acq.width self._framebuffer._height = acq.height proto.copy_paint_data(acq, old_width, old_height, self._shms[acq.buffer_id].buf, self._framebuffer.data) release_paint = proto.ReleasePaintData(page_id=acq.page_id, buffer_id=acq.buffer_id) await channel.asend_message(self._ctx_duplex, release_paint) paint_data = PaintData( dirty_rects=acq.dirty_rects, frame=self._framebuffer, width=acq.width, height=acq.height, ) self.emit("paint", paint_data) async def send_mouse_move(self, x: int, y: int, mouse_leave: bool = False) -> None: await channel.asend_message( self._ctx_duplex, proto.SendMouseMoveEvent(page_id=self.id, x=x, y=y, mouse_leave=mouse_leave), ) async def send_mouse_click( self, x: int, y: int, button: int = 0, mouse_up: bool = False, click_count: int = 1 ) -> None: await channel.asend_message( self._ctx_duplex, proto.SendMouseClickEvent( page_id=self.id, x=x, y=y, button_type=button, mouse_up=mouse_up, click_count=click_count ), ) async def send_mouse_wheel(self, x: int, y: int, delta_x: int = 0, delta_y: int = 0) -> None: await channel.asend_message( self._ctx_duplex, proto.SendMouseWheelEvent(page_id=self.id, x=x, y=y, delta_x=delta_x, delta_y=delta_y), ) async def send_key_event( self, type: int, modifiers: int, windows_key_code: int, native_key_code: int = 0, character: int = 0 ) -> None: await channel.asend_message( self._ctx_duplex, proto.SendKeyEvent( page_id=self.id, type=type, modifiers=modifiers, windows_key_code=windows_key_code, native_key_code=native_key_code, character=character, ), ) async def navigate(self, url: str) -> None: await channel.asend_message( self._ctx_duplex, proto.NavigateRequest(page_id=self.id, url=url), ) async def go_back(self) -> None: await channel.asend_message( self._ctx_duplex, proto.GoBackRequest(page_id=self.id), ) async def go_forward(self) -> None: await channel.asend_message( self._ctx_duplex, proto.GoForwardRequest(page_id=self.id), ) async def send_focus_event(self, set_focus: bool = True) -> None: await channel.asend_message( self._ctx_duplex, proto.SendFocusEvent(page_id=self.id, set_focus=set_focus), ) def _handle_audio_data(self, msg: proto.AcquireAudioData) -> None: if msg.channels <= 0 or msg.sample_rate <= 0 or msg.frames <= 0: return frame = rtc.AudioFrame( data=msg.data, sample_rate=msg.sample_rate, num_channels=msg.channels, samples_per_channel=msg.frames, ) self.emit("audio", AudioData(frame=frame, pts=msg.pts)) async def _handle_close(self, msg: proto.BrowserClosed) -> None: logger.debug("browser page closed", extra={"page_id": self.id}) self._close_fut.set_result(None)Abstract base class for generic types.
On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::
class Mapping[KT, VT]: def __getitem__(self, key: KT) -> VT: ... # Etc.On older versions of Python, however, generic classes have to explicitly inherit from Generic.
After a class has been declared to be generic, it can then be used as follows::
def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return defaultInitialize a new instance of EventEmitter.
Ancestors
- EventEmitter
- typing.Generic
Instance variables
prop id : int-
Expand source code
@property def id(self) -> int: return self._opts.page_id
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: if self._closed: return self._closed = True try: await channel.asend_message( self._ctx_duplex, proto.CloseBrowserRequest(page_id=self.id) ) await asyncio.shield(self._close_fut) except DuplexClosed: pass # Subprocess already closed (e.g., window closed manually) finally: self._cleanup_shm() async def go_back(self) ‑> None-
Expand source code
async def go_back(self) -> None: await channel.asend_message( self._ctx_duplex, proto.GoBackRequest(page_id=self.id), ) async def go_forward(self) ‑> None-
Expand source code
async def go_forward(self) -> None: await channel.asend_message( self._ctx_duplex, proto.GoForwardRequest(page_id=self.id), ) -
Expand source code
async def navigate(self, url: str) -> None: await channel.asend_message( self._ctx_duplex, proto.NavigateRequest(page_id=self.id, url=url), ) async def send_focus_event(self, set_focus: bool = True) ‑> None-
Expand source code
async def send_focus_event(self, set_focus: bool = True) -> None: await channel.asend_message( self._ctx_duplex, proto.SendFocusEvent(page_id=self.id, set_focus=set_focus), ) async def send_key_event(self,
type: int,
modifiers: int,
windows_key_code: int,
native_key_code: int = 0,
character: int = 0) ‑> None-
Expand source code
async def send_key_event( self, type: int, modifiers: int, windows_key_code: int, native_key_code: int = 0, character: int = 0 ) -> None: await channel.asend_message( self._ctx_duplex, proto.SendKeyEvent( page_id=self.id, type=type, modifiers=modifiers, windows_key_code=windows_key_code, native_key_code=native_key_code, character=character, ), ) async def send_mouse_click(self, x: int, y: int, button: int = 0, mouse_up: bool = False, click_count: int = 1) ‑> None-
Expand source code
async def send_mouse_click( self, x: int, y: int, button: int = 0, mouse_up: bool = False, click_count: int = 1 ) -> None: await channel.asend_message( self._ctx_duplex, proto.SendMouseClickEvent( page_id=self.id, x=x, y=y, button_type=button, mouse_up=mouse_up, click_count=click_count ), ) async def send_mouse_move(self, x: int, y: int, mouse_leave: bool = False) ‑> None-
Expand source code
async def send_mouse_move(self, x: int, y: int, mouse_leave: bool = False) -> None: await channel.asend_message( self._ctx_duplex, proto.SendMouseMoveEvent(page_id=self.id, x=x, y=y, mouse_leave=mouse_leave), ) async def send_mouse_wheel(self, x: int, y: int, delta_x: int = 0, delta_y: int = 0) ‑> None-
Expand source code
async def send_mouse_wheel(self, x: int, y: int, delta_x: int = 0, delta_y: int = 0) -> None: await channel.asend_message( self._ctx_duplex, proto.SendMouseWheelEvent(page_id=self.id, x=x, y=y, delta_x=delta_x, delta_y=delta_y), ) async def start(self) ‑> None-
Expand source code
async def start(self) -> None: import multiprocessing.shared_memory as mp_shm shm_size = proto.SHM_MAX_WIDTH * proto.SHM_MAX_HEIGHT * 4 self._shms = [] shm_names = [] for _ in range(self._NUM_SHM_BUFFERS): name = f"lkcef_browser_{_shortuuid()}" shm = mp_shm.SharedMemory(create=True, size=shm_size, name=name) self._shms.append(shm) shm_names.append(name) self._framebuffer = rtc.VideoFrame( proto.SHM_MAX_WIDTH, proto.SHM_MAX_HEIGHT, rtc.VideoBufferType.BGRA, bytearray(proto.SHM_MAX_WIDTH * proto.SHM_MAX_HEIGHT * 4), ) req = proto.CreateBrowserRequest( page_id=self._opts.page_id, width=self._opts.width, height=self._opts.height, shm_names=shm_names, url=self._opts.url, framerate=self._opts.framerate, ) await channel.asend_message(self._ctx_duplex, req) # TODO(theomonnom): create timeout (would prevent never resolving futures if the # browser process crashed for some reasons) await asyncio.shield(self._created_fut)
Inherited members
class BrowserSession (*,
page: BrowserPage,
room: rtc.Room)-
Expand source code
class BrowserSession: def __init__(self, *, page: BrowserPage, room: rtc.Room) -> None: self._page = page self._room = room self._video_source: rtc.VideoSource | None = None self._audio_source: rtc.AudioSource | None = None self._video_track: rtc.LocalVideoTrack | None = None self._audio_track: rtc.LocalAudioTrack | None = None self._started = False self._last_frame: rtc.VideoFrame | None = None self._video_task: asyncio.Task[None] | None = None self._audio_init_task: asyncio.Task[None] | None = None self._audio_task: asyncio.Task[None] | None = None self._audio_queue: asyncio.Queue[rtc.AudioFrame] | None = None self._input_queue: asyncio.Queue[Any] = asyncio.Queue(maxsize=256) self._input_task: asyncio.Task[None] | None = None self._focus_identity: str | None = None # Event set when a human interrupts the agent's focus self._agent_interrupted = asyncio.Event() @property def focus_identity(self) -> str | None: return self._focus_identity @property def agent_interrupted(self) -> asyncio.Event: """Event that is set when a human takes focus from the agent.""" return self._agent_interrupted def set_agent_focus(self, active: bool) -> None: """Grant or revoke browser focus for the AI agent.""" if active: self._focus_identity = "__agent__" self._agent_interrupted.clear() elif self._focus_identity == "__agent__": self._focus_identity = None async def reclaim_agent_focus(self) -> None: """Reclaim focus for the agent and notify all participants.""" self.set_agent_focus(True) await self._page.send_focus_event(True) await self._broadcast_focus() async def start(self) -> None: if self._started: return self._started = True opts = self._page._opts self._video_source = rtc.VideoSource(opts.width, opts.height, is_screencast=True) self._video_track = rtc.LocalVideoTrack.create_video_track( "browser-video", self._video_source ) video_opts = rtc.TrackPublishOptions( source=rtc.TrackSource.SOURCE_SCREENSHARE, video_encoding=rtc.VideoEncoding(max_bitrate=8_000_000, max_framerate=opts.framerate), simulcast=False, ) self._page.on("paint", self._on_paint) self._page.on("audio", self._on_audio) self._page.on("cursor_changed", self._on_cursor) self._page.on("url_changed", self._on_url_changed) await self._room.local_participant.publish_track(self._video_track, video_opts) self._video_task = asyncio.create_task(self._video_loop(opts.framerate)) # Single persistent task for sending input events to subprocess self._input_task = asyncio.create_task(self._input_sender_loop()) # Register RPC methods for focus management @self._room.local_participant.register_rpc_method("browser/request-focus") # type: ignore[arg-type] async def _handle_request_focus( data: rtc.rpc.RpcInvocationData, ) -> str: if self._focus_identity is None: self._focus_identity = data.caller_identity await self._page.send_focus_event(True) await self._broadcast_focus() return json.dumps({"granted": True}) # If agent has focus, allow human to interrupt if self._focus_identity == "__agent__": self._focus_identity = data.caller_identity self._agent_interrupted.set() await self._page.send_focus_event(True) await self._broadcast_focus() return json.dumps({"granted": True}) return json.dumps({"granted": False, "holder": self._focus_identity}) @self._room.local_participant.register_rpc_method("browser/release-focus") # type: ignore[arg-type] async def _handle_release_focus( data: rtc.rpc.RpcInvocationData, ) -> str: if self._focus_identity == data.caller_identity: self._focus_identity = None await self._page.send_focus_event(False) await self._broadcast_focus() return json.dumps({"released": True}) return json.dumps({"released": False}) @self._room.local_participant.register_rpc_method("browser/navigate") # type: ignore[arg-type] async def _handle_navigate( data: rtc.rpc.RpcInvocationData, ) -> str: payload = json.loads(data.payload) url = payload.get("url", "") if url: self._queue_input(self._page.navigate(url)) return json.dumps({"status": "ok"}) @self._room.local_participant.register_rpc_method("browser/go-back") # type: ignore[arg-type] async def _handle_go_back( data: rtc.rpc.RpcInvocationData, ) -> str: self._queue_input(self._page.go_back()) return json.dumps({"status": "ok"}) @self._room.local_participant.register_rpc_method("browser/go-forward") # type: ignore[arg-type] async def _handle_go_forward( data: rtc.rpc.RpcInvocationData, ) -> str: self._queue_input(self._page.go_forward()) return json.dumps({"status": "ok"}) # Listen for input data from participants @self._room.on("data_received") def _on_data_received(packet: rtc.DataPacket) -> None: if packet.topic != "browser-input": return if packet.participant is None: return if packet.participant.identity != self._focus_identity: return try: events = json.loads(packet.data) except (json.JSONDecodeError, UnicodeDecodeError): return for evt in events: self._dispatch_input(evt) self._on_data_received = _on_data_received # Release focus when the holder disconnects @self._room.on("participant_disconnected") def _on_participant_disconnected(participant: rtc.RemoteParticipant) -> None: if participant.identity == self._focus_identity: self._focus_identity = None self._queue_input(self._page.send_focus_event(False)) self._queue_input(self._broadcast_focus()) self._on_participant_disconnected = _on_participant_disconnected def _queue_input(self, coro: Any) -> None: try: self._input_queue.put_nowait(coro) except asyncio.QueueFull: pass async def _input_sender_loop(self) -> None: while True: coro = await self._input_queue.get() try: await coro except Exception: pass def _dispatch_input(self, evt: dict[str, Any]) -> None: t = evt.get("type") if t == "mousemove": self._queue_input(self._page.send_mouse_move(evt["x"], evt["y"])) elif t == "mousedown": self._queue_input( self._page.send_mouse_click(evt["x"], evt["y"], evt.get("button", 0), False, 1) ) elif t == "mouseup": self._queue_input( self._page.send_mouse_click(evt["x"], evt["y"], evt.get("button", 0), True, 1) ) elif t == "wheel": self._queue_input( self._page.send_mouse_wheel( evt["x"], evt["y"], evt.get("deltaX", 0), evt.get("deltaY", 0) ) ) elif t == "keydown": wkc = evt["keyCode"] nkc = _NATIVE_KEY_CODES.get(wkc, 0) self._queue_input(self._page.send_key_event(0, evt.get("modifiers", 0), wkc, nkc, 0)) elif t == "keyup": wkc = evt["keyCode"] nkc = _NATIVE_KEY_CODES.get(wkc, 0) self._queue_input(self._page.send_key_event(2, evt.get("modifiers", 0), wkc, 0, 0)) elif t == "char": wkc = evt["keyCode"] nkc = _NATIVE_KEY_CODES.get(wkc, 0) self._queue_input( self._page.send_key_event( 3, evt.get("modifiers", 0), wkc, nkc, evt.get("charCode", 0) ) ) async def _broadcast_focus(self) -> None: payload = json.dumps({"identity": self._focus_identity}).encode() await self._room.local_participant.publish_data( payload, reliable=True, topic="browser-focus" ) async def _init_audio(self, frame: rtc.AudioFrame) -> None: try: self._audio_source = rtc.AudioSource( frame.sample_rate, frame.num_channels, queue_size_ms=100 ) self._audio_track = rtc.LocalAudioTrack.create_audio_track( "browser-audio", self._audio_source ) audio_opts = rtc.TrackPublishOptions( source=rtc.TrackSource.SOURCE_SCREENSHARE_AUDIO, ) await self._room.local_participant.publish_track(self._audio_track, audio_opts) self._audio_queue = asyncio.Queue(maxsize=50) self._audio_queue.put_nowait(frame) self._audio_task = asyncio.create_task(self._audio_loop()) except Exception: logger.exception("failed to initialize audio, will retry on next packet") self._audio_source = None self._audio_track = None self._audio_init_task = None async def _audio_loop(self) -> None: assert self._audio_queue is not None assert self._audio_source is not None while True: frame = await self._audio_queue.get() try: await self._audio_source.capture_frame(frame) except Exception: logger.exception("audio capture error") def _on_audio(self, data: AudioData) -> None: if self._audio_source is None: if self._audio_init_task is not None: return self._audio_init_task = asyncio.get_event_loop().create_task( self._init_audio(data.frame) ) return if self._audio_queue is None: return try: self._audio_queue.put_nowait(data.frame) except asyncio.QueueFull: pass _CEF_CURSOR_MAP: dict[int, str] = { 0: "default", # CT_POINTER 1: "crosshair", # CT_CROSS 2: "pointer", # CT_HAND 3: "text", # CT_IBEAM 4: "wait", # CT_WAIT 5: "help", # CT_HELP 6: "ew-resize", # CT_EASTRESIZE 7: "ns-resize", # CT_NORTHRESIZE 8: "nesw-resize", # CT_NORTHEASTRESIZE 9: "nwse-resize", # CT_NORTHWESTRESIZE 10: "ns-resize", # CT_SOUTHRESIZE 11: "nwse-resize", # CT_SOUTHEASTRESIZE 12: "nesw-resize", # CT_SOUTHWESTRESIZE 13: "ew-resize", # CT_WESTRESIZE 14: "ns-resize", # CT_NORTHSOUTHRESIZE 15: "ew-resize", # CT_EASTWESTRESIZE 16: "nesw-resize", # CT_NORTHEASTSOUTHWESTRESIZE 17: "nwse-resize", # CT_NORTHWESTSOUTHEASTRESIZE 18: "ew-resize", # CT_COLUMNRESIZE 19: "ns-resize", # CT_ROWRESIZE 20: "move", # CT_MIDDLEPANNING 28: "not-allowed", # CT_NOTALLOWED 29: "grab", # CT_GRAB 30: "grabbing", # CT_GRABBING 32: "move", # CT_MOVE } def _on_url_changed(self, url: str) -> None: payload = json.dumps({"url": url}).encode() self._queue_input( self._room.local_participant.publish_data(payload, reliable=True, topic="browser-url") ) def _on_cursor(self, cursor_type: int) -> None: css_cursor = self._CEF_CURSOR_MAP.get(cursor_type, "default") payload = json.dumps({"cursor": css_cursor}).encode() self._queue_input( self._room.local_participant.publish_data( payload, reliable=True, topic="cursor_changed" ) ) def _on_paint(self, data: PaintData) -> None: self._last_frame = data.frame async def _video_loop(self, fps: float) -> None: interval = 1.0 / fps loop = asyncio.get_event_loop() next_time = loop.time() while True: if self._last_frame is not None and self._video_source is not None: ts_us = time.monotonic_ns() // 1000 self._video_source.capture_frame(self._last_frame, timestamp_us=ts_us) next_time += interval delay = next_time - loop.time() if delay > 0: await asyncio.sleep(delay) else: next_time = loop.time() async def aclose(self) -> None: if not self._started: return self._page.off("paint", self._on_paint) self._page.off("audio", self._on_audio) self._page.off("cursor_changed", self._on_cursor) self._page.off("url_changed", self._on_url_changed) if hasattr(self, "_on_data_received"): self._room.off("data_received", self._on_data_received) if hasattr(self, "_on_participant_disconnected"): self._room.off("participant_disconnected", self._on_participant_disconnected) if self._video_task: self._video_task.cancel() with contextlib.suppress(asyncio.CancelledError): await self._video_task if self._audio_task: self._audio_task.cancel() with contextlib.suppress(asyncio.CancelledError): await self._audio_task if self._input_task: self._input_task.cancel() with contextlib.suppress(asyncio.CancelledError): await self._input_task try: if self._video_track: await self._room.local_participant.unpublish_track(self._video_track.sid) if self._audio_track: await self._room.local_participant.unpublish_track(self._audio_track.sid) except Exception: pass # tracks may already be gone if room disconnected firstInstance variables
prop agent_interrupted : asyncio.Event-
Expand source code
@property def agent_interrupted(self) -> asyncio.Event: """Event that is set when a human takes focus from the agent.""" return self._agent_interruptedEvent that is set when a human takes focus from the agent.
prop focus_identity : str | None-
Expand source code
@property def focus_identity(self) -> str | None: return self._focus_identity
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: if not self._started: return self._page.off("paint", self._on_paint) self._page.off("audio", self._on_audio) self._page.off("cursor_changed", self._on_cursor) self._page.off("url_changed", self._on_url_changed) if hasattr(self, "_on_data_received"): self._room.off("data_received", self._on_data_received) if hasattr(self, "_on_participant_disconnected"): self._room.off("participant_disconnected", self._on_participant_disconnected) if self._video_task: self._video_task.cancel() with contextlib.suppress(asyncio.CancelledError): await self._video_task if self._audio_task: self._audio_task.cancel() with contextlib.suppress(asyncio.CancelledError): await self._audio_task if self._input_task: self._input_task.cancel() with contextlib.suppress(asyncio.CancelledError): await self._input_task try: if self._video_track: await self._room.local_participant.unpublish_track(self._video_track.sid) if self._audio_track: await self._room.local_participant.unpublish_track(self._audio_track.sid) except Exception: pass # tracks may already be gone if room disconnected first async def reclaim_agent_focus(self) ‑> None-
Expand source code
async def reclaim_agent_focus(self) -> None: """Reclaim focus for the agent and notify all participants.""" self.set_agent_focus(True) await self._page.send_focus_event(True) await self._broadcast_focus()Reclaim focus for the agent and notify all participants.
def set_agent_focus(self, active: bool) ‑> None-
Expand source code
def set_agent_focus(self, active: bool) -> None: """Grant or revoke browser focus for the AI agent.""" if active: self._focus_identity = "__agent__" self._agent_interrupted.clear() elif self._focus_identity == "__agent__": self._focus_identity = NoneGrant or revoke browser focus for the AI agent.
async def start(self) ‑> None-
Expand source code
async def start(self) -> None: if self._started: return self._started = True opts = self._page._opts self._video_source = rtc.VideoSource(opts.width, opts.height, is_screencast=True) self._video_track = rtc.LocalVideoTrack.create_video_track( "browser-video", self._video_source ) video_opts = rtc.TrackPublishOptions( source=rtc.TrackSource.SOURCE_SCREENSHARE, video_encoding=rtc.VideoEncoding(max_bitrate=8_000_000, max_framerate=opts.framerate), simulcast=False, ) self._page.on("paint", self._on_paint) self._page.on("audio", self._on_audio) self._page.on("cursor_changed", self._on_cursor) self._page.on("url_changed", self._on_url_changed) await self._room.local_participant.publish_track(self._video_track, video_opts) self._video_task = asyncio.create_task(self._video_loop(opts.framerate)) # Single persistent task for sending input events to subprocess self._input_task = asyncio.create_task(self._input_sender_loop()) # Register RPC methods for focus management @self._room.local_participant.register_rpc_method("browser/request-focus") # type: ignore[arg-type] async def _handle_request_focus( data: rtc.rpc.RpcInvocationData, ) -> str: if self._focus_identity is None: self._focus_identity = data.caller_identity await self._page.send_focus_event(True) await self._broadcast_focus() return json.dumps({"granted": True}) # If agent has focus, allow human to interrupt if self._focus_identity == "__agent__": self._focus_identity = data.caller_identity self._agent_interrupted.set() await self._page.send_focus_event(True) await self._broadcast_focus() return json.dumps({"granted": True}) return json.dumps({"granted": False, "holder": self._focus_identity}) @self._room.local_participant.register_rpc_method("browser/release-focus") # type: ignore[arg-type] async def _handle_release_focus( data: rtc.rpc.RpcInvocationData, ) -> str: if self._focus_identity == data.caller_identity: self._focus_identity = None await self._page.send_focus_event(False) await self._broadcast_focus() return json.dumps({"released": True}) return json.dumps({"released": False}) @self._room.local_participant.register_rpc_method("browser/navigate") # type: ignore[arg-type] async def _handle_navigate( data: rtc.rpc.RpcInvocationData, ) -> str: payload = json.loads(data.payload) url = payload.get("url", "") if url: self._queue_input(self._page.navigate(url)) return json.dumps({"status": "ok"}) @self._room.local_participant.register_rpc_method("browser/go-back") # type: ignore[arg-type] async def _handle_go_back( data: rtc.rpc.RpcInvocationData, ) -> str: self._queue_input(self._page.go_back()) return json.dumps({"status": "ok"}) @self._room.local_participant.register_rpc_method("browser/go-forward") # type: ignore[arg-type] async def _handle_go_forward( data: rtc.rpc.RpcInvocationData, ) -> str: self._queue_input(self._page.go_forward()) return json.dumps({"status": "ok"}) # Listen for input data from participants @self._room.on("data_received") def _on_data_received(packet: rtc.DataPacket) -> None: if packet.topic != "browser-input": return if packet.participant is None: return if packet.participant.identity != self._focus_identity: return try: events = json.loads(packet.data) except (json.JSONDecodeError, UnicodeDecodeError): return for evt in events: self._dispatch_input(evt) self._on_data_received = _on_data_received # Release focus when the holder disconnects @self._room.on("participant_disconnected") def _on_participant_disconnected(participant: rtc.RemoteParticipant) -> None: if participant.identity == self._focus_identity: self._focus_identity = None self._queue_input(self._page.send_focus_event(False)) self._queue_input(self._broadcast_focus()) self._on_participant_disconnected = _on_participant_disconnected
class PageActions (*,
page: BrowserPage)-
Expand source code
class PageActions: """Typed input API for a CEF BrowserPage with frame capture. Usage:: actions = PageActions(page=page) await actions.left_click(100, 200) frame = actions.last_frame """ def __init__(self, *, page: BrowserPage) -> None: self._page = page self._lock = asyncio.Lock() self._last_frame: rtc.VideoFrame | None = None self._page.on("paint", self._on_paint) def _on_paint(self, data: Any) -> None: self._last_frame = data.frame @property def last_frame(self) -> rtc.VideoFrame | None: return self._last_frame async def left_click(self, x: int, y: int, *, modifiers: str | None = None) -> None: async with self._lock: mod_keys = _parse_modifier_keys(modifiers) await self._page.send_mouse_move(x, y) await _press_modifiers(self._page, mod_keys) await self._page.send_mouse_click(x, y, 0, False, 1) await self._page.send_mouse_click(x, y, 0, True, 1) await _release_modifiers(self._page, mod_keys) async def right_click(self, x: int, y: int) -> None: async with self._lock: await self._page.send_mouse_move(x, y) await self._page.send_mouse_click(x, y, 2, False, 1) await self._page.send_mouse_click(x, y, 2, True, 1) async def double_click(self, x: int, y: int) -> None: async with self._lock: await self._page.send_mouse_move(x, y) await self._page.send_mouse_click(x, y, 0, False, 1) await self._page.send_mouse_click(x, y, 0, True, 1) await self._page.send_mouse_click(x, y, 0, False, 2) await self._page.send_mouse_click(x, y, 0, True, 2) async def triple_click(self, x: int, y: int) -> None: async with self._lock: await self._page.send_mouse_move(x, y) await self._page.send_mouse_click(x, y, 0, False, 1) await self._page.send_mouse_click(x, y, 0, True, 1) await self._page.send_mouse_click(x, y, 0, False, 2) await self._page.send_mouse_click(x, y, 0, True, 2) await self._page.send_mouse_click(x, y, 0, False, 3) await self._page.send_mouse_click(x, y, 0, True, 3) async def middle_click(self, x: int, y: int) -> None: async with self._lock: await self._page.send_mouse_move(x, y) await self._page.send_mouse_click(x, y, 1, False, 1) await self._page.send_mouse_click(x, y, 1, True, 1) async def mouse_move(self, x: int, y: int) -> None: async with self._lock: await self._page.send_mouse_move(x, y) async def left_click_drag(self, *, start_x: int, start_y: int, end_x: int, end_y: int) -> None: async with self._lock: await self._page.send_mouse_move(start_x, start_y) await self._page.send_mouse_click(start_x, start_y, 0, False, 1) await asyncio.sleep(0.05) await self._page.send_mouse_move(end_x, end_y) await asyncio.sleep(0.05) await self._page.send_mouse_click(end_x, end_y, 0, True, 1) async def left_mouse_down(self, x: int, y: int) -> None: async with self._lock: await self._page.send_mouse_move(x, y) await self._page.send_mouse_click(x, y, 0, False, 1) async def left_mouse_up(self, x: int, y: int) -> None: async with self._lock: await self._page.send_mouse_move(x, y) await self._page.send_mouse_click(x, y, 0, True, 1) async def scroll( self, x: int, y: int, *, direction: str = "down", amount: int = 3, ) -> None: async with self._lock: pixels = amount * 120 delta_x, delta_y = 0, 0 if direction == "down": delta_y = -pixels elif direction == "up": delta_y = pixels elif direction == "left": delta_x = pixels elif direction == "right": delta_x = -pixels await self._page.send_mouse_move(x, y) await self._page.send_mouse_wheel(x, y, delta_x, delta_y) async def type_text(self, text: str) -> None: async with self._lock: for ch in text: char_code = ord(ch) shifted = False if ch.isalpha(): vk = ord(ch.upper()) shifted = ch.isupper() elif ch in SHIFTED_CHAR_TO_VK: vk = SHIFTED_CHAR_TO_VK[ch] shifted = True else: vk = KEY_NAME_TO_VK.get(ch, 0) if not vk: # Unknown character (e.g. unicode) — CHAR-only await self._page.send_key_event(CHAR, 0, 0, 0, char_code) await asyncio.sleep(0.01) continue modifiers = MOD_SHIFT if shifted else 0 nkc = NATIVE_KEY_CODES.get(vk, 0) if shifted: shift_nkc = NATIVE_KEY_CODES.get(VK_SHIFT, 0) await self._page.send_key_event(RAWKEYDOWN, MOD_SHIFT, VK_SHIFT, shift_nkc, 0) await self._page.send_key_event(RAWKEYDOWN, modifiers, vk, nkc, 0) await self._page.send_key_event(CHAR, modifiers, char_code, nkc, char_code) await self._page.send_key_event(KEYUP, modifiers, vk, 0, 0) if shifted: await self._page.send_key_event(KEYUP, 0, VK_SHIFT, 0, 0) await asyncio.sleep(0.01) async def key(self, text: str) -> None: async with self._lock: await _send_key_combo(self._page, text) async def hold_key(self, text: str, *, duration: float = 0.5) -> None: async with self._lock: keys = [k.strip().lower() for k in text.split("+")] modifiers = 0 for k in keys: if k in MODIFIER_MAP: modifiers |= MODIFIER_MAP[k] vk = KEY_NAME_TO_VK.get(k, 0) nkc = NATIVE_KEY_CODES.get(vk, 0) await self._page.send_key_event(RAWKEYDOWN, modifiers, vk, nkc, 0) await asyncio.sleep(duration) for k in reversed(keys): vk = KEY_NAME_TO_VK.get(k, 0) await self._page.send_key_event(KEYUP, 0, vk, 0, 0) async def wait(self) -> None: async with self._lock: await asyncio.sleep(1) async def navigate(self, url: str) -> None: async with self._lock: await self._page.navigate(url) async def go_back(self) -> None: async with self._lock: await self._page.go_back() async def go_forward(self) -> None: async with self._lock: await self._page.go_forward() def aclose(self) -> None: self._page.off("paint", self._on_paint)Typed input API for a CEF BrowserPage with frame capture.
Usage::
actions = PageActions(page=page) await actions.left_click(100, 200) frame = actions.last_frameInstance variables
prop last_frame : rtc.VideoFrame | None-
Expand source code
@property def last_frame(self) -> rtc.VideoFrame | None: return self._last_frame
Methods
def aclose(self) ‑> None-
Expand source code
def aclose(self) -> None: self._page.off("paint", self._on_paint) async def double_click(self, x: int, y: int) ‑> None-
Expand source code
async def double_click(self, x: int, y: int) -> None: async with self._lock: await self._page.send_mouse_move(x, y) await self._page.send_mouse_click(x, y, 0, False, 1) await self._page.send_mouse_click(x, y, 0, True, 1) await self._page.send_mouse_click(x, y, 0, False, 2) await self._page.send_mouse_click(x, y, 0, True, 2) async def go_back(self) ‑> None-
Expand source code
async def go_back(self) -> None: async with self._lock: await self._page.go_back() async def go_forward(self) ‑> None-
Expand source code
async def go_forward(self) -> None: async with self._lock: await self._page.go_forward() async def hold_key(self, text: str, *, duration: float = 0.5) ‑> None-
Expand source code
async def hold_key(self, text: str, *, duration: float = 0.5) -> None: async with self._lock: keys = [k.strip().lower() for k in text.split("+")] modifiers = 0 for k in keys: if k in MODIFIER_MAP: modifiers |= MODIFIER_MAP[k] vk = KEY_NAME_TO_VK.get(k, 0) nkc = NATIVE_KEY_CODES.get(vk, 0) await self._page.send_key_event(RAWKEYDOWN, modifiers, vk, nkc, 0) await asyncio.sleep(duration) for k in reversed(keys): vk = KEY_NAME_TO_VK.get(k, 0) await self._page.send_key_event(KEYUP, 0, vk, 0, 0) async def key(self, text: str) ‑> None-
Expand source code
async def key(self, text: str) -> None: async with self._lock: await _send_key_combo(self._page, text) async def left_click(self, x: int, y: int, *, modifiers: str | None = None) ‑> None-
Expand source code
async def left_click(self, x: int, y: int, *, modifiers: str | None = None) -> None: async with self._lock: mod_keys = _parse_modifier_keys(modifiers) await self._page.send_mouse_move(x, y) await _press_modifiers(self._page, mod_keys) await self._page.send_mouse_click(x, y, 0, False, 1) await self._page.send_mouse_click(x, y, 0, True, 1) await _release_modifiers(self._page, mod_keys) async def left_click_drag(self, *, start_x: int, start_y: int, end_x: int, end_y: int) ‑> None-
Expand source code
async def left_click_drag(self, *, start_x: int, start_y: int, end_x: int, end_y: int) -> None: async with self._lock: await self._page.send_mouse_move(start_x, start_y) await self._page.send_mouse_click(start_x, start_y, 0, False, 1) await asyncio.sleep(0.05) await self._page.send_mouse_move(end_x, end_y) await asyncio.sleep(0.05) await self._page.send_mouse_click(end_x, end_y, 0, True, 1) async def left_mouse_down(self, x: int, y: int) ‑> None-
Expand source code
async def left_mouse_down(self, x: int, y: int) -> None: async with self._lock: await self._page.send_mouse_move(x, y) await self._page.send_mouse_click(x, y, 0, False, 1) async def left_mouse_up(self, x: int, y: int) ‑> None-
Expand source code
async def left_mouse_up(self, x: int, y: int) -> None: async with self._lock: await self._page.send_mouse_move(x, y) await self._page.send_mouse_click(x, y, 0, True, 1) async def middle_click(self, x: int, y: int) ‑> None-
Expand source code
async def middle_click(self, x: int, y: int) -> None: async with self._lock: await self._page.send_mouse_move(x, y) await self._page.send_mouse_click(x, y, 1, False, 1) await self._page.send_mouse_click(x, y, 1, True, 1) async def mouse_move(self, x: int, y: int) ‑> None-
Expand source code
async def mouse_move(self, x: int, y: int) -> None: async with self._lock: await self._page.send_mouse_move(x, y) -
Expand source code
async def navigate(self, url: str) -> None: async with self._lock: await self._page.navigate(url) async def right_click(self, x: int, y: int) ‑> None-
Expand source code
async def right_click(self, x: int, y: int) -> None: async with self._lock: await self._page.send_mouse_move(x, y) await self._page.send_mouse_click(x, y, 2, False, 1) await self._page.send_mouse_click(x, y, 2, True, 1) async def scroll(self, x: int, y: int, *, direction: str = 'down', amount: int = 3) ‑> None-
Expand source code
async def scroll( self, x: int, y: int, *, direction: str = "down", amount: int = 3, ) -> None: async with self._lock: pixels = amount * 120 delta_x, delta_y = 0, 0 if direction == "down": delta_y = -pixels elif direction == "up": delta_y = pixels elif direction == "left": delta_x = pixels elif direction == "right": delta_x = -pixels await self._page.send_mouse_move(x, y) await self._page.send_mouse_wheel(x, y, delta_x, delta_y) async def triple_click(self, x: int, y: int) ‑> None-
Expand source code
async def triple_click(self, x: int, y: int) -> None: async with self._lock: await self._page.send_mouse_move(x, y) await self._page.send_mouse_click(x, y, 0, False, 1) await self._page.send_mouse_click(x, y, 0, True, 1) await self._page.send_mouse_click(x, y, 0, False, 2) await self._page.send_mouse_click(x, y, 0, True, 2) await self._page.send_mouse_click(x, y, 0, False, 3) await self._page.send_mouse_click(x, y, 0, True, 3) async def type_text(self, text: str) ‑> None-
Expand source code
async def type_text(self, text: str) -> None: async with self._lock: for ch in text: char_code = ord(ch) shifted = False if ch.isalpha(): vk = ord(ch.upper()) shifted = ch.isupper() elif ch in SHIFTED_CHAR_TO_VK: vk = SHIFTED_CHAR_TO_VK[ch] shifted = True else: vk = KEY_NAME_TO_VK.get(ch, 0) if not vk: # Unknown character (e.g. unicode) — CHAR-only await self._page.send_key_event(CHAR, 0, 0, 0, char_code) await asyncio.sleep(0.01) continue modifiers = MOD_SHIFT if shifted else 0 nkc = NATIVE_KEY_CODES.get(vk, 0) if shifted: shift_nkc = NATIVE_KEY_CODES.get(VK_SHIFT, 0) await self._page.send_key_event(RAWKEYDOWN, MOD_SHIFT, VK_SHIFT, shift_nkc, 0) await self._page.send_key_event(RAWKEYDOWN, modifiers, vk, nkc, 0) await self._page.send_key_event(CHAR, modifiers, char_code, nkc, char_code) await self._page.send_key_event(KEYUP, modifiers, vk, 0, 0) if shifted: await self._page.send_key_event(KEYUP, 0, VK_SHIFT, 0, 0) await asyncio.sleep(0.01) async def wait(self) ‑> None-
Expand source code
async def wait(self) -> None: async with self._lock: await asyncio.sleep(1)
class PaintData (dirty_rects: list[tuple[int, int, int, int]],
frame: rtc.VideoFrame,
width: int,
height: int)-
Expand source code
@dataclass class PaintData: dirty_rects: list[tuple[int, int, int, int]] frame: rtc.VideoFrame width: int height: intPaintData(dirty_rects: 'list[tuple[int, int, int, int]]', frame: 'rtc.VideoFrame', width: 'int', height: 'int')
Instance variables
var dirty_rects : list[tuple[int, int, int, int]]var frame : VideoFramevar height : intvar width : int