Module livekit.plugins.krisp.viva_filter
Krisp VIVA noise reduction audio filter for LiveKit Agents.
This module provides an audio filter implementation using Krisp VIVA SDK for real-time noise suppression in LiveKit voice agents.
Classes
class KrispVivaFilterFrameProcessor (model_path: str | None = None,
noise_suppression_level: int = 100,
frame_duration_ms: int = 10,
sample_rate: int | None = None)-
Expand source code
class KrispVivaFilterFrameProcessor(rtc.FrameProcessor[rtc.AudioFrame]): """FrameProcessor implementation for Krisp noise reduction. This class implements the FrameProcessor interface from livekit-rtc, allowing it to be used directly with the noise_cancellation parameter in AudioInputOptions or RoomInputOptions. Example: ```python from livekit.agents import room_io from livekit.plugins import krisp # Create frame processor processor = krisp.KrispVivaFilterFrameProcessor( noise_suppression_level=100, frame_duration_ms=10, ) # Use it directly in AudioInputOptions await session.start( agent=MyAgent(), room=ctx.room, room_options=room_io.RoomOptions( audio_input=room_io.AudioInputOptions( sample_rate=16000, frame_size_ms=10, noise_cancellation=processor, ), ), ) ``` """ def __init__( self, model_path: str | None = None, noise_suppression_level: int = 100, frame_duration_ms: int = 10, sample_rate: int | None = None, ) -> None: """Initialize the Krisp frame processor. Args: model_path: Path to the Krisp model file (.kef extension). If None, uses KRISP_VIVA_FILTER_MODEL_PATH environment variable. noise_suppression_level: Noise suppression level (0-100, default: 100). frame_duration_ms: Frame duration in milliseconds (10, 15, 20, 30, or 32, default: 10). sample_rate: sample rate in Hz. If None, default to 16000 Hz. Raises: RuntimeError: If krisp-audio package is not installed. ValueError: If model_path is not provided and KRISP_VIVA_FILTER_MODEL_PATH is not set, or if frame_duration_ms is not supported. Exception: If model file doesn't have .kef extension. FileNotFoundError: If model file doesn't exist. """ # Check if krisp-audio is available if not KRISP_AUDIO_AVAILABLE: raise RuntimeError("krisp-audio package is not installed.") # Initialize state variables first self._sdk_acquired = False self._filtering_enabled = True self._session: Any | None = None self._noise_suppression_level = noise_suppression_level self._sample_rate: int | None = None self._frame_duration_ms = frame_duration_ms # Acquire SDK reference (initializes on first call) try: KrispSDKManager.acquire() self._sdk_acquired = True except Exception as e: logger.error(f"Failed to acquire Krisp SDK: {e}") raise RuntimeError(f"Failed to acquire Krisp SDK: {e}") from e try: # Set model path, checking environment if not specified self._model_path = model_path or os.getenv("KRISP_VIVA_FILTER_MODEL_PATH") if not self._model_path: logger.error( "Model path is not provided and KRISP_VIVA_FILTER_MODEL_PATH is not set." ) raise ValueError("Model path for KrispVivaFilterFrameProcessor must be provided.") if not self._model_path.endswith(".kef"): raise Exception("Model is expected with .kef extension") if not os.path.isfile(self._model_path): raise FileNotFoundError(f"Model file not found: {self._model_path}") # Validate frame duration if frame_duration_ms not in KRISP_FRAME_DURATIONS: raise ValueError( f"Unsupported frame duration: {frame_duration_ms}ms. " f"Supported durations: {list(KRISP_FRAME_DURATIONS.keys())}" ) # Always create session to pre-load the model # Use provided sample rate, or default to 16kHz (most common) init_sample_rate = sample_rate if sample_rate is not None else 16000 self._create_session(init_sample_rate) logger.info( f"Krisp frame processor initialized with {init_sample_rate}Hz session " f"(model pre-loaded, will recreate session if different sample rate)" ) except Exception: # If initialization fails after acquiring SDK, release it if self._sdk_acquired: KrispSDKManager.release() self._sdk_acquired = False raise def _create_session(self, sample_rate: int) -> None: """Create a new Krisp session with the correct sample rate. Args: sample_rate: The sample rate of the audio frames in Hz. """ # If session already exists for this sample rate, don't recreate if self._session is not None and self._sample_rate == sample_rate: return logger.info(f"Creating Krisp session for sample rate: {sample_rate}Hz") model_info = krisp_audio.ModelInfo() model_info.path = self._model_path nc_cfg = krisp_audio.NcSessionConfig() nc_cfg.inputSampleRate = int_to_krisp_sample_rate(sample_rate) nc_cfg.inputFrameDuration = int_to_krisp_frame_duration(self._frame_duration_ms) nc_cfg.outputSampleRate = nc_cfg.inputSampleRate nc_cfg.modelInfo = model_info try: self._session = krisp_audio.NcInt16.create(nc_cfg) self._sample_rate = sample_rate logger.info("✅ Krisp session created successfully") except Exception as e: logger.error(f"❌ Failed to create Krisp session: {e}") raise def _process(self, frame: rtc.AudioFrame) -> rtc.AudioFrame: """Process an audio frame with Krisp noise reduction. This is the method required by the FrameProcessor interface. Args: frame: Input audio frame. Must contain exactly the number of samples matching the configured frame_duration_ms at the frame's sample_rate. For example: 10ms @ 16kHz = 160 samples, 20ms @ 32kHz = 640 samples. Returns: Filtered audio frame with noise reduction applied. If filtering is disabled, returns the original frame. Raises: ValueError: If frame size doesn't match the expected frame duration. """ if not self._filtering_enabled: return frame if self._session is None or self._sample_rate != frame.sample_rate: raise ValueError(f"Session not created or sample rate mismatch: {frame.sample_rate}Hz") # Verify frame size matches expected duration expected_samples = int((frame.sample_rate * self._frame_duration_ms) / 1000) if frame.samples_per_channel != expected_samples: raise ValueError( f"Frame size mismatch: expected {expected_samples} samples " f"({self._frame_duration_ms}ms @ {frame.sample_rate}Hz), " f"got {frame.samples_per_channel} samples" ) # Convert frame to numpy array audio_samples = np.frombuffer(frame.data, dtype=np.int16) try: # Process through Krisp filtered_samples = self._session.process(audio_samples, self._noise_suppression_level) # Validate output if filtered_samples is None or len(filtered_samples) == 0: logger.warning("Krisp returned empty output, using original audio") filtered_samples = audio_samples elif len(filtered_samples) != len(audio_samples): logger.warning( f"Krisp output size mismatch: expected {len(audio_samples)}, " f"got {len(filtered_samples)}, using original audio" ) filtered_samples = audio_samples # Return filtered frame return rtc.AudioFrame( data=filtered_samples.tobytes(), sample_rate=frame.sample_rate, num_channels=frame.num_channels, samples_per_channel=len(filtered_samples), ) except Exception as e: logger.error(f"Error processing frame: {e}") # Return original frame on error return frame def process(self, frame: rtc.AudioFrame) -> rtc.AudioFrame: """Public method that calls _process (for backward compatibility).""" return self._process(frame) def enable(self) -> None: """Enable noise filtering.""" self._filtering_enabled = True def disable(self) -> None: """Disable noise filtering (audio will pass through unmodified).""" self._filtering_enabled = False @property def enabled(self) -> bool: """Check if filtering is currently enabled (required by FrameProcessor interface).""" return self._filtering_enabled @enabled.setter def enabled(self, value: bool) -> None: """Set filtering enabled state (required by FrameProcessor interface).""" self._filtering_enabled = value @property def is_enabled(self) -> bool: """Check if filtering is currently enabled (backward compatibility).""" return self._filtering_enabled def _close(self) -> None: """Clean up processor session resources (required by FrameProcessor interface). Note: This method is called during track transitions (when streams are closed/reopened), not just when the processor is destroyed. Therefore, we only clean up the session here, not the SDK reference. The SDK will be released in __del__ when the processor is actually being destroyed (at the end of the call). """ if self._session is not None: self._session = None logger.debug("Krisp frame processor session closed") def close(self) -> None: """Clean up processor session resources (public method for backward compatibility).""" self._close() def __del__(self) -> None: """Destructor to ensure cleanup of session resources. Note: During Python shutdown, we avoid calling C extensions to prevent GIL errors. Always call close() explicitly for proper cleanup. """ # Check if we're in Python shutdown (modules being cleaned up) # If KrispSDKManager is None, we're in shutdown - don't do anything if KrispSDKManager is None: return if getattr(self, "_sdk_acquired", False): try: if getattr(self, "_session", None) is not None: self._session = None # Release SDK reference only if we still have it KrispSDKManager.release() self._sdk_acquired = False except Exception: # Silently ignore errors during shutdown pass def __enter__(self) -> KrispVivaFilterFrameProcessor: """Context manager entry.""" return self def __exit__( self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: Any ) -> Literal[False]: """Context manager exit - clean up session.""" self.close() return FalseFrameProcessor implementation for Krisp noise reduction.
This class implements the FrameProcessor interface from livekit-rtc, allowing it to be used directly with the noise_cancellation parameter in AudioInputOptions or RoomInputOptions.
Example
from livekit.agents import room_io from livekit.plugins import krisp # Create frame processor processor = krisp.KrispVivaFilterFrameProcessor( noise_suppression_level=100, frame_duration_ms=10, ) # Use it directly in AudioInputOptions await session.start( agent=MyAgent(), room=ctx.room, room_options=room_io.RoomOptions( audio_input=room_io.AudioInputOptions( sample_rate=16000, frame_size_ms=10, noise_cancellation=processor, ), ), )Initialize the Krisp frame processor.
Args
model_path- Path to the Krisp model file (.kef extension). If None, uses KRISP_VIVA_FILTER_MODEL_PATH environment variable.
noise_suppression_level- Noise suppression level (0-100, default: 100).
frame_duration_ms- Frame duration in milliseconds (10, 15, 20, 30, or 32, default: 10).
sample_rate- sample rate in Hz. If None, default to 16000 Hz.
Raises
RuntimeError- If krisp-audio package is not installed.
ValueError- If model_path is not provided and KRISP_VIVA_FILTER_MODEL_PATH is not set, or if frame_duration_ms is not supported.
Exception- If model file doesn't have .kef extension.
FileNotFoundError- If model file doesn't exist.
Ancestors
- FrameProcessor
- typing.Generic
- abc.ABC
Instance variables
prop enabled : bool-
Expand source code
@property def enabled(self) -> bool: """Check if filtering is currently enabled (required by FrameProcessor interface).""" return self._filtering_enabledCheck if filtering is currently enabled (required by FrameProcessor interface).
prop is_enabled : bool-
Expand source code
@property def is_enabled(self) -> bool: """Check if filtering is currently enabled (backward compatibility).""" return self._filtering_enabledCheck if filtering is currently enabled (backward compatibility).
Methods
def close(self) ‑> None-
Expand source code
def close(self) -> None: """Clean up processor session resources (public method for backward compatibility).""" self._close()Clean up processor session resources (public method for backward compatibility).
def disable(self) ‑> None-
Expand source code
def disable(self) -> None: """Disable noise filtering (audio will pass through unmodified).""" self._filtering_enabled = FalseDisable noise filtering (audio will pass through unmodified).
def enable(self) ‑> None-
Expand source code
def enable(self) -> None: """Enable noise filtering.""" self._filtering_enabled = TrueEnable noise filtering.
def process(self, frame: rtc.AudioFrame) ‑> AudioFrame-
Expand source code
def process(self, frame: rtc.AudioFrame) -> rtc.AudioFrame: """Public method that calls _process (for backward compatibility).""" return self._process(frame)Public method that calls _process (for backward compatibility).