Module livekit.plugins.krisp

Krisp VIVA plugin for LiveKit Agents

This plugin provides real-time noise reduction using Krisp's proprietary algorithms via the VIVA SDK.

Features

  • KrispVivaFilterFrameProcessor: Real-time noise reduction FrameProcessor

Sub-modules

livekit.plugins.krisp.krisp_instance

Krisp SDK instance manager for LiveKit Agents …

livekit.plugins.krisp.log
livekit.plugins.krisp.version
livekit.plugins.krisp.viva_filter

Krisp VIVA noise reduction audio filter for LiveKit Agents …

Functions

def int_to_krisp_frame_duration(frame_duration_ms: int) ‑> Any
Expand source code
def int_to_krisp_frame_duration(frame_duration_ms: int) -> Any:
    if frame_duration_ms not in KRISP_FRAME_DURATIONS:
        supported_durations = ", ".join(
            str(duration) for duration in sorted(KRISP_FRAME_DURATIONS.keys())
        )
        raise ValueError(
            f"Unsupported frame duration: {frame_duration_ms} ms. "
            f"Supported durations: {supported_durations} ms"
        )
    return KRISP_FRAME_DURATIONS[frame_duration_ms]
def int_to_krisp_sample_rate(sample_rate: int) ‑> Any
Expand source code
def int_to_krisp_sample_rate(sample_rate: int) -> Any:
    if sample_rate not in KRISP_SAMPLE_RATES:
        supported_rates = ", ".join(str(rate) for rate in sorted(KRISP_SAMPLE_RATES.keys()))
        raise ValueError(
            f"Unsupported sample rate: {sample_rate} Hz. Supported rates: {supported_rates} Hz"
        )
    return KRISP_SAMPLE_RATES[sample_rate]

Classes

class KrispSDKManager
Expand source code
class KrispSDKManager:
    """Singleton manager for Krisp VIVA SDK with reference counting.

    This manager ensures the Krisp SDK is initialized only once and properly
    cleaned up when all components are done using it. It uses reference counting
    to track active users (filters).

    Thread-safe implementation using a lock for all operations.

    The license key should be provided via the KRISP_VIVA_SDK_LICENSE_KEY environment variable.
    """

    _initialized = False
    _lock = Lock()
    _reference_count = 0

    @staticmethod
    def _log_callback(log_message: str, log_level: Any) -> None:
        """Thread-safe callback for Krisp SDK logging."""
        logger.debug(f"[Krisp {log_level}] {log_message}")

    @staticmethod
    def licensing_error_callback(error: Any, error_message: str) -> None:
        logger.error(f"[Krisp Licensing Error: {error}] {error_message}")

    @classmethod
    def _get_license_key(cls) -> str:
        """Get the license key from the KRISP_VIVA_SDK_LICENSE_KEY environment variable."""
        return os.getenv("KRISP_VIVA_SDK_LICENSE_KEY", "")

    @classmethod
    def acquire(cls) -> None:
        """Acquire a reference to the SDK (initializes if needed).

        Call this when creating a filter instance.
        The SDK will be initialized on the first call.

        Raises:
            Exception: If SDK initialization fails (propagated from krisp_audio)
        """
        with cls._lock:
            # Initialize SDK on first acquire
            if cls._reference_count == 0:
                try:
                    license_key = cls._get_license_key()
                    krisp_audio.globalInit(
                        "",
                        license_key,
                        cls.licensing_error_callback,
                        cls._log_callback,
                        krisp_audio.LogLevel.Off,
                    )
                    cls._initialized = True

                    version = krisp_audio.getVersion()
                    logger.debug(
                        f"Krisp Audio SDK initialized - "
                        f"Version: {version.major}.{version.minor}.{version.patch}"
                    )

                except Exception as e:
                    cls._initialized = False
                    logger.error(f"Krisp SDK initialization failed: {e}")
                    raise

            cls._reference_count += 1
            logger.debug(f"Krisp SDK reference count: {cls._reference_count}")

    @classmethod
    def release(cls) -> None:
        """Release a reference to the SDK (destroys if last reference).

        Call this when destroying a filter instance.
        The SDK will be cleaned up when the last reference is released.
        """
        with cls._lock:
            if cls._reference_count > 0:
                cls._reference_count -= 1
                logger.debug(f"Krisp SDK reference count: {cls._reference_count}")

                # Destroy SDK when last reference is released
                if cls._reference_count == 0 and cls._initialized:
                    try:
                        krisp_audio.globalDestroy()
                        cls._initialized = False
                        logger.debug("Krisp Audio SDK destroyed (all references released)")
                    except Exception as e:
                        logger.error(f"Error during Krisp SDK cleanup: {e}")
                        cls._initialized = False

    @classmethod
    def is_initialized(cls) -> bool:
        """Check if the SDK is currently initialized.

        Returns:
            True if SDK is initialized, False otherwise.
        """
        with cls._lock:
            return cls._initialized

    @classmethod
    def get_reference_count(cls) -> int:
        """Get the current reference count.

        Returns:
            Number of active references to the SDK.
        """
        with cls._lock:
            return cls._reference_count

