Module livekit.plugins.silero

Silero VAD plugin for LiveKit Agents

See https://docs.livekit.io/build/turns/vad/ for more information.

Sub-modules

livekit.plugins.silero.resources

Used by importlib.resources and setuptools

Classes

class VAD (*, session: onnxruntime.InferenceSession, opts: _VADOptions)
Expand source code
class VAD(agents.vad.VAD):
    """
    Silero Voice Activity Detection (VAD) class.

    This class provides functionality to detect speech segments within audio data using the Silero VAD model.
    """  # noqa: E501

    @classmethod
    def load(
        cls,
        *,
        min_speech_duration: float = 0.05,
        min_silence_duration: float = 0.55,
        prefix_padding_duration: float = 0.5,
        max_buffered_speech: float = 60.0,
        activation_threshold: float = 0.5,
        sample_rate: Literal[8000, 16000] = 16000,
        force_cpu: bool = True,
        # deprecated
        padding_duration: NotGivenOr[float] = NOT_GIVEN,
    ) -> VAD:
        """
        Load and initialize the Silero VAD model.

        This method loads the ONNX model and prepares it for inference. When options are not provided,
        sane defaults are used.

        **Note:**
            This method is blocking and may take time to load the model into memory.
            It is recommended to call this method inside your prewarm mechanism.

        **Example:**

            ```python
            def prewarm(proc: JobProcess):
                proc.userdata["vad"] = silero.VAD.load()


            async def entrypoint(ctx: JobContext):
                vad = (ctx.proc.userdata["vad"],)
                # your agent logic...


            if __name__ == "__main__":
                cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint, prewarm_fnc=prewarm))
            ```

        Args:
            min_speech_duration (float): Minimum duration of speech to start a new speech chunk.
            min_silence_duration (float): At the end of each speech, wait this duration before ending the speech.
            prefix_padding_duration (float): Duration of padding to add to the beginning of each speech chunk.
            max_buffered_speech (float): Maximum duration of speech to keep in the buffer (in seconds).
            activation_threshold (float): Threshold to consider a frame as speech.
            sample_rate (Literal[8000, 16000]): Sample rate for the inference (only 8KHz and 16KHz are supported).
            force_cpu (bool): Force the use of CPU for inference.
            padding_duration (float | None): **Deprecated**. Use `prefix_padding_duration` instead.

        Returns:
            VAD: An instance of the VAD class ready for streaming.

        Raises:
            ValueError: If an unsupported sample rate is provided.
        """  # noqa: E501
        if sample_rate not in onnx_model.SUPPORTED_SAMPLE_RATES:
            raise ValueError("Silero VAD only supports 8KHz and 16KHz sample rates")

        if is_given(padding_duration):
            logger.warning(
                "padding_duration is deprecated and will be removed in 1.5.0, use prefix_padding_duration instead",  # noqa: E501
            )
            prefix_padding_duration = padding_duration

        session = onnx_model.new_inference_session(force_cpu)
        opts = _VADOptions(
            min_speech_duration=min_speech_duration,
            min_silence_duration=min_silence_duration,
            prefix_padding_duration=prefix_padding_duration,
            max_buffered_speech=max_buffered_speech,
            activation_threshold=activation_threshold,
            sample_rate=sample_rate,
        )
        return cls(session=session, opts=opts)

    def __init__(
        self,
        *,
        session: onnxruntime.InferenceSession,
        opts: _VADOptions,
    ) -> None:
        super().__init__(capabilities=agents.vad.VADCapabilities(update_interval=0.032))
        self._onnx_session = session
        self._opts = opts
        self._streams = weakref.WeakSet[VADStream]()

    def stream(self) -> VADStream:
        """
        Create a new VADStream for processing audio data.

        Returns:
            VADStream: A stream object for processing audio input and detecting speech.
        """
        stream = VADStream(
            self,
            self._opts,
            onnx_model.OnnxModel(
                onnx_session=self._onnx_session, sample_rate=self._opts.sample_rate
            ),
        )
        self._streams.add(stream)
        return stream

    def update_options(
        self,
        *,
        min_speech_duration: NotGivenOr[float] = NOT_GIVEN,
        min_silence_duration: NotGivenOr[float] = NOT_GIVEN,
        prefix_padding_duration: NotGivenOr[float] = NOT_GIVEN,
        max_buffered_speech: NotGivenOr[float] = NOT_GIVEN,
        activation_threshold: NotGivenOr[float] = NOT_GIVEN,
    ) -> None:
        """
        Update the VAD options.

        This method allows you to update the VAD options after the VAD object has been created.

        Args:
            min_speech_duration (float): Minimum duration of speech to start a new speech chunk.
            min_silence_duration (float): At the end of each speech, wait this duration before ending the speech.
            prefix_padding_duration (float): Duration of padding to add to the beginning of each speech chunk.
            max_buffered_speech (float): Maximum duration of speech to keep in the buffer (in seconds).
            activation_threshold (float): Threshold to consider a frame as speech.
        """  # noqa: E501
        if is_given(min_speech_duration):
            self._opts.min_speech_duration = min_speech_duration
        if is_given(min_silence_duration):
            self._opts.min_silence_duration = min_silence_duration
        if is_given(prefix_padding_duration):
            self._opts.prefix_padding_duration = prefix_padding_duration
        if is_given(max_buffered_speech):
            self._opts.max_buffered_speech = max_buffered_speech
        if is_given(activation_threshold):
            self._opts.activation_threshold = activation_threshold

        for stream in self._streams:
            stream.update_options(
                min_speech_duration=min_speech_duration,
                min_silence_duration=min_silence_duration,
                prefix_padding_duration=prefix_padding_duration,
                max_buffered_speech=max_buffered_speech,
                activation_threshold=activation_threshold,
            )

