Module livekit.plugins.browser

Browser plugin for LiveKit Agents

Support for Chromium Embedded Framework (CEF).

Classes

class AudioData (frame: rtc.AudioFrame, pts: int)
Expand source code
@dataclass
class AudioData:
    frame: rtc.AudioFrame
    pts: int

AudioData(frame: 'rtc.AudioFrame', pts: 'int')

Instance variables

var frameAudioFrame
var pts : int
class BrowserContext (*, dev_mode: bool, remote_debugging_port: int = 0)
Expand source code
class BrowserContext:
    def __init__(self, *, dev_mode: bool, remote_debugging_port: int = 0) -> None:
        self._mp_ctx = mp.get_context("spawn")
        self._pages: dict[int, BrowserPage] = {}
        self._dev_mode = dev_mode
        self._initialized = False
        self._next_page_id = 1
        self._remote_debugging_port = remote_debugging_port
        self._closed_event = asyncio.Event()
        self._proc = None
        self._duplex = None
        self._main_atask = None
        self._root_cache_path = None

    async def initialize(self) -> None:
        from .download import get_resources_dir

        resources_dir = str(get_resources_dir())

        mp_pch, mp_cch = socket.socketpair()
        self._duplex = await AsyncDuplex.open(mp_pch)

        self._proc = self._mp_ctx.Process(target=proc_main.main, args=(mp_cch, resources_dir))

        # On Linux aarch64, libcef.so is too large for late TLS allocation.
        # Set LD_PRELOAD so the dynamic linker reserves TLS early.
        _preload_restore = None
        if sys.platform.startswith("linux"):
            libcef = os.path.join(resources_dir, "libcef.so")
            if os.path.exists(libcef):
                _preload_restore = os.environ.get("LD_PRELOAD")
                old = _preload_restore or ""
                os.environ["LD_PRELOAD"] = libcef + (":" + old if old else "")

        self._proc.start()

        if _preload_restore is not None:
            os.environ["LD_PRELOAD"] = _preload_restore
        elif "LD_PRELOAD" in os.environ and sys.platform.startswith("linux"):
            del os.environ["LD_PRELOAD"]
        mp_cch.close()

        try:
            await self._initialize_context(resources_dir)
        except BaseException:
            await self._cleanup_process()
            raise

    async def _initialize_context(self, resources_dir: str) -> None:
        if not self._remote_debugging_port:
            with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
                s.bind(("", 0))
                s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
                self._remote_debugging_port = s.getsockname()[1]

            logger.debug("using remote debugging port %d", self._remote_debugging_port)

        self._root_cache_path = tempfile.mkdtemp(prefix="lkcef_cache_")
        await channel.asend_message(
            self._duplex,
            proto.InitializeContextRequest(
                dev_mode=self._dev_mode,
                remote_debugging_port=self._remote_debugging_port,
                root_cache_path=self._root_cache_path,
            ),
        )
        resp = await channel.arecv_message(self._duplex, proto.IPC_MESSAGES)
        assert isinstance(resp, proto.ContextInitializedResponse)
        self._initialized = True
        logger.debug("browser context initialized", extra={"pid": self._proc.pid})

        self._main_atask = asyncio.create_task(self._main_task(self._duplex))

    async def _cleanup_process(self) -> None:
        if self._duplex:
            try:
                await self._duplex.aclose()
            except DuplexClosed:
                pass

        # Subprocess has its own 5s watchdog — just wait for it
        if self._proc and self._proc.is_alive():
            self._proc.join(timeout=15)
            if self._proc.is_alive():
                logger.warning("subprocess did not exit, killing")
                self._proc.kill()
                self._proc.join()

        if self._proc:
            self._proc.close()

        if self._root_cache_path:
            shutil.rmtree(self._root_cache_path, ignore_errors=True)

    @asynccontextmanager
    async def playwright(self, timeout: float | None = None):
        if not self._initialized:
            raise RuntimeError("BrowserContext not initialized")

        from playwright.async_api import async_playwright

        async with async_playwright() as p:
            url = f"http://localhost:{self._remote_debugging_port}"
            browser = await p.chromium.connect_over_cdp(url, timeout=timeout)
            try:
                yield browser
            finally:
                await browser.close()

    @_log_exceptions(_logger=logger)
    async def _main_task(self, duplex: AsyncDuplex) -> None:
        while True:
            try:
                msg = await channel.arecv_message(duplex, proto.IPC_MESSAGES)
            except DuplexClosed:
                break

            try:
                if isinstance(msg, proto.CreateBrowserResponse):
                    page = self._pages[msg.page_id]
                    await page._handle_created(msg)
                elif isinstance(msg, proto.AcquireAudioData):
                    page = self._pages[msg.page_id]
                    page._handle_audio_data(msg)
                elif isinstance(msg, proto.AcquirePaintData):
                    page = self._pages[msg.page_id]
                    await page._handle_paint(msg)
                elif isinstance(msg, proto.AudioStreamStarted):
                    pass
                elif isinstance(msg, proto.AudioStreamStopped):
                    pass
                elif isinstance(msg, proto.CursorChanged):
                    page = self._pages[msg.page_id]
                    page.emit("cursor_changed", msg.cursor_type)
                elif isinstance(msg, proto.UrlChanged):
                    page = self._pages[msg.page_id]
                    page.emit("url_changed", msg.url)
                elif isinstance(msg, proto.BrowserClosed):
                    page = self._pages[msg.page_id]
                    await page._handle_close(msg)
            except Exception:
                logger.exception("error handling IPC message %s", type(msg).__name__)

        # Subprocess exited — resolve any pending page futures so cleanup doesn't hang
        for page in self._pages.values():
            if not page._close_fut.done():
                page._close_fut.set_result(None)
            if not page._created_fut.done():
                page._created_fut.set_result(None)
        self._closed_event.set()

    async def new_page(
        self, *, url: str, width: int = 1920, height: int = 1080, framerate: int = 24
    ) -> BrowserPage:
        if not self._initialized:
            raise RuntimeError("BrowserContext not initialized")

        page_id = self._next_page_id
        self._next_page_id += 1
        page = BrowserPage(
            self._mp_ctx,
            _PageOptions(
                page_id=page_id,
                url=url,
                width=width,
                height=height,
                framerate=framerate,
            ),
            self._duplex,
        )
        self._pages[page_id] = page
        await page.start()
        return page

    async def aclose(self) -> None:
        if not self._initialized:
            return

        self._initialized = False

        # Close all pages (ensure shm cleanup even on errors)
        for page in list(self._pages.values()):
            try:
                await page.aclose()
            except Exception:
                page._cleanup_shm()

        # Cancel the main task
        if self._main_atask:
            self._main_atask.cancel()
            with contextlib.suppress(asyncio.CancelledError):
                await self._main_atask

        await self._cleanup_process()

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    if not self._initialized:
        return

    self._initialized = False

    # Close all pages (ensure shm cleanup even on errors)
    for page in list(self._pages.values()):
        try:
            await page.aclose()
        except Exception:
            page._cleanup_shm()

    # Cancel the main task
    if self._main_atask:
        self._main_atask.cancel()
        with contextlib.suppress(asyncio.CancelledError):
            await self._main_atask

    await self._cleanup_process()
