Module livekit.agents.cli.watcher

Classes

class WatchClient (worker: Worker, cli_args: proto.CliArgs, loop: asyncio.AbstractEventLoop | None = None)
Expand source code
class WatchClient:
    def __init__(
        self,
        worker: Worker,
        cli_args: proto.CliArgs,
        loop: asyncio.AbstractEventLoop | None = None,
    ) -> None:
        self._loop = loop or asyncio.get_event_loop()
        self._worker = worker
        self._cli_args = cli_args

    def start(self) -> None:
        self._main_task = self._loop.create_task(self._run())

    @utils.log_exceptions(logger=logger)
    async def _run(self) -> None:
        assert self._cli_args.mp_cch
        try:
            self._cch = await utils.aio.duplex_unix._AsyncDuplex.open(
                self._cli_args.mp_cch
            )

            await channel.asend_message(self._cch, proto.ReloadJobsRequest())

            while True:
                try:
                    msg = await channel.arecv_message(self._cch, proto.IPC_MESSAGES)
                except utils.aio.duplex_unix.DuplexClosed:
                    break

                if isinstance(msg, proto.ActiveJobsRequest):
                    jobs = self._worker.active_jobs
                    await channel.asend_message(
                        self._cch,
                        proto.ActiveJobsResponse(
                            jobs=jobs, reload_count=self._cli_args.reload_count
                        ),
                    )
                elif isinstance(msg, proto.ReloadJobsResponse):
                    # TODO(theomonnom): wait for the worker to be fully initialized/connected
                    await self._worker._reload_jobs(msg.jobs)
                    await channel.asend_message(self._cch, proto.Reloaded())
        except utils.aio.duplex_unix.DuplexClosed:
            pass

    async def aclose(self) -> None:
        if not self._main_task:
            return

        self._main_task.cancel()
        with contextlib.suppress(asyncio.CancelledError):
            await self._main_task

        await self._cch.aclose()

Methods

async def aclose(self) ‑> None
def start(self) ‑> None
class WatchServer (worker_runner: Callable[[proto.CliArgs], Any], main_file: pathlib.Path, cli_args: proto.CliArgs, loop: asyncio.AbstractEventLoop)
Expand source code
class WatchServer:
    def __init__(
        self,
        worker_runner: Callable[[proto.CliArgs], Any],
        main_file: pathlib.Path,
        cli_args: proto.CliArgs,
        loop: asyncio.AbstractEventLoop,
    ) -> None:
        self._mp_pch, cli_args.mp_cch = socket.socketpair()
        self._cli_args = cli_args
        self._worker_runner = worker_runner
        self._main_file = main_file
        self._loop = loop

        self._recv_jobs_fut = asyncio.Future[None]()
        self._worker_reloading = False

    async def run(self) -> None:
        watch_paths = _find_watchable_paths(self._main_file)
        for pth in watch_paths:
            logger.log(DEV_LEVEL, f"Watching {pth}")

        self._pch = await utils.aio.duplex_unix._AsyncDuplex.open(self._mp_pch)
        read_ipc_task = self._loop.create_task(self._read_ipc_task())

        try:
            await watchfiles.arun_process(
                *watch_paths,
                target=self._worker_runner,
                args=(self._cli_args,),
                watch_filter=watchfiles.filters.PythonFilter(),
                callback=self._on_reload,
            )
        finally:
            await utils.aio.gracefully_cancel(read_ipc_task)
            await self._pch.aclose()

    async def _on_reload(self, _: Set[watchfiles.main.FileChange]) -> None:
        if self._worker_reloading:
            return

        self._worker_reloading = True

        try:
            await channel.asend_message(self._pch, proto.ActiveJobsRequest())
            self._recv_jobs_fut = asyncio.Future()
            with contextlib.suppress(asyncio.TimeoutError):
                # wait max 1.5s to get the active jobs
                await asyncio.wait_for(self._recv_jobs_fut, timeout=1.5)
        finally:
            self._cli_args.reload_count += 1

    @utils.log_exceptions(logger=logger)
    async def _read_ipc_task(self) -> None:
        active_jobs = []
        while True:
            msg = await channel.arecv_message(self._pch, proto.IPC_MESSAGES)
            if isinstance(msg, proto.ActiveJobsResponse):
                if msg.reload_count != self._cli_args.reload_count:
                    continue

                active_jobs = msg.jobs
                with contextlib.suppress(asyncio.InvalidStateError):
                    self._recv_jobs_fut.set_result(None)
            if isinstance(msg, proto.ReloadJobsRequest):
                await channel.asend_message(
                    self._pch, proto.ReloadJobsResponse(jobs=active_jobs)
                )
            if isinstance(msg, proto.Reloaded):
                self._worker_reloading = False

Methods

async def run(self) ‑> None