Module livekit.plugins.sarvam

Sarvam.ai plugin for LiveKit Agents

Support for speech-to-text and text-to-speech with Sarvam.ai.

Sarvam.ai provides high-quality STT and TTS for Indian languages.

For API access, visit https://sarvam.ai/

Classes

class STT (*,
language: str = 'en-IN',
model: SarvamSTTModels | str = 'saarika:v2.5',
api_key: str | None = None,
base_url: str | None = None,
http_session: aiohttp.ClientSession | None = None,
prompt: str | None = None)
Expand source code
class STT(stt.STT):
    """Sarvam.ai Speech-to-Text implementation.

    This class provides speech-to-text functionality using the Sarvam.ai API.
    Sarvam.ai specializes in high-quality STT for Indian languages.

    Args:
        language: BCP-47 language code, e.g., "hi-IN", "en-IN"
        model: The Sarvam STT model to use
        api_key: Sarvam.ai API key (falls back to SARVAM_API_KEY env var)
        base_url: API endpoint URL
        http_session: Optional aiohttp session to use
        prompt: Optional prompt for STT translate (saaras models only)
    """

    def __init__(
        self,
        *,
        language: str = "en-IN",
        model: SarvamSTTModels | str = "saarika:v2.5",
        api_key: str | None = None,
        base_url: str | None = None,
        http_session: aiohttp.ClientSession | None = None,
        prompt: str | None = None,
    ) -> None:
        super().__init__(capabilities=stt.STTCapabilities(streaming=True, interim_results=True))

        self._api_key = api_key or os.environ.get("SARVAM_API_KEY")
        if not self._api_key:
            raise ValueError(
                "Sarvam API key is required. "
                "Provide it directly or set SARVAM_API_KEY environment variable."
            )

        self._opts = SarvamSTTOptions(
            language=language,
            api_key=self._api_key,
            model=model,
            base_url=base_url,
            prompt=prompt,
        )
        self._session = http_session
        self._logger = logger.getChild(self.__class__.__name__)
        self._streams = weakref.WeakSet[SpeechStream]()

    @property
    def model(self) -> str:
        return self._opts.model

    @property
    def provider(self) -> str:
        return "Sarvam"

    def _ensure_session(self) -> aiohttp.ClientSession:
        if not self._session:
            self._session = utils.http_context.http_session()
        return self._session

    async def _recognize_impl(
        self,
        buffer: AudioBuffer,
        *,
        language: NotGivenOr[str] = NOT_GIVEN,
        model: NotGivenOr[SarvamSTTModels | str] = NOT_GIVEN,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    ) -> stt.SpeechEvent:
        """Recognize speech using Sarvam.ai API.

        Args:
            buffer: Audio buffer containing speech data
            language: BCP-47 language code (overrides the one set in constructor)
            model: Sarvam model to use (overrides the one set in constructor)
            conn_options: Connection options for API requests

        Returns:
            A SpeechEvent containing the transcription result

        Raises:
            APIConnectionError: On network connection errors
            APIStatusError: On API errors (non-200 status)
            APITimeoutError: On API timeout
        """
        opts_language = self._opts.language if isinstance(language, type(NOT_GIVEN)) else language
        opts_model = self._opts.model if isinstance(model, type(NOT_GIVEN)) else model

        wav_bytes = rtc.combine_audio_frames(buffer).to_wav_bytes()

        form_data = aiohttp.FormData()
        form_data.add_field("file", wav_bytes, filename="audio.wav", content_type="audio/wav")

        # Add model and language_code to the form data if specified
        # Sarvam API docs state language_code is optional for saarika:v2x but mandatory for v1
        # Model is also optional, defaults to saarika:v2.5
        if opts_language:
            form_data.add_field("language_code", opts_language)
        if opts_model:
            form_data.add_field("model", str(opts_model))

        if self._api_key is None:
            raise ValueError("API key cannot be None")
        headers = {"api-subscription-key": self._api_key}

        try:
            if self._opts.base_url is None:
                raise ValueError("base_url cannot be None")
            async with self._ensure_session().post(
                url=self._opts.base_url,
                data=form_data,
                headers=headers,
                timeout=aiohttp.ClientTimeout(
                    total=conn_options.timeout,
                    sock_connect=conn_options.timeout,
                ),
            ) as res:
                if res.status != 200:
                    error_text = await res.text()
                    self._logger.error(f"Sarvam API error: {res.status} - {error_text}")
                    raise APIStatusError(
                        message=f"Sarvam API Error: {error_text}",
                        status_code=res.status,
                    )

                response_json = await res.json()
                self._logger.debug(f"Sarvam API response: {response_json}")

                transcript_text = response_json.get("transcript", "")
                request_id = response_json.get("request_id", "")
                detected_language = response_json.get("language_code")
                if not isinstance(detected_language, str):
                    detected_language = opts_language or ""

                start_time = 0.0
                end_time = 0.0

                # Try to get timestamps if available
                timestamps_data = response_json.get("timestamps")
                if timestamps_data and isinstance(timestamps_data, dict):
                    words_ts_start = timestamps_data.get("start_time_seconds")
                    words_ts_end = timestamps_data.get("end_time_seconds")
                    if isinstance(words_ts_start, list) and len(words_ts_start) > 0:
                        start_time = words_ts_start[0]
                    if isinstance(words_ts_end, list) and len(words_ts_end) > 0:
                        end_time = words_ts_end[-1]

                # If start/end times are still 0, use buffer duration as an estimate for end_time
                if start_time == 0.0 and end_time == 0.0:
                    end_time = _calculate_audio_duration(buffer)

                alternatives = [
                    stt.SpeechData(
                        language=detected_language,
                        text=transcript_text,
                        start_time=start_time,
                        end_time=end_time,
                        confidence=1.0,  # Sarvam doesn't provide confidence score in this response
                    )
                ]

                return stt.SpeechEvent(
                    type=stt.SpeechEventType.FINAL_TRANSCRIPT,
                    request_id=request_id,
                    alternatives=alternatives,
                )

        except asyncio.TimeoutError as e:
            self._logger.error(f"Sarvam API timeout: {e}")
            raise APITimeoutError("Sarvam API request timed out") from e
        except aiohttp.ClientError as e:
            self._logger.error(f"Sarvam API client error: {e}")
            raise APIConnectionError(f"Sarvam API connection error: {e}") from e
        except Exception as e:
            self._logger.error(f"Error during Sarvam STT processing: {e}")
            raise APIConnectionError(f"Unexpected error in Sarvam STT: {e}") from e

    def stream(
        self,
        *,
        language: NotGivenOr[str] = NOT_GIVEN,
        model: NotGivenOr[SarvamSTTModels | str] = NOT_GIVEN,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
        prompt: NotGivenOr[str] = NOT_GIVEN,
    ) -> SpeechStream:
        """Create a streaming transcription session."""
        opts_language = _get_option_value(self._opts.language, language)
        opts_model = _get_option_value(self._opts.model, model)

        if not isinstance(opts_language, str):
            opts_language = self._opts.language
        if not isinstance(opts_model, str):
            opts_model = self._opts.model

        # Handle prompt conversion from NotGiven to None
        final_prompt: str | None
        if isinstance(prompt, str):
            final_prompt = prompt
        else:
            final_prompt = self._opts.prompt

        # Create options for the stream
        stream_opts = SarvamSTTOptions(
            language=opts_language,
            api_key=self._api_key if self._api_key else "",
            model=opts_model,
            prompt=final_prompt,
        )

        # Create a fresh session for this stream to avoid conflicts
        stream_session = aiohttp.ClientSession()

        if self._api_key is None:
            raise ValueError("API key cannot be None")
        stream = SpeechStream(
            stt=self,
            opts=stream_opts,
            conn_options=conn_options,
            api_key=self._api_key,
            http_session=stream_session,
        )
        self._streams.add(stream)
        return stream