async def initialize(self) ‑> None
Expand source code
async def initialize(self) -> None:
    from .download import get_resources_dir

    resources_dir = str(get_resources_dir())

    mp_pch, mp_cch = socket.socketpair()
    self._duplex = await AsyncDuplex.open(mp_pch)

    self._proc = self._mp_ctx.Process(target=proc_main.main, args=(mp_cch, resources_dir))

    # On Linux aarch64, libcef.so is too large for late TLS allocation.
    # Set LD_PRELOAD so the dynamic linker reserves TLS early.
    _preload_restore = None
    if sys.platform.startswith("linux"):
        libcef = os.path.join(resources_dir, "libcef.so")
        if os.path.exists(libcef):
            _preload_restore = os.environ.get("LD_PRELOAD")
            old = _preload_restore or ""
            os.environ["LD_PRELOAD"] = libcef + (":" + old if old else "")

    self._proc.start()

    if _preload_restore is not None:
        os.environ["LD_PRELOAD"] = _preload_restore
    elif "LD_PRELOAD" in os.environ and sys.platform.startswith("linux"):
        del os.environ["LD_PRELOAD"]
    mp_cch.close()

    try:
        await self._initialize_context(resources_dir)
    except BaseException:
        await self._cleanup_process()
        raise
async def new_page(self, *, url: str, width: int = 1920, height: int = 1080, framerate: int = 24) ‑> BrowserPage
Expand source code
async def new_page(
    self, *, url: str, width: int = 1920, height: int = 1080, framerate: int = 24
) -> BrowserPage:
    if not self._initialized:
        raise RuntimeError("BrowserContext not initialized")

    page_id = self._next_page_id
    self._next_page_id += 1
    page = BrowserPage(
        self._mp_ctx,
        _PageOptions(
            page_id=page_id,
            url=url,
            width=width,
            height=height,
            framerate=framerate,
        ),
        self._duplex,
    )
    self._pages[page_id] = page
    await page.start()
    return page
async def playwright(self, timeout: float | None = None)
Expand source code
@asynccontextmanager
async def playwright(self, timeout: float | None = None):
    if not self._initialized:
        raise RuntimeError("BrowserContext not initialized")

    from playwright.async_api import async_playwright

    async with async_playwright() as p:
        url = f"http://localhost:{self._remote_debugging_port}"
        browser = await p.chromium.connect_over_cdp(url, timeout=timeout)
        try:
            yield browser
        finally:
            await browser.close()
