Module livekit.agents.utils.codecs
Classes
class AudioStreamDecoder (*,
sample_rate: int | None = 48000,
num_channels: int | None = 1,
format: str | None = None)-
Expand source code
class AudioStreamDecoder: """A class that can be used to decode audio stream into PCM AudioFrames. Decoders are stateful, and it should not be reused across multiple streams. Each decoder is designed to decode a single stream. """ _max_workers: int = 10 _executor: ThreadPoolExecutor | None = None def __init__( self, *, sample_rate: int | None = 48000, num_channels: int | None = 1, format: str | None = None, ): self._sample_rate = sample_rate self._layout = "mono" if num_channels == 2: self._layout = "stereo" self._mime_type = format.lower() if format else None self._av_format = _mime_to_av_format(self._mime_type) self._output_ch = aio.Chan[rtc.AudioFrame]() self._closed = False self._started = False self._input_buf = StreamBuffer() self._loop = asyncio.get_event_loop() if self.__class__._executor is None: # each decoder instance will submit jobs to the shared pool self.__class__._executor = ThreadPoolExecutor(max_workers=self.__class__._max_workers) def push(self, chunk: bytes) -> None: self._input_buf.write(chunk) if not self._started: self._started = True target = self._decode_wav_loop if self._av_format == "wav" else self._decode_loop self._loop.run_in_executor(self.__class__._executor, target) def end_input(self) -> None: self._input_buf.end_input() if not self._started: # if no data was pushed, close the output channel self._output_ch.close() def _decode_loop(self) -> None: container: av.container.InputContainer | None = None resampler: av.AudioResampler | None = None try: # open container in low-latency streaming mode container = av.open( self._input_buf, mode="r", format=self._av_format, buffer_size=256, options={ "probesize": "32", "analyzeduration": "0", "fflags": "nobuffer+flush_packets", "flags": "low_delay", "reorder_queue_size": "0", "max_delay": "0", "avioflags": "direct", }, ) # explicitly disable internal buffering flags on the FFmpeg container container.flags |= cast( int, av.container.Flags.no_buffer.value | av.container.Flags.flush_packets.value ) if len(container.streams.audio) == 0: raise ValueError("no audio stream found") audio_stream = container.streams.audio[0] # Set up resampler only if needed if self._sample_rate is not None or self._layout is not None: resampler = av.AudioResampler( format="s16", layout=self._layout, rate=self._sample_rate ) for frame in container.decode(audio_stream): if self._closed: return if resampler: frames = resampler.resample(frame) else: frames = [frame] for f in frames: nchannels = len(f.layout.channels) self._loop.call_soon_threadsafe( self._output_ch.send_nowait, rtc.AudioFrame( data=f.to_ndarray().tobytes(), num_channels=nchannels, sample_rate=int(f.sample_rate), samples_per_channel=int(f.samples / nchannels), ), ) except Exception: logger.exception("error decoding audio") finally: self._loop.call_soon_threadsafe(self._output_ch.close) if container: container.close() def _decode_wav_loop(self) -> None: """Decode wav data from the buffer without ffmpeg, parse header and emit PCM frames. This can be much faster than using ffmpeg, as we are emitting frames as quickly as possible. """ try: # parse RIFF header header = b"" while len(header) < 12: chunk = self._input_buf.read(12 - len(header)) if not chunk: raise ValueError("Invalid WAV file: incomplete header") header += chunk if header[:4] != b"RIFF" or header[8:12] != b"WAVE": raise ValueError(f"Invalid WAV file: missing RIFF/WAVE: {header!r}") # parse fmt chunk while True: sub_header = self._input_buf.read(8) if len(sub_header) < 8: raise ValueError("Invalid WAV file: incomplete fmt chunk header") chunk_id, chunk_size = struct.unpack("<4sI", sub_header) data = b"" remaining = chunk_size while remaining > 0: part = self._input_buf.read(min(1024, remaining)) if not part: raise ValueError("Invalid WAV file: incomplete fmt chunk data") data += part remaining -= len(part) if chunk_id == b"fmt ": audio_format, wave_channels, wave_rate, _, _, bits_per_sample = struct.unpack( "<HHIIHH", data[:16] ) if audio_format != 1: raise ValueError(f"Unsupported WAV audio format: {audio_format}") break # parse data chunk while True: sub_header = self._input_buf.read(8) if len(sub_header) < 8: raise ValueError("Invalid WAV file: incomplete data chunk header") chunk_id, chunk_size = struct.unpack("<4sI", sub_header) if chunk_id == b"data": break # skip chunk data to_skip = chunk_size while to_skip > 0: skipped = self._input_buf.read(min(1024, to_skip)) if not skipped: raise ValueError("Invalid WAV file: incomplete chunk while seeking data") to_skip -= len(skipped) # now ready to decode bstream = AudioByteStream(sample_rate=wave_rate, num_channels=wave_channels) resampler = ( rtc.AudioResampler( input_rate=wave_rate, output_rate=self._sample_rate, num_channels=wave_channels ) if self._sample_rate is not None else None ) def resample_and_push(frame: rtc.AudioFrame) -> None: if not resampler: self._loop.call_soon_threadsafe(self._output_ch.send_nowait, frame) return for resampled_frame in resampler.push(frame): self._loop.call_soon_threadsafe( self._output_ch.send_nowait, resampled_frame, ) while True: chunk = self._input_buf.read(1024) if not chunk: break frames = bstream.push(chunk) for rtc_frame in frames: resample_and_push(rtc_frame) for rtc_frame in bstream.flush(): resample_and_push(rtc_frame) except Exception: logger.exception("error decoding wav") finally: self._loop.call_soon_threadsafe(self._output_ch.close) def __aiter__(self) -> AsyncIterator[rtc.AudioFrame]: return self async def __anext__(self) -> rtc.AudioFrame: return await self._output_ch.__anext__() async def aclose(self) -> None: if self._closed: return self.end_input() self._closed = True self._input_buf.close() if not self._started: return async for _ in self._output_ch: pass
A class that can be used to decode audio stream into PCM AudioFrames.
Decoders are stateful, and it should not be reused across multiple streams. Each decoder is designed to decode a single stream.
Methods
async def aclose(self) ‑> None
-
Expand source code
async def aclose(self) -> None: if self._closed: return self.end_input() self._closed = True self._input_buf.close() if not self._started: return async for _ in self._output_ch: pass
def end_input(self) ‑> None
-
Expand source code
def end_input(self) -> None: self._input_buf.end_input() if not self._started: # if no data was pushed, close the output channel self._output_ch.close()
def push(self, chunk: bytes) ‑> None
-
Expand source code
def push(self, chunk: bytes) -> None: self._input_buf.write(chunk) if not self._started: self._started = True target = self._decode_wav_loop if self._av_format == "wav" else self._decode_loop self._loop.run_in_executor(self.__class__._executor, target)
class StreamBuffer
-
Expand source code
class StreamBuffer: """ A thread-safe buffer that behaves like an IO stream. Allows writing from one thread and reading from another. """ def __init__(self) -> None: self._buffer = io.BytesIO() self._lock = threading.Lock() self._data_available = threading.Condition(self._lock) self._eof = False def write(self, data: bytes) -> None: """Write data to the buffer from a writer thread.""" with self._data_available: self._buffer.seek(0, io.SEEK_END) self._buffer.write(data) self._data_available.notify_all() def read(self, size: int = -1) -> bytes: """Read data from the buffer in a reader thread.""" if self._buffer.closed: return b"" with self._data_available: while True: if self._buffer.closed: return b"" # always read from beginning self._buffer.seek(0) data = self._buffer.read(size) if data: # shrink the buffer to remove already-read data remaining = self._buffer.read() self._buffer = io.BytesIO(remaining) return data if self._eof: return b"" self._data_available.wait() def end_input(self) -> None: """Signal that no more data will be written.""" with self._data_available: self._eof = True self._data_available.notify_all() def close(self) -> None: self._buffer.close()
A thread-safe buffer that behaves like an IO stream. Allows writing from one thread and reading from another.
Methods
def close(self) ‑> None
-
Expand source code
def close(self) -> None: self._buffer.close()
def end_input(self) ‑> None
-
Expand source code
def end_input(self) -> None: """Signal that no more data will be written.""" with self._data_available: self._eof = True self._data_available.notify_all()
Signal that no more data will be written.
def read(self, size: int = -1) ‑> bytes
-
Expand source code
def read(self, size: int = -1) -> bytes: """Read data from the buffer in a reader thread.""" if self._buffer.closed: return b"" with self._data_available: while True: if self._buffer.closed: return b"" # always read from beginning self._buffer.seek(0) data = self._buffer.read(size) if data: # shrink the buffer to remove already-read data remaining = self._buffer.read() self._buffer = io.BytesIO(remaining) return data if self._eof: return b"" self._data_available.wait()
Read data from the buffer in a reader thread.
def write(self, data: bytes) ‑> None
-
Expand source code
def write(self, data: bytes) -> None: """Write data to the buffer from a writer thread.""" with self._data_available: self._buffer.seek(0, io.SEEK_END) self._buffer.write(data) self._data_available.notify_all()
Write data to the buffer from a writer thread.