Sarvam.ai Speech-to-Text implementation.

This class provides speech-to-text functionality using the Sarvam.ai API. Sarvam.ai specializes in high-quality STT for Indian languages.

Args

language
BCP-47 language code, e.g., "hi-IN", "en-IN"
model
The Sarvam STT model to use
api_key
Sarvam.ai API key (falls back to SARVAM_API_KEY env var)
base_url
API endpoint URL
http_session
Optional aiohttp session to use
prompt
Optional prompt for STT translate (saaras models only)

Ancestors

  • livekit.agents.stt.stt.STT
  • abc.ABC
  • EventEmitter
  • typing.Generic

Instance variables

prop model : str
Expand source code
@property
def model(self) -> str:
    return self._opts.model

Get the model name/identifier for this STT instance.

Returns

The model name if available, "unknown" otherwise.

Note

Plugins should override this property to provide their model information.

prop provider : str
Expand source code
@property
def provider(self) -> str:
    return "Sarvam"

Get the provider name/identifier for this STT instance.

Returns

The provider name if available, "unknown" otherwise.

Note

Plugins should override this property to provide their provider information.

Methods

def stream(self,
*,
language: NotGivenOr[str] = NOT_GIVEN,
model: NotGivenOr[SarvamSTTModels | str] = NOT_GIVEN,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0),
prompt: NotGivenOr[str] = NOT_GIVEN) ‑> livekit.plugins.sarvam.stt.SpeechStream
Expand source code
def stream(
    self,
    *,
    language: NotGivenOr[str] = NOT_GIVEN,
    model: NotGivenOr[SarvamSTTModels | str] = NOT_GIVEN,
    conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    prompt: NotGivenOr[str] = NOT_GIVEN,
) -> SpeechStream:
    """Create a streaming transcription session."""
    opts_language = _get_option_value(self._opts.language, language)
    opts_model = _get_option_value(self._opts.model, model)

    if not isinstance(opts_language, str):
        opts_language = self._opts.language
    if not isinstance(opts_model, str):
        opts_model = self._opts.model

    # Handle prompt conversion from NotGiven to None
    final_prompt: str | None
    if isinstance(prompt, str):
        final_prompt = prompt
    else:
        final_prompt = self._opts.prompt

    # Create options for the stream
    stream_opts = SarvamSTTOptions(
        language=opts_language,
        api_key=self._api_key if self._api_key else "",
        model=opts_model,
        prompt=final_prompt,
    )

    # Create a fresh session for this stream to avoid conflicts
    stream_session = aiohttp.ClientSession()

    if self._api_key is None:
        raise ValueError("API key cannot be None")
    stream = SpeechStream(
        stt=self,
        opts=stream_opts,
        conn_options=conn_options,
        api_key=self._api_key,
        http_session=stream_session,
    )
    self._streams.add(stream)
    return stream