class BrowserPage (mp_ctx: mpc.SpawnContext, opts: _PageOptions, ctx_duplex: AsyncDuplex)
Expand source code
class BrowserPage(EventEmitter[EventTypes]):
    def __init__(
        self,
        mp_ctx: mpc.SpawnContext,
        opts: _PageOptions,
        ctx_duplex: AsyncDuplex,
    ) -> None:
        super().__init__()
        self._mp_ctx = mp_ctx
        self._opts = opts
        self._ctx_duplex = ctx_duplex

        self._view_width = 0
        self._view_height = 0

        self._created_fut = asyncio.Future()
        self._close_fut = asyncio.Future()
        self._closed = False

    @property
    def id(self) -> int:
        return self._opts.page_id

    _NUM_SHM_BUFFERS = 2

    async def start(self) -> None:
        import multiprocessing.shared_memory as mp_shm

        shm_size = proto.SHM_MAX_WIDTH * proto.SHM_MAX_HEIGHT * 4
        self._shms = []
        shm_names = []
        for _ in range(self._NUM_SHM_BUFFERS):
            name = f"lkcef_browser_{_shortuuid()}"
            shm = mp_shm.SharedMemory(create=True, size=shm_size, name=name)
            self._shms.append(shm)
            shm_names.append(name)

        self._framebuffer = rtc.VideoFrame(
            proto.SHM_MAX_WIDTH,
            proto.SHM_MAX_HEIGHT,
            rtc.VideoBufferType.BGRA,
            bytearray(proto.SHM_MAX_WIDTH * proto.SHM_MAX_HEIGHT * 4),
        )

        req = proto.CreateBrowserRequest(
            page_id=self._opts.page_id,
            width=self._opts.width,
            height=self._opts.height,
            shm_names=shm_names,
            url=self._opts.url,
            framerate=self._opts.framerate,
        )

        await channel.asend_message(self._ctx_duplex, req)

        # TODO(theomonnom): create timeout (would prevent never resolving futures if the
        #  browser process crashed for some reasons)
        await asyncio.shield(self._created_fut)

    async def aclose(self) -> None:
        if self._closed:
            return
        self._closed = True

        try:
            await channel.asend_message(
                self._ctx_duplex, proto.CloseBrowserRequest(page_id=self.id)
            )
            await asyncio.shield(self._close_fut)
        except DuplexClosed:
            pass  # Subprocess already closed (e.g., window closed manually)
        finally:
            self._cleanup_shm()

    def _cleanup_shm(self) -> None:
        for shm in self._shms:
            try:
                shm.close()
            except Exception:
                pass
            try:
                shm.unlink()
            except Exception:
                pass
        self._shms.clear()

    async def _handle_created(self, msg: proto.CreateBrowserResponse) -> None:
        self._created_fut.set_result(None)

    async def _handle_paint(self, acq: proto.AcquirePaintData) -> None:
        old_width = self._view_width
        old_height = self._view_height
        self._view_width = acq.width
        self._view_height = acq.height

        # TODO(theomonnom): remove hacky alloc-free resizing
        self._framebuffer._width = acq.width
        self._framebuffer._height = acq.height

        proto.copy_paint_data(acq, old_width, old_height, self._shms[acq.buffer_id].buf, self._framebuffer.data)

        release_paint = proto.ReleasePaintData(page_id=acq.page_id, buffer_id=acq.buffer_id)
        await channel.asend_message(self._ctx_duplex, release_paint)

        paint_data = PaintData(
            dirty_rects=acq.dirty_rects,
            frame=self._framebuffer,
            width=acq.width,
            height=acq.height,
        )
        self.emit("paint", paint_data)

    async def send_mouse_move(self, x: int, y: int, mouse_leave: bool = False) -> None:
        await channel.asend_message(
            self._ctx_duplex,
            proto.SendMouseMoveEvent(page_id=self.id, x=x, y=y, mouse_leave=mouse_leave),
        )

    async def send_mouse_click(
        self, x: int, y: int, button: int = 0, mouse_up: bool = False, click_count: int = 1
    ) -> None:
        await channel.asend_message(
            self._ctx_duplex,
            proto.SendMouseClickEvent(
                page_id=self.id, x=x, y=y, button_type=button, mouse_up=mouse_up, click_count=click_count
            ),
        )

    async def send_mouse_wheel(self, x: int, y: int, delta_x: int = 0, delta_y: int = 0) -> None:
        await channel.asend_message(
            self._ctx_duplex,
            proto.SendMouseWheelEvent(page_id=self.id, x=x, y=y, delta_x=delta_x, delta_y=delta_y),
        )

    async def send_key_event(
        self, type: int, modifiers: int, windows_key_code: int, native_key_code: int = 0, character: int = 0
    ) -> None:
        await channel.asend_message(
            self._ctx_duplex,
            proto.SendKeyEvent(
                page_id=self.id,
                type=type,
                modifiers=modifiers,
                windows_key_code=windows_key_code,
                native_key_code=native_key_code,
                character=character,
            ),
        )

    async def navigate(self, url: str) -> None:
        await channel.asend_message(
            self._ctx_duplex,
            proto.NavigateRequest(page_id=self.id, url=url),
        )

    async def go_back(self) -> None:
        await channel.asend_message(
            self._ctx_duplex,
            proto.GoBackRequest(page_id=self.id),
        )

    async def go_forward(self) -> None:
        await channel.asend_message(
            self._ctx_duplex,
            proto.GoForwardRequest(page_id=self.id),
        )

    async def send_focus_event(self, set_focus: bool = True) -> None:
        await channel.asend_message(
            self._ctx_duplex,
            proto.SendFocusEvent(page_id=self.id, set_focus=set_focus),
        )

    def _handle_audio_data(self, msg: proto.AcquireAudioData) -> None:
        if msg.channels <= 0 or msg.sample_rate <= 0 or msg.frames <= 0:
            return

        frame = rtc.AudioFrame(
            data=msg.data,
            sample_rate=msg.sample_rate,
            num_channels=msg.channels,
            samples_per_channel=msg.frames,
        )
        self.emit("audio", AudioData(frame=frame, pts=msg.pts))

    async def _handle_close(self, msg: proto.BrowserClosed) -> None:
        logger.debug("browser page closed", extra={"page_id": self.id})
        self._close_fut.set_result(None)

