Module livekit.browser
CEF (Chromium Embedded Framework) bindings for LiveKit.
Install the engine with download() and then create a
BrowserContext to render web pages off-screen.
Sub-modules
livekit.browser.loglivekit.browser.proclivekit.browser.proc_mainlivekit.browser.protolivekit.browser.version
Functions
def download(*, version: str | None = None, dest: Path | None = None) ‑> pathlib.Path-
Expand source code
def download(*, version: str | None = None, dest: Path | None = None) -> Path: """Download CEF binaries for the current platform from GitHub Releases. Args: version: Version to download. Defaults to this package's version. dest: Destination directory. Defaults to ~/.cache/livekit/browser/<version>/<platform>/. Returns: Path to the directory containing the downloaded binaries. """ if version is None: version = __version__ plat = _detect_platform() if dest is None: dest = _default_cache_dir() / version / plat marker = dest / ".downloaded" if marker.exists(): logger.debug("CEF binaries already downloaded at %s", dest) return dest url = _GITHUB_RELEASE_URL.format(version=version, platform=plat) logger.info("Downloading CEF binaries from %s", url) dest.mkdir(parents=True, exist_ok=True) tarball = dest / "lkcef.tar.gz" try: urllib.request.urlretrieve(url, tarball) with tarfile.open(tarball, "r:gz") as tf: tf.extractall(path=dest) marker.touch() logger.info("CEF binaries downloaded to %s", dest) finally: tarball.unlink(missing_ok=True) return destDownload CEF binaries for the current platform from GitHub Releases.
Args
version- Version to download. Defaults to this package's version.
dest- Destination directory. Defaults to ~/.cache/livekit/browser/
/ /.
Returns
Path to the directory containing the downloaded binaries.
def get_resources_dir(*, version: str | None = None) ‑> pathlib.Path-
Expand source code
def get_resources_dir(*, version: str | None = None) -> Path: """Return path to downloaded CEF resources. Raises: FileNotFoundError: If CEF binaries have not been downloaded yet. """ if version is None: version = __version__ plat = _detect_platform() dest = _default_cache_dir() / version / plat marker = dest / ".downloaded" if not marker.exists(): raise FileNotFoundError( f"CEF binaries not found at {dest}. " f"Run `livekit.browser.download()` or `python -m livekit.browser.download` first." ) return destReturn path to downloaded CEF resources.
Raises
FileNotFoundError- If CEF binaries have not been downloaded yet.
Classes
class AudioData (frame: rtc.AudioFrame, pts: int)-
Expand source code
@dataclass class AudioData: frame: rtc.AudioFrame pts: intAudioData(frame: 'rtc.AudioFrame', pts: 'int')
Instance variables
var frame : AudioFramevar pts : int
class BrowserContext (*, dev_mode: bool, remote_debugging_port: int = 0)-
Expand source code
class BrowserContext: def __init__(self, *, dev_mode: bool, remote_debugging_port: int = 0) -> None: self._mp_ctx = mp.get_context("spawn") self._pages: dict[int, BrowserPage] = {} self._dev_mode = dev_mode self._initialized = False self._next_page_id = 1 self._remote_debugging_port = remote_debugging_port self._closed_event = asyncio.Event() self._proc = None self._duplex = None self._main_atask = None self._root_cache_path = None async def initialize(self) -> None: from .download import get_resources_dir resources_dir = str(get_resources_dir()) mp_pch, mp_cch = socket.socketpair() self._duplex = await AsyncDuplex.open(mp_pch) self._proc = self._mp_ctx.Process(target=proc_main.main, args=(mp_cch, resources_dir)) # On Linux aarch64, libcef.so is too large for late TLS allocation. # Set LD_PRELOAD so the dynamic linker reserves TLS early. _preload_restore = None if sys.platform.startswith("linux"): libcef = os.path.join(resources_dir, "libcef.so") if os.path.exists(libcef): _preload_restore = os.environ.get("LD_PRELOAD") old = _preload_restore or "" os.environ["LD_PRELOAD"] = libcef + (":" + old if old else "") self._proc.start() if _preload_restore is not None: os.environ["LD_PRELOAD"] = _preload_restore elif "LD_PRELOAD" in os.environ and sys.platform.startswith("linux"): del os.environ["LD_PRELOAD"] mp_cch.close() try: await self._initialize_context(resources_dir) except BaseException: await self._cleanup_process() raise async def _initialize_context(self, resources_dir: str) -> None: if not self._remote_debugging_port: with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: s.bind(("", 0)) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self._remote_debugging_port = s.getsockname()[1] logger.debug("using remote debugging port %d", self._remote_debugging_port) self._root_cache_path = tempfile.mkdtemp(prefix="lkcef_cache_") await channel.asend_message( self._duplex, proto.InitializeContextRequest( dev_mode=self._dev_mode, remote_debugging_port=self._remote_debugging_port, root_cache_path=self._root_cache_path, ), ) resp = await channel.arecv_message(self._duplex, proto.IPC_MESSAGES) assert isinstance(resp, proto.ContextInitializedResponse) self._initialized = True logger.debug("browser context initialized", extra={"pid": self._proc.pid}) self._main_atask = asyncio.create_task(self._main_task(self._duplex)) async def _cleanup_process(self) -> None: if self._duplex: try: await self._duplex.aclose() except DuplexClosed: pass # Subprocess has its own 5s watchdog — just wait for it if self._proc and self._proc.is_alive(): self._proc.join(timeout=15) if self._proc.is_alive(): logger.warning("subprocess did not exit, killing") self._proc.kill() self._proc.join() if self._proc: self._proc.close() if self._root_cache_path: shutil.rmtree(self._root_cache_path, ignore_errors=True) @asynccontextmanager async def playwright(self, timeout: float | None = None): if not self._initialized: raise RuntimeError("BrowserContext not initialized") from playwright.async_api import async_playwright async with async_playwright() as p: url = f"http://localhost:{self._remote_debugging_port}" browser = await p.chromium.connect_over_cdp(url, timeout=timeout) try: yield browser finally: await browser.close() @_log_exceptions(_logger=logger) async def _main_task(self, duplex: AsyncDuplex) -> None: while True: try: msg = await channel.arecv_message(duplex, proto.IPC_MESSAGES) except DuplexClosed: break try: if isinstance(msg, proto.CreateBrowserResponse): page = self._pages[msg.page_id] await page._handle_created(msg) elif isinstance(msg, proto.AcquireAudioData): page = self._pages[msg.page_id] page._handle_audio_data(msg) elif isinstance(msg, proto.AcquirePaintData): page = self._pages[msg.page_id] await page._handle_paint(msg) elif isinstance(msg, proto.AudioStreamStarted): pass elif isinstance(msg, proto.AudioStreamStopped): pass elif isinstance(msg, proto.CursorChanged): page = self._pages[msg.page_id] page.emit("cursor_changed", msg.cursor_type) elif isinstance(msg, proto.UrlChanged): page = self._pages[msg.page_id] page.emit("url_changed", msg.url) elif isinstance(msg, proto.BrowserClosed): page = self._pages[msg.page_id] await page._handle_close(msg) except Exception: logger.exception("error handling IPC message %s", type(msg).__name__) # Subprocess exited — resolve any pending page futures so cleanup doesn't hang for page in self._pages.values(): if not page._close_fut.done(): page._close_fut.set_result(None) if not page._created_fut.done(): page._created_fut.set_result(None) self._closed_event.set() async def new_page( self, *, url: str, width: int = 1920, height: int = 1080, framerate: int = 24 ) -> BrowserPage: if not self._initialized: raise RuntimeError("BrowserContext not initialized") page_id = self._next_page_id self._next_page_id += 1 page = BrowserPage( self._mp_ctx, _PageOptions( page_id=page_id, url=url, width=width, height=height, framerate=framerate, ), self._duplex, ) self._pages[page_id] = page await page.start() return page async def aclose(self) -> None: if not self._initialized: return self._initialized = False # Close all pages (ensure shm cleanup even on errors) for page in list(self._pages.values()): try: await page.aclose() except Exception: page._cleanup_shm() # Cancel the main task if self._main_atask: self._main_atask.cancel() with contextlib.suppress(asyncio.CancelledError): await self._main_atask await self._cleanup_process()Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: if not self._initialized: return self._initialized = False # Close all pages (ensure shm cleanup even on errors) for page in list(self._pages.values()): try: await page.aclose() except Exception: page._cleanup_shm() # Cancel the main task if self._main_atask: self._main_atask.cancel() with contextlib.suppress(asyncio.CancelledError): await self._main_atask await self._cleanup_process() async def initialize(self) ‑> None-
Expand source code
async def initialize(self) -> None: from .download import get_resources_dir resources_dir = str(get_resources_dir()) mp_pch, mp_cch = socket.socketpair() self._duplex = await AsyncDuplex.open(mp_pch) self._proc = self._mp_ctx.Process(target=proc_main.main, args=(mp_cch, resources_dir)) # On Linux aarch64, libcef.so is too large for late TLS allocation. # Set LD_PRELOAD so the dynamic linker reserves TLS early. _preload_restore = None if sys.platform.startswith("linux"): libcef = os.path.join(resources_dir, "libcef.so") if os.path.exists(libcef): _preload_restore = os.environ.get("LD_PRELOAD") old = _preload_restore or "" os.environ["LD_PRELOAD"] = libcef + (":" + old if old else "") self._proc.start() if _preload_restore is not None: os.environ["LD_PRELOAD"] = _preload_restore elif "LD_PRELOAD" in os.environ and sys.platform.startswith("linux"): del os.environ["LD_PRELOAD"] mp_cch.close() try: await self._initialize_context(resources_dir) except BaseException: await self._cleanup_process() raise async def new_page(self, *, url: str, width: int = 1920, height: int = 1080, framerate: int = 24) ‑> BrowserPage-
Expand source code
async def new_page( self, *, url: str, width: int = 1920, height: int = 1080, framerate: int = 24 ) -> BrowserPage: if not self._initialized: raise RuntimeError("BrowserContext not initialized") page_id = self._next_page_id self._next_page_id += 1 page = BrowserPage( self._mp_ctx, _PageOptions( page_id=page_id, url=url, width=width, height=height, framerate=framerate, ), self._duplex, ) self._pages[page_id] = page await page.start() return page async def playwright(self, timeout: float | None = None)-
Expand source code
@asynccontextmanager async def playwright(self, timeout: float | None = None): if not self._initialized: raise RuntimeError("BrowserContext not initialized") from playwright.async_api import async_playwright async with async_playwright() as p: url = f"http://localhost:{self._remote_debugging_port}" browser = await p.chromium.connect_over_cdp(url, timeout=timeout) try: yield browser finally: await browser.close()
class BrowserPage (mp_ctx: mpc.SpawnContext, opts: _PageOptions, ctx_duplex: AsyncDuplex)-
Expand source code
class BrowserPage(EventEmitter[EventTypes]): def __init__( self, mp_ctx: mpc.SpawnContext, opts: _PageOptions, ctx_duplex: AsyncDuplex, ) -> None: super().__init__() self._mp_ctx = mp_ctx self._opts = opts self._ctx_duplex = ctx_duplex self._view_width = 0 self._view_height = 0 self._created_fut = asyncio.Future() self._close_fut = asyncio.Future() self._closed = False @property def id(self) -> int: return self._opts.page_id _NUM_SHM_BUFFERS = 2 async def start(self) -> None: import multiprocessing.shared_memory as mp_shm shm_size = proto.SHM_MAX_WIDTH * proto.SHM_MAX_HEIGHT * 4 self._shms = [] shm_names = [] for _ in range(self._NUM_SHM_BUFFERS): name = f"lkcef_browser_{_shortuuid()}" shm = mp_shm.SharedMemory(create=True, size=shm_size, name=name) self._shms.append(shm) shm_names.append(name) self._framebuffer = rtc.VideoFrame( proto.SHM_MAX_WIDTH, proto.SHM_MAX_HEIGHT, rtc.VideoBufferType.BGRA, bytearray(proto.SHM_MAX_WIDTH * proto.SHM_MAX_HEIGHT * 4), ) req = proto.CreateBrowserRequest( page_id=self._opts.page_id, width=self._opts.width, height=self._opts.height, shm_names=shm_names, url=self._opts.url, framerate=self._opts.framerate, ) await channel.asend_message(self._ctx_duplex, req) # TODO(theomonnom): create timeout (would prevent never resolving futures if the # browser process crashed for some reasons) await asyncio.shield(self._created_fut) async def aclose(self) -> None: if self._closed: return self._closed = True try: await channel.asend_message( self._ctx_duplex, proto.CloseBrowserRequest(page_id=self.id) ) await asyncio.shield(self._close_fut) except DuplexClosed: pass # Subprocess already closed (e.g., window closed manually) finally: self._cleanup_shm() def _cleanup_shm(self) -> None: for shm in self._shms: try: shm.close() except Exception: pass try: shm.unlink() except Exception: pass self._shms.clear() async def _handle_created(self, msg: proto.CreateBrowserResponse) -> None: self._created_fut.set_result(None) async def _handle_paint(self, acq: proto.AcquirePaintData) -> None: old_width = self._view_width old_height = self._view_height self._view_width = acq.width self._view_height = acq.height # TODO(theomonnom): remove hacky alloc-free resizing self._framebuffer._width = acq.width self._framebuffer._height = acq.height proto.copy_paint_data(acq, old_width, old_height, self._shms[acq.buffer_id].buf, self._framebuffer.data) release_paint = proto.ReleasePaintData(page_id=acq.page_id, buffer_id=acq.buffer_id) await channel.asend_message(self._ctx_duplex, release_paint) paint_data = PaintData( dirty_rects=acq.dirty_rects, frame=self._framebuffer, width=acq.width, height=acq.height, ) self.emit("paint", paint_data) async def send_mouse_move(self, x: int, y: int, mouse_leave: bool = False) -> None: await channel.asend_message( self._ctx_duplex, proto.SendMouseMoveEvent(page_id=self.id, x=x, y=y, mouse_leave=mouse_leave), ) async def send_mouse_click( self, x: int, y: int, button: int = 0, mouse_up: bool = False, click_count: int = 1 ) -> None: await channel.asend_message( self._ctx_duplex, proto.SendMouseClickEvent( page_id=self.id, x=x, y=y, button_type=button, mouse_up=mouse_up, click_count=click_count ), ) async def send_mouse_wheel(self, x: int, y: int, delta_x: int = 0, delta_y: int = 0) -> None: await channel.asend_message( self._ctx_duplex, proto.SendMouseWheelEvent(page_id=self.id, x=x, y=y, delta_x=delta_x, delta_y=delta_y), ) async def send_key_event( self, type: int, modifiers: int, windows_key_code: int, native_key_code: int = 0, character: int = 0 ) -> None: await channel.asend_message( self._ctx_duplex, proto.SendKeyEvent( page_id=self.id, type=type, modifiers=modifiers, windows_key_code=windows_key_code, native_key_code=native_key_code, character=character, ), ) async def navigate(self, url: str) -> None: await channel.asend_message( self._ctx_duplex, proto.NavigateRequest(page_id=self.id, url=url), ) async def go_back(self) -> None: await channel.asend_message( self._ctx_duplex, proto.GoBackRequest(page_id=self.id), ) async def go_forward(self) -> None: await channel.asend_message( self._ctx_duplex, proto.GoForwardRequest(page_id=self.id), ) async def send_focus_event(self, set_focus: bool = True) -> None: await channel.asend_message( self._ctx_duplex, proto.SendFocusEvent(page_id=self.id, set_focus=set_focus), ) def _handle_audio_data(self, msg: proto.AcquireAudioData) -> None: if msg.channels <= 0 or msg.sample_rate <= 0 or msg.frames <= 0: return frame = rtc.AudioFrame( data=msg.data, sample_rate=msg.sample_rate, num_channels=msg.channels, samples_per_channel=msg.frames, ) self.emit("audio", AudioData(frame=frame, pts=msg.pts)) async def _handle_close(self, msg: proto.BrowserClosed) -> None: logger.debug("browser page closed", extra={"page_id": self.id}) self._close_fut.set_result(None)Abstract base class for generic types.
On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::
class Mapping[KT, VT]: def __getitem__(self, key: KT) -> VT: ... # Etc.On older versions of Python, however, generic classes have to explicitly inherit from Generic.
After a class has been declared to be generic, it can then be used as follows::
def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return defaultInitialize a new instance of EventEmitter.
Ancestors
- EventEmitter
- typing.Generic
Instance variables
prop id : int-
Expand source code
@property def id(self) -> int: return self._opts.page_id
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: if self._closed: return self._closed = True try: await channel.asend_message( self._ctx_duplex, proto.CloseBrowserRequest(page_id=self.id) ) await asyncio.shield(self._close_fut) except DuplexClosed: pass # Subprocess already closed (e.g., window closed manually) finally: self._cleanup_shm() async def go_back(self) ‑> None-
Expand source code
async def go_back(self) -> None: await channel.asend_message( self._ctx_duplex, proto.GoBackRequest(page_id=self.id), ) async def go_forward(self) ‑> None-
Expand source code
async def go_forward(self) -> None: await channel.asend_message( self._ctx_duplex, proto.GoForwardRequest(page_id=self.id), ) -
Expand source code
async def navigate(self, url: str) -> None: await channel.asend_message( self._ctx_duplex, proto.NavigateRequest(page_id=self.id, url=url), ) async def send_focus_event(self, set_focus: bool = True) ‑> None-
Expand source code
async def send_focus_event(self, set_focus: bool = True) -> None: await channel.asend_message( self._ctx_duplex, proto.SendFocusEvent(page_id=self.id, set_focus=set_focus), ) async def send_key_event(self,
type: int,
modifiers: int,
windows_key_code: int,
native_key_code: int = 0,
character: int = 0) ‑> None-
Expand source code
async def send_key_event( self, type: int, modifiers: int, windows_key_code: int, native_key_code: int = 0, character: int = 0 ) -> None: await channel.asend_message( self._ctx_duplex, proto.SendKeyEvent( page_id=self.id, type=type, modifiers=modifiers, windows_key_code=windows_key_code, native_key_code=native_key_code, character=character, ), ) async def send_mouse_click(self, x: int, y: int, button: int = 0, mouse_up: bool = False, click_count: int = 1) ‑> None-
Expand source code
async def send_mouse_click( self, x: int, y: int, button: int = 0, mouse_up: bool = False, click_count: int = 1 ) -> None: await channel.asend_message( self._ctx_duplex, proto.SendMouseClickEvent( page_id=self.id, x=x, y=y, button_type=button, mouse_up=mouse_up, click_count=click_count ), ) async def send_mouse_move(self, x: int, y: int, mouse_leave: bool = False) ‑> None-
Expand source code
async def send_mouse_move(self, x: int, y: int, mouse_leave: bool = False) -> None: await channel.asend_message( self._ctx_duplex, proto.SendMouseMoveEvent(page_id=self.id, x=x, y=y, mouse_leave=mouse_leave), ) async def send_mouse_wheel(self, x: int, y: int, delta_x: int = 0, delta_y: int = 0) ‑> None-
Expand source code
async def send_mouse_wheel(self, x: int, y: int, delta_x: int = 0, delta_y: int = 0) -> None: await channel.asend_message( self._ctx_duplex, proto.SendMouseWheelEvent(page_id=self.id, x=x, y=y, delta_x=delta_x, delta_y=delta_y), ) async def start(self) ‑> None-
Expand source code
async def start(self) -> None: import multiprocessing.shared_memory as mp_shm shm_size = proto.SHM_MAX_WIDTH * proto.SHM_MAX_HEIGHT * 4 self._shms = [] shm_names = [] for _ in range(self._NUM_SHM_BUFFERS): name = f"lkcef_browser_{_shortuuid()}" shm = mp_shm.SharedMemory(create=True, size=shm_size, name=name) self._shms.append(shm) shm_names.append(name) self._framebuffer = rtc.VideoFrame( proto.SHM_MAX_WIDTH, proto.SHM_MAX_HEIGHT, rtc.VideoBufferType.BGRA, bytearray(proto.SHM_MAX_WIDTH * proto.SHM_MAX_HEIGHT * 4), ) req = proto.CreateBrowserRequest( page_id=self._opts.page_id, width=self._opts.width, height=self._opts.height, shm_names=shm_names, url=self._opts.url, framerate=self._opts.framerate, ) await channel.asend_message(self._ctx_duplex, req) # TODO(theomonnom): create timeout (would prevent never resolving futures if the # browser process crashed for some reasons) await asyncio.shield(self._created_fut)
Inherited members
class PaintData (dirty_rects: list[tuple[int, int, int, int]],
frame: rtc.VideoFrame,
width: int,
height: int)-
Expand source code
@dataclass class PaintData: dirty_rects: list[tuple[int, int, int, int]] frame: rtc.VideoFrame width: int height: intPaintData(dirty_rects: 'list[tuple[int, int, int, int]]', frame: 'rtc.VideoFrame', width: 'int', height: 'int')
Instance variables
var dirty_rects : list[tuple[int, int, int, int]]var frame : VideoFramevar height : intvar width : int