Create a streaming transcription session.

Inherited members

class TTS (*,
target_language_code: SarvamTTSLanguages | str,
model: SarvamTTSModels | str = 'bulbul:v2',
speaker: SarvamTTSSpeakers | str = 'anushka',
speech_sample_rate: int = 22050,
num_channels: int = 1,
pitch: float = 0.0,
pace: float = 1.0,
loudness: float = 1.0,
enable_preprocessing: bool = False,
api_key: str | None = None,
base_url: str = 'https://api.sarvam.ai/text-to-speech',
ws_url: str = 'wss://api.sarvam.ai/text-to-speech/ws',
http_session: aiohttp.ClientSession | None = None,
send_completion_event: bool = True)
Expand source code
class TTS(tts.TTS):
    """Sarvam.ai Text-to-Speech implementation.

    This class provides text-to-speech functionality using the Sarvam.ai API.
    Sarvam.ai specializes in high-quality TTS for Indian languages.

    Args:
        target_language_code: BCP-47 language code for supported Indian languages
        model: Sarvam TTS model to use (bulbul:v2)
        speaker: Voice to use for synthesis
        speech_sample_rate: Audio sample rate in Hz
        num_channels: Number of audio channels (Sarvam outputs mono)
        pitch: Voice pitch adjustment (-20.0 to 20.0) - only supported in v2 for now
        pace: Speech rate multiplier (0.5 to 2.0)
        loudness: Volume multiplier (0.5 to 2.0) - only supported in v2 for now
        enable_preprocessing: Whether to use text preprocessing
        api_key: Sarvam.ai API key (required)
        base_url: API endpoint URL
        ws_url: WebSocket endpoint URL
        http_session: Optional aiohttp session to use
    """

    def __init__(
        self,
        *,
        target_language_code: SarvamTTSLanguages | str,
        model: SarvamTTSModels | str = "bulbul:v2",
        speaker: SarvamTTSSpeakers | str = "anushka",
        speech_sample_rate: int = 22050,
        num_channels: int = 1,  # Sarvam output is mono WAV
        pitch: float = 0.0,
        pace: float = 1.0,
        loudness: float = 1.0,
        enable_preprocessing: bool = False,
        api_key: str | None = None,
        base_url: str = SARVAM_TTS_BASE_URL,
        ws_url: str = SARVAM_TTS_WS_URL,
        http_session: aiohttp.ClientSession | None = None,
        send_completion_event: bool = True,
    ) -> None:
        super().__init__(
            capabilities=tts.TTSCapabilities(streaming=True),
            sample_rate=speech_sample_rate,
            num_channels=num_channels,
        )

        self._api_key = api_key or os.environ.get("SARVAM_API_KEY")
        if not self._api_key:
            raise ValueError(
                "Sarvam API key is required. Provide it directly or set SARVAM_API_KEY env var."
            )

        # Validate inputs early
        if not target_language_code or not target_language_code.strip():
            raise ValueError("Target language code is required and cannot be empty")
        if not model or not model.strip():
            raise ValueError("Model is required and cannot be empty")
        if not speaker or not speaker.strip():
            raise ValueError("Speaker is required and cannot be empty")

        # Validate parameter ranges
        if not -20.0 <= pitch <= 20.0:
            raise ValueError("Pitch must be between -20.0 and 20.0")
        if not 0.5 <= pace <= 2.0:
            raise ValueError("Pace must be between 0.5 and 2.0")
        if not 0.5 <= loudness <= 2.0:
            raise ValueError("Loudness must be between 0.5 and 2.0")
        if speech_sample_rate not in [8000, 16000, 22050, 24000]:
            raise ValueError("Sample rate must be 8000, 16000, 22050, or 24000 Hz")

        # Validate model-speaker compatibility
        if not validate_model_speaker_compatibility(model, speaker):
            compatible_speakers = MODEL_SPEAKER_COMPATIBILITY.get(model, {}).get("all", [])
            raise ValueError(
                f"Speaker '{speaker}' is not compatible with model '{model}'. "
                f"Please choose a compatible speaker from: {', '.join(compatible_speakers)}"
            )

        # Initialize word tokenizer for streaming
        word_tokenizer = tokenize.basic.SentenceTokenizer()

        self._opts = SarvamTTSOptions(
            target_language_code=target_language_code,
            model=model,
            speaker=speaker,
            speech_sample_rate=speech_sample_rate,
            pitch=pitch,
            pace=pace,
            loudness=loudness,
            enable_preprocessing=enable_preprocessing,
            api_key=self._api_key,
            base_url=base_url,
            ws_url=ws_url,
            word_tokenizer=word_tokenizer,
            send_completion_event=send_completion_event,
        )
        self._session = http_session
        self._streams = weakref.WeakSet[SynthesizeStream]()

        self._pool = utils.ConnectionPool[aiohttp.ClientWebSocketResponse](
            connect_cb=self._connect_ws,
            close_cb=self._close_ws,
            max_session_duration=3600,  # 1 hour
            mark_refreshed_on_get=False,
        )

    async def _connect_ws(self, timeout: float) -> aiohttp.ClientWebSocketResponse:
        session = self._ensure_session()
        headers = {
            "api-subscription-key": self._opts.api_key,
            "User-Agent": "LiveKit-Sarvam-TTS/1.0",
            "Accept": "*/*",
            "Accept-Encoding": "gzip, deflate, br",
        }
        # Add model parameter to URL like the client does
        ws_url = f"{self._opts.ws_url}?model={self._opts.model}&send_completion_event={self._opts.send_completion_event}"

        logger.info("Connecting to Sarvam TTS WebSocket")

        try:
            return await asyncio.wait_for(
                session.ws_connect(
                    ws_url,
                    headers=headers,
                ),
                timeout,
            )
        except Exception as e:
            logger.error(
                "Failed to connect to Sarvam TTS WebSocket",
                extra={"error": str(e), "url": ws_url},
                exc_info=True,
            )
            raise APIConnectionError(f"WebSocket connection failed: {e}") from e

    async def _close_ws(self, ws: aiohttp.ClientWebSocketResponse) -> None:
        await ws.close()

    @property
    def model(self) -> str:
        return self._opts.model

    @property
    def provider(self) -> str:
        return "Sarvam"

    def _ensure_session(self) -> aiohttp.ClientSession:
        if not self._session:
            self._session = utils.http_context.http_session()
        return self._session

    def update_options(
        self,
        *,
        model: str | None = None,
        speaker: str | None = None,
        pitch: float | None = None,
        pace: float | None = None,
        loudness: float | None = None,
        enable_preprocessing: bool | None = False,
        send_completion_event: bool | None = True,
    ) -> None:
        """Update TTS options with validation."""
        if model is not None:
            if not model.strip():
                raise ValueError("Model cannot be empty")
            if model not in ["bulbul:v2"]:
                raise ValueError(f"Unsupported model: {model}")
            self._opts.model = model

        if speaker is not None:
            if not speaker.strip():
                raise ValueError("Speaker cannot be empty")
            if not validate_model_speaker_compatibility(self._opts.model, speaker):
                compatible_speakers = MODEL_SPEAKER_COMPATIBILITY.get(self._opts.model, {}).get(
                    "all", []
                )
                raise ValueError(
                    f"Speaker '{speaker}' incompatible with {self._opts.model}. "
                    f"Compatible speakers: {', '.join(compatible_speakers)}"
                )
            self._opts.speaker = speaker

        if pitch is not None:
            if not -20.0 <= pitch <= 20.0:
                raise ValueError("Pitch must be between -20.0 and 20.0")
            self._opts.pitch = pitch

        if pace is not None:
            if not 0.5 <= pace <= 2.0:
                raise ValueError("Pace must be between 0.5 and 2.0")
            self._opts.pace = pace

        if loudness is not None:
            if not 0.5 <= loudness <= 2.0:
                raise ValueError("Loudness must be between 0.5 and 2.0")
            self._opts.loudness = loudness

        if enable_preprocessing is not None:
            self._opts.enable_preprocessing = enable_preprocessing

        if send_completion_event is not None:
            self._opts.send_completion_event = send_completion_event

    # Implement the abstract synthesize method
    def synthesize(
        self, text: str, *, conn_options: APIConnectOptions | None = None
    ) -> ChunkedStream:
        """Synthesize text to audio using Sarvam.ai TTS API."""
        if conn_options is None:
            conn_options = DEFAULT_API_CONNECT_OPTIONS
        return ChunkedStream(tts=self, input_text=text, conn_options=conn_options)

    def stream(
        self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS
    ) -> SynthesizeStream:
        """Create a streaming TTS session."""
        stream = SynthesizeStream(tts=self, conn_options=conn_options)
        self._streams.add(stream)
        return stream

    def prewarm(self) -> None:
        """Prewarm WebSocket connections."""
        self._pool.prewarm()

    async def aclose(self) -> None:
        """Close all active streams and connections."""
        for stream in list(self._streams):
            await stream.aclose()

        self._streams.clear()
        await self._pool.aclose()