Abstract base class for generic types.

On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::

class Mapping[KT, VT]:
    def __getitem__(self, key: KT) -> VT:
        ...
    # Etc.

On older versions of Python, however, generic classes have to explicitly inherit from Generic.

After a class has been declared to be generic, it can then be used as follows::

def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT:
    try:
        return mapping[key]
    except KeyError:
        return default

Initialize a new instance of EventEmitter.

Ancestors

Instance variables

prop id : int
Expand source code
@property
def id(self) -> int:
    return self._opts.page_id

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    if self._closed:
        return
    self._closed = True

    try:
        await channel.asend_message(
            self._ctx_duplex, proto.CloseBrowserRequest(page_id=self.id)
        )
        await asyncio.shield(self._close_fut)
    except DuplexClosed:
        pass  # Subprocess already closed (e.g., window closed manually)
    finally:
        self._cleanup_shm()
async def go_back(self) ‑> None
Expand source code
async def go_back(self) -> None:
    await channel.asend_message(
        self._ctx_duplex,
        proto.GoBackRequest(page_id=self.id),
    )
async def go_forward(self) ‑> None
Expand source code
async def go_forward(self) -> None:
    await channel.asend_message(
        self._ctx_duplex,
        proto.GoForwardRequest(page_id=self.id),
    )
async def navigate(self, url: str) ‑> None
Expand source code
async def navigate(self, url: str) -> None:
    await channel.asend_message(
        self._ctx_duplex,
        proto.NavigateRequest(page_id=self.id, url=url),
    )
async def send_focus_event(self, set_focus: bool = True) ‑> None
Expand source code
async def send_focus_event(self, set_focus: bool = True) -> None:
    await channel.asend_message(
        self._ctx_duplex,
        proto.SendFocusEvent(page_id=self.id, set_focus=set_focus),
    )
async def send_key_event(self,
type: int,
modifiers: int,
windows_key_code: int,
native_key_code: int = 0,
character: int = 0) ‑> None
Expand source code
async def send_key_event(
    self, type: int, modifiers: int, windows_key_code: int, native_key_code: int = 0, character: int = 0
) -> None:
    await channel.asend_message(
        self._ctx_duplex,
        proto.SendKeyEvent(
            page_id=self.id,
            type=type,
            modifiers=modifiers,
            windows_key_code=windows_key_code,
            native_key_code=native_key_code,
            character=character,
        ),
    )
async def send_mouse_click(self, x: int, y: int, button: int = 0, mouse_up: bool = False, click_count: int = 1) ‑> None
Expand source code
async def send_mouse_click(
    self, x: int, y: int, button: int = 0, mouse_up: bool = False, click_count: int = 1
) -> None:
    await channel.asend_message(
        self._ctx_duplex,
        proto.SendMouseClickEvent(
            page_id=self.id, x=x, y=y, button_type=button, mouse_up=mouse_up, click_count=click_count
        ),
    )
async def send_mouse_move(self, x: int, y: int, mouse_leave: bool = False) ‑> None
Expand source code
async def send_mouse_move(self, x: int, y: int, mouse_leave: bool = False) -> None:
    await channel.asend_message(
        self._ctx_duplex,
        proto.SendMouseMoveEvent(page_id=self.id, x=x, y=y, mouse_leave=mouse_leave),
    )
