Expand source code
class AgentsConsole:
_instance: AgentsConsole | None = None
_console_directory = "console-recordings"
@classmethod
def get_instance(cls) -> AgentsConsole:
if cls._instance is None:
cls._instance = cls()
return cls._instance
def __init__(self) -> None:
theme: dict[str, str | Style] = {
"tag": "black on #1fd5f9",
"label": "#8f83ff",
"error": "red",
"lk-fg": "#1fd5f9",
"log.name": Style.null(),
"log.extra": Style(dim=True),
"logging.level.notset": Style(dim=True),
"logging.level.debug": Style(color="cyan"),
"logging.level.info": Style(color="green"),
"logging.level.warning": Style(color="yellow"),
"logging.level.dev": Style(color="blue"),
"logging.level.error": Style(color="red", bold=True),
"logging.level.critical": Style(color="red", bold=True, reverse=True),
}
self.tag_width = 11
self.console = Console(theme=Theme(theme))
self._apm = rtc.AudioProcessingModule(
echo_cancellation=True,
noise_suppression=True,
high_pass_filter=True,
auto_gain_control=True,
)
self._input_delay = 0.0
self._input_name: str | None = None
self._input_stream: sd.InputStream | None = None
self._output_delay = 0.0
self._output_name: str | None = None
self._output_stream: sd.OutputStream | None = None
self._input_lock = threading.Lock()
self._input_levels = np.zeros(14, dtype=np.float32)
self._console_mode: ConsoleMode = "audio"
self._lock = threading.Lock()
self._io_acquired = False
self._io_acquired_event = threading.Event()
self._enabled = False
self._record = False
self._text_mode_log_filter = TextModeLogFilter()
self._log_handler = RichLoggingHandler(self)
self._session_directory = pathlib.Path(
self._console_directory, f"session-{datetime.datetime.now().strftime('%m-%d-%H%M%S')}"
)
def acquire_io(self, *, loop: asyncio.AbstractEventLoop, session: AgentSession) -> None:
with self._lock:
if self._io_acquired:
raise RuntimeError("the ConsoleIO was already acquired by another session")
if asyncio.get_running_loop() != loop:
raise RuntimeError(
"the ConsoleIO must be acquired in the same asyncio loop as the session"
)
self._io_acquired = True
self._io_loop = loop
self._io_context = contextvars.copy_context()
self._io_audio_input = ConsoleAudioInput(loop)
self._io_audio_output = ConsoleAudioOutput(loop)
self._io_acquired_event.set()
self._io_session = session
self._update_sess_io(
session, self.console_mode, self._io_audio_input, self._io_audio_output
)
@property
def enabled(self) -> bool:
return self._enabled
@enabled.setter
def enabled(self, val: bool) -> None:
self._enabled = val
@property
def record(self) -> bool:
return self._record
@record.setter
def record(self, val: bool) -> None:
self._record = val
@property
def session_directory(self) -> pathlib.Path:
return self._session_directory
@property
def io_acquired(self) -> bool:
with self._lock:
return self._io_acquired
@property
def io_session(self) -> AgentSession:
if not self._io_acquired:
raise RuntimeError("AgentsConsole is not acquired")
return self._io_session
@property
def io_loop(self) -> asyncio.AbstractEventLoop:
if not self._io_acquired:
raise RuntimeError("AgentsConsole is not acquired")
return self._io_loop
@property
def io_context(self) -> contextvars.Context:
if not self._io_acquired:
raise RuntimeError("AgentsConsole is not acquired")
return self._io_context
def wait_for_io_acquisition(self) -> None:
self._io_acquired_event.wait()
@property
def input_name(self) -> str | None:
return self._input_name
@property
def output_name(self) -> str | None:
return self._output_name
@property
def console_mode(self) -> ConsoleMode:
return self._console_mode
@console_mode.setter
def console_mode(self, mode: ConsoleMode) -> None:
with self._lock:
self._console_mode = mode
if not self._io_acquired:
return
self.io_loop.call_soon_threadsafe(
self._update_sess_io,
self.io_session,
mode,
self._io_audio_input,
self._io_audio_output,
)
def _update_sess_io(
self,
sess: AgentSession,
mode: ConsoleMode,
audio_input: ConsoleAudioInput,
audio_output: ConsoleAudioOutput,
) -> None:
if asyncio.get_running_loop() != self.io_loop:
raise RuntimeError("_update_sess_io must be executed on the io_loop")
with self._lock:
if not self._io_acquired:
return
if self._io_session != sess or self._console_mode != mode:
return
if mode == "text":
sess.input.audio = None
sess.output.audio = None
self._log_handler.addFilter(self._text_mode_log_filter)
else:
sess.input.audio = audio_input
sess.output.audio = audio_output
self._log_handler.removeFilter(self._text_mode_log_filter)
def print(
self, child: RenderableType, *, tag: str = "", tag_style: Style | None = None
) -> None:
self.console.print(self._render_tag(child, tag=tag, tag_style=tag_style))
def _render_tag(
self,
child: RenderableType,
*,
tag: str = "",
tag_width: int | None = None,
tag_style: Style | None = None,
) -> ConsoleRenderable:
if tag:
tag = f" {tag} "
tag_width = tag_width or self.tag_width
table = Table.grid(
Column(width=tag_width + 2, no_wrap=True),
Column(no_wrap=False, overflow="fold"),
padding=(0, 0, 0, 0),
collapse_padding=True,
pad_edge=False,
)
left_padding = tag_width - len(tag)
left_padding = max(0, left_padding)
style = tag_style or self.console.get_style("tag")
tag_segments = [Segment(tag, style=style)]
left = [Segment(" " * left_padding), *tag_segments]
table.add_row(Group(*left), Group(child)) # type: ignore
return table
def set_microphone_enabled(self, enable: bool, *, device: int | str | None = None) -> None:
if self._input_stream:
self._input_stream.close()
self._input_stream = self._input_name = None
if not enable:
return
import sounddevice as sd
if device is None:
device, _ = sd.default.device
try:
device_info = sd.query_devices(device, kind="input")
except Exception:
raise CLIError(
"Unable to access the microphone. \n"
"Please ensure a microphone is connected and recognized by your system. "
"To see available input devices, run: lk-agents console --list-devices"
) from None
assert isinstance(device_info, dict), "device_info is dict"
self._input_name = device_info.get("name", "Unnamed microphone")
self._input_stream = sd.InputStream(
callback=self._sd_input_callback,
dtype="int16",
channels=1,
device=device,
samplerate=24000,
blocksize=2400,
)
self._input_stream.start()
def set_speaker_enabled(self, enable: bool, *, device: int | str | None = None) -> None:
if self._output_stream:
self._output_stream.close()
self._output_stream = self._output_name = None
if not enable:
return
import sounddevice as sd
if device is None:
_, device = sd.default.device
try:
device_info = sd.query_devices(device, kind="output")
except Exception:
raise CLIError(
"Unable to access the speaker. \n"
"Please ensure a speaker is connected and recognized by your system. "
"To see available output devices, run: lk-agents console --list-devices"
) from None
assert isinstance(device_info, dict), "device_info is dict"
self._output_name = device_info.get("name", "Unnamed speaker")
self._output_stream = sd.OutputStream(
callback=self._sd_output_callback,
dtype="int16",
channels=1,
device=device,
samplerate=24000,
blocksize=2400,
)
self._output_stream.start()
def _validate_device_or_raise(
self, *, input_device: str | None, output_device: str | None
) -> None:
import sounddevice as sd
try:
if input_device:
sd.query_devices(input_device, kind="input")
except Exception:
raise CLIError(
"Unable to access the microphone. \n"
"Please ensure a microphone is connected and recognized by your system. "
"To see available input devices, run: lk-agents console --list-devices"
) from None
try:
if output_device:
sd.query_devices(output_device, kind="output")
except Exception:
raise CLIError(
"Unable to access the speaker. \n"
"Please ensure a speaker is connected and recognized by your system. "
"To see available output devices, run: lk-agents console --list-devices"
) from None
def _sd_input_callback(self, indata: np.ndarray, frame_count: int, time: Any, *_: Any) -> None:
self._input_delay = time.currentTime - time.inputBufferAdcTime
total_delay = self._output_delay + self._input_delay
try:
self._apm.set_stream_delay_ms(int(total_delay * 1000))
except RuntimeError:
pass # setting stream delay in console mode fails often, so we silently continue
sr = 24000
x = indata[:, 0].astype(np.float32) / 32768.0
n = x.size
x *= np.hanning(n).astype(np.float32)
X = np.fft.rfft(x, n=n)
mag = np.abs(X).astype(np.float32) * (2.0 / n)
mag[0] *= 0.5
mag[-1] *= 1.0 - 0.5 * float(n % 2 == 0)
freqs = np.fft.rfftfreq(n, d=1.0 / sr)
nb = len(self._input_levels)
edges = np.geomspace(20.0, (sr * 0.5) * 0.96, nb + 1).astype(np.float32)
b = np.clip(np.digitize(freqs, edges) - 1, 0, nb - 1)
p = (mag * mag).astype(np.float32)
sump = np.bincount(b, weights=p, minlength=nb)
cnts = np.maximum(np.bincount(b, minlength=nb), 1)
pmean = sump / cnts
db = 10.0 * np.log10(pmean + 1e-12)
floor_db, hot_db = -70.0, -20
lev = np.clip(((db - floor_db) / (hot_db - floor_db)).astype(np.float32), 0.0, 1.0)
lev = np.maximum(lev**0.75 - 0.02, 0.0)
peak = float(lev.max())
lev *= np.clip(0.95 / (peak + 1e-6), 0.0, 3.0)
lev = np.clip(lev, 0.0, 1.0)
decay = float(np.exp(-(n / sr) / 0.1))
with self._input_lock:
prev = self._input_levels.astype(np.float32)
self._input_levels = np.maximum(lev, prev * decay)
if not self._io_acquired:
return
FRAME_SAMPLES = 240 # 10ms at 24000 Hz
num_frames = frame_count // FRAME_SAMPLES
for i in range(num_frames):
start = i * FRAME_SAMPLES
end = start + FRAME_SAMPLES
capture_chunk = indata[start:end]
frame = rtc.AudioFrame(
data=capture_chunk.tobytes(),
samples_per_channel=FRAME_SAMPLES,
sample_rate=24000,
num_channels=1,
)
self._apm.process_stream(frame)
in_data_aec = np.frombuffer(frame.data, dtype=np.int16)
rms = np.sqrt(np.mean(in_data_aec.astype(np.float32) ** 2))
max_int16 = np.iinfo(np.int16).max
self._micro_db = 20.0 * np.log10(rms / max_int16 + 1e-6)
self._io_loop.call_soon_threadsafe(self._io_audio_input.push_frame, frame)
def _sd_output_callback(self, outdata: np.ndarray, frames: int, time: Any, *_: Any) -> None:
if not self.io_acquired:
outdata[:] = 0
return
self._output_delay = time.outputBufferDacTime - time.currentTime
FRAME_SAMPLES = 240
with self._io_audio_output.audio_lock:
bytes_needed = frames * 2
if len(self._io_audio_output.audio_buffer) < bytes_needed:
available_bytes = len(self._io_audio_output.audio_buffer)
outdata[: available_bytes // 2, 0] = np.frombuffer(
self._io_audio_output.audio_buffer, dtype=np.int16, count=available_bytes // 2
)
outdata[available_bytes // 2 :, 0] = 0
del self._io_audio_output.audio_buffer[:available_bytes] # TODO: optimize
else:
chunk = self._io_audio_output.audio_buffer[:bytes_needed]
outdata[:, 0] = np.frombuffer(chunk, dtype=np.int16, count=frames)
del self._io_audio_output.audio_buffer[:bytes_needed]
num_chunks = frames // FRAME_SAMPLES
for i in range(num_chunks):
start = i * FRAME_SAMPLES
end = start + FRAME_SAMPLES
render_chunk = outdata[start:end, 0]
render_frame_for_aec = rtc.AudioFrame(
data=render_chunk.tobytes(),
samples_per_channel=FRAME_SAMPLES,
sample_rate=24000,
num_channels=1,
)
self._apm.process_reverse_stream(render_frame_for_aec)