Sarvam.ai Text-to-Speech implementation.

This class provides text-to-speech functionality using the Sarvam.ai API. Sarvam.ai specializes in high-quality TTS for Indian languages.

Args

target_language_code
BCP-47 language code for supported Indian languages
model
Sarvam TTS model to use (bulbul:v2)
speaker
Voice to use for synthesis
speech_sample_rate
Audio sample rate in Hz
num_channels
Number of audio channels (Sarvam outputs mono)
pitch
Voice pitch adjustment (-20.0 to 20.0) - only supported in v2 for now
pace
Speech rate multiplier (0.5 to 2.0)
loudness
Volume multiplier (0.5 to 2.0) - only supported in v2 for now
enable_preprocessing
Whether to use text preprocessing
api_key
Sarvam.ai API key (required)
base_url
API endpoint URL
ws_url
WebSocket endpoint URL
http_session
Optional aiohttp session to use

Ancestors

  • livekit.agents.tts.tts.TTS
  • abc.ABC
  • EventEmitter
  • typing.Generic

Instance variables

prop model : str
Expand source code
@property
def model(self) -> str:
    return self._opts.model

Get the model name/identifier for this TTS instance.

Returns

The model name if available, "unknown" otherwise.

Note

Plugins should override this property to provide their model information.

prop provider : str
Expand source code
@property
def provider(self) -> str:
    return "Sarvam"