async def send_mouse_wheel(self, x: int, y: int, delta_x: int = 0, delta_y: int = 0) ‑> None
Expand source code
async def send_mouse_wheel(self, x: int, y: int, delta_x: int = 0, delta_y: int = 0) -> None:
    await channel.asend_message(
        self._ctx_duplex,
        proto.SendMouseWheelEvent(page_id=self.id, x=x, y=y, delta_x=delta_x, delta_y=delta_y),
    )
async def start(self) ‑> None
Expand source code
async def start(self) -> None:
    import multiprocessing.shared_memory as mp_shm

    shm_size = proto.SHM_MAX_WIDTH * proto.SHM_MAX_HEIGHT * 4
    self._shms = []
    shm_names = []
    for _ in range(self._NUM_SHM_BUFFERS):
        name = f"lkcef_browser_{_shortuuid()}"
        shm = mp_shm.SharedMemory(create=True, size=shm_size, name=name)
        self._shms.append(shm)
        shm_names.append(name)

    self._framebuffer = rtc.VideoFrame(
        proto.SHM_MAX_WIDTH,
        proto.SHM_MAX_HEIGHT,
        rtc.VideoBufferType.BGRA,
        bytearray(proto.SHM_MAX_WIDTH * proto.SHM_MAX_HEIGHT * 4),
    )

    req = proto.CreateBrowserRequest(
        page_id=self._opts.page_id,
        width=self._opts.width,
        height=self._opts.height,
        shm_names=shm_names,
        url=self._opts.url,
        framerate=self._opts.framerate,
    )

    await channel.asend_message(self._ctx_duplex, req)

    # TODO(theomonnom): create timeout (would prevent never resolving futures if the
    #  browser process crashed for some reasons)
    await asyncio.shield(self._created_fut)

Inherited members