Silero Voice Activity Detection (VAD) class.

This class provides functionality to detect speech segments within audio data using the Silero VAD model.

Ancestors

Static methods

def load(*,
min_speech_duration: float = 0.05,
min_silence_duration: float = 0.55,
prefix_padding_duration: float = 0.5,
max_buffered_speech: float = 60.0,
activation_threshold: float = 0.5,
sample_rate: Literal[8000, 16000] = 16000,
force_cpu: bool = True,
padding_duration: NotGivenOr[float] = NOT_GIVEN) ‑> livekit.plugins.silero.vad.VAD

Load and initialize the Silero VAD model.

This method loads the ONNX model and prepares it for inference. When options are not provided, sane defaults are used.

Note: This method is blocking and may take time to load the model into memory. It is recommended to call this method inside your prewarm mechanism.

Example:

```python
def prewarm(proc: JobProcess):
    proc.userdata["vad"] = silero.VAD.load()


async def entrypoint(ctx: JobContext):
    vad = (ctx.proc.userdata["vad"],)
    # your agent logic...


if __name__ == "__main__":
    cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint, prewarm_fnc=prewarm))
```

Args

min_speech_duration : float
Minimum duration of speech to start a new speech chunk.
min_silence_duration : float
At the end of each speech, wait this duration before ending the speech.
prefix_padding_duration : float
Duration of padding to add to the beginning of each speech chunk.
max_buffered_speech : float
Maximum duration of speech to keep in the buffer (in seconds).
activation_threshold : float
Threshold to consider a frame as speech.
sample_rate : Literal[8000, 16000]
Sample rate for the inference (only 8KHz and 16KHz are supported).
force_cpu : bool
Force the use of CPU for inference.
padding_duration : float | None
Deprecated. Use prefix_padding_duration instead.

Returns

VAD
An instance of the VAD class ready for streaming.

Raises

ValueError
If an unsupported sample rate is provided.

Methods

def stream(self) ‑> livekit.plugins.silero.vad.VADStream
Expand source code
def stream(self) -> VADStream:
    """
    Create a new VADStream for processing audio data.

    Returns:
        VADStream: A stream object for processing audio input and detecting speech.
    """
    stream = VADStream(
        self,
        self._opts,
        onnx_model.OnnxModel(
            onnx_session=self._onnx_session, sample_rate=self._opts.sample_rate
        ),
    )
    self._streams.add(stream)
    return stream

Create a new VADStream for processing audio data.

Returns

