Module livekit.agents.cli

Sub-modules

livekit.agents.cli.discover
livekit.agents.cli.readchar
livekit.agents.cli.tcp_console
livekit.agents.cli.watcher

Functions

def run_app(server: AgentServer | WorkerOptions) ‑> None
Expand source code
def run_app(server: AgentServer | WorkerOptions) -> None:
    """Run the agent via the (deprecated) rich Python CLI.

    This is the default entry used by ``python myagent.py <command>``. The rich
    CLI lives in ``._legacy`` and is being phased out in favor of the LiveKit CLI
    (``lk agent ...``) and the thin interface in ``livekit.agents.__main__``.
    """
    from . import _legacy

    _legacy.run_app(server)

Run the agent via the (deprecated) rich Python CLI.

This is the default entry used by python myagent.py <command>. The rich CLI lives in ._legacy and is being phased out in favor of the LiveKit CLI (lk agent …) and the thin interface in livekit.agents.__main__.

Classes

class AgentsConsole
Expand source code
class AgentsConsole:
    """Minimal console stub for TCP console mode (Go CLI handles the TUI)."""

    _instance: AgentsConsole | None = None

    @classmethod
    def get_instance(cls) -> AgentsConsole:
        if cls._instance is None:
            cls._instance = cls()
        return cls._instance

    def __init__(self) -> None:
        import datetime
        import pathlib

        self._lock = threading.Lock()
        self._io_acquired = False
        self._io_acquired_event = threading.Event()
        self._enabled = False
        self._record = False
        self._console_mode: ConsoleMode = "audio"
        self._tcp_transport: TcpSessionTransport | None = None
        self._tcp_audio_input: TcpAudioInput | None = None
        self._tcp_audio_output: TcpAudioOutput | None = None
        self._session_directory = pathlib.Path(
            "console-recordings",
            f"session-{datetime.datetime.now().strftime('%m-%d-%H%M%S')}",
        )

    def acquire_io(self, *, loop: asyncio.AbstractEventLoop, session: AgentSession) -> None:
        with self._lock:
            if self._io_acquired:
                raise RuntimeError("the ConsoleIO was already acquired by another session")

            if asyncio.get_running_loop() != loop:
                raise RuntimeError(
                    "the ConsoleIO must be acquired in the same asyncio loop as the session"
                )

            self._io_acquired = True
            self._io_loop = loop
            self._io_context = contextvars.copy_context()

            assert self._tcp_transport is not None
            assert self._tcp_audio_input is not None
            assert self._tcp_audio_output is not None
            self._io_audio_input = self._tcp_audio_input
            self._io_audio_output = self._tcp_audio_output

            self._io_transcription_sync = TranscriptSynchronizer(
                next_in_chain_audio=self._io_audio_output,
                next_in_chain_text=None,
            )
            self._io_acquired_event.set()
            self._io_session = session

        if session:
            self._update_sess_io(
                session,
                self.console_mode,
                self._io_audio_input,
                self._io_transcription_sync.audio_output,
                self._io_transcription_sync.text_output,
            )

    @property
    def enabled(self) -> bool:
        return self._enabled

    @enabled.setter
    def enabled(self, val: bool) -> None:
        self._enabled = val

    @property
    def record(self) -> bool:
        return self._record

    @record.setter
    def record(self, val: bool) -> None:
        self._record = val

    @property
    def session_directory(self) -> Any:
        return self._session_directory

    @property
    def io_acquired(self) -> bool:
        with self._lock:
            return self._io_acquired

    @property
    def io_session(self) -> AgentSession:
        if not self._io_acquired:
            raise RuntimeError("AgentsConsole is not acquired")
        return self._io_session

    @property
    def io_loop(self) -> asyncio.AbstractEventLoop:
        if not self._io_acquired:
            raise RuntimeError("AgentsConsole is not acquired")
        return self._io_loop

    @property
    def io_context(self) -> contextvars.Context:
        if not self._io_acquired:
            raise RuntimeError("AgentsConsole is not acquired")
        return self._io_context

    def wait_for_io_acquisition(self) -> None:
        self._io_acquired_event.wait()

    @property
    def console_mode(self) -> ConsoleMode:
        return self._console_mode

    @console_mode.setter
    def console_mode(self, mode: ConsoleMode) -> None:
        with self._lock:
            self._console_mode = mode

            if not self._io_acquired:
                return

            self.io_loop.call_soon_threadsafe(
                self._update_sess_io,
                self.io_session,
                mode,
                self._io_audio_input,
                self._io_transcription_sync.audio_output,
                self._io_transcription_sync.text_output,
            )

    def _update_sess_io(
        self,
        sess: AgentSession,
        mode: ConsoleMode,
        audio_input: io.AudioInput,
        audio_output: io.AudioOutput,
        text_output: io.TextOutput,
    ) -> None:
        if asyncio.get_running_loop() != self.io_loop:
            raise RuntimeError("_update_sess_io must be executed on the io_loop")

        with self._lock:
            if not self._io_acquired:
                return

            if self._io_session != sess or self._console_mode != mode:
                return

            if mode == "text":
                sess.input.audio = None
                sess.output.audio = None
                sess.output.transcription = None
            else:
                sess.input.audio = audio_input
                sess.output.audio = audio_output
                sess.output.transcription = text_output