Singleton manager for Krisp VIVA SDK with reference counting.

This manager ensures the Krisp SDK is initialized only once and properly cleaned up when all components are done using it. It uses reference counting to track active users (filters).

Thread-safe implementation using a lock for all operations.

The license key should be provided via the KRISP_VIVA_SDK_LICENSE_KEY environment variable.

Static methods

def acquire() ‑> None

Acquire a reference to the SDK (initializes if needed).

Call this when creating a filter instance. The SDK will be initialized on the first call.

Raises

Exception
If SDK initialization fails (propagated from krisp_audio)
def get_reference_count() ‑> int

Get the current reference count.

Returns

Number of active references to the SDK.

def is_initialized() ‑> bool

Check if the SDK is currently initialized.

Returns

True if SDK is initialized, False otherwise.

def licensing_error_callback(error: Any, error_message: str) ‑> None
Expand source code
@staticmethod
def licensing_error_callback(error: Any, error_message: str) -> None:
    logger.error(f"[Krisp Licensing Error: {error}] {error_message}")
def release() ‑> None

Release a reference to the SDK (destroys if last reference).

Call this when destroying a filter instance. The SDK will be cleaned up when the last reference is released.

class KrispVivaFilterFrameProcessor (model_path: str | None = None,
noise_suppression_level: int = 100,
frame_duration_ms: int = 10,
sample_rate: int | None = None)
Expand source code
class KrispVivaFilterFrameProcessor(rtc.FrameProcessor[rtc.AudioFrame]):
    """FrameProcessor implementation for Krisp noise reduction.

    This class implements the FrameProcessor interface from livekit-rtc,
    allowing it to be used directly with the noise_cancellation parameter
    in AudioInputOptions or RoomInputOptions.

    Example:
        ```python
        from livekit.agents import room_io
        from livekit.plugins import krisp

        # Create frame processor
        processor = krisp.KrispVivaFilterFrameProcessor(
            noise_suppression_level=100,
            frame_duration_ms=10,
        )

        # Use it directly in AudioInputOptions
        await session.start(
            agent=MyAgent(),
            room=ctx.room,
            room_options=room_io.RoomOptions(
                audio_input=room_io.AudioInputOptions(
                    sample_rate=16000,
                    frame_size_ms=10,
                    noise_cancellation=processor,
                ),
            ),
        )
        ```
    """

    def __init__(
        self,
        model_path: str | None = None,
        noise_suppression_level: int = 100,
        frame_duration_ms: int = 10,
        sample_rate: int | None = None,
    ) -> None:
        """Initialize the Krisp frame processor.

        Args:
            model_path: Path to the Krisp model file (.kef extension).
                If None, uses KRISP_VIVA_FILTER_MODEL_PATH environment variable.
            noise_suppression_level: Noise suppression level (0-100, default: 100).
            frame_duration_ms: Frame duration in milliseconds (10, 15, 20, 30, or 32, default: 10).
            sample_rate: sample rate in Hz. If None, default to 16000 Hz.

        Raises:
            RuntimeError: If krisp-audio package is not installed.
            ValueError: If model_path is not provided and KRISP_VIVA_FILTER_MODEL_PATH is not set,
                or if frame_duration_ms is not supported.
            Exception: If model file doesn't have .kef extension.
            FileNotFoundError: If model file doesn't exist.
        """
        # Check if krisp-audio is available
        if not KRISP_AUDIO_AVAILABLE:
            raise RuntimeError("krisp-audio package is not installed.")

        # Initialize state variables first
        self._sdk_acquired = False
        self._filtering_enabled = True
        self._session: Any | None = None
        self._noise_suppression_level = noise_suppression_level
        self._sample_rate: int | None = None
        self._frame_duration_ms = frame_duration_ms

        # Acquire SDK reference (initializes on first call)
        try:
            KrispSDKManager.acquire()
            self._sdk_acquired = True
        except Exception as e:
            logger.error(f"Failed to acquire Krisp SDK: {e}")
            raise RuntimeError(f"Failed to acquire Krisp SDK: {e}") from e

        try:
            # Set model path, checking environment if not specified
            self._model_path = model_path or os.getenv("KRISP_VIVA_FILTER_MODEL_PATH")
            if not self._model_path:
                logger.error(
                    "Model path is not provided and KRISP_VIVA_FILTER_MODEL_PATH is not set."
                )
                raise ValueError("Model path for KrispVivaFilterFrameProcessor must be provided.")

            if not self._model_path.endswith(".kef"):
                raise Exception("Model is expected with .kef extension")

            if not os.path.isfile(self._model_path):
                raise FileNotFoundError(f"Model file not found: {self._model_path}")

            # Validate frame duration
            if frame_duration_ms not in KRISP_FRAME_DURATIONS:
                raise ValueError(
                    f"Unsupported frame duration: {frame_duration_ms}ms. "
                    f"Supported durations: {list(KRISP_FRAME_DURATIONS.keys())}"
                )

            # Always create session to pre-load the model
            # Use provided sample rate, or default to 16kHz (most common)
            init_sample_rate = sample_rate if sample_rate is not None else 16000
            self._create_session(init_sample_rate)
            logger.info(
                f"Krisp frame processor initialized with {init_sample_rate}Hz session "
                f"(model pre-loaded, will recreate session if different sample rate)"
            )
        except Exception:
            # If initialization fails after acquiring SDK, release it
            if self._sdk_acquired:
                KrispSDKManager.release()
                self._sdk_acquired = False
            raise

    def _create_session(self, sample_rate: int) -> None:
        """Create a new Krisp session with the correct sample rate.

        Args:
            sample_rate: The sample rate of the audio frames in Hz.
        """
        # If session already exists for this sample rate, don't recreate
        if self._session is not None and self._sample_rate == sample_rate:
            return

        logger.info(f"Creating Krisp session for sample rate: {sample_rate}Hz")

        model_info = krisp_audio.ModelInfo()
        model_info.path = self._model_path

        nc_cfg = krisp_audio.NcSessionConfig()
        nc_cfg.inputSampleRate = int_to_krisp_sample_rate(sample_rate)
        nc_cfg.inputFrameDuration = int_to_krisp_frame_duration(self._frame_duration_ms)
        nc_cfg.outputSampleRate = nc_cfg.inputSampleRate
        nc_cfg.modelInfo = model_info

        try:
            self._session = krisp_audio.NcInt16.create(nc_cfg)
            self._sample_rate = sample_rate

            logger.info("✅ Krisp session created successfully")
        except Exception as e:
            logger.error(f"❌ Failed to create Krisp session: {e}")
            raise

    def _process(self, frame: rtc.AudioFrame) -> rtc.AudioFrame:
        """Process an audio frame with Krisp noise reduction.

        This is the method required by the FrameProcessor interface.

        Args:
            frame: Input audio frame. Must contain exactly the number of samples
                   matching the configured frame_duration_ms at the frame's sample_rate.
                   For example: 10ms @ 16kHz = 160 samples, 20ms @ 32kHz = 640 samples.

        Returns:
            Filtered audio frame with noise reduction applied.
            If filtering is disabled, returns the original frame.

        Raises:
            ValueError: If frame size doesn't match the expected frame duration.
        """
        if not self._filtering_enabled:
            return frame

        if self._session is None or self._sample_rate != frame.sample_rate:
            raise ValueError(f"Session not created or sample rate mismatch: {frame.sample_rate}Hz")

        # Verify frame size matches expected duration
        expected_samples = int((frame.sample_rate * self._frame_duration_ms) / 1000)
        if frame.samples_per_channel != expected_samples:
            raise ValueError(
                f"Frame size mismatch: expected {expected_samples} samples "
                f"({self._frame_duration_ms}ms @ {frame.sample_rate}Hz), "
                f"got {frame.samples_per_channel} samples"
            )

        # Convert frame to numpy array
        audio_samples = np.frombuffer(frame.data, dtype=np.int16)

        try:
            # Process through Krisp
            filtered_samples = self._session.process(audio_samples, self._noise_suppression_level)

            # Validate output
            if filtered_samples is None or len(filtered_samples) == 0:
                logger.warning("Krisp returned empty output, using original audio")
                filtered_samples = audio_samples
            elif len(filtered_samples) != len(audio_samples):
                logger.warning(
                    f"Krisp output size mismatch: expected {len(audio_samples)}, "
                    f"got {len(filtered_samples)}, using original audio"
                )
                filtered_samples = audio_samples

            # Return filtered frame
            return rtc.AudioFrame(
                data=filtered_samples.tobytes(),
                sample_rate=frame.sample_rate,
                num_channels=frame.num_channels,
                samples_per_channel=len(filtered_samples),
            )

        except Exception as e:
            logger.error(f"Error processing frame: {e}")
            # Return original frame on error
            return frame

    def process(self, frame: rtc.AudioFrame) -> rtc.AudioFrame:
        """Public method that calls _process (for backward compatibility)."""
        return self._process(frame)

    def enable(self) -> None:
        """Enable noise filtering."""
        self._filtering_enabled = True

    def disable(self) -> None:
        """Disable noise filtering (audio will pass through unmodified)."""
        self._filtering_enabled = False

    @property
    def enabled(self) -> bool:
        """Check if filtering is currently enabled (required by FrameProcessor interface)."""
        return self._filtering_enabled

    @enabled.setter
    def enabled(self, value: bool) -> None:
        """Set filtering enabled state (required by FrameProcessor interface)."""
        self._filtering_enabled = value

    @property
    def is_enabled(self) -> bool:
        """Check if filtering is currently enabled (backward compatibility)."""
        return self._filtering_enabled

    def _close(self) -> None:
        """Clean up processor session resources (required by FrameProcessor interface).

        Note: This method is called during track transitions (when streams are closed/reopened),
        not just when the processor is destroyed. Therefore, we only clean up the session here,
        not the SDK reference. The SDK will be released in __del__ when the processor is
        actually being destroyed (at the end of the call).
        """
        if self._session is not None:
            self._session = None

        logger.debug("Krisp frame processor session closed")

    def close(self) -> None:
        """Clean up processor session resources (public method for backward compatibility)."""
        self._close()

    def __del__(self) -> None:
        """Destructor to ensure cleanup of session resources.

        Note: During Python shutdown, we avoid calling C extensions to prevent GIL errors.
        Always call close() explicitly for proper cleanup.
        """
        # Check if we're in Python shutdown (modules being cleaned up)
        # If KrispSDKManager is None, we're in shutdown - don't do anything
        if KrispSDKManager is None:
            return

        if getattr(self, "_sdk_acquired", False):
            try:
                if getattr(self, "_session", None) is not None:
                    self._session = None
                # Release SDK reference only if we still have it
                KrispSDKManager.release()
                self._sdk_acquired = False
            except Exception:
                # Silently ignore errors during shutdown
                pass

    def __enter__(self) -> KrispVivaFilterFrameProcessor:
        """Context manager entry."""
        return self

    def __exit__(
        self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: Any
    ) -> Literal[False]:
        """Context manager exit - clean up session."""
        self.close()
        return False

