Module livekit.agents.cli

Sub-modules

livekit.agents.cli.discover
livekit.agents.cli.log
livekit.agents.cli.watcher

Functions

def run_app(server: AgentServer | WorkerOptions) ‑> None
Expand source code
def run_app(server: AgentServer | WorkerOptions) -> None:
    if isinstance(server, WorkerOptions):
        server = AgentServer.from_server_options(server)

    _build_cli(server)()

Classes

class AgentsConsole
Expand source code
class AgentsConsole:
    _instance: AgentsConsole | None = None
    _console_directory = "console-recordings"

    @classmethod
    def get_instance(cls) -> AgentsConsole:
        if cls._instance is None:
            cls._instance = cls()
        return cls._instance

    def __init__(self) -> None:
        theme: dict[str, str | Style] = {
            "tag": "black on #1fd5f9",
            "label": "#8f83ff",
            "error": "red",
            "lk-fg": "#1fd5f9",
            "log.name": Style.null(),
            "log.extra": Style(dim=True),
            "logging.level.notset": Style(dim=True),
            "logging.level.debug": Style(color="cyan"),
            "logging.level.info": Style(color="green"),
            "logging.level.warning": Style(color="yellow"),
            "logging.level.dev": Style(color="blue"),
            "logging.level.error": Style(color="red", bold=True),
            "logging.level.critical": Style(color="red", bold=True, reverse=True),
        }
        self.tag_width = 11
        self.console = Console(theme=Theme(theme))

        self._apm = rtc.AudioProcessingModule(
            echo_cancellation=True,
            noise_suppression=True,
            high_pass_filter=True,
            auto_gain_control=True,
        )

        self._input_delay = 0.0
        self._input_name: str | None = None
        self._input_stream: sd.InputStream | None = None

        self._output_delay = 0.0
        self._output_name: str | None = None
        self._output_stream: sd.OutputStream | None = None

        self._input_lock = threading.Lock()
        self._input_levels = np.zeros(14, dtype=np.float32)

        self._console_mode: ConsoleMode = "audio"

        self._lock = threading.Lock()
        self._io_acquired = False
        self._io_acquired_event = threading.Event()

        self._enabled = False
        self._record = False

        self._text_mode_log_filter = TextModeLogFilter()
        self._log_handler = RichLoggingHandler(self)

        self._session_directory = pathlib.Path(
            self._console_directory, f"session-{datetime.datetime.now().strftime('%m-%d-%H%M%S')}"
        )

    def acquire_io(self, *, loop: asyncio.AbstractEventLoop, session: AgentSession) -> None:
        with self._lock:
            if self._io_acquired:
                raise RuntimeError("the ConsoleIO was already acquired by another session")

            if asyncio.get_running_loop() != loop:
                raise RuntimeError(
                    "the ConsoleIO must be acquired in the same asyncio loop as the session"
                )

            self._io_acquired = True
            self._io_loop = loop
            self._io_context = contextvars.copy_context()
            self._io_audio_input = ConsoleAudioInput(loop)
            self._io_audio_output = ConsoleAudioOutput(loop)
            self._io_acquired_event.set()
            self._io_session = session

        self._update_sess_io(
            session, self.console_mode, self._io_audio_input, self._io_audio_output
        )

    @property
    def enabled(self) -> bool:
        return self._enabled

    @enabled.setter
    def enabled(self, val: bool) -> None:
        self._enabled = val

    @property
    def record(self) -> bool:
        return self._record

    @record.setter
    def record(self, val: bool) -> None:
        self._record = val

    @property
    def session_directory(self) -> pathlib.Path:
        return self._session_directory

    @property
    def io_acquired(self) -> bool:
        with self._lock:
            return self._io_acquired

    @property
    def io_session(self) -> AgentSession:
        if not self._io_acquired:
            raise RuntimeError("AgentsConsole is not acquired")

        return self._io_session

    @property
    def io_loop(self) -> asyncio.AbstractEventLoop:
        if not self._io_acquired:
            raise RuntimeError("AgentsConsole is not acquired")

        return self._io_loop

    @property
    def io_context(self) -> contextvars.Context:
        if not self._io_acquired:
            raise RuntimeError("AgentsConsole is not acquired")

        return self._io_context

    def wait_for_io_acquisition(self) -> None:
        self._io_acquired_event.wait()

    @property
    def input_name(self) -> str | None:
        return self._input_name

    @property
    def output_name(self) -> str | None:
        return self._output_name

    @property
    def console_mode(self) -> ConsoleMode:
        return self._console_mode

    @console_mode.setter
    def console_mode(self, mode: ConsoleMode) -> None:
        with self._lock:
            self._console_mode = mode

            if not self._io_acquired:
                return

            self.io_loop.call_soon_threadsafe(
                self._update_sess_io,
                self.io_session,
                mode,
                self._io_audio_input,
                self._io_audio_output,
            )

    def _update_sess_io(
        self,
        sess: AgentSession,
        mode: ConsoleMode,
        audio_input: ConsoleAudioInput,
        audio_output: ConsoleAudioOutput,
    ) -> None:
        if asyncio.get_running_loop() != self.io_loop:
            raise RuntimeError("_update_sess_io must be executed on the io_loop")

        with self._lock:
            if not self._io_acquired:
                return

            if self._io_session != sess or self._console_mode != mode:
                return

            if mode == "text":
                sess.input.audio = None
                sess.output.audio = None
                self._log_handler.addFilter(self._text_mode_log_filter)
            else:
                sess.input.audio = audio_input
                sess.output.audio = audio_output
                self._log_handler.removeFilter(self._text_mode_log_filter)

    def print(
        self, child: RenderableType, *, tag: str = "", tag_style: Style | None = None
    ) -> None:
        self.console.print(self._render_tag(child, tag=tag, tag_style=tag_style))

    def _render_tag(
        self,
        child: RenderableType,
        *,
        tag: str = "",
        tag_width: int | None = None,
        tag_style: Style | None = None,
    ) -> ConsoleRenderable:
        if tag:
            tag = f" {tag} "

        tag_width = tag_width or self.tag_width
        table = Table.grid(
            Column(width=tag_width + 2, no_wrap=True),
            Column(no_wrap=False, overflow="fold"),
            padding=(0, 0, 0, 0),
            collapse_padding=True,
            pad_edge=False,
        )

        left_padding = tag_width - len(tag)
        left_padding = max(0, left_padding)

        style = tag_style or self.console.get_style("tag")
        tag_segments = [Segment(tag, style=style)]

        left = [Segment(" " * left_padding), *tag_segments]
        table.add_row(Group(*left), Group(child))  # type: ignore
        return table

    def set_microphone_enabled(self, enable: bool, *, device: int | str | None = None) -> None:
        if self._input_stream:
            self._input_stream.close()
            self._input_stream = self._input_name = None

        if not enable:
            return

        import sounddevice as sd

        if device is None:
            device, _ = sd.default.device

        try:
            device_info = sd.query_devices(device, kind="input")
        except Exception:
            raise CLIError(
                "Unable to access the microphone. \n"
                "Please ensure a microphone is connected and recognized by your system. "
                "To see available input devices, run: lk-agents console --list-devices"
            ) from None

        assert isinstance(device_info, dict), "device_info is dict"

        self._input_name = device_info.get("name", "Unnamed microphone")
        self._input_stream = sd.InputStream(
            callback=self._sd_input_callback,
            dtype="int16",
            channels=1,
            device=device,
            samplerate=24000,
            blocksize=2400,
        )
        self._input_stream.start()

    def set_speaker_enabled(self, enable: bool, *, device: int | str | None = None) -> None:
        if self._output_stream:
            self._output_stream.close()
            self._output_stream = self._output_name = None

        if not enable:
            return

        import sounddevice as sd

        if device is None:
            _, device = sd.default.device

        try:
            device_info = sd.query_devices(device, kind="output")
        except Exception:
            raise CLIError(
                "Unable to access the speaker. \n"
                "Please ensure a speaker is connected and recognized by your system. "
                "To see available output devices, run: lk-agents console --list-devices"
            ) from None

        assert isinstance(device_info, dict), "device_info is dict"

        self._output_name = device_info.get("name", "Unnamed speaker")
        self._output_stream = sd.OutputStream(
            callback=self._sd_output_callback,
            dtype="int16",
            channels=1,
            device=device,
            samplerate=24000,
            blocksize=2400,
        )
        self._output_stream.start()

    def _validate_device_or_raise(
        self, *, input_device: str | None, output_device: str | None
    ) -> None:
        import sounddevice as sd

        try:
            if input_device:
                sd.query_devices(input_device, kind="input")
        except Exception:
            raise CLIError(
                "Unable to access the microphone. \n"
                "Please ensure a microphone is connected and recognized by your system. "
                "To see available input devices, run: lk-agents console --list-devices"
            ) from None

        try:
            if output_device:
                sd.query_devices(output_device, kind="output")
        except Exception:
            raise CLIError(
                "Unable to access the speaker. \n"
                "Please ensure a speaker is connected and recognized by your system. "
                "To see available output devices, run: lk-agents console --list-devices"
            ) from None

    def _sd_input_callback(self, indata: np.ndarray, frame_count: int, time: Any, *_: Any) -> None:
        self._input_delay = time.currentTime - time.inputBufferAdcTime
        total_delay = self._output_delay + self._input_delay

        try:
            self._apm.set_stream_delay_ms(int(total_delay * 1000))
        except RuntimeError:
            pass  # setting stream delay in console mode fails often, so we silently continue

        sr = 24000
        x = indata[:, 0].astype(np.float32) / 32768.0
        n = x.size
        x *= np.hanning(n).astype(np.float32)

        X = np.fft.rfft(x, n=n)
        mag = np.abs(X).astype(np.float32) * (2.0 / n)
        mag[0] *= 0.5
        mag[-1] *= 1.0 - 0.5 * float(n % 2 == 0)

        freqs = np.fft.rfftfreq(n, d=1.0 / sr)
        nb = len(self._input_levels)
        edges = np.geomspace(20.0, (sr * 0.5) * 0.96, nb + 1).astype(np.float32)
        b = np.clip(np.digitize(freqs, edges) - 1, 0, nb - 1)

        p = (mag * mag).astype(np.float32)
        sump = np.bincount(b, weights=p, minlength=nb)
        cnts = np.maximum(np.bincount(b, minlength=nb), 1)
        pmean = sump / cnts

        db = 10.0 * np.log10(pmean + 1e-12)
        floor_db, hot_db = -70.0, -20
        lev = np.clip(((db - floor_db) / (hot_db - floor_db)).astype(np.float32), 0.0, 1.0)
        lev = np.maximum(lev**0.75 - 0.02, 0.0)
        peak = float(lev.max())
        lev *= np.clip(0.95 / (peak + 1e-6), 0.0, 3.0)
        lev = np.clip(lev, 0.0, 1.0)

        decay = float(np.exp(-(n / sr) / 0.1))
        with self._input_lock:
            prev = self._input_levels.astype(np.float32)
            self._input_levels = np.maximum(lev, prev * decay)

        if not self._io_acquired:
            return

        FRAME_SAMPLES = 240  # 10ms at 24000 Hz
        num_frames = frame_count // FRAME_SAMPLES

        for i in range(num_frames):
            start = i * FRAME_SAMPLES
            end = start + FRAME_SAMPLES
            capture_chunk = indata[start:end]

            frame = rtc.AudioFrame(
                data=capture_chunk.tobytes(),
                samples_per_channel=FRAME_SAMPLES,
                sample_rate=24000,
                num_channels=1,
            )
            self._apm.process_stream(frame)

            in_data_aec = np.frombuffer(frame.data, dtype=np.int16)
            rms = np.sqrt(np.mean(in_data_aec.astype(np.float32) ** 2))
            max_int16 = np.iinfo(np.int16).max
            self._micro_db = 20.0 * np.log10(rms / max_int16 + 1e-6)

            self._io_loop.call_soon_threadsafe(self._io_audio_input.push_frame, frame)

    def _sd_output_callback(self, outdata: np.ndarray, frames: int, time: Any, *_: Any) -> None:
        if not self.io_acquired:
            outdata[:] = 0
            return

        self._output_delay = time.outputBufferDacTime - time.currentTime

        FRAME_SAMPLES = 240
        with self._io_audio_output.audio_lock:
            bytes_needed = frames * 2
            if len(self._io_audio_output.audio_buffer) < bytes_needed:
                available_bytes = len(self._io_audio_output.audio_buffer)
                outdata[: available_bytes // 2, 0] = np.frombuffer(
                    self._io_audio_output.audio_buffer, dtype=np.int16, count=available_bytes // 2
                )
                outdata[available_bytes // 2 :, 0] = 0
                del self._io_audio_output.audio_buffer[:available_bytes]  # TODO: optimize
            else:
                chunk = self._io_audio_output.audio_buffer[:bytes_needed]
                outdata[:, 0] = np.frombuffer(chunk, dtype=np.int16, count=frames)
                del self._io_audio_output.audio_buffer[:bytes_needed]

        num_chunks = frames // FRAME_SAMPLES
        for i in range(num_chunks):
            start = i * FRAME_SAMPLES
            end = start + FRAME_SAMPLES
            render_chunk = outdata[start:end, 0]
            render_frame_for_aec = rtc.AudioFrame(
                data=render_chunk.tobytes(),
                samples_per_channel=FRAME_SAMPLES,
                sample_rate=24000,
                num_channels=1,
            )
            self._apm.process_reverse_stream(render_frame_for_aec)

Static methods

def get_instance() ‑> livekit.agents.cli.cli.AgentsConsole

Instance variables

prop console_mode : ConsoleMode
Expand source code
@property
def console_mode(self) -> ConsoleMode:
    return self._console_mode
prop enabled : bool
Expand source code
@property
def enabled(self) -> bool:
    return self._enabled
prop input_name : str | None
Expand source code
@property
def input_name(self) -> str | None:
    return self._input_name
prop io_acquired : bool
Expand source code
@property
def io_acquired(self) -> bool:
    with self._lock:
        return self._io_acquired
prop io_context : contextvars.Context
Expand source code
@property
def io_context(self) -> contextvars.Context:
    if not self._io_acquired:
        raise RuntimeError("AgentsConsole is not acquired")

    return self._io_context
prop io_loop : asyncio.AbstractEventLoop
Expand source code
@property
def io_loop(self) -> asyncio.AbstractEventLoop:
    if not self._io_acquired:
        raise RuntimeError("AgentsConsole is not acquired")

    return self._io_loop
prop io_session : AgentSession
Expand source code
@property
def io_session(self) -> AgentSession:
    if not self._io_acquired:
        raise RuntimeError("AgentsConsole is not acquired")

    return self._io_session
prop output_name : str | None
Expand source code
@property
def output_name(self) -> str | None:
    return self._output_name
prop record : bool
Expand source code
@property
def record(self) -> bool:
    return self._record
prop session_directory : pathlib.Path
Expand source code
@property
def session_directory(self) -> pathlib.Path:
    return self._session_directory

Methods

def acquire_io(self, *, loop: asyncio.AbstractEventLoop, session: AgentSession) ‑> None
Expand source code
def acquire_io(self, *, loop: asyncio.AbstractEventLoop, session: AgentSession) -> None:
    with self._lock:
        if self._io_acquired:
            raise RuntimeError("the ConsoleIO was already acquired by another session")

        if asyncio.get_running_loop() != loop:
            raise RuntimeError(
                "the ConsoleIO must be acquired in the same asyncio loop as the session"
            )

        self._io_acquired = True
        self._io_loop = loop
        self._io_context = contextvars.copy_context()
        self._io_audio_input = ConsoleAudioInput(loop)
        self._io_audio_output = ConsoleAudioOutput(loop)
        self._io_acquired_event.set()
        self._io_session = session

    self._update_sess_io(
        session, self.console_mode, self._io_audio_input, self._io_audio_output
    )
def print(self, child: RenderableType, *, tag: str = '', tag_style: Style | None = None) ‑> None
Expand source code
def print(
    self, child: RenderableType, *, tag: str = "", tag_style: Style | None = None
) -> None:
    self.console.print(self._render_tag(child, tag=tag, tag_style=tag_style))
def set_microphone_enabled(self, enable: bool, *, device: int | str | None = None) ‑> None
Expand source code
def set_microphone_enabled(self, enable: bool, *, device: int | str | None = None) -> None:
    if self._input_stream:
        self._input_stream.close()
        self._input_stream = self._input_name = None

    if not enable:
        return

    import sounddevice as sd

    if device is None:
        device, _ = sd.default.device

    try:
        device_info = sd.query_devices(device, kind="input")
    except Exception:
        raise CLIError(
            "Unable to access the microphone. \n"
            "Please ensure a microphone is connected and recognized by your system. "
            "To see available input devices, run: lk-agents console --list-devices"
        ) from None

    assert isinstance(device_info, dict), "device_info is dict"

    self._input_name = device_info.get("name", "Unnamed microphone")
    self._input_stream = sd.InputStream(
        callback=self._sd_input_callback,
        dtype="int16",
        channels=1,
        device=device,
        samplerate=24000,
        blocksize=2400,
    )
    self._input_stream.start()
def set_speaker_enabled(self, enable: bool, *, device: int | str | None = None) ‑> None
Expand source code
def set_speaker_enabled(self, enable: bool, *, device: int | str | None = None) -> None:
    if self._output_stream:
        self._output_stream.close()
        self._output_stream = self._output_name = None

    if not enable:
        return

    import sounddevice as sd

    if device is None:
        _, device = sd.default.device

    try:
        device_info = sd.query_devices(device, kind="output")
    except Exception:
        raise CLIError(
            "Unable to access the speaker. \n"
            "Please ensure a speaker is connected and recognized by your system. "
            "To see available output devices, run: lk-agents console --list-devices"
        ) from None

    assert isinstance(device_info, dict), "device_info is dict"

    self._output_name = device_info.get("name", "Unnamed speaker")
    self._output_stream = sd.OutputStream(
        callback=self._sd_output_callback,
        dtype="int16",
        channels=1,
        device=device,
        samplerate=24000,
        blocksize=2400,
    )
    self._output_stream.start()
def wait_for_io_acquisition(self) ‑> None
Expand source code
def wait_for_io_acquisition(self) -> None:
    self._io_acquired_event.wait()