Minimal console stub for TCP console mode (Go CLI handles the TUI).

Static methods

def get_instance() ‑> livekit.agents.cli.cli.AgentsConsole

Instance variables

prop console_mode : ConsoleMode
Expand source code
@property
def console_mode(self) -> ConsoleMode:
    return self._console_mode
prop enabled : bool
Expand source code
@property
def enabled(self) -> bool:
    return self._enabled
prop io_acquired : bool
Expand source code
@property
def io_acquired(self) -> bool:
    with self._lock:
        return self._io_acquired
prop io_context : contextvars.Context
Expand source code
@property
def io_context(self) -> contextvars.Context:
    if not self._io_acquired:
        raise RuntimeError("AgentsConsole is not acquired")
    return self._io_context
prop io_loop : asyncio.AbstractEventLoop
Expand source code
@property
def io_loop(self) -> asyncio.AbstractEventLoop:
    if not self._io_acquired:
        raise RuntimeError("AgentsConsole is not acquired")
    return self._io_loop
prop io_session : AgentSession
Expand source code
@property
def io_session(self) -> AgentSession:
    if not self._io_acquired:
        raise RuntimeError("AgentsConsole is not acquired")
    return self._io_session
prop record : bool
Expand source code
@property
def record(self) -> bool:
    return self._record
prop session_directory : Any
Expand source code
@property
def session_directory(self) -> Any:
    return self._session_directory

Methods

def acquire_io(self, *, loop: asyncio.AbstractEventLoop, session: AgentSession) ‑> None
Expand source code
def acquire_io(self, *, loop: asyncio.AbstractEventLoop, session: AgentSession) -> None:
    with self._lock:
        if self._io_acquired:
            raise RuntimeError("the ConsoleIO was already acquired by another session")

        if asyncio.get_running_loop() != loop:
            raise RuntimeError(
                "the ConsoleIO must be acquired in the same asyncio loop as the session"
            )

        self._io_acquired = True
        self._io_loop = loop
        self._io_context = contextvars.copy_context()

        assert self._tcp_transport is not None
        assert self._tcp_audio_input is not None
        assert self._tcp_audio_output is not None
        self._io_audio_input = self._tcp_audio_input
        self._io_audio_output = self._tcp_audio_output

        self._io_transcription_sync = TranscriptSynchronizer(
            next_in_chain_audio=self._io_audio_output,
            next_in_chain_text=None,
        )
        self._io_acquired_event.set()
        self._io_session = session

    if session:
        self._update_sess_io(
            session,
            self.console_mode,
            self._io_audio_input,
            self._io_transcription_sync.audio_output,
            self._io_transcription_sync.text_output,
        )
def wait_for_io_acquisition(self) ‑> None
Expand source code
def wait_for_io_acquisition(self) -> None:
    self._io_acquired_event.wait()