Module livekit.agents.cli
Sub-modules
livekit.agents.cli.discoverlivekit.agents.cli.readcharlivekit.agents.cli.tcp_consolelivekit.agents.cli.watcher
Functions
def run_app(server: AgentServer | WorkerOptions) ‑> None-
Expand source code
def run_app(server: AgentServer | WorkerOptions) -> None: """Run the agent via the (deprecated) rich Python CLI. This is the default entry used by ``python myagent.py <command>``. The rich CLI lives in ``._legacy`` and is being phased out in favor of the LiveKit CLI (``lk agent ...``) and the thin interface in ``livekit.agents.__main__``. """ from . import _legacy _legacy.run_app(server)Run the agent via the (deprecated) rich Python CLI.
This is the default entry used by
python myagent.py <command>. The rich CLI lives in._legacyand is being phased out in favor of the LiveKit CLI (lk agent …) and the thin interface inlivekit.agents.__main__.
Classes
class AgentsConsole-
Expand source code
class AgentsConsole: """Minimal console stub for TCP console mode (Go CLI handles the TUI).""" _instance: AgentsConsole | None = None @classmethod def get_instance(cls) -> AgentsConsole: if cls._instance is None: cls._instance = cls() return cls._instance def __init__(self) -> None: import datetime import pathlib self._lock = threading.Lock() self._io_acquired = False self._io_acquired_event = threading.Event() self._enabled = False self._record = False self._console_mode: ConsoleMode = "audio" self._tcp_transport: TcpSessionTransport | None = None self._tcp_audio_input: TcpAudioInput | None = None self._tcp_audio_output: TcpAudioOutput | None = None self._session_directory = pathlib.Path( "console-recordings", f"session-{datetime.datetime.now().strftime('%m-%d-%H%M%S')}", ) def acquire_io(self, *, loop: asyncio.AbstractEventLoop, session: AgentSession) -> None: with self._lock: if self._io_acquired: raise RuntimeError("the ConsoleIO was already acquired by another session") if asyncio.get_running_loop() != loop: raise RuntimeError( "the ConsoleIO must be acquired in the same asyncio loop as the session" ) self._io_acquired = True self._io_loop = loop self._io_context = contextvars.copy_context() assert self._tcp_transport is not None assert self._tcp_audio_input is not None assert self._tcp_audio_output is not None self._io_audio_input = self._tcp_audio_input self._io_audio_output = self._tcp_audio_output self._io_transcription_sync = TranscriptSynchronizer( next_in_chain_audio=self._io_audio_output, next_in_chain_text=None, ) self._io_acquired_event.set() self._io_session = session if session: self._update_sess_io( session, self.console_mode, self._io_audio_input, self._io_transcription_sync.audio_output, self._io_transcription_sync.text_output, ) @property def enabled(self) -> bool: return self._enabled @enabled.setter def enabled(self, val: bool) -> None: self._enabled = val @property def record(self) -> bool: return self._record @record.setter def record(self, val: bool) -> None: self._record = val @property def session_directory(self) -> Any: return self._session_directory @property def io_acquired(self) -> bool: with self._lock: return self._io_acquired @property def io_session(self) -> AgentSession: if not self._io_acquired: raise RuntimeError("AgentsConsole is not acquired") return self._io_session @property def io_loop(self) -> asyncio.AbstractEventLoop: if not self._io_acquired: raise RuntimeError("AgentsConsole is not acquired") return self._io_loop @property def io_context(self) -> contextvars.Context: if not self._io_acquired: raise RuntimeError("AgentsConsole is not acquired") return self._io_context def wait_for_io_acquisition(self) -> None: self._io_acquired_event.wait() @property def console_mode(self) -> ConsoleMode: return self._console_mode @console_mode.setter def console_mode(self, mode: ConsoleMode) -> None: with self._lock: self._console_mode = mode if not self._io_acquired: return self.io_loop.call_soon_threadsafe( self._update_sess_io, self.io_session, mode, self._io_audio_input, self._io_transcription_sync.audio_output, self._io_transcription_sync.text_output, ) def _update_sess_io( self, sess: AgentSession, mode: ConsoleMode, audio_input: io.AudioInput, audio_output: io.AudioOutput, text_output: io.TextOutput, ) -> None: if asyncio.get_running_loop() != self.io_loop: raise RuntimeError("_update_sess_io must be executed on the io_loop") with self._lock: if not self._io_acquired: return if self._io_session != sess or self._console_mode != mode: return if mode == "text": sess.input.audio = None sess.output.audio = None sess.output.transcription = None else: sess.input.audio = audio_input sess.output.audio = audio_output sess.output.transcription = text_outputMinimal console stub for TCP console mode (Go CLI handles the TUI).
Static methods
def get_instance() ‑> livekit.agents.cli.cli.AgentsConsole
Instance variables
prop console_mode : ConsoleMode-
Expand source code
@property def console_mode(self) -> ConsoleMode: return self._console_mode prop enabled : bool-
Expand source code
@property def enabled(self) -> bool: return self._enabled prop io_acquired : bool-
Expand source code
@property def io_acquired(self) -> bool: with self._lock: return self._io_acquired prop io_context : contextvars.Context-
Expand source code
@property def io_context(self) -> contextvars.Context: if not self._io_acquired: raise RuntimeError("AgentsConsole is not acquired") return self._io_context prop io_loop : asyncio.AbstractEventLoop-
Expand source code
@property def io_loop(self) -> asyncio.AbstractEventLoop: if not self._io_acquired: raise RuntimeError("AgentsConsole is not acquired") return self._io_loop prop io_session : AgentSession-
Expand source code
@property def io_session(self) -> AgentSession: if not self._io_acquired: raise RuntimeError("AgentsConsole is not acquired") return self._io_session prop record : bool-
Expand source code
@property def record(self) -> bool: return self._record prop session_directory : Any-
Expand source code
@property def session_directory(self) -> Any: return self._session_directory
Methods
def acquire_io(self, *, loop: asyncio.AbstractEventLoop, session: AgentSession) ‑> None-
Expand source code
def acquire_io(self, *, loop: asyncio.AbstractEventLoop, session: AgentSession) -> None: with self._lock: if self._io_acquired: raise RuntimeError("the ConsoleIO was already acquired by another session") if asyncio.get_running_loop() != loop: raise RuntimeError( "the ConsoleIO must be acquired in the same asyncio loop as the session" ) self._io_acquired = True self._io_loop = loop self._io_context = contextvars.copy_context() assert self._tcp_transport is not None assert self._tcp_audio_input is not None assert self._tcp_audio_output is not None self._io_audio_input = self._tcp_audio_input self._io_audio_output = self._tcp_audio_output self._io_transcription_sync = TranscriptSynchronizer( next_in_chain_audio=self._io_audio_output, next_in_chain_text=None, ) self._io_acquired_event.set() self._io_session = session if session: self._update_sess_io( session, self.console_mode, self._io_audio_input, self._io_transcription_sync.audio_output, self._io_transcription_sync.text_output, ) def wait_for_io_acquisition(self) ‑> None-
Expand source code
def wait_for_io_acquisition(self) -> None: self._io_acquired_event.wait()