Get the provider name/identifier for this TTS instance.

Returns

The provider name if available, "unknown" otherwise.

Note

Plugins should override this property to provide their provider information.

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    """Close all active streams and connections."""
    for stream in list(self._streams):
        await stream.aclose()

    self._streams.clear()
    await self._pool.aclose()

Close all active streams and connections.

def prewarm(self) ‑> None
Expand source code
def prewarm(self) -> None:
    """Prewarm WebSocket connections."""
    self._pool.prewarm()

Prewarm WebSocket connections.

def stream(self,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.sarvam.tts.SynthesizeStream
Expand source code
def stream(
    self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS
) -> SynthesizeStream:
    """Create a streaming TTS session."""
    stream = SynthesizeStream(tts=self, conn_options=conn_options)
    self._streams.add(stream)
    return stream

Create a streaming TTS session.

def synthesize(self, text: str, *, conn_options: APIConnectOptions | None = None) ‑> livekit.plugins.sarvam.tts.ChunkedStream
Expand source code
def synthesize(
    self, text: str, *, conn_options: APIConnectOptions | None = None
) -> ChunkedStream:
    """Synthesize text to audio using Sarvam.ai TTS API."""
    if conn_options is None:
        conn_options = DEFAULT_API_CONNECT_OPTIONS
    return ChunkedStream(tts=self, input_text=text, conn_options=conn_options)

Synthesize text to audio using Sarvam.ai TTS API.

def update_options(self,
*,
model: str | None = None,
speaker: str | None = None,
pitch: float | None = None,
pace: float | None = None,
loudness: float | None = None,
enable_preprocessing: bool | None = False,
send_completion_event: bool | None = True) ‑> None
Expand source code
def update_options(
    self,
    *,
    model: str | None = None,
    speaker: str | None = None,
    pitch: float | None = None,
    pace: float | None = None,
    loudness: float | None = None,
    enable_preprocessing: bool | None = False,
    send_completion_event: bool | None = True,
) -> None:
    """Update TTS options with validation."""
    if model is not None:
        if not model.strip():
            raise ValueError("Model cannot be empty")
        if model not in ["bulbul:v2"]:
            raise ValueError(f"Unsupported model: {model}")
        self._opts.model = model

    if speaker is not None:
        if not speaker.strip():
            raise ValueError("Speaker cannot be empty")
        if not validate_model_speaker_compatibility(self._opts.model, speaker):
            compatible_speakers = MODEL_SPEAKER_COMPATIBILITY.get(self._opts.model, {}).get(
                "all", []
            )
            raise ValueError(
                f"Speaker '{speaker}' incompatible with {self._opts.model}. "
                f"Compatible speakers: {', '.join(compatible_speakers)}"
            )
        self._opts.speaker = speaker

    if pitch is not None:
        if not -20.0 <= pitch <= 20.0:
            raise ValueError("Pitch must be between -20.0 and 20.0")
        self._opts.pitch = pitch

    if pace is not None:
        if not 0.5 <= pace <= 2.0:
            raise ValueError("Pace must be between 0.5 and 2.0")
        self._opts.pace = pace

    if loudness is not None:
        if not 0.5 <= loudness <= 2.0:
            raise ValueError("Loudness must be between 0.5 and 2.0")
        self._opts.loudness = loudness

    if enable_preprocessing is not None:
        self._opts.enable_preprocessing = enable_preprocessing

    if send_completion_event is not None:
        self._opts.send_completion_event = send_completion_event

Update TTS options with validation.

Inherited members