Module livekit.agents.utils.codecs
Sub-modules
livekit.agents.utils.codecs.decoder
Classes
class AudioStreamDecoder (*, sample_rate: int = 48000, num_channels: int = 1)
-
Expand source code
class AudioStreamDecoder: """A class that can be used to decode audio stream into PCM AudioFrames. Decoders are stateful, and it should not be reused across multiple streams. Each decoder is designed to decode a single stream. """ _max_workers: int = 10 _executor: Optional[ThreadPoolExecutor] = None def __init__(self, *, sample_rate: int = 48000, num_channels: int = 1): try: import av # noqa except ImportError: raise ImportError( "You haven't included the 'codecs' optional dependencies. Please install the 'codecs' extra by running `pip install livekit-agents[codecs]`" ) self._sample_rate = sample_rate self._layout = "mono" if num_channels == 2: self._layout = "stereo" elif num_channels != 1: raise ValueError(f"Invalid number of channels: {num_channels}") self._output_ch = aio.Chan[rtc.AudioFrame]() self._closed = False self._started = False self._input_buf = StreamBuffer() self._loop = asyncio.get_event_loop() if self.__class__._executor is None: # each decoder instance will submit jobs to the shared pool self.__class__._executor = ThreadPoolExecutor( max_workers=self.__class__._max_workers ) def push(self, chunk: bytes): self._input_buf.write(chunk) if not self._started: self._started = True self._loop.run_in_executor(self.__class__._executor, self._decode_loop) def end_input(self): self._input_buf.end_input() if not self._started: # if no data was pushed, close the output channel self._output_ch.close() def _decode_loop(self): container: av.container.InputContainer | None = None resampler: av.AudioResampler | None = None try: container = av.open(self._input_buf, mode="r") if len(container.streams.audio) == 0: raise ValueError("no audio stream found") audio_stream = container.streams.audio[0] resampler = av.AudioResampler( format="s16", layout=self._layout, rate=self._sample_rate ) for frame in container.decode(audio_stream): if self._closed: return for resampled_frame in resampler.resample(frame): nchannels = len(resampled_frame.layout.channels) self._loop.call_soon_threadsafe( self._output_ch.send_nowait, rtc.AudioFrame( data=resampled_frame.to_ndarray().tobytes(), num_channels=nchannels, sample_rate=int(resampled_frame.sample_rate), samples_per_channel=int( resampled_frame.samples / nchannels ), ), ) except Exception: logger.exception("error decoding audio") finally: self._loop.call_soon_threadsafe(self._output_ch.close) if container: container.close() def __aiter__(self) -> AsyncIterator[rtc.AudioFrame]: return self async def __anext__(self) -> rtc.AudioFrame: return await self._output_ch.__anext__() async def aclose(self): if self._closed: return self.end_input() self._closed = True self._input_buf.close() # wait for decode loop to finish, only if anything's been pushed with contextlib.suppress(aio.ChanClosed): if self._started: await self._output_ch.recv()
A class that can be used to decode audio stream into PCM AudioFrames.
Decoders are stateful, and it should not be reused across multiple streams. Each decoder is designed to decode a single stream.
Methods
async def aclose(self)
-
Expand source code
async def aclose(self): if self._closed: return self.end_input() self._closed = True self._input_buf.close() # wait for decode loop to finish, only if anything's been pushed with contextlib.suppress(aio.ChanClosed): if self._started: await self._output_ch.recv()
def end_input(self)
-
Expand source code
def end_input(self): self._input_buf.end_input() if not self._started: # if no data was pushed, close the output channel self._output_ch.close()
def push(self, chunk: bytes)
-
Expand source code
def push(self, chunk: bytes): self._input_buf.write(chunk) if not self._started: self._started = True self._loop.run_in_executor(self.__class__._executor, self._decode_loop)
class StreamBuffer
-
Expand source code
class StreamBuffer: """ A thread-safe buffer that behaves like an IO stream. Allows writing from one thread and reading from another. """ def __init__(self): self._buffer = io.BytesIO() self._lock = threading.Lock() self._data_available = threading.Condition(self._lock) self._eof = False def write(self, data: bytes): """Write data to the buffer from a writer thread.""" with self._data_available: self._buffer.seek(0, io.SEEK_END) self._buffer.write(data) self._data_available.notify_all() def read(self, size: int = -1) -> bytes: """Read data from the buffer in a reader thread.""" if self._buffer.closed: return b"" with self._data_available: while True: if self._buffer.closed: return b"" # always read from beginning self._buffer.seek(0) data = self._buffer.read(size) if data: # shrink the buffer to remove already-read data remaining = self._buffer.read() self._buffer = io.BytesIO(remaining) return data if self._eof: return b"" self._data_available.wait() def end_input(self): """Signal that no more data will be written.""" with self._data_available: self._eof = True self._data_available.notify_all() def close(self): self._buffer.close()
A thread-safe buffer that behaves like an IO stream. Allows writing from one thread and reading from another.
Methods
def close(self)
-
Expand source code
def close(self): self._buffer.close()
def end_input(self)
-
Expand source code
def end_input(self): """Signal that no more data will be written.""" with self._data_available: self._eof = True self._data_available.notify_all()
Signal that no more data will be written.
def read(self, size: int = -1) ‑> bytes
-
Expand source code
def read(self, size: int = -1) -> bytes: """Read data from the buffer in a reader thread.""" if self._buffer.closed: return b"" with self._data_available: while True: if self._buffer.closed: return b"" # always read from beginning self._buffer.seek(0) data = self._buffer.read(size) if data: # shrink the buffer to remove already-read data remaining = self._buffer.read() self._buffer = io.BytesIO(remaining) return data if self._eof: return b"" self._data_available.wait()
Read data from the buffer in a reader thread.
def write(self, data: bytes)
-
Expand source code
def write(self, data: bytes): """Write data to the buffer from a writer thread.""" with self._data_available: self._buffer.seek(0, io.SEEK_END) self._buffer.write(data) self._data_available.notify_all()
Write data to the buffer from a writer thread.