FrameProcessor implementation for Krisp noise reduction.

This class implements the FrameProcessor interface from livekit-rtc, allowing it to be used directly with the noise_cancellation parameter in AudioInputOptions or RoomInputOptions.

Example

from livekit.agents import room_io
from livekit.plugins import krisp

# Create frame processor
processor = krisp.KrispVivaFilterFrameProcessor(
    noise_suppression_level=100,
    frame_duration_ms=10,
)

# Use it directly in AudioInputOptions
await session.start(
    agent=MyAgent(),
    room=ctx.room,
    room_options=room_io.RoomOptions(
        audio_input=room_io.AudioInputOptions(
            sample_rate=16000,
            frame_size_ms=10,
            noise_cancellation=processor,
        ),
    ),
)

Initialize the Krisp frame processor.

Args

model_path
Path to the Krisp model file (.kef extension). If None, uses KRISP_VIVA_FILTER_MODEL_PATH environment variable.
noise_suppression_level
Noise suppression level (0-100, default: 100).
frame_duration_ms
Frame duration in milliseconds (10, 15, 20, 30, or 32, default: 10).
sample_rate
sample rate in Hz. If None, default to 16000 Hz.

Raises

RuntimeError
If krisp-audio package is not installed.
ValueError
If model_path is not provided and KRISP_VIVA_FILTER_MODEL_PATH is not set, or if frame_duration_ms is not supported.
Exception
If model file doesn't have .kef extension.
FileNotFoundError
If model file doesn't exist.

