Module livekit.agents.utils.codecs

Classes

class AudioStreamDecoder (*,
sample_rate: int | None = 48000,
num_channels: int | None = 1,
format: str | None = None)
Expand source code
class AudioStreamDecoder:
    """A class that can be used to decode audio stream into PCM AudioFrames.

    Decoders are stateful, and it should not be reused across multiple streams. Each decoder
    is designed to decode a single stream.
    """

    def __init__(
        self,
        *,
        sample_rate: int | None = 48000,
        num_channels: int | None = 1,
        format: str | None = None,
    ):
        self._sample_rate = sample_rate

        self._layout = "mono"
        if num_channels == 2:
            self._layout = "stereo"

        self._mime_type = format.lower() if format else None
        self._av_format = _mime_to_av_format(self._mime_type)
        self._is_wav = self._av_format == "wav"

        self._output_ch = aio.Chan[rtc.AudioFrame]()
        self._closed = False
        self._started = False
        self._loop = asyncio.get_event_loop()

        # lazy-initialized only for non-WAV codecs
        self._input_buf: StreamBuffer | None = None
        self._executor: ThreadPoolExecutor | None = None

        # lazy-initialized only for WAV
        self._wav_decoder: _WavInlineDecoder | None = None

    def push(self, chunk: bytes) -> None:
        if self._is_wav:
            if self._wav_decoder is None:
                self._wav_decoder = _WavInlineDecoder(self._output_ch, self._sample_rate)
            try:
                self._wav_decoder.push(chunk)
            except Exception:
                if not self._closed:
                    logger.exception("error decoding WAV audio")
                    self._output_ch.close()
                    self._closed = True
                return
            self._started = True
            return

        if self._input_buf is None:
            self._input_buf = StreamBuffer()
            self._executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="AudioDecoder")
        self._input_buf.write(chunk)
        if not self._started:
            self._started = True
            self._loop.run_in_executor(self._executor, self._decode_loop)

    def end_input(self) -> None:
        if self._is_wav:
            if self._wav_decoder is not None and not self._closed:
                try:
                    self._wav_decoder.flush()
                except Exception:
                    logger.exception("error flushing WAV audio")
            if not self._closed:
                self._output_ch.close()
            return

        if self._input_buf is not None:
            self._input_buf.end_input()
        if not self._started:
            self._output_ch.close()

    def _decode_loop(self) -> None:
        container: av.container.InputContainer | None = None
        resampler: av.AudioResampler | None = None
        try:
            # open container in low-latency streaming mode
            container = av.open(
                self._input_buf,
                mode="r",
                format=self._av_format,
                buffer_size=256,
                options={
                    "probesize": "32",
                    "analyzeduration": "0",
                    "fflags": "nobuffer+flush_packets",
                    "flags": "low_delay",
                    "reorder_queue_size": "0",
                    "max_delay": "0",
                    "avioflags": "direct",
                },
            )
            # explicitly disable internal buffering flags on the FFmpeg container
            container.flags |= cast(
                int, av.container.Flags.no_buffer.value | av.container.Flags.flush_packets.value
            )

            if len(container.streams.audio) == 0:
                raise ValueError("no audio stream found")

            audio_stream = container.streams.audio[0]

            # Set up resampler only if needed
            if self._sample_rate is not None or self._layout is not None:
                resampler = av.AudioResampler(
                    format="s16", layout=self._layout, rate=self._sample_rate
                )

            for frame in container.decode(audio_stream):
                if self._closed:
                    return

                if resampler:
                    frames = resampler.resample(frame)
                else:
                    frames = [frame]

                for f in frames:
                    self._emit_av_frame(f)

            # flush the resampler to get any remaining buffered samples
            if resampler and not self._closed:
                for f in resampler.resample(None):
                    self._emit_av_frame(f)

        except Exception:
            logger.exception("error decoding audio")
        finally:
            self._loop.call_soon_threadsafe(self._output_ch.close)
            if container:
                container.close()

    def _emit_av_frame(self, f: av.AudioFrame) -> None:
        self._loop.call_soon_threadsafe(
            self._output_ch.send_nowait,
            rtc.AudioFrame(
                data=f.to_ndarray().tobytes(),
                num_channels=len(f.layout.channels),
                sample_rate=int(f.sample_rate),
                samples_per_channel=f.samples,
            ),
        )

    def __aiter__(self) -> AsyncIterator[rtc.AudioFrame]:
        return self

    async def __anext__(self) -> rtc.AudioFrame:
        return await self._output_ch.__anext__()

    async def aclose(self) -> None:
        if self._closed:
            return

        self.end_input()
        self._closed = True

        if self._input_buf is not None:
            self._input_buf.close()

        if not self._started:
            return

        async for _ in self._output_ch:
            pass

        if self._executor is not None:
            self._executor.shutdown(wait=False, cancel_futures=True)

A class that can be used to decode audio stream into PCM AudioFrames.