class BrowserSession (*,
page: BrowserPage,
room: rtc.Room)
Expand source code
class BrowserSession:
    def __init__(self, *, page: BrowserPage, room: rtc.Room) -> None:
        self._page = page
        self._room = room
        self._video_source: rtc.VideoSource | None = None
        self._audio_source: rtc.AudioSource | None = None
        self._video_track: rtc.LocalVideoTrack | None = None
        self._audio_track: rtc.LocalAudioTrack | None = None
        self._started = False
        self._last_frame: rtc.VideoFrame | None = None
        self._video_task: asyncio.Task[None] | None = None
        self._audio_init_task: asyncio.Task[None] | None = None
        self._audio_task: asyncio.Task[None] | None = None
        self._audio_queue: asyncio.Queue[rtc.AudioFrame] | None = None
        self._input_queue: asyncio.Queue[Any] = asyncio.Queue(maxsize=256)
        self._input_task: asyncio.Task[None] | None = None

        self._focus_identity: str | None = None

    async def start(self) -> None:
        if self._started:
            return
        self._started = True

        opts = self._page._opts

        self._video_source = rtc.VideoSource(opts.width, opts.height, is_screencast=True)

        self._video_track = rtc.LocalVideoTrack.create_video_track(
            "browser-video", self._video_source
        )

        video_opts = rtc.TrackPublishOptions(
            source=rtc.TrackSource.SOURCE_SCREENSHARE,
            video_encoding=rtc.VideoEncoding(max_bitrate=8_000_000, max_framerate=opts.framerate),
            simulcast=False,
        )
        self._page.on("paint", self._on_paint)
        self._page.on("audio", self._on_audio)
        self._page.on("cursor_changed", self._on_cursor)
        self._page.on("url_changed", self._on_url_changed)

        await self._room.local_participant.publish_track(self._video_track, video_opts)

        self._video_task = asyncio.create_task(self._video_loop(opts.framerate))

        # Single persistent task for sending input events to subprocess
        self._input_task = asyncio.create_task(self._input_sender_loop())

        # Register RPC methods for focus management
        @self._room.local_participant.register_rpc_method("browser/request-focus")  # type: ignore[arg-type]
        async def _handle_request_focus(
            data: rtc.rpc.RpcInvocationData,
        ) -> str:
            if self._focus_identity is None:
                self._focus_identity = data.caller_identity
                await self._page.send_focus_event(True)
                await self._broadcast_focus()
                return json.dumps({"granted": True})
            return json.dumps({"granted": False, "holder": self._focus_identity})

        @self._room.local_participant.register_rpc_method("browser/release-focus")  # type: ignore[arg-type]
        async def _handle_release_focus(
            data: rtc.rpc.RpcInvocationData,
        ) -> str:
            if self._focus_identity == data.caller_identity:
                self._focus_identity = None
                await self._page.send_focus_event(False)
                await self._broadcast_focus()
                return json.dumps({"released": True})
            return json.dumps({"released": False})

        @self._room.local_participant.register_rpc_method("browser/navigate")  # type: ignore[arg-type]
        async def _handle_navigate(
            data: rtc.rpc.RpcInvocationData,
        ) -> str:
            payload = json.loads(data.request_body)
            url = payload.get("url", "")
            if url:
                self._queue_input(self._page.navigate(url))
            return json.dumps({"status": "ok"})

        @self._room.local_participant.register_rpc_method("browser/go-back")  # type: ignore[arg-type]
        async def _handle_go_back(
            data: rtc.rpc.RpcInvocationData,
        ) -> str:
            self._queue_input(self._page.go_back())
            return json.dumps({"status": "ok"})

        @self._room.local_participant.register_rpc_method("browser/go-forward")  # type: ignore[arg-type]
        async def _handle_go_forward(
            data: rtc.rpc.RpcInvocationData,
        ) -> str:
            self._queue_input(self._page.go_forward())
            return json.dumps({"status": "ok"})

        # Listen for input data from participants
        @self._room.on("data_received")
        def _on_data_received(packet: rtc.DataPacket) -> None:
            if packet.topic != "browser-input":
                return
            if packet.participant is None:
                return
            if packet.participant.identity != self._focus_identity:
                return
            try:
                events = json.loads(packet.data)
            except (json.JSONDecodeError, UnicodeDecodeError):
                return
            for evt in events:
                self._dispatch_input(evt)

        self._on_data_received = _on_data_received

        # Release focus when the holder disconnects
        @self._room.on("participant_disconnected")
        def _on_participant_disconnected(participant: rtc.RemoteParticipant) -> None:
            if participant.identity == self._focus_identity:
                self._focus_identity = None
                self._queue_input(self._page.send_focus_event(False))
                self._queue_input(self._broadcast_focus())

        self._on_participant_disconnected = _on_participant_disconnected

    def _queue_input(self, coro: Any) -> None:
        try:
            self._input_queue.put_nowait(coro)
        except asyncio.QueueFull:
            pass

    async def _input_sender_loop(self) -> None:
        while True:
            coro = await self._input_queue.get()
            try:
                await coro
            except Exception:
                pass

    def _dispatch_input(self, evt: dict[str, Any]) -> None:
        t = evt.get("type")
        if t == "mousemove":
            self._queue_input(self._page.send_mouse_move(evt["x"], evt["y"]))
        elif t == "mousedown":
            self._queue_input(
                self._page.send_mouse_click(evt["x"], evt["y"], evt.get("button", 0), False, 1)
            )
        elif t == "mouseup":
            self._queue_input(
                self._page.send_mouse_click(evt["x"], evt["y"], evt.get("button", 0), True, 1)
            )
        elif t == "wheel":
            self._queue_input(
                self._page.send_mouse_wheel(
                    evt["x"], evt["y"], evt.get("deltaX", 0), evt.get("deltaY", 0)
                )
            )
        elif t == "keydown":
            wkc = evt["keyCode"]
            nkc = _NATIVE_KEY_CODES.get(wkc, 0)
            self._queue_input(self._page.send_key_event(0, evt.get("modifiers", 0), wkc, nkc, 0))
        elif t == "keyup":
            wkc = evt["keyCode"]
            nkc = _NATIVE_KEY_CODES.get(wkc, 0)
            self._queue_input(self._page.send_key_event(2, evt.get("modifiers", 0), wkc, 0, 0))
        elif t == "char":
            wkc = evt["keyCode"]
            nkc = _NATIVE_KEY_CODES.get(wkc, 0)
            self._queue_input(
                self._page.send_key_event(
                    3, evt.get("modifiers", 0), wkc, nkc, evt.get("charCode", 0)
                )
            )

    async def _broadcast_focus(self) -> None:
        payload = json.dumps({"identity": self._focus_identity}).encode()
        await self._room.local_participant.publish_data(
            payload, reliable=True, topic="browser-focus"
        )

    async def _init_audio(self, frame: rtc.AudioFrame) -> None:
        try:
            self._audio_source = rtc.AudioSource(
                frame.sample_rate, frame.num_channels, queue_size_ms=100
            )
            self._audio_track = rtc.LocalAudioTrack.create_audio_track(
                "browser-audio", self._audio_source
            )
            audio_opts = rtc.TrackPublishOptions(
                source=rtc.TrackSource.SOURCE_SCREENSHARE_AUDIO,
            )
            await self._room.local_participant.publish_track(self._audio_track, audio_opts)
            self._audio_queue = asyncio.Queue(maxsize=50)
            self._audio_queue.put_nowait(frame)
            self._audio_task = asyncio.create_task(self._audio_loop())
        except Exception:
            logger.exception("failed to initialize audio, will retry on next packet")
            self._audio_source = None
            self._audio_track = None
            self._audio_init_task = None

    async def _audio_loop(self) -> None:
        assert self._audio_queue is not None
        assert self._audio_source is not None
        while True:
            frame = await self._audio_queue.get()
            try:
                await self._audio_source.capture_frame(frame)
            except Exception:
                logger.exception("audio capture error")

    def _on_audio(self, data: AudioData) -> None:
        if self._audio_source is None:
            if self._audio_init_task is not None:
                return
            self._audio_init_task = asyncio.get_event_loop().create_task(
                self._init_audio(data.frame)
            )
            return

        if self._audio_queue is None:
            return

        try:
            self._audio_queue.put_nowait(data.frame)
        except asyncio.QueueFull:
            pass

    _CEF_CURSOR_MAP: dict[int, str] = {
        0: "default",  # CT_POINTER
        1: "crosshair",  # CT_CROSS
        2: "pointer",  # CT_HAND
        3: "text",  # CT_IBEAM
        4: "wait",  # CT_WAIT
        5: "help",  # CT_HELP
        6: "ew-resize",  # CT_EASTRESIZE
        7: "ns-resize",  # CT_NORTHRESIZE
        8: "nesw-resize",  # CT_NORTHEASTRESIZE
        9: "nwse-resize",  # CT_NORTHWESTRESIZE
        10: "ns-resize",  # CT_SOUTHRESIZE
        11: "nwse-resize",  # CT_SOUTHEASTRESIZE
        12: "nesw-resize",  # CT_SOUTHWESTRESIZE
        13: "ew-resize",  # CT_WESTRESIZE
        14: "ns-resize",  # CT_NORTHSOUTHRESIZE
        15: "ew-resize",  # CT_EASTWESTRESIZE
        16: "nesw-resize",  # CT_NORTHEASTSOUTHWESTRESIZE
        17: "nwse-resize",  # CT_NORTHWESTSOUTHEASTRESIZE
        18: "ew-resize",  # CT_COLUMNRESIZE
        19: "ns-resize",  # CT_ROWRESIZE
        20: "move",  # CT_MIDDLEPANNING
        28: "not-allowed",  # CT_NOTALLOWED
        29: "grab",  # CT_GRAB
        30: "grabbing",  # CT_GRABBING
        32: "move",  # CT_MOVE
    }

    def _on_url_changed(self, url: str) -> None:
        payload = json.dumps({"url": url}).encode()
        self._queue_input(
            self._room.local_participant.publish_data(payload, reliable=True, topic="browser-url")
        )

    def _on_cursor(self, cursor_type: int) -> None:
        css_cursor = self._CEF_CURSOR_MAP.get(cursor_type, "default")
        payload = json.dumps({"cursor": css_cursor}).encode()
        self._queue_input(
            self._room.local_participant.publish_data(
                payload, reliable=True, topic="cursor_changed"
            )
        )

    def _on_paint(self, data: PaintData) -> None:
        self._last_frame = data.frame

    async def _video_loop(self, fps: float) -> None:
        interval = 1.0 / fps
        loop = asyncio.get_event_loop()
        next_time = loop.time()
        while True:
            if self._last_frame is not None and self._video_source is not None:
                ts_us = time.monotonic_ns() // 1000
                self._video_source.capture_frame(self._last_frame, timestamp_us=ts_us)
            next_time += interval
            delay = next_time - loop.time()
            if delay > 0:
                await asyncio.sleep(delay)
            else:
                next_time = loop.time()

    async def aclose(self) -> None:
        if not self._started:
            return

        self._page.off("paint", self._on_paint)
        self._page.off("audio", self._on_audio)
        self._page.off("cursor_changed", self._on_cursor)
        self._page.off("url_changed", self._on_url_changed)

        if hasattr(self, "_on_data_received"):
            self._room.off("data_received", self._on_data_received)
        if hasattr(self, "_on_participant_disconnected"):
            self._room.off("participant_disconnected", self._on_participant_disconnected)

        if self._video_task:
            self._video_task.cancel()
            with contextlib.suppress(asyncio.CancelledError):
                await self._video_task

        if self._audio_task:
            self._audio_task.cancel()
            with contextlib.suppress(asyncio.CancelledError):
                await self._audio_task

        if self._input_task:
            self._input_task.cancel()
            with contextlib.suppress(asyncio.CancelledError):
                await self._input_task

        try:
            if self._video_track:
                await self._room.local_participant.unpublish_track(self._video_track.sid)
            if self._audio_track:
                await self._room.local_participant.unpublish_track(self._audio_track.sid)
        except Exception:
            pass  # tracks may already be gone if room disconnected first

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    if not self._started:
        return

    self._page.off("paint", self._on_paint)
    self._page.off("audio", self._on_audio)
    self._page.off("cursor_changed", self._on_cursor)
    self._page.off("url_changed", self._on_url_changed)

    if hasattr(self, "_on_data_received"):
        self._room.off("data_received", self._on_data_received)
    if hasattr(self, "_on_participant_disconnected"):
        self._room.off("participant_disconnected", self._on_participant_disconnected)

    if self._video_task:
        self._video_task.cancel()
        with contextlib.suppress(asyncio.CancelledError):
            await self._video_task

    if self._audio_task:
        self._audio_task.cancel()
        with contextlib.suppress(asyncio.CancelledError):
            await self._audio_task

    if self._input_task:
        self._input_task.cancel()
        with contextlib.suppress(asyncio.CancelledError):
            await self._input_task

    try:
        if self._video_track:
            await self._room.local_participant.unpublish_track(self._video_track.sid)
        if self._audio_track:
            await self._room.local_participant.unpublish_track(self._audio_track.sid)
    except Exception:
        pass  # tracks may already be gone if room disconnected first