Ancestors

Instance variables

prop enabled : bool
Expand source code
@property
def enabled(self) -> bool:
    """Check if filtering is currently enabled (required by FrameProcessor interface)."""
    return self._filtering_enabled

Check if filtering is currently enabled (required by FrameProcessor interface).

prop is_enabled : bool
Expand source code
@property
def is_enabled(self) -> bool:
    """Check if filtering is currently enabled (backward compatibility)."""
    return self._filtering_enabled

Check if filtering is currently enabled (backward compatibility).

Methods

def close(self) ‑> None
Expand source code
def close(self) -> None:
    """Clean up processor session resources (public method for backward compatibility)."""
    self._close()

Clean up processor session resources (public method for backward compatibility).

def disable(self) ‑> None
Expand source code
def disable(self) -> None:
    """Disable noise filtering (audio will pass through unmodified)."""
    self._filtering_enabled = False

Disable noise filtering (audio will pass through unmodified).

def enable(self) ‑> None
Expand source code
def enable(self) -> None:
    """Enable noise filtering."""
    self._filtering_enabled = True

Enable noise filtering.

def process(self, frame: rtc.AudioFrame) ‑> AudioFrame
Expand source code
def process(self, frame: rtc.AudioFrame) -> rtc.AudioFrame:
    """Public method that calls _process (for backward compatibility)."""
    return self._process(frame)

Public method that calls _process (for backward compatibility).