Expand source code
class BrowserServer:
def __init__(
self,
duplex: Duplex,
shms: list[mp_shm.SharedMemory],
page_id: int,
send_lock: threading.Lock,
):
self._duplex = duplex
self._shms = shms
self._page_id = page_id
self._send_lock = send_lock
self._view_width = 0
self._view_height = 0
self._closing = False
self._shm_available = []
for _ in shms:
ev = threading.Event()
ev.set()
self._shm_available.append(ev)
@staticmethod
def create(
*,
duplex: Duplex,
create_req: proto.CreateBrowserRequest,
browser_app,
send_lock: threading.Lock,
) -> "BrowserServer":
logger.debug(
"creating browser",
extra={
"page_id": create_req.page_id,
"url": create_req.url,
"framerate": create_req.framerate,
"width": create_req.width,
"height": create_req.height,
"shm_names": create_req.shm_names,
},
)
import lkcef_python as lkcef
opts = lkcef.BrowserOptions()
opts.framerate = create_req.framerate
opts.width = create_req.width
opts.height = create_req.height
shms = [mp_shm.SharedMemory(name=name) for name in create_req.shm_names]
bserver = BrowserServer(duplex, shms, create_req.page_id, send_lock)
opts.created_callback = bserver._browser_created
opts.paint_callback = bserver._paint
opts.close_callback = bserver._closed
opts.audio_stream_started_callback = bserver._audio_stream_started
opts.audio_stream_packet_callback = bserver._audio_packet
opts.audio_stream_stopped_callback = bserver._audio_stream_stopped
opts.cursor_change_callback = bserver._cursor_changed
opts.url_changed_callback = bserver._url_changed
browser_app.create_browser(create_req.url, opts)
return bserver
def _browser_created(self, impl):
browser_id = impl.identifier()
logger.debug(
"browser created",
extra={"browser_id": browser_id, "page_id": self._page_id},
)
self._impl = impl
try:
with self._send_lock:
channel.send_message(
self._duplex,
proto.CreateBrowserResponse(page_id=self._page_id, browser_id=browser_id),
)
except DuplexClosed:
self._closing = True
logger.warning("duplex closed, failed to send CreateBrowserResponse")
def _paint(self, frame_data):
if self._closing:
return # make sure to not use the shm
# Find a free SHM buffer
buffer_id = -1
for i, ev in enumerate(self._shm_available):
if ev.is_set():
buffer_id = i
break
if buffer_id == -1:
logger.warning("paint frame dropped — all SHM buffers busy")
return
self._shm_available[buffer_id].clear()
acq = proto.AcquirePaintData()
acq.page_id = self._page_id
acq.buffer_id = buffer_id
acq.width = frame_data.width
acq.height = frame_data.height
dirty_rects = []
for rect in frame_data.dirty_rects:
dirty_rects.append((rect.x, rect.y, rect.width, rect.height))
acq.dirty_rects = dirty_rects
old_width = self._view_width
old_height = self._view_height
self._view_width = frame_data.width
self._view_height = frame_data.height
proto.copy_paint_data(acq, old_width, old_height, frame_data.buffer, self._shms[buffer_id].buf)
try:
with self._send_lock:
channel.send_message(self._duplex, acq)
except DuplexClosed:
self._shm_available[buffer_id].set()
self._closing = True
logger.warning("duplex closed, stopping paint")
def _audio_stream_started(self, sample_rate: int, channels: int, frames: int):
try:
with self._send_lock:
channel.send_message(
self._duplex,
proto.AudioStreamStarted(
page_id=self._page_id,
sample_rate=sample_rate,
channels=channels,
frames_per_buffer=frames,
),
)
except DuplexClosed:
self._closing = True
logger.warning("duplex closed, failed to send AudioStreamStarted")
def _audio_packet(
self,
data_bytes: bytes,
frames: int,
channels: int,
sample_rate: int,
pts: int,
):
if self._closing:
return
try:
with self._send_lock:
channel.send_message(
self._duplex,
proto.AcquireAudioData(
page_id=self._page_id,
sample_rate=sample_rate,
channels=channels,
frames=frames,
pts=pts,
data=data_bytes,
),
)
except DuplexClosed:
self._closing = True
logger.warning("duplex closed, stopping audio")
def _audio_stream_stopped(self):
try:
with self._send_lock:
channel.send_message(
self._duplex,
proto.AudioStreamStopped(page_id=self._page_id),
)
except DuplexClosed:
self._closing = True
logger.warning("duplex closed, failed to send AudioStreamStopped")
def _cursor_changed(self, cursor_type: int):
if self._closing:
return
try:
with self._send_lock:
channel.send_message(
self._duplex,
proto.CursorChanged(
page_id=self._page_id, cursor_type=cursor_type
),
)
except DuplexClosed:
self._closing = True
logger.warning("duplex closed, failed to send CursorChanged")
def _url_changed(self, url: str):
if self._closing:
return
try:
with self._send_lock:
channel.send_message(
self._duplex,
proto.UrlChanged(
page_id=self._page_id, url=url
),
)
except DuplexClosed:
self._closing = True
logger.warning("duplex closed, failed to send UrlChanged")
def _closed(self) -> None:
try:
with self._send_lock:
channel.send_message(self._duplex, proto.BrowserClosed(page_id=self._page_id))
except DuplexClosed:
self._closing = True
logger.warning("duplex closed, failed to send BrowserClosed")
def handle_release_paint(self, msg: proto.ReleasePaintData):
self._shm_available[msg.buffer_id].set()
def handle_close(self, msg: proto.CloseBrowserRequest):
self._closing = True
self._impl.close()