async def start(self) ‑> None
Expand source code
async def start(self) -> None:
    if self._started:
        return
    self._started = True

    opts = self._page._opts

    self._video_source = rtc.VideoSource(opts.width, opts.height, is_screencast=True)

    self._video_track = rtc.LocalVideoTrack.create_video_track(
        "browser-video", self._video_source
    )

    video_opts = rtc.TrackPublishOptions(
        source=rtc.TrackSource.SOURCE_SCREENSHARE,
        video_encoding=rtc.VideoEncoding(max_bitrate=8_000_000, max_framerate=opts.framerate),
        simulcast=False,
    )
    self._page.on("paint", self._on_paint)
    self._page.on("audio", self._on_audio)
    self._page.on("cursor_changed", self._on_cursor)
    self._page.on("url_changed", self._on_url_changed)

    await self._room.local_participant.publish_track(self._video_track, video_opts)

    self._video_task = asyncio.create_task(self._video_loop(opts.framerate))

    # Single persistent task for sending input events to subprocess
    self._input_task = asyncio.create_task(self._input_sender_loop())

    # Register RPC methods for focus management
    @self._room.local_participant.register_rpc_method("browser/request-focus")  # type: ignore[arg-type]
    async def _handle_request_focus(
        data: rtc.rpc.RpcInvocationData,
    ) -> str:
        if self._focus_identity is None:
            self._focus_identity = data.caller_identity
            await self._page.send_focus_event(True)
            await self._broadcast_focus()
            return json.dumps({"granted": True})
        return json.dumps({"granted": False, "holder": self._focus_identity})

    @self._room.local_participant.register_rpc_method("browser/release-focus")  # type: ignore[arg-type]
    async def _handle_release_focus(
        data: rtc.rpc.RpcInvocationData,
    ) -> str:
        if self._focus_identity == data.caller_identity:
            self._focus_identity = None
            await self._page.send_focus_event(False)
            await self._broadcast_focus()
            return json.dumps({"released": True})
        return json.dumps({"released": False})

    @self._room.local_participant.register_rpc_method("browser/navigate")  # type: ignore[arg-type]
    async def _handle_navigate(
        data: rtc.rpc.RpcInvocationData,
    ) -> str:
        payload = json.loads(data.request_body)
        url = payload.get("url", "")
        if url:
            self._queue_input(self._page.navigate(url))
        return json.dumps({"status": "ok"})

    @self._room.local_participant.register_rpc_method("browser/go-back")  # type: ignore[arg-type]
    async def _handle_go_back(
        data: rtc.rpc.RpcInvocationData,
    ) -> str:
        self._queue_input(self._page.go_back())
        return json.dumps({"status": "ok"})

    @self._room.local_participant.register_rpc_method("browser/go-forward")  # type: ignore[arg-type]
    async def _handle_go_forward(
        data: rtc.rpc.RpcInvocationData,
    ) -> str:
        self._queue_input(self._page.go_forward())
        return json.dumps({"status": "ok"})

    # Listen for input data from participants
    @self._room.on("data_received")
    def _on_data_received(packet: rtc.DataPacket) -> None:
        if packet.topic != "browser-input":
            return
        if packet.participant is None:
            return
        if packet.participant.identity != self._focus_identity:
            return
        try:
            events = json.loads(packet.data)
        except (json.JSONDecodeError, UnicodeDecodeError):
            return
        for evt in events:
            self._dispatch_input(evt)

    self._on_data_received = _on_data_received

    # Release focus when the holder disconnects
    @self._room.on("participant_disconnected")
    def _on_participant_disconnected(participant: rtc.RemoteParticipant) -> None:
        if participant.identity == self._focus_identity:
            self._focus_identity = None
            self._queue_input(self._page.send_focus_event(False))
            self._queue_input(self._broadcast_focus())

    self._on_participant_disconnected = _on_participant_disconnected
class PaintData (dirty_rects: list[tuple[int, int, int, int]],
frame: rtc.VideoFrame,
width: int,
height: int)
Expand source code
@dataclass
class PaintData:
    dirty_rects: list[tuple[int, int, int, int]]
    frame: rtc.VideoFrame
    width: int
    height: int

PaintData(dirty_rects: 'list[tuple[int, int, int, int]]', frame: 'rtc.VideoFrame', width: 'int', height: 'int')

Instance variables

var dirty_rects : list[tuple[int, int, int, int]]
var frameVideoFrame
var height : int
var width : int