Decoders are stateful, and it should not be reused across multiple streams. Each decoder is designed to decode a single stream.

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    if self._closed:
        return

    self.end_input()
    self._closed = True

    if self._input_buf is not None:
        self._input_buf.close()

    if not self._started:
        return

    async for _ in self._output_ch:
        pass

    if self._executor is not None:
        self._executor.shutdown(wait=False, cancel_futures=True)
def end_input(self) ‑> None
Expand source code
def end_input(self) -> None:
    if self._is_wav:
        if self._wav_decoder is not None and not self._closed:
            try:
                self._wav_decoder.flush()
            except Exception:
                logger.exception("error flushing WAV audio")
        if not self._closed:
            self._output_ch.close()
        return

    if self._input_buf is not None:
        self._input_buf.end_input()
    if not self._started:
        self._output_ch.close()
def push(self, chunk: bytes) ‑> None
Expand source code
def push(self, chunk: bytes) -> None:
    if self._is_wav:
        if self._wav_decoder is None:
            self._wav_decoder = _WavInlineDecoder(self._output_ch, self._sample_rate)
        try:
            self._wav_decoder.push(chunk)
        except Exception:
            if not self._closed:
                logger.exception("error decoding WAV audio")
                self._output_ch.close()
                self._closed = True
            return
        self._started = True
        return

    if self._input_buf is None:
        self._input_buf = StreamBuffer()
        self._executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="AudioDecoder")
    self._input_buf.write(chunk)
    if not self._started:
        self._started = True
        self._loop.run_in_executor(self._executor, self._decode_loop)
class StreamBuffer
Expand source code
class StreamBuffer:
    """
    A thread-safe buffer that behaves like an IO stream.
    Allows writing from one thread and reading from another.
    """

    _COMPACT_THRESHOLD = 5 * 1024 * 1024  # compact after 5MB consumed

    def __init__(self) -> None:
        self._bio = io.BytesIO()
        self._lock = threading.Lock()
        self._data_available = threading.Condition(self._lock)
        self._eof = False
        self._closed = False
        self._write_pos = 0
        self._read_pos = 0

    def write(self, data: bytes) -> None:
        """Write data to the buffer from a writer thread."""
        with self._data_available:
            self._bio.seek(self._write_pos)
            self._bio.write(data)
            self._write_pos = self._bio.tell()
            self._data_available.notify_all()

    def read(self, size: int = -1) -> bytes:
        """Read data from the buffer in a reader thread."""
        if size == 0:
            return b""

        with self._data_available:
            while True:
                if self._closed:
                    return b""

                available = self._write_pos - self._read_pos
                if available > 0:
                    self._bio.seek(self._read_pos)
                    if size < 0:
                        data = self._bio.read(available)
                    else:
                        data = self._bio.read(min(size, available))
                    self._read_pos = self._bio.tell()

                    if self._read_pos >= self._COMPACT_THRESHOLD:
                        remaining = self._bio.read()
                        self._bio = io.BytesIO(remaining)
                        self._bio.seek(0, io.SEEK_END)
                        self._write_pos = self._bio.tell()
                        self._read_pos = 0

                    return data if data else b""

                if self._eof:
                    return b""

                self._data_available.wait()

    def end_input(self) -> None:
        """Signal that no more data will be written."""
        with self._data_available:
            self._eof = True
            self._data_available.notify_all()

    def close(self) -> None:
        with self._data_available:
            self._closed = True
            self._data_available.notify_all()
            self._bio.close()

A thread-safe buffer that behaves like an IO stream. Allows writing from one thread and reading from another.

Methods

def close(self) ‑> None
Expand source code
def close(self) -> None:
    with self._data_available:
        self._closed = True
        self._data_available.notify_all()
        self._bio.close()
def end_input(self) ‑> None
Expand source code
def end_input(self) -> None:
    """Signal that no more data will be written."""
    with self._data_available:
        self._eof = True
        self._data_available.notify_all()

Signal that no more data will be written.

def read(self, size: int = -1) ‑> bytes
Expand source code
def read(self, size: int = -1) -> bytes:
    """Read data from the buffer in a reader thread."""
    if size == 0:
        return b""

    with self._data_available:
        while True:
            if self._closed:
                return b""

            available = self._write_pos - self._read_pos
            if available > 0:
                self._bio.seek(self._read_pos)
                if size < 0:
                    data = self._bio.read(available)
                else:
                    data = self._bio.read(min(size, available))
                self._read_pos = self._bio.tell()

                if self._read_pos >= self._COMPACT_THRESHOLD:
                    remaining = self._bio.read()
                    self._bio = io.BytesIO(remaining)
                    self._bio.seek(0, io.SEEK_END)
                    self._write_pos = self._bio.tell()
                    self._read_pos = 0

                return data if data else b""

            if self._eof:
                return b""

            self._data_available.wait()

Read data from the buffer in a reader thread.

def write(self, data: bytes) ‑> None
Expand source code
def write(self, data: bytes) -> None:
    """Write data to the buffer from a writer thread."""
    with self._data_available:
        self._bio.seek(self._write_pos)
        self._bio.write(data)
        self._write_pos = self._bio.tell()
        self._data_available.notify_all()

Write data to the buffer from a writer thread.