Module livekit.agents.cli.watcher

Classes

class WatchClient (worker: AgentServer,
reload_addr: str,
loop: asyncio.AbstractEventLoop | None = None)
Expand source code
class WatchClient:
    """Connects to Go CLI's reload server over TCP using DevMessage protobuf."""

    def __init__(
        self,
        worker: AgentServer,
        reload_addr: str,
        loop: asyncio.AbstractEventLoop | None = None,
    ) -> None:
        self._loop = loop or asyncio.get_event_loop()
        self._worker = worker
        self._reload_addr = reload_addr
        self._main_task: asyncio.Task | None = None

    def start(self) -> None:
        self._main_task = self._loop.create_task(self._run())

    @utils.log_exceptions(logger=logger)
    async def _run(self) -> None:
        host, port_str = self._reload_addr.rsplit(":", 1)
        reader, writer = await asyncio.open_connection(host, int(port_str))

        try:
            # On startup: send GetRunningJobsRequest to Go, recv response, reload jobs
            req = agent_dev.AgentDevMessage(
                get_running_jobs_request=agent_dev.GetRunningAgentJobsRequest()
            )
            await _send_proto(writer, req.SerializeToString())

            data = await _recv_proto(reader)
            resp = agent_dev.AgentDevMessage()
            resp.ParseFromString(data)

            if resp.HasField("get_running_jobs_response"):
                jobs_resp = resp.get_running_jobs_response
                if jobs_resp.jobs:
                    running_jobs = [proto.running_job_from_proto(j) for j in jobs_resp.jobs]
                    logger.info(f"reloading {len(running_jobs)} job(s)")
                    await self._worker._reload_jobs(running_jobs)

            # Listen for GetRunningJobsRequest from Go (capture before restart)
            while True:
                try:
                    data = await _recv_proto(reader)
                except (asyncio.IncompleteReadError, ConnectionError, OSError):
                    break

                msg = agent_dev.AgentDevMessage()
                msg.ParseFromString(data)

                if msg.HasField("get_running_jobs_request"):
                    jobs = self._worker.active_jobs
                    job_protos = [proto.running_job_to_proto(j) for j in jobs]
                    resp = agent_dev.AgentDevMessage(
                        get_running_jobs_response=agent_dev.GetRunningAgentJobsResponse(
                            jobs=job_protos
                        )
                    )
                    await _send_proto(writer, resp.SerializeToString())
        except (asyncio.IncompleteReadError, ConnectionError, OSError):
            pass
        finally:
            writer.close()
            with contextlib.suppress(Exception):
                await writer.wait_closed()

    async def aclose(self) -> None:
        if not self._main_task:
            return

        self._main_task.cancel()
        with contextlib.suppress(asyncio.CancelledError):
            await self._main_task

Connects to Go CLI's reload server over TCP using DevMessage protobuf.

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    if not self._main_task:
        return

    self._main_task.cancel()
    with contextlib.suppress(asyncio.CancelledError):
        await self._main_task
def start(self) ‑> None
Expand source code
def start(self) -> None:
    self._main_task = self._loop.create_task(self._run())