VADStream
A stream object for processing audio input and detecting speech.
def update_options(self,
*,
min_speech_duration: NotGivenOr[float] = NOT_GIVEN,
min_silence_duration: NotGivenOr[float] = NOT_GIVEN,
prefix_padding_duration: NotGivenOr[float] = NOT_GIVEN,
max_buffered_speech: NotGivenOr[float] = NOT_GIVEN,
activation_threshold: NotGivenOr[float] = NOT_GIVEN) ‑> None
Expand source code
def update_options(
    self,
    *,
    min_speech_duration: NotGivenOr[float] = NOT_GIVEN,
    min_silence_duration: NotGivenOr[float] = NOT_GIVEN,
    prefix_padding_duration: NotGivenOr[float] = NOT_GIVEN,
    max_buffered_speech: NotGivenOr[float] = NOT_GIVEN,
    activation_threshold: NotGivenOr[float] = NOT_GIVEN,
) -> None:
    """
    Update the VAD options.

    This method allows you to update the VAD options after the VAD object has been created.

    Args:
        min_speech_duration (float): Minimum duration of speech to start a new speech chunk.
        min_silence_duration (float): At the end of each speech, wait this duration before ending the speech.
        prefix_padding_duration (float): Duration of padding to add to the beginning of each speech chunk.
        max_buffered_speech (float): Maximum duration of speech to keep in the buffer (in seconds).
        activation_threshold (float): Threshold to consider a frame as speech.
    """  # noqa: E501
    if is_given(min_speech_duration):
        self._opts.min_speech_duration = min_speech_duration
    if is_given(min_silence_duration):
        self._opts.min_silence_duration = min_silence_duration
    if is_given(prefix_padding_duration):
        self._opts.prefix_padding_duration = prefix_padding_duration
    if is_given(max_buffered_speech):
        self._opts.max_buffered_speech = max_buffered_speech
    if is_given(activation_threshold):
        self._opts.activation_threshold = activation_threshold

    for stream in self._streams:
        stream.update_options(
            min_speech_duration=min_speech_duration,
            min_silence_duration=min_silence_duration,
            prefix_padding_duration=prefix_padding_duration,
            max_buffered_speech=max_buffered_speech,
            activation_threshold=activation_threshold,
        )

Update the VAD options.

This method allows you to update the VAD options after the VAD object has been created.

Args

min_speech_duration : float
Minimum duration of speech to start a new speech chunk.
min_silence_duration : float
At the end of each speech, wait this duration before ending the speech.
prefix_padding_duration : float
Duration of padding to add to the beginning of each speech chunk.
max_buffered_speech : float
Maximum duration of speech to keep in the buffer (in seconds).
activation_threshold : float
Threshold to consider a frame as speech.

Inherited members

