Module livekit.agents.utils.codecs

Sub-modules

livekit.agents.utils.codecs.decoder

Classes

class AudioStreamDecoder (*, sample_rate: int = 48000, num_channels: int = 1)
Expand source code
class AudioStreamDecoder:
    """A class that can be used to decode audio stream into PCM AudioFrames.

    Decoders are stateful, and it should not be reused across multiple streams. Each decoder
    is designed to decode a single stream.
    """

    _max_workers: int = 10
    _executor: Optional[ThreadPoolExecutor] = None

    def __init__(self, *, sample_rate: int = 48000, num_channels: int = 1):
        try:
            import av  # noqa
        except ImportError:
            raise ImportError(
                "You haven't included the 'codecs' optional dependencies. Please install the 'codecs' extra by running `pip install livekit-agents[codecs]`"
            )

        self._sample_rate = sample_rate
        self._layout = "mono"
        if num_channels == 2:
            self._layout = "stereo"
        elif num_channels != 1:
            raise ValueError(f"Invalid number of channels: {num_channels}")

        self._output_ch = aio.Chan[rtc.AudioFrame]()
        self._closed = False
        self._started = False
        self._input_buf = StreamBuffer()
        self._loop = asyncio.get_event_loop()

        if self.__class__._executor is None:
            # each decoder instance will submit jobs to the shared pool
            self.__class__._executor = ThreadPoolExecutor(
                max_workers=self.__class__._max_workers
            )

    def push(self, chunk: bytes):
        self._input_buf.write(chunk)
        if not self._started:
            self._started = True
            self._loop.run_in_executor(self.__class__._executor, self._decode_loop)

    def end_input(self):
        self._input_buf.end_input()
        if not self._started:
            # if no data was pushed, close the output channel
            self._output_ch.close()

    def _decode_loop(self):
        container: av.container.InputContainer | None = None
        resampler: av.AudioResampler | None = None
        try:
            container = av.open(self._input_buf, mode="r")
            if len(container.streams.audio) == 0:
                raise ValueError("no audio stream found")

            audio_stream = container.streams.audio[0]
            resampler = av.AudioResampler(
                format="s16", layout=self._layout, rate=self._sample_rate
            )

            for frame in container.decode(audio_stream):
                if self._closed:
                    return

                for resampled_frame in resampler.resample(frame):
                    nchannels = len(resampled_frame.layout.channels)
                    self._loop.call_soon_threadsafe(
                        self._output_ch.send_nowait,
                        rtc.AudioFrame(
                            data=resampled_frame.to_ndarray().tobytes(),
                            num_channels=nchannels,
                            sample_rate=int(resampled_frame.sample_rate),
                            samples_per_channel=int(
                                resampled_frame.samples / nchannels
                            ),
                        ),
                    )

        except Exception:
            logger.exception("error decoding audio")
        finally:
            self._loop.call_soon_threadsafe(self._output_ch.close)
            if container:
                container.close()

    def __aiter__(self) -> AsyncIterator[rtc.AudioFrame]:
        return self

    async def __anext__(self) -> rtc.AudioFrame:
        return await self._output_ch.__anext__()

    async def aclose(self):
        if self._closed:
            return

        self.end_input()
        self._closed = True
        self._input_buf.close()
        # wait for decode loop to finish, only if anything's been pushed
        with contextlib.suppress(aio.ChanClosed):
            if self._started:
                await self._output_ch.recv()

A class that can be used to decode audio stream into PCM AudioFrames.

Decoders are stateful, and it should not be reused across multiple streams. Each decoder is designed to decode a single stream.

Methods

async def aclose(self)
Expand source code
async def aclose(self):
    if self._closed:
        return

    self.end_input()
    self._closed = True
    self._input_buf.close()
    # wait for decode loop to finish, only if anything's been pushed
    with contextlib.suppress(aio.ChanClosed):
        if self._started:
            await self._output_ch.recv()
def end_input(self)
Expand source code
def end_input(self):
    self._input_buf.end_input()
    if not self._started:
        # if no data was pushed, close the output channel
        self._output_ch.close()
def push(self, chunk: bytes)
Expand source code
def push(self, chunk: bytes):
    self._input_buf.write(chunk)
    if not self._started:
        self._started = True
        self._loop.run_in_executor(self.__class__._executor, self._decode_loop)
class StreamBuffer
Expand source code
class StreamBuffer:
    """
    A thread-safe buffer that behaves like an IO stream.
    Allows writing from one thread and reading from another.
    """

    def __init__(self):
        self._buffer = io.BytesIO()
        self._lock = threading.Lock()
        self._data_available = threading.Condition(self._lock)
        self._eof = False

    def write(self, data: bytes):
        """Write data to the buffer from a writer thread."""
        with self._data_available:
            self._buffer.seek(0, io.SEEK_END)
            self._buffer.write(data)
            self._data_available.notify_all()

    def read(self, size: int = -1) -> bytes:
        """Read data from the buffer in a reader thread."""

        if self._buffer.closed:
            return b""

        with self._data_available:
            while True:
                if self._buffer.closed:
                    return b""
                # always read from beginning
                self._buffer.seek(0)
                data = self._buffer.read(size)

                if data:
                    # shrink the buffer to remove already-read data
                    remaining = self._buffer.read()
                    self._buffer = io.BytesIO(remaining)
                    return data

                if self._eof:
                    return b""

                self._data_available.wait()

    def end_input(self):
        """Signal that no more data will be written."""
        with self._data_available:
            self._eof = True
            self._data_available.notify_all()

    def close(self):
        self._buffer.close()

A thread-safe buffer that behaves like an IO stream. Allows writing from one thread and reading from another.

Methods

def close(self)
Expand source code
def close(self):
    self._buffer.close()
def end_input(self)
Expand source code
def end_input(self):
    """Signal that no more data will be written."""
    with self._data_available:
        self._eof = True
        self._data_available.notify_all()

Signal that no more data will be written.

def read(self, size: int = -1) ‑> bytes
Expand source code
def read(self, size: int = -1) -> bytes:
    """Read data from the buffer in a reader thread."""

    if self._buffer.closed:
        return b""

    with self._data_available:
        while True:
            if self._buffer.closed:
                return b""
            # always read from beginning
            self._buffer.seek(0)
            data = self._buffer.read(size)

            if data:
                # shrink the buffer to remove already-read data
                remaining = self._buffer.read()
                self._buffer = io.BytesIO(remaining)
                return data

            if self._eof:
                return b""

            self._data_available.wait()

Read data from the buffer in a reader thread.

def write(self, data: bytes)
Expand source code
def write(self, data: bytes):
    """Write data to the buffer from a writer thread."""
    with self._data_available:
        self._buffer.seek(0, io.SEEK_END)
        self._buffer.write(data)
        self._data_available.notify_all()

Write data to the buffer from a writer thread.