Module livekit.agents.utils.codecs
Classes
class AudioStreamDecoder (*,
sample_rate: int | None = 48000,
num_channels: int | None = 1,
format: str | None = None)-
Expand source code
class AudioStreamDecoder: """A class that can be used to decode audio stream into PCM AudioFrames. Decoders are stateful, and it should not be reused across multiple streams. Each decoder is designed to decode a single stream. """ def __init__( self, *, sample_rate: int | None = 48000, num_channels: int | None = 1, format: str | None = None, ): self._sample_rate = sample_rate self._layout = "mono" if num_channels == 2: self._layout = "stereo" self._mime_type = format.lower() if format else None self._av_format = _mime_to_av_format(self._mime_type) self._is_wav = self._av_format == "wav" self._output_ch = aio.Chan[rtc.AudioFrame]() self._closed = False self._started = False self._loop = asyncio.get_event_loop() # lazy-initialized only for non-WAV codecs self._input_buf: StreamBuffer | None = None self._executor: ThreadPoolExecutor | None = None # lazy-initialized only for WAV self._wav_decoder: _WavInlineDecoder | None = None def push(self, chunk: bytes) -> None: if self._is_wav: if self._wav_decoder is None: self._wav_decoder = _WavInlineDecoder(self._output_ch, self._sample_rate) try: self._wav_decoder.push(chunk) except Exception: if not self._closed: logger.exception("error decoding WAV audio") self._output_ch.close() self._closed = True return self._started = True return if self._input_buf is None: self._input_buf = StreamBuffer() self._executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="AudioDecoder") self._input_buf.write(chunk) if not self._started: self._started = True self._loop.run_in_executor(self._executor, self._decode_loop) def end_input(self) -> None: if self._is_wav: if self._wav_decoder is not None and not self._closed: try: self._wav_decoder.flush() except Exception: logger.exception("error flushing WAV audio") if not self._closed: self._output_ch.close() return if self._input_buf is not None: self._input_buf.end_input() if not self._started: self._output_ch.close() def _decode_loop(self) -> None: container: av.container.InputContainer | None = None resampler: av.AudioResampler | None = None try: # open container in low-latency streaming mode container = av.open( self._input_buf, mode="r", format=self._av_format, buffer_size=256, options={ "probesize": "32", "analyzeduration": "0", "fflags": "nobuffer+flush_packets", "flags": "low_delay", "reorder_queue_size": "0", "max_delay": "0", "avioflags": "direct", }, ) # explicitly disable internal buffering flags on the FFmpeg container container.flags |= cast( int, av.container.Flags.no_buffer.value | av.container.Flags.flush_packets.value ) if len(container.streams.audio) == 0: raise ValueError("no audio stream found") audio_stream = container.streams.audio[0] # Set up resampler only if needed if self._sample_rate is not None or self._layout is not None: resampler = av.AudioResampler( format="s16", layout=self._layout, rate=self._sample_rate ) for frame in container.decode(audio_stream): if self._closed: return if resampler: frames = resampler.resample(frame) else: frames = [frame] for f in frames: self._emit_av_frame(f) # flush the resampler to get any remaining buffered samples if resampler and not self._closed: for f in resampler.resample(None): self._emit_av_frame(f) except Exception: logger.exception("error decoding audio") finally: self._loop.call_soon_threadsafe(self._output_ch.close) if container: container.close() def _emit_av_frame(self, f: av.AudioFrame) -> None: self._loop.call_soon_threadsafe( self._output_ch.send_nowait, rtc.AudioFrame( data=f.to_ndarray().tobytes(), num_channels=len(f.layout.channels), sample_rate=int(f.sample_rate), samples_per_channel=f.samples, ), ) def __aiter__(self) -> AsyncIterator[rtc.AudioFrame]: return self async def __anext__(self) -> rtc.AudioFrame: return await self._output_ch.__anext__() async def aclose(self) -> None: if self._closed: return self.end_input() self._closed = True if self._input_buf is not None: self._input_buf.close() if not self._started: return async for _ in self._output_ch: pass if self._executor is not None: self._executor.shutdown(wait=False, cancel_futures=True)A class that can be used to decode audio stream into PCM AudioFrames.
Decoders are stateful, and it should not be reused across multiple streams. Each decoder is designed to decode a single stream.
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: if self._closed: return self.end_input() self._closed = True if self._input_buf is not None: self._input_buf.close() if not self._started: return async for _ in self._output_ch: pass if self._executor is not None: self._executor.shutdown(wait=False, cancel_futures=True) def end_input(self) ‑> None-
Expand source code
def end_input(self) -> None: if self._is_wav: if self._wav_decoder is not None and not self._closed: try: self._wav_decoder.flush() except Exception: logger.exception("error flushing WAV audio") if not self._closed: self._output_ch.close() return if self._input_buf is not None: self._input_buf.end_input() if not self._started: self._output_ch.close() def push(self, chunk: bytes) ‑> None-
Expand source code
def push(self, chunk: bytes) -> None: if self._is_wav: if self._wav_decoder is None: self._wav_decoder = _WavInlineDecoder(self._output_ch, self._sample_rate) try: self._wav_decoder.push(chunk) except Exception: if not self._closed: logger.exception("error decoding WAV audio") self._output_ch.close() self._closed = True return self._started = True return if self._input_buf is None: self._input_buf = StreamBuffer() self._executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="AudioDecoder") self._input_buf.write(chunk) if not self._started: self._started = True self._loop.run_in_executor(self._executor, self._decode_loop)
class StreamBuffer-
Expand source code
class StreamBuffer: """ A thread-safe buffer that behaves like an IO stream. Allows writing from one thread and reading from another. """ _COMPACT_THRESHOLD = 5 * 1024 * 1024 # compact after 5MB consumed def __init__(self) -> None: self._bio = io.BytesIO() self._lock = threading.Lock() self._data_available = threading.Condition(self._lock) self._eof = False self._closed = False self._write_pos = 0 self._read_pos = 0 def write(self, data: bytes) -> None: """Write data to the buffer from a writer thread.""" with self._data_available: self._bio.seek(self._write_pos) self._bio.write(data) self._write_pos = self._bio.tell() self._data_available.notify_all() def read(self, size: int = -1) -> bytes: """Read data from the buffer in a reader thread.""" if size == 0: return b"" with self._data_available: while True: if self._closed: return b"" available = self._write_pos - self._read_pos if available > 0: self._bio.seek(self._read_pos) if size < 0: data = self._bio.read(available) else: data = self._bio.read(min(size, available)) self._read_pos = self._bio.tell() if self._read_pos >= self._COMPACT_THRESHOLD: remaining = self._bio.read() self._bio = io.BytesIO(remaining) self._bio.seek(0, io.SEEK_END) self._write_pos = self._bio.tell() self._read_pos = 0 return data if data else b"" if self._eof: return b"" self._data_available.wait() def end_input(self) -> None: """Signal that no more data will be written.""" with self._data_available: self._eof = True self._data_available.notify_all() def close(self) -> None: with self._data_available: self._closed = True self._data_available.notify_all() self._bio.close()A thread-safe buffer that behaves like an IO stream. Allows writing from one thread and reading from another.
Methods
def close(self) ‑> None-
Expand source code
def close(self) -> None: with self._data_available: self._closed = True self._data_available.notify_all() self._bio.close() def end_input(self) ‑> None-
Expand source code
def end_input(self) -> None: """Signal that no more data will be written.""" with self._data_available: self._eof = True self._data_available.notify_all()Signal that no more data will be written.
def read(self, size: int = -1) ‑> bytes-
Expand source code
def read(self, size: int = -1) -> bytes: """Read data from the buffer in a reader thread.""" if size == 0: return b"" with self._data_available: while True: if self._closed: return b"" available = self._write_pos - self._read_pos if available > 0: self._bio.seek(self._read_pos) if size < 0: data = self._bio.read(available) else: data = self._bio.read(min(size, available)) self._read_pos = self._bio.tell() if self._read_pos >= self._COMPACT_THRESHOLD: remaining = self._bio.read() self._bio = io.BytesIO(remaining) self._bio.seek(0, io.SEEK_END) self._write_pos = self._bio.tell() self._read_pos = 0 return data if data else b"" if self._eof: return b"" self._data_available.wait()Read data from the buffer in a reader thread.
def write(self, data: bytes) ‑> None-
Expand source code
def write(self, data: bytes) -> None: """Write data to the buffer from a writer thread.""" with self._data_available: self._bio.seek(self._write_pos) self._bio.write(data) self._write_pos = self._bio.tell() self._data_available.notify_all()Write data to the buffer from a writer thread.