class VADStream (vad: VAD,
opts: _VADOptions,
model: onnx_model.OnnxModel)
Expand source code
class VADStream(agents.vad.VADStream):
    def __init__(self, vad: VAD, opts: _VADOptions, model: onnx_model.OnnxModel) -> None:
        super().__init__(vad)
        self._opts, self._model = opts, model
        self._loop = asyncio.get_event_loop()

        self._executor = ThreadPoolExecutor(max_workers=1)
        self._task.add_done_callback(lambda _: self._executor.shutdown(wait=False))
        self._exp_filter = utils.ExpFilter(alpha=0.35)

        self._input_sample_rate = 0
        self._speech_buffer: np.ndarray | None = None
        self._speech_buffer_max_reached = False
        self._prefix_padding_samples = 0  # (input_sample_rate)

    def update_options(
        self,
        *,
        min_speech_duration: NotGivenOr[float] = NOT_GIVEN,
        min_silence_duration: NotGivenOr[float] = NOT_GIVEN,
        prefix_padding_duration: NotGivenOr[float] = NOT_GIVEN,
        max_buffered_speech: NotGivenOr[float] = NOT_GIVEN,
        activation_threshold: NotGivenOr[float] = NOT_GIVEN,
    ) -> None:
        """
        Update the VAD options.

        This method allows you to update the VAD options after the VAD object has been created.

        Args:
            min_speech_duration (float): Minimum duration of speech to start a new speech chunk.
            min_silence_duration (float): At the end of each speech, wait this duration before ending the speech.
            prefix_padding_duration (float): Duration of padding to add to the beginning of each speech chunk.
            max_buffered_speech (float): Maximum duration of speech to keep in the buffer (in seconds).
            activation_threshold (float): Threshold to consider a frame as speech.
        """  # noqa: E501
        old_max_buffered_speech = self._opts.max_buffered_speech

        if is_given(min_speech_duration):
            self._opts.min_speech_duration = min_speech_duration
        if is_given(min_silence_duration):
            self._opts.min_silence_duration = min_silence_duration
        if is_given(prefix_padding_duration):
            self._opts.prefix_padding_duration = prefix_padding_duration
        if is_given(max_buffered_speech):
            self._opts.max_buffered_speech = max_buffered_speech
        if is_given(activation_threshold):
            self._opts.activation_threshold = activation_threshold

        if self._input_sample_rate:
            assert self._speech_buffer is not None

            self._prefix_padding_samples = int(
                self._opts.prefix_padding_duration * self._input_sample_rate
            )

            self._speech_buffer.resize(
                int(self._opts.max_buffered_speech * self._input_sample_rate)
                + self._prefix_padding_samples
            )

            if self._opts.max_buffered_speech > old_max_buffered_speech:
                self._speech_buffer_max_reached = False

    @agents.utils.log_exceptions(logger=logger)
    async def _main_task(self):
        inference_f32_data = np.empty(self._model.window_size_samples, dtype=np.float32)
        speech_buffer_index: int = 0

        # "pub_" means public, these values are exposed to the users through events
        pub_speaking = False
        pub_speech_duration = 0.0
        pub_silence_duration = 0.0
        pub_current_sample = 0
        pub_timestamp = 0.0

        speech_threshold_duration = 0.0
        silence_threshold_duration = 0.0

        input_frames = []
        inference_frames = []
        resampler: rtc.AudioResampler | None = None

        # used to avoid drift when the sample_rate ratio is not an integer
        input_copy_remaining_fract = 0.0

        extra_inference_time = 0.0

        async for input_frame in self._input_ch:
            if not isinstance(input_frame, rtc.AudioFrame):
                continue  # ignore flush sentinel for now

            if not self._input_sample_rate:
                self._input_sample_rate = input_frame.sample_rate

                # alloc the buffers now that we know the input sample rate
                self._prefix_padding_samples = int(
                    self._opts.prefix_padding_duration * self._input_sample_rate
                )

                self._speech_buffer = np.empty(
                    int(self._opts.max_buffered_speech * self._input_sample_rate)
                    + self._prefix_padding_samples,
                    dtype=np.int16,
                )

                if self._input_sample_rate != self._opts.sample_rate:
                    # resampling needed: the input sample rate isn't the same as the model's
                    # sample rate used for inference
                    resampler = rtc.AudioResampler(
                        input_rate=self._input_sample_rate,
                        output_rate=self._opts.sample_rate,
                        quality=rtc.AudioResamplerQuality.QUICK,  # VAD doesn't need high quality
                    )

            elif self._input_sample_rate != input_frame.sample_rate:
                logger.error("a frame with another sample rate was already pushed")
                continue

            assert self._speech_buffer is not None

            input_frames.append(input_frame)
            if resampler is not None:
                # the resampler may have a bit of latency, but it is OK to ignore since it should be
                # negligible
                inference_frames.extend(resampler.push(input_frame))
            else:
                inference_frames.append(input_frame)

            while True:
                start_time = time.perf_counter()

                available_inference_samples = sum(
                    [frame.samples_per_channel for frame in inference_frames]
                )
                if available_inference_samples < self._model.window_size_samples:
                    break  # not enough samples to run inference

                input_frame = utils.combine_frames(input_frames)
                inference_frame = utils.combine_frames(inference_frames)

                # convert data to f32
                np.divide(
                    inference_frame.data[: self._model.window_size_samples],
                    np.iinfo(np.int16).max,
                    out=inference_f32_data,
                    dtype=np.float32,
                )

                # run the inference
                p = await self._loop.run_in_executor(
                    self._executor, self._model, inference_f32_data
                )
                p = self._exp_filter.apply(exp=1.0, sample=p)

                window_duration = self._model.window_size_samples / self._opts.sample_rate

                pub_current_sample += self._model.window_size_samples
                pub_timestamp += window_duration

                resampling_ratio = self._input_sample_rate / self._model.sample_rate
                to_copy = (
                    self._model.window_size_samples * resampling_ratio + input_copy_remaining_fract
                )
                to_copy_int = int(to_copy)
                input_copy_remaining_fract = to_copy - to_copy_int

                # copy the inference window to the speech buffer
                available_space = len(self._speech_buffer) - speech_buffer_index
                to_copy_buffer = min(to_copy_int, available_space)
                if to_copy_buffer > 0:
                    self._speech_buffer[
                        speech_buffer_index : speech_buffer_index + to_copy_buffer
                    ] = input_frame.data[:to_copy_buffer]
                    speech_buffer_index += to_copy_buffer
                elif not self._speech_buffer_max_reached:
                    # reached self._opts.max_buffered_speech (padding is included)
                    speech_buffer_max_reached = True
                    logger.warning(
                        "max_buffered_speech reached, ignoring further data for the current speech input"  # noqa: E501
                    )

                inference_duration = time.perf_counter() - start_time
                extra_inference_time = max(
                    0.0,
                    extra_inference_time + inference_duration - window_duration,
                )
                if inference_duration > SLOW_INFERENCE_THRESHOLD:
                    logger.warning(
                        "inference is slower than realtime",
                        extra={"delay": extra_inference_time},
                    )

                def _reset_write_cursor():
                    nonlocal speech_buffer_index, speech_buffer_max_reached
                    assert self._speech_buffer is not None

                    if speech_buffer_index <= self._prefix_padding_samples:
                        return

                    padding_data = self._speech_buffer[
                        speech_buffer_index - self._prefix_padding_samples : speech_buffer_index
                    ]

                    self._speech_buffer_max_reached = False
                    self._speech_buffer[: self._prefix_padding_samples] = padding_data
                    speech_buffer_index = self._prefix_padding_samples

                def _copy_speech_buffer() -> rtc.AudioFrame:
                    # copy the data from speech_buffer
                    assert self._speech_buffer is not None
                    speech_data = self._speech_buffer[:speech_buffer_index].tobytes()  # noqa: B023

                    return rtc.AudioFrame(
                        sample_rate=self._input_sample_rate,
                        num_channels=1,
                        samples_per_channel=speech_buffer_index,  # noqa: B023
                        data=speech_data,
                    )

                if pub_speaking:
                    pub_speech_duration += window_duration
                else:
                    pub_silence_duration += window_duration

                self._event_ch.send_nowait(
                    agents.vad.VADEvent(
                        type=agents.vad.VADEventType.INFERENCE_DONE,
                        samples_index=pub_current_sample,
                        timestamp=pub_timestamp,
                        silence_duration=pub_silence_duration,
                        speech_duration=pub_speech_duration,
                        probability=p,
                        inference_duration=inference_duration,
                        frames=[
                            rtc.AudioFrame(
                                data=input_frame.data[:to_copy_int].tobytes(),
                                sample_rate=self._input_sample_rate,
                                num_channels=1,
                                samples_per_channel=to_copy_int,
                            )
                        ],
                        speaking=pub_speaking,
                        raw_accumulated_silence=silence_threshold_duration,
                        raw_accumulated_speech=speech_threshold_duration,
                    )
                )

                if p >= self._opts.activation_threshold:
                    speech_threshold_duration += window_duration
                    silence_threshold_duration = 0.0

                    if not pub_speaking:
                        if speech_threshold_duration >= self._opts.min_speech_duration:
                            pub_speaking = True
                            pub_silence_duration = 0.0
                            pub_speech_duration = speech_threshold_duration

                            self._event_ch.send_nowait(
                                agents.vad.VADEvent(
                                    type=agents.vad.VADEventType.START_OF_SPEECH,
                                    samples_index=pub_current_sample,
                                    timestamp=pub_timestamp,
                                    silence_duration=pub_silence_duration,
                                    speech_duration=pub_speech_duration,
                                    frames=[_copy_speech_buffer()],
                                    speaking=True,
                                )
                            )

                else:
                    silence_threshold_duration += window_duration
                    speech_threshold_duration = 0.0

                    if not pub_speaking:
                        _reset_write_cursor()

                    if (
                        pub_speaking
                        and silence_threshold_duration >= self._opts.min_silence_duration
                    ):
                        pub_speaking = False
                        pub_speech_duration = 0.0
                        pub_silence_duration = silence_threshold_duration

                        self._event_ch.send_nowait(
                            agents.vad.VADEvent(
                                type=agents.vad.VADEventType.END_OF_SPEECH,
                                samples_index=pub_current_sample,
                                timestamp=pub_timestamp,
                                silence_duration=pub_silence_duration,
                                speech_duration=pub_speech_duration,
                                frames=[_copy_speech_buffer()],
                                speaking=False,
                            )
                        )

                        _reset_write_cursor()

                # remove the frames that were used for inference from the input and inference frames
                input_frames = []
                inference_frames = []

                # add the remaining data
                if len(input_frame.data) - to_copy_int > 0:
                    data = input_frame.data[to_copy_int:].tobytes()
                    input_frames.append(
                        rtc.AudioFrame(
                            data=data,
                            sample_rate=self._input_sample_rate,
                            num_channels=1,
                            samples_per_channel=len(data) // 2,
                        )
                    )

                if len(inference_frame.data) - self._model.window_size_samples > 0:
                    data = inference_frame.data[self._model.window_size_samples :].tobytes()
                    inference_frames.append(
                        rtc.AudioFrame(
                            data=data,
                            sample_rate=self._opts.sample_rate,
                            num_channels=1,
                            samples_per_channel=len(data) // 2,
                        )
                    )

