Module livekit.plugins.silero
Sub-modules
livekit.plugins.silero.resources
-
Used by importlib.resources and setuptools
Classes
class VAD (*, session: onnxruntime.InferenceSession, opts: _VADOptions)
-
Silero Voice Activity Detection (VAD) class.
This class provides functionality to detect speech segments within audio data using the Silero VAD model.
Expand source code
class VAD(agents.vad.VAD): """ Silero Voice Activity Detection (VAD) class. This class provides functionality to detect speech segments within audio data using the Silero VAD model. """ @classmethod def load( cls, *, min_speech_duration: float = 0.05, min_silence_duration: float = 0.55, prefix_padding_duration: float = 0.5, max_buffered_speech: float = 60.0, activation_threshold: float = 0.5, sample_rate: Literal[8000, 16000] = 16000, force_cpu: bool = True, # deprecated padding_duration: float | None = None, ) -> "VAD": """ Load and initialize the Silero VAD model. This method loads the ONNX model and prepares it for inference. When options are not provided, sane defaults are used. **Note:** This method is blocking and may take time to load the model into memory. It is recommended to call this method inside your prewarm mechanism. **Example:** ```python def prewarm(proc: JobProcess): proc.userdata["vad"] = silero.VAD.load() async def entrypoint(ctx: JobContext): vad = (ctx.proc.userdata["vad"],) # your agent logic... if __name__ == "__main__": cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint, prewarm_fnc=prewarm)) ``` Args: min_speech_duration (float): Minimum duration of speech to start a new speech chunk. min_silence_duration (float): At the end of each speech, wait this duration before ending the speech. prefix_padding_duration (float): Duration of padding to add to the beginning of each speech chunk. max_buffered_speech (float): Maximum duration of speech to keep in the buffer (in seconds). activation_threshold (float): Threshold to consider a frame as speech. sample_rate (Literal[8000, 16000]): Sample rate for the inference (only 8KHz and 16KHz are supported). force_cpu (bool): Force the use of CPU for inference. padding_duration (float | None): **Deprecated**. Use `prefix_padding_duration` instead. Returns: VAD: An instance of the VAD class ready for streaming. Raises: ValueError: If an unsupported sample rate is provided. """ if sample_rate not in onnx_model.SUPPORTED_SAMPLE_RATES: raise ValueError("Silero VAD only supports 8KHz and 16KHz sample rates") if padding_duration is not None: logger.warning( "padding_duration is deprecated and will be removed in 1.5.0, use prefix_padding_duration instead", ) prefix_padding_duration = padding_duration session = onnx_model.new_inference_session(force_cpu) opts = _VADOptions( min_speech_duration=min_speech_duration, min_silence_duration=min_silence_duration, prefix_padding_duration=prefix_padding_duration, max_buffered_speech=max_buffered_speech, activation_threshold=activation_threshold, sample_rate=sample_rate, ) return cls(session=session, opts=opts) def __init__( self, *, session: onnxruntime.InferenceSession, opts: _VADOptions, ) -> None: super().__init__(capabilities=agents.vad.VADCapabilities(update_interval=0.032)) self._onnx_session = session self._opts = opts self._streams = weakref.WeakSet[VADStream]() def stream(self) -> "VADStream": """ Create a new VADStream for processing audio data. Returns: VADStream: A stream object for processing audio input and detecting speech. """ stream = VADStream( self, self._opts, onnx_model.OnnxModel( onnx_session=self._onnx_session, sample_rate=self._opts.sample_rate ), ) self._streams.add(stream) return stream def update_options( self, *, min_speech_duration: float | None = None, min_silence_duration: float | None = None, prefix_padding_duration: float | None = None, max_buffered_speech: float | None = None, activation_threshold: float | None = None, ) -> None: """ Update the VAD options. This method allows you to update the VAD options after the VAD object has been created. Args: min_speech_duration (float): Minimum duration of speech to start a new speech chunk. min_silence_duration (float): At the end of each speech, wait this duration before ending the speech. prefix_padding_duration (float): Duration of padding to add to the beginning of each speech chunk. max_buffered_speech (float): Maximum duration of speech to keep in the buffer (in seconds). activation_threshold (float): Threshold to consider a frame as speech. """ self._opts = _VADOptions( min_speech_duration=min_speech_duration or self._opts.min_speech_duration, min_silence_duration=min_silence_duration or self._opts.min_silence_duration, prefix_padding_duration=prefix_padding_duration or self._opts.prefix_padding_duration, max_buffered_speech=max_buffered_speech or self._opts.max_buffered_speech, activation_threshold=activation_threshold or self._opts.activation_threshold, sample_rate=self._opts.sample_rate, ) for stream in self._streams: stream.update_options( min_speech_duration=min_speech_duration, min_silence_duration=min_silence_duration, prefix_padding_duration=prefix_padding_duration, max_buffered_speech=max_buffered_speech, activation_threshold=activation_threshold, )
Ancestors
- VAD
- abc.ABC
- EventEmitter
- typing.Generic
Static methods
def load(*, min_speech_duration: float = 0.05, min_silence_duration: float = 0.55, prefix_padding_duration: float = 0.5, max_buffered_speech: float = 60.0, activation_threshold: float = 0.5, sample_rate: Literal[8000, 16000] = 16000, force_cpu: bool = True, padding_duration: float | None = None) ‑> livekit.plugins.silero.vad.VAD
-
Load and initialize the Silero VAD model.
This method loads the ONNX model and prepares it for inference. When options are not provided, sane defaults are used.
Note: This method is blocking and may take time to load the model into memory. It is recommended to call this method inside your prewarm mechanism.
Example:
```python def prewarm(proc: JobProcess): proc.userdata["vad"] = silero.VAD.load() async def entrypoint(ctx: JobContext): vad = (ctx.proc.userdata["vad"],) # your agent logic... if __name__ == "__main__": cli.run_app(WorkerOptions(entrypoint_fnc=entrypoint, prewarm_fnc=prewarm)) ```
Args
min_speech_duration
:float
- Minimum duration of speech to start a new speech chunk.
min_silence_duration
:float
- At the end of each speech, wait this duration before ending the speech.
prefix_padding_duration
:float
- Duration of padding to add to the beginning of each speech chunk.
max_buffered_speech
:float
- Maximum duration of speech to keep in the buffer (in seconds).
activation_threshold
:float
- Threshold to consider a frame as speech.
sample_rate
:Literal[8000, 16000]
- Sample rate for the inference (only 8KHz and 16KHz are supported).
force_cpu
:bool
- Force the use of CPU for inference.
padding_duration
:float | None
- Deprecated. Use
prefix_padding_duration
instead.
Returns
VAD
- An instance of the VAD class ready for streaming.
Raises
ValueError
- If an unsupported sample rate is provided.
Methods
def stream(self) ‑> livekit.plugins.silero.vad.VADStream
-
Create a new VADStream for processing audio data.
Returns
VADStream
- A stream object for processing audio input and detecting speech.
def update_options(self, *, min_speech_duration: float | None = None, min_silence_duration: float | None = None, prefix_padding_duration: float | None = None, max_buffered_speech: float | None = None, activation_threshold: float | None = None) ‑> None
-
Update the VAD options.
This method allows you to update the VAD options after the VAD object has been created.
Args
min_speech_duration
:float
- Minimum duration of speech to start a new speech chunk.
min_silence_duration
:float
- At the end of each speech, wait this duration before ending the speech.
prefix_padding_duration
:float
- Duration of padding to add to the beginning of each speech chunk.
max_buffered_speech
:float
- Maximum duration of speech to keep in the buffer (in seconds).
activation_threshold
:float
- Threshold to consider a frame as speech.
Inherited members
class VADStream (vad: VAD, opts: _VADOptions, model: onnx_model.OnnxModel)
-
Helper class that provides a standard way to create an ABC using inheritance.
Expand source code
class VADStream(agents.vad.VADStream): def __init__( self, vad: VAD, opts: _VADOptions, model: onnx_model.OnnxModel ) -> None: super().__init__(vad) self._opts, self._model = opts, model self._loop = asyncio.get_event_loop() self._executor = ThreadPoolExecutor(max_workers=1) self._task.add_done_callback(lambda _: self._executor.shutdown(wait=False)) self._exp_filter = utils.ExpFilter(alpha=0.35) self._input_sample_rate = 0 self._speech_buffer: np.ndarray | None = None self._speech_buffer_max_reached = False self._prefix_padding_samples = 0 # (input_sample_rate) def update_options( self, *, min_speech_duration: float | None = None, min_silence_duration: float | None = None, prefix_padding_duration: float | None = None, max_buffered_speech: float | None = None, activation_threshold: float | None = None, ) -> None: """ Update the VAD options. This method allows you to update the VAD options after the VAD object has been created. Args: min_speech_duration (float): Minimum duration of speech to start a new speech chunk. min_silence_duration (float): At the end of each speech, wait this duration before ending the speech. prefix_padding_duration (float): Duration of padding to add to the beginning of each speech chunk. max_buffered_speech (float): Maximum duration of speech to keep in the buffer (in seconds). activation_threshold (float): Threshold to consider a frame as speech. """ old_max_buffered_speech = self._opts.max_buffered_speech self._opts = _VADOptions( min_speech_duration=min_speech_duration or self._opts.min_speech_duration, min_silence_duration=min_silence_duration or self._opts.min_silence_duration, prefix_padding_duration=prefix_padding_duration or self._opts.prefix_padding_duration, max_buffered_speech=max_buffered_speech or self._opts.max_buffered_speech, activation_threshold=activation_threshold or self._opts.activation_threshold, sample_rate=self._opts.sample_rate, ) if self._input_sample_rate: assert self._speech_buffer is not None self._prefix_padding_samples = int( self._opts.prefix_padding_duration * self._input_sample_rate ) self._speech_buffer.resize( int(self._opts.max_buffered_speech * self._input_sample_rate) + self._prefix_padding_samples ) if self._opts.max_buffered_speech > old_max_buffered_speech: self._speech_buffer_max_reached = False @agents.utils.log_exceptions(logger=logger) async def _main_task(self): inference_f32_data = np.empty(self._model.window_size_samples, dtype=np.float32) speech_buffer_index: int = 0 # "pub_" means public, these values are exposed to the users through events pub_speaking = False pub_speech_duration = 0.0 pub_silence_duration = 0.0 pub_current_sample = 0 pub_timestamp = 0.0 speech_threshold_duration = 0.0 silence_threshold_duration = 0.0 input_frames = [] inference_frames = [] resampler: rtc.AudioResampler | None = None # used to avoid drift when the sample_rate ratio is not an integer input_copy_remaining_fract = 0.0 extra_inference_time = 0.0 async for input_frame in self._input_ch: if not isinstance(input_frame, rtc.AudioFrame): continue # ignore flush sentinel for now if not self._input_sample_rate: self._input_sample_rate = input_frame.sample_rate # alloc the buffers now that we know the input sample rate self._prefix_padding_samples = int( self._opts.prefix_padding_duration * self._input_sample_rate ) self._speech_buffer = np.empty( int(self._opts.max_buffered_speech * self._input_sample_rate) + self._prefix_padding_samples, dtype=np.int16, ) if self._input_sample_rate != self._opts.sample_rate: # resampling needed: the input sample rate isn't the same as the model's # sample rate used for inference resampler = rtc.AudioResampler( input_rate=self._input_sample_rate, output_rate=self._opts.sample_rate, quality=rtc.AudioResamplerQuality.QUICK, # VAD doesn't need high quality ) elif self._input_sample_rate != input_frame.sample_rate: logger.error("a frame with another sample rate was already pushed") continue assert self._speech_buffer is not None input_frames.append(input_frame) if resampler is not None: # the resampler may have a bit of latency, but it is OK to ignore since it should be # negligible inference_frames.extend(resampler.push(input_frame)) else: inference_frames.append(input_frame) while True: start_time = time.perf_counter() available_inference_samples = sum( [frame.samples_per_channel for frame in inference_frames] ) if available_inference_samples < self._model.window_size_samples: break # not enough samples to run inference input_frame = utils.combine_frames(input_frames) inference_frame = utils.combine_frames(inference_frames) # convert data to f32 np.divide( inference_frame.data[: self._model.window_size_samples], np.iinfo(np.int16).max, out=inference_f32_data, dtype=np.float32, ) # run the inference p = await self._loop.run_in_executor( self._executor, self._model, inference_f32_data ) p = self._exp_filter.apply(exp=1.0, sample=p) window_duration = ( self._model.window_size_samples / self._opts.sample_rate ) pub_current_sample += self._model.window_size_samples pub_timestamp += window_duration resampling_ratio = self._input_sample_rate / self._model.sample_rate to_copy = ( self._model.window_size_samples * resampling_ratio + input_copy_remaining_fract ) to_copy_int = int(to_copy) input_copy_remaining_fract = to_copy - to_copy_int # copy the inference window to the speech buffer available_space = len(self._speech_buffer) - speech_buffer_index to_copy_buffer = min(to_copy_int, available_space) if to_copy_buffer > 0: self._speech_buffer[ speech_buffer_index : speech_buffer_index + to_copy_buffer ] = input_frame.data[:to_copy_buffer] speech_buffer_index += to_copy_buffer elif not self._speech_buffer_max_reached: # reached self._opts.max_buffered_speech (padding is included) speech_buffer_max_reached = True logger.warning( "max_buffered_speech reached, ignoring further data for the current speech input" ) inference_duration = time.perf_counter() - start_time extra_inference_time = max( 0.0, extra_inference_time + inference_duration - window_duration, ) if inference_duration > SLOW_INFERENCE_THRESHOLD: logger.warning( "inference is slower than realtime", extra={"delay": extra_inference_time}, ) def _reset_write_cursor(): nonlocal speech_buffer_index, speech_buffer_max_reached assert self._speech_buffer is not None if speech_buffer_index <= self._prefix_padding_samples: return padding_data = self._speech_buffer[ speech_buffer_index - self._prefix_padding_samples : speech_buffer_index ] self._speech_buffer_max_reached = False self._speech_buffer[: self._prefix_padding_samples] = padding_data speech_buffer_index = self._prefix_padding_samples def _copy_speech_buffer() -> rtc.AudioFrame: # copy the data from speech_buffer assert self._speech_buffer is not None speech_data = self._speech_buffer[:speech_buffer_index].tobytes() return rtc.AudioFrame( sample_rate=self._input_sample_rate, num_channels=1, samples_per_channel=speech_buffer_index, data=speech_data, ) if pub_speaking: pub_speech_duration += window_duration else: pub_silence_duration += window_duration self._event_ch.send_nowait( agents.vad.VADEvent( type=agents.vad.VADEventType.INFERENCE_DONE, samples_index=pub_current_sample, timestamp=pub_timestamp, silence_duration=pub_silence_duration, speech_duration=pub_speech_duration, probability=p, inference_duration=inference_duration, frames=[ rtc.AudioFrame( data=input_frame.data[:to_copy_int].tobytes(), sample_rate=self._input_sample_rate, num_channels=1, samples_per_channel=to_copy_int, ) ], speaking=pub_speaking, raw_accumulated_silence=silence_threshold_duration, raw_accumulated_speech=speech_threshold_duration, ) ) if p >= self._opts.activation_threshold: speech_threshold_duration += window_duration silence_threshold_duration = 0.0 if not pub_speaking: if speech_threshold_duration >= self._opts.min_speech_duration: pub_speaking = True pub_silence_duration = 0.0 pub_speech_duration = speech_threshold_duration self._event_ch.send_nowait( agents.vad.VADEvent( type=agents.vad.VADEventType.START_OF_SPEECH, samples_index=pub_current_sample, timestamp=pub_timestamp, silence_duration=pub_silence_duration, speech_duration=pub_speech_duration, frames=[_copy_speech_buffer()], speaking=True, ) ) else: silence_threshold_duration += window_duration speech_threshold_duration = 0.0 if not pub_speaking: _reset_write_cursor() if ( pub_speaking and silence_threshold_duration >= self._opts.min_silence_duration ): pub_speaking = False pub_speech_duration = 0.0 pub_silence_duration = silence_threshold_duration self._event_ch.send_nowait( agents.vad.VADEvent( type=agents.vad.VADEventType.END_OF_SPEECH, samples_index=pub_current_sample, timestamp=pub_timestamp, silence_duration=pub_silence_duration, speech_duration=pub_speech_duration, frames=[_copy_speech_buffer()], speaking=False, ) ) _reset_write_cursor() # remove the frames that were used for inference from the input and inference frames input_frames = [] inference_frames = [] # add the remaining data if len(input_frame.data) - to_copy_int > 0: data = input_frame.data[to_copy_int:].tobytes() input_frames.append( rtc.AudioFrame( data=data, sample_rate=self._input_sample_rate, num_channels=1, samples_per_channel=len(data) // 2, ) ) if len(inference_frame.data) - self._model.window_size_samples > 0: data = inference_frame.data[ self._model.window_size_samples : ].tobytes() inference_frames.append( rtc.AudioFrame( data=data, sample_rate=self._opts.sample_rate, num_channels=1, samples_per_channel=len(data) // 2, ) )
Ancestors
- VADStream
- abc.ABC
Methods
def update_options(self, *, min_speech_duration: float | None = None, min_silence_duration: float | None = None, prefix_padding_duration: float | None = None, max_buffered_speech: float | None = None, activation_threshold: float | None = None) ‑> None
-
Update the VAD options.
This method allows you to update the VAD options after the VAD object has been created.
Args
min_speech_duration
:float
- Minimum duration of speech to start a new speech chunk.
min_silence_duration
:float
- At the end of each speech, wait this duration before ending the speech.
prefix_padding_duration
:float
- Duration of padding to add to the beginning of each speech chunk.
max_buffered_speech
:float
- Maximum duration of speech to keep in the buffer (in seconds).
activation_threshold
:float
- Threshold to consider a frame as speech.
Inherited members