Helper class that provides a standard way to create an ABC using inheritance.

Ancestors

Methods

def update_options(self,
*,
min_speech_duration: NotGivenOr[float] = NOT_GIVEN,
min_silence_duration: NotGivenOr[float] = NOT_GIVEN,
prefix_padding_duration: NotGivenOr[float] = NOT_GIVEN,
max_buffered_speech: NotGivenOr[float] = NOT_GIVEN,
activation_threshold: NotGivenOr[float] = NOT_GIVEN) ‑> None
Expand source code
def update_options(
    self,
    *,
    min_speech_duration: NotGivenOr[float] = NOT_GIVEN,
    min_silence_duration: NotGivenOr[float] = NOT_GIVEN,
    prefix_padding_duration: NotGivenOr[float] = NOT_GIVEN,
    max_buffered_speech: NotGivenOr[float] = NOT_GIVEN,
    activation_threshold: NotGivenOr[float] = NOT_GIVEN,
) -> None:
    """
    Update the VAD options.

    This method allows you to update the VAD options after the VAD object has been created.

    Args:
        min_speech_duration (float): Minimum duration of speech to start a new speech chunk.
        min_silence_duration (float): At the end of each speech, wait this duration before ending the speech.
        prefix_padding_duration (float): Duration of padding to add to the beginning of each speech chunk.
        max_buffered_speech (float): Maximum duration of speech to keep in the buffer (in seconds).
        activation_threshold (float): Threshold to consider a frame as speech.
    """  # noqa: E501
    old_max_buffered_speech = self._opts.max_buffered_speech

    if is_given(min_speech_duration):
        self._opts.min_speech_duration = min_speech_duration
    if is_given(min_silence_duration):
        self._opts.min_silence_duration = min_silence_duration
    if is_given(prefix_padding_duration):
        self._opts.prefix_padding_duration = prefix_padding_duration
    if is_given(max_buffered_speech):
        self._opts.max_buffered_speech = max_buffered_speech
    if is_given(activation_threshold):
        self._opts.activation_threshold = activation_threshold

    if self._input_sample_rate:
        assert self._speech_buffer is not None

        self._prefix_padding_samples = int(
            self._opts.prefix_padding_duration * self._input_sample_rate
        )

        self._speech_buffer.resize(
            int(self._opts.max_buffered_speech * self._input_sample_rate)
            + self._prefix_padding_samples
        )

        if self._opts.max_buffered_speech > old_max_buffered_speech:
            self._speech_buffer_max_reached = False

Update the VAD options.

This method allows you to update the VAD options after the VAD object has been created.

Args

min_speech_duration : float
Minimum duration of speech to start a new speech chunk.
min_silence_duration : float
At the end of each speech, wait this duration before ending the speech.
prefix_padding_duration : float
Duration of padding to add to the beginning of each speech chunk.
max_buffered_speech : float
Maximum duration of speech to keep in the buffer (in seconds).
activation_threshold : float
Threshold to consider a frame as speech.

Inherited members