Module livekit.plugins.sarvam

Sarvam.ai plugin for LiveKit Agents

Support for speech-to-text, text-to-speech, and LLM with Sarvam.ai.

Sarvam.ai provides high-quality STT and TTS for Indian languages and OpenAI-compatible LLMs.

For API access, visit https://sarvam.ai/

Classes

class LLM (*,
model: str | SarvamLLMModels = 'sarvam-30b',
api_key: NotGivenOr[str] = NOT_GIVEN,
base_url: NotGivenOr[str] = 'https://api.sarvam.ai/v1',
client: openai.AsyncClient | None = None,
user: NotGivenOr[str] = NOT_GIVEN,
temperature: NotGivenOr[float] = NOT_GIVEN,
top_p: NotGivenOr[float] = NOT_GIVEN,
tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN,
reasoning_effort: NotGivenOr[ReasoningEffort] = NOT_GIVEN,
max_tokens: NotGivenOr[int] = NOT_GIVEN,
wiki_grounding: NotGivenOr[bool] = NOT_GIVEN,
stop: NotGivenOr[str | list[str]] = NOT_GIVEN,
n: NotGivenOr[int] = NOT_GIVEN,
seed: NotGivenOr[int] = NOT_GIVEN,
frequency_penalty: NotGivenOr[float] = NOT_GIVEN,
presence_penalty: NotGivenOr[float] = NOT_GIVEN,
extra_headers: NotGivenOr[dict[str, str]] = NOT_GIVEN,
extra_body: NotGivenOr[dict[str, Any]] = NOT_GIVEN,
timeout: httpx.Timeout | None = None)
Expand source code
class LLM(OpenAILLM):
    def __init__(
        self,
        *,
        model: str | SarvamLLMModels = "sarvam-30b",
        api_key: NotGivenOr[str] = NOT_GIVEN,
        base_url: NotGivenOr[str] = SARVAM_LLM_BASE_URL,
        client: openai.AsyncClient | None = None,
        user: NotGivenOr[str] = NOT_GIVEN,
        temperature: NotGivenOr[float] = NOT_GIVEN,
        top_p: NotGivenOr[float] = NOT_GIVEN,
        tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN,
        reasoning_effort: NotGivenOr[ReasoningEffort] = NOT_GIVEN,
        max_tokens: NotGivenOr[int] = NOT_GIVEN,
        wiki_grounding: NotGivenOr[bool] = NOT_GIVEN,
        stop: NotGivenOr[str | list[str]] = NOT_GIVEN,
        n: NotGivenOr[int] = NOT_GIVEN,
        seed: NotGivenOr[int] = NOT_GIVEN,
        frequency_penalty: NotGivenOr[float] = NOT_GIVEN,
        presence_penalty: NotGivenOr[float] = NOT_GIVEN,
        extra_headers: NotGivenOr[dict[str, str]] = NOT_GIVEN,
        extra_body: NotGivenOr[dict[str, Any]] = NOT_GIVEN,
        timeout: httpx.Timeout | None = None,
    ) -> None:
        """
        Create a new instance of Sarvam LLM.

        ``api_key`` must be set to your Sarvam API key, either using the argument or by setting
        the ``SARVAM_API_KEY`` environment variable.
        """
        validated_model = _validate_model(model)
        sarvam_api_key = _get_api_key(api_key)
        merged_headers = dict(extra_headers) if is_given(extra_headers) else {}
        # Sarvam chat-completions auth and telemetry headers are always enforced.
        merged_headers["api-subscription-key"] = sarvam_api_key
        merged_headers["User-Agent"] = USER_AGENT

        merged_body = dict(extra_body) if is_given(extra_body) else {}
        if is_given(max_tokens):
            merged_body["max_tokens"] = max_tokens
        if is_given(wiki_grounding):
            merged_body["wiki_grounding"] = wiki_grounding
        if is_given(stop):
            merged_body["stop"] = stop
        if is_given(n):
            merged_body["n"] = n
        if is_given(seed):
            merged_body["seed"] = seed
        if is_given(frequency_penalty):
            merged_body["frequency_penalty"] = frequency_penalty
        if is_given(presence_penalty):
            merged_body["presence_penalty"] = presence_penalty
        filtered_body = _filter_extra_body(merged_body)

        super().__init__(
            model=validated_model,
            api_key=sarvam_api_key,
            base_url=base_url,
            client=client,
            user=user,
            temperature=temperature,
            top_p=top_p,
            tool_choice=tool_choice,
            reasoning_effort=reasoning_effort,
            extra_headers=merged_headers,
            extra_body=filtered_body if filtered_body else NOT_GIVEN,
            timeout=timeout,
        )

    @property
    def model(self) -> str:
        return self._opts.model

    @property
    def provider(self) -> str:
        return "Sarvam"

Helper class that provides a standard way to create an ABC using inheritance.

Create a new instance of Sarvam LLM.

api_key must be set to your Sarvam API key, either using the argument or by setting the SARVAM_API_KEY environment variable.

Ancestors

  • livekit.plugins.openai.llm.LLM
  • livekit.agents.llm.llm.LLM
  • abc.ABC
  • EventEmitter
  • typing.Generic

Instance variables

prop model : str
Expand source code
@property
def model(self) -> str:
    return self._opts.model

Get the model name/identifier for this LLM instance.

Returns

The model name if available, "unknown" otherwise.

Note

Plugins should override this property to provide their model information.

prop provider : str
Expand source code
@property
def provider(self) -> str:
    return "Sarvam"

Get the provider name/identifier for this LLM instance.

Returns

The provider name if available, "unknown" otherwise.

Note

Plugins should override this property to provide their provider information.

Inherited members

class STT (*,
language: str = 'en-IN',
model: SarvamSTTModels | str = 'saarika:v2.5',
mode: SarvamSTTModes | str = 'transcribe',
api_key: str | None = None,
base_url: str | None = None,
http_session: aiohttp.ClientSession | None = None,
prompt: str | None = None,
high_vad_sensitivity: bool | None = None,
sample_rate: int = 16000,
flush_signal: bool | None = None,
input_audio_codec: str | None = None,
positive_speech_threshold: float | None = None,
negative_speech_threshold: float | None = None,
min_speech_frames: int | None = None,
first_turn_min_speech_frames: int | None = None,
negative_frames_count: int | None = None,
negative_frames_window: int | None = None,
start_speech_volume_threshold: float | None = None,
interrupt_min_speech_frames: int | None = None,
pre_speech_pad_frames: int | None = None,
num_initial_ignored_frames: int | None = None)
Expand source code
class STT(stt.STT):
    """Sarvam.ai Speech-to-Text implementation.

    This class provides speech-to-text functionality using the Sarvam.ai API.
    Sarvam.ai specializes in high-quality STT for Indian languages.

    Args:
        language: BCP-47 language code, e.g., "hi-IN", "en-IN"
        model: The Sarvam STT model to use
        mode: Mode for saaras:v3 (transcribe/translate/verbatim/translit/codemix)
        api_key: Sarvam.ai API key (falls back to SARVAM_API_KEY env var)
        base_url: API endpoint URL
        http_session: Optional aiohttp session to use
        prompt: Optional prompt for STT translate (saaras models only)
    """

    def __init__(
        self,
        *,
        language: str = "en-IN",
        model: SarvamSTTModels | str = "saarika:v2.5",
        mode: SarvamSTTModes | str = "transcribe",
        api_key: str | None = None,
        base_url: str | None = None,
        http_session: aiohttp.ClientSession | None = None,
        prompt: str | None = None,
        high_vad_sensitivity: bool | None = None,
        sample_rate: int = 16000,
        flush_signal: bool | None = None,
        input_audio_codec: str | None = None,
        positive_speech_threshold: float | None = None,
        negative_speech_threshold: float | None = None,
        min_speech_frames: int | None = None,
        first_turn_min_speech_frames: int | None = None,
        negative_frames_count: int | None = None,
        negative_frames_window: int | None = None,
        start_speech_volume_threshold: float | None = None,
        interrupt_min_speech_frames: int | None = None,
        pre_speech_pad_frames: int | None = None,
        num_initial_ignored_frames: int | None = None,
    ) -> None:
        super().__init__(
            capabilities=stt.STTCapabilities(
                streaming=True,
                interim_results=True,
                # chunk timestamps don't seem to work despite the docs saying they do
                aligned_transcript=False,
            )
        )

        self._api_key = api_key or os.environ.get("SARVAM_API_KEY")
        if not self._api_key:
            raise ValueError(
                "Sarvam API key is required. "
                "Provide it directly or set SARVAM_API_KEY environment variable."
            )

        self._opts = SarvamSTTOptions(
            language=LanguageCode(language),
            api_key=self._api_key,
            model=model,
            mode=mode,
            base_url=base_url,
            prompt=prompt,
            high_vad_sensitivity=high_vad_sensitivity,
            sample_rate=sample_rate,
            flush_signal=flush_signal,
            input_audio_codec=input_audio_codec,
            positive_speech_threshold=positive_speech_threshold,
            negative_speech_threshold=negative_speech_threshold,
            min_speech_frames=min_speech_frames,
            first_turn_min_speech_frames=first_turn_min_speech_frames,
            negative_frames_count=negative_frames_count,
            negative_frames_window=negative_frames_window,
            start_speech_volume_threshold=start_speech_volume_threshold,
            interrupt_min_speech_frames=interrupt_min_speech_frames,
            pre_speech_pad_frames=pre_speech_pad_frames,
            num_initial_ignored_frames=num_initial_ignored_frames,
        )
        self._session = http_session
        self._logger = logger.getChild(self.__class__.__name__)
        self._streams = weakref.WeakSet[SpeechStream]()

    @property
    def model(self) -> str:
        return self._opts.model

    @property
    def provider(self) -> str:
        return "Sarvam"

    def _ensure_session(self) -> aiohttp.ClientSession:
        if not self._session:
            self._session = utils.http_context.http_session()
        return self._session

    @staticmethod
    def _single_attempt_conn_options(conn_options: APIConnectOptions) -> APIConnectOptions:
        return APIConnectOptions(
            max_retry=0,
            retry_interval=conn_options.retry_interval,
            timeout=conn_options.timeout,
        )

    async def recognize(
        self,
        buffer: AudioBuffer,
        *,
        language: NotGivenOr[str] = NOT_GIVEN,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    ) -> stt.SpeechEvent:
        single_attempt_conn_options = self._single_attempt_conn_options(conn_options)
        return await super().recognize(
            buffer,
            language=language,
            conn_options=single_attempt_conn_options,
        )

    def _resolve_opts(
        self,
        *,
        language: NotGivenOr[str] = NOT_GIVEN,
        model: NotGivenOr[SarvamSTTModels | str] = NOT_GIVEN,
        mode: NotGivenOr[SarvamSTTModes | str] = NOT_GIVEN,
    ) -> tuple[str, str, str]:
        """Resolve language, model and mode from overrides or defaults.

        Returns:
            Tuple of (language, model, mode).

        Raises:
            ValueError: If mode is explicitly given but not supported by the model.
        """
        resolved_language = LanguageCode(language) if is_given(language) else self._opts.language
        resolved_model = model if is_given(model) else self._opts.model
        if not isinstance(resolved_language, str):
            resolved_language = self._opts.language
        if not isinstance(resolved_model, str):
            resolved_model = self._opts.model

        if is_given(mode):
            resolved_mode = str(mode)
            # Validate: caller explicitly asked for a mode — error if unsupported
            _validate_mode_for_model(resolved_model, resolved_mode)
        else:
            resolved_mode = self._opts.mode

        _validate_language_for_model(resolved_model, resolved_language)

        return resolved_language, resolved_model, resolved_mode

    async def _recognize_impl(
        self,
        buffer: AudioBuffer,
        *,
        language: NotGivenOr[str] = NOT_GIVEN,
        model: NotGivenOr[SarvamSTTModels | str] = NOT_GIVEN,
        mode: NotGivenOr[SarvamSTTModes | str] = NOT_GIVEN,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    ) -> stt.SpeechEvent:
        """Recognize speech using Sarvam.ai API.

        Args:
            buffer: Audio buffer containing speech data
            language: BCP-47 language code (overrides the one set in constructor)
            model: Sarvam model to use (overrides the one set in constructor)
            conn_options: Connection options for API requests

        Returns:
            A SpeechEvent containing the transcription result

        Raises:
            APIConnectionError: On network connection errors
            APIStatusError: On API errors (non-200 status)
            APITimeoutError: On API timeout
        """
        opts_language, opts_model, opts_mode = self._resolve_opts(
            language=language,
            model=model,
            mode=mode,
        )

        wav_bytes = rtc.combine_audio_frames(buffer).to_wav_bytes()

        form_data = aiohttp.FormData()
        form_data.add_field("file", wav_bytes, filename="audio.wav", content_type="audio/wav")

        # Add model and language_code to the form data if specified
        # Sarvam API docs state language_code is optional for saarika:v2x but mandatory for v1
        # Model is also optional, defaults to saarika:v2.5
        if opts_language:
            form_data.add_field("language_code", opts_language)
        if opts_model:
            form_data.add_field("model", str(opts_model))
        if _model_supports_mode(opts_model):
            form_data.add_field("mode", str(opts_mode))

        if not self._api_key:
            raise ValueError("API key cannot be None")
        headers = {
            "api-subscription-key": self._api_key,
            "User-Agent": USER_AGENT,
        }

        try:
            base_url, _ = _get_urls_for_model(opts_model)
            async with self._ensure_session().post(
                url=base_url,
                data=form_data,
                headers=headers,
                timeout=aiohttp.ClientTimeout(
                    total=conn_options.timeout,
                    sock_connect=conn_options.timeout,
                ),
            ) as res:
                if res.status != 200:
                    error_text = await res.text()
                    self._logger.error(f"Sarvam API error: {res.status} - {error_text}")
                    raise APIStatusError(
                        message=f"Sarvam API Error ({res.status}): {error_text}",
                        status_code=res.status,
                        body=error_text,
                    )

                response_json = await res.json()
                self._logger.debug(f"Sarvam API response: {response_json}")

                transcript_text = response_json.get("transcript", "")
                request_id = response_json.get("request_id", "")
                detected_language = response_json.get("language_code")
                if not isinstance(detected_language, str):
                    detected_language = LanguageCode(opts_language or "")
                else:
                    detected_language = LanguageCode(detected_language)

                start_time = 0.0
                end_time = 0.0

                # Try to get timestamps if available
                timestamps_data = response_json.get("timestamps")
                if timestamps_data and isinstance(timestamps_data, dict):
                    words_ts_start = timestamps_data.get("start_time_seconds")
                    words_ts_end = timestamps_data.get("end_time_seconds")
                    if isinstance(words_ts_start, list) and len(words_ts_start) > 0:
                        start_time = words_ts_start[0]
                    if isinstance(words_ts_end, list) and len(words_ts_end) > 0:
                        end_time = words_ts_end[-1]

                # If start/end times are still 0, use buffer duration as an estimate for end_time
                if start_time == 0.0 and end_time == 0.0:
                    end_time = _calculate_audio_duration(buffer)

                alternatives = [
                    stt.SpeechData(
                        language=detected_language,
                        text=transcript_text,
                        start_time=start_time,
                        end_time=end_time,
                        confidence=1.0,  # Sarvam doesn't provide confidence score in this response
                    )
                ]

                return stt.SpeechEvent(
                    type=stt.SpeechEventType.FINAL_TRANSCRIPT,
                    request_id=request_id,
                    alternatives=alternatives,
                )

        except asyncio.TimeoutError as e:
            self._logger.error(f"Sarvam API timeout: {e}")
            raise APITimeoutError("Sarvam API request timed out") from e
        except aiohttp.ClientError as e:
            self._logger.error(f"Sarvam API client error: {e}")
            raise APIConnectionError(f"Sarvam API connection error: {e}") from e
        except (APIStatusError, APIConnectionError, APITimeoutError):
            # Preserve provider-originated status/body/retry metadata.
            raise
        except Exception as e:
            self._logger.error(f"Error during Sarvam STT processing: {e}")
            raise APIConnectionError(f"Unexpected error in Sarvam STT: {e}") from e

    def stream(
        self,
        *,
        language: NotGivenOr[str] = NOT_GIVEN,
        model: NotGivenOr[SarvamSTTModels | str] = NOT_GIVEN,
        mode: NotGivenOr[SarvamSTTModes | str] = NOT_GIVEN,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
        prompt: NotGivenOr[str] = NOT_GIVEN,
        high_vad_sensitivity: NotGivenOr[bool] = NOT_GIVEN,
        sample_rate: NotGivenOr[int] = NOT_GIVEN,
        flush_signal: NotGivenOr[bool] = NOT_GIVEN,
        input_audio_codec: NotGivenOr[str] = NOT_GIVEN,
        positive_speech_threshold: NotGivenOr[float] = NOT_GIVEN,
        negative_speech_threshold: NotGivenOr[float] = NOT_GIVEN,
        min_speech_frames: NotGivenOr[int] = NOT_GIVEN,
        first_turn_min_speech_frames: NotGivenOr[int] = NOT_GIVEN,
        negative_frames_count: NotGivenOr[int] = NOT_GIVEN,
        negative_frames_window: NotGivenOr[int] = NOT_GIVEN,
        start_speech_volume_threshold: NotGivenOr[float] = NOT_GIVEN,
        interrupt_min_speech_frames: NotGivenOr[int] = NOT_GIVEN,
        pre_speech_pad_frames: NotGivenOr[int] = NOT_GIVEN,
        num_initial_ignored_frames: NotGivenOr[int] = NOT_GIVEN,
    ) -> SpeechStream:
        """Create a streaming transcription session."""
        opts_language, opts_model, opts_mode = self._resolve_opts(
            language=language,
            model=model,
            mode=mode,
        )

        # Handle prompt conversion from NotGiven to None
        final_prompt = prompt if isinstance(prompt, str) else self._opts.prompt

        opts_high_vad = (
            high_vad_sensitivity
            if is_given(high_vad_sensitivity)
            else self._opts.high_vad_sensitivity
        )
        opts_sample_rate = sample_rate if is_given(sample_rate) else self._opts.sample_rate
        opts_flush_signal = flush_signal if is_given(flush_signal) else self._opts.flush_signal
        opts_input_codec = (
            input_audio_codec if is_given(input_audio_codec) else self._opts.input_audio_codec
        )
        opts_positive_speech = (
            positive_speech_threshold
            if is_given(positive_speech_threshold)
            else self._opts.positive_speech_threshold
        )
        opts_negative_speech = (
            negative_speech_threshold
            if is_given(negative_speech_threshold)
            else self._opts.negative_speech_threshold
        )
        opts_min_speech = (
            min_speech_frames if is_given(min_speech_frames) else self._opts.min_speech_frames
        )
        opts_first_turn = (
            first_turn_min_speech_frames
            if is_given(first_turn_min_speech_frames)
            else self._opts.first_turn_min_speech_frames
        )
        opts_neg_count = (
            negative_frames_count
            if is_given(negative_frames_count)
            else self._opts.negative_frames_count
        )
        opts_neg_window = (
            negative_frames_window
            if is_given(negative_frames_window)
            else self._opts.negative_frames_window
        )
        opts_vol_threshold = (
            start_speech_volume_threshold
            if is_given(start_speech_volume_threshold)
            else self._opts.start_speech_volume_threshold
        )
        opts_interrupt = (
            interrupt_min_speech_frames
            if is_given(interrupt_min_speech_frames)
            else self._opts.interrupt_min_speech_frames
        )
        opts_pre_pad = (
            pre_speech_pad_frames
            if is_given(pre_speech_pad_frames)
            else self._opts.pre_speech_pad_frames
        )
        opts_initial_ignored = (
            num_initial_ignored_frames
            if is_given(num_initial_ignored_frames)
            else self._opts.num_initial_ignored_frames
        )
        single_attempt_conn_options = self._single_attempt_conn_options(conn_options)

        # Create options for the stream
        stream_opts = SarvamSTTOptions(
            language=opts_language,
            api_key=self._api_key if self._api_key else "",
            model=opts_model,
            mode=opts_mode,
            prompt=final_prompt,
            high_vad_sensitivity=opts_high_vad,
            sample_rate=opts_sample_rate,
            flush_signal=opts_flush_signal,
            input_audio_codec=opts_input_codec,
            positive_speech_threshold=opts_positive_speech,
            negative_speech_threshold=opts_negative_speech,
            min_speech_frames=opts_min_speech,
            first_turn_min_speech_frames=opts_first_turn,
            negative_frames_count=opts_neg_count,
            negative_frames_window=opts_neg_window,
            start_speech_volume_threshold=opts_vol_threshold,
            interrupt_min_speech_frames=opts_interrupt,
            pre_speech_pad_frames=opts_pre_pad,
            num_initial_ignored_frames=opts_initial_ignored,
        )

        # Create a fresh session for this stream to avoid conflicts
        stream_session = aiohttp.ClientSession()

        if not self._api_key:
            raise ValueError("API key cannot be None")
        stream = SpeechStream(
            stt=self,
            opts=stream_opts,
            conn_options=single_attempt_conn_options,
            api_key=self._api_key,
            http_session=stream_session,
        )
        self._streams.add(stream)
        return stream

Sarvam.ai Speech-to-Text implementation.

This class provides speech-to-text functionality using the Sarvam.ai API. Sarvam.ai specializes in high-quality STT for Indian languages.

Args

language
BCP-47 language code, e.g., "hi-IN", "en-IN"
model
The Sarvam STT model to use
mode
Mode for saaras:v3 (transcribe/translate/verbatim/translit/codemix)
api_key
Sarvam.ai API key (falls back to SARVAM_API_KEY env var)
base_url
API endpoint URL
http_session
Optional aiohttp session to use
prompt
Optional prompt for STT translate (saaras models only)

Ancestors

  • livekit.agents.stt.stt.STT
  • abc.ABC
  • EventEmitter
  • typing.Generic

Instance variables

prop model : str
Expand source code
@property
def model(self) -> str:
    return self._opts.model

Get the model name/identifier for this STT instance.

Returns

The model name if available, "unknown" otherwise.

Note

Plugins should override this property to provide their model information.

prop provider : str
Expand source code
@property
def provider(self) -> str:
    return "Sarvam"

Get the provider name/identifier for this STT instance.

Returns

The provider name if available, "unknown" otherwise.

Note

Plugins should override this property to provide their provider information.

Methods

async def recognize(self,
buffer: AudioBuffer,
*,
language: NotGivenOr[str] = NOT_GIVEN,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.agents.stt.stt.SpeechEvent
Expand source code
async def recognize(
    self,
    buffer: AudioBuffer,
    *,
    language: NotGivenOr[str] = NOT_GIVEN,
    conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
) -> stt.SpeechEvent:
    single_attempt_conn_options = self._single_attempt_conn_options(conn_options)
    return await super().recognize(
        buffer,
        language=language,
        conn_options=single_attempt_conn_options,
    )
def stream(self,
*,
language: NotGivenOr[str] = NOT_GIVEN,
model: NotGivenOr[SarvamSTTModels | str] = NOT_GIVEN,
mode: NotGivenOr[SarvamSTTModes | str] = NOT_GIVEN,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0),
prompt: NotGivenOr[str] = NOT_GIVEN,
high_vad_sensitivity: NotGivenOr[bool] = NOT_GIVEN,
sample_rate: NotGivenOr[int] = NOT_GIVEN,
flush_signal: NotGivenOr[bool] = NOT_GIVEN,
input_audio_codec: NotGivenOr[str] = NOT_GIVEN,
positive_speech_threshold: NotGivenOr[float] = NOT_GIVEN,
negative_speech_threshold: NotGivenOr[float] = NOT_GIVEN,
min_speech_frames: NotGivenOr[int] = NOT_GIVEN,
first_turn_min_speech_frames: NotGivenOr[int] = NOT_GIVEN,
negative_frames_count: NotGivenOr[int] = NOT_GIVEN,
negative_frames_window: NotGivenOr[int] = NOT_GIVEN,
start_speech_volume_threshold: NotGivenOr[float] = NOT_GIVEN,
interrupt_min_speech_frames: NotGivenOr[int] = NOT_GIVEN,
pre_speech_pad_frames: NotGivenOr[int] = NOT_GIVEN,
num_initial_ignored_frames: NotGivenOr[int] = NOT_GIVEN) ‑> livekit.plugins.sarvam.stt.SpeechStream
Expand source code
def stream(
    self,
    *,
    language: NotGivenOr[str] = NOT_GIVEN,
    model: NotGivenOr[SarvamSTTModels | str] = NOT_GIVEN,
    mode: NotGivenOr[SarvamSTTModes | str] = NOT_GIVEN,
    conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    prompt: NotGivenOr[str] = NOT_GIVEN,
    high_vad_sensitivity: NotGivenOr[bool] = NOT_GIVEN,
    sample_rate: NotGivenOr[int] = NOT_GIVEN,
    flush_signal: NotGivenOr[bool] = NOT_GIVEN,
    input_audio_codec: NotGivenOr[str] = NOT_GIVEN,
    positive_speech_threshold: NotGivenOr[float] = NOT_GIVEN,
    negative_speech_threshold: NotGivenOr[float] = NOT_GIVEN,
    min_speech_frames: NotGivenOr[int] = NOT_GIVEN,
    first_turn_min_speech_frames: NotGivenOr[int] = NOT_GIVEN,
    negative_frames_count: NotGivenOr[int] = NOT_GIVEN,
    negative_frames_window: NotGivenOr[int] = NOT_GIVEN,
    start_speech_volume_threshold: NotGivenOr[float] = NOT_GIVEN,
    interrupt_min_speech_frames: NotGivenOr[int] = NOT_GIVEN,
    pre_speech_pad_frames: NotGivenOr[int] = NOT_GIVEN,
    num_initial_ignored_frames: NotGivenOr[int] = NOT_GIVEN,
) -> SpeechStream:
    """Create a streaming transcription session."""
    opts_language, opts_model, opts_mode = self._resolve_opts(
        language=language,
        model=model,
        mode=mode,
    )

    # Handle prompt conversion from NotGiven to None
    final_prompt = prompt if isinstance(prompt, str) else self._opts.prompt

    opts_high_vad = (
        high_vad_sensitivity
        if is_given(high_vad_sensitivity)
        else self._opts.high_vad_sensitivity
    )
    opts_sample_rate = sample_rate if is_given(sample_rate) else self._opts.sample_rate
    opts_flush_signal = flush_signal if is_given(flush_signal) else self._opts.flush_signal
    opts_input_codec = (
        input_audio_codec if is_given(input_audio_codec) else self._opts.input_audio_codec
    )
    opts_positive_speech = (
        positive_speech_threshold
        if is_given(positive_speech_threshold)
        else self._opts.positive_speech_threshold
    )
    opts_negative_speech = (
        negative_speech_threshold
        if is_given(negative_speech_threshold)
        else self._opts.negative_speech_threshold
    )
    opts_min_speech = (
        min_speech_frames if is_given(min_speech_frames) else self._opts.min_speech_frames
    )
    opts_first_turn = (
        first_turn_min_speech_frames
        if is_given(first_turn_min_speech_frames)
        else self._opts.first_turn_min_speech_frames
    )
    opts_neg_count = (
        negative_frames_count
        if is_given(negative_frames_count)
        else self._opts.negative_frames_count
    )
    opts_neg_window = (
        negative_frames_window
        if is_given(negative_frames_window)
        else self._opts.negative_frames_window
    )
    opts_vol_threshold = (
        start_speech_volume_threshold
        if is_given(start_speech_volume_threshold)
        else self._opts.start_speech_volume_threshold
    )
    opts_interrupt = (
        interrupt_min_speech_frames
        if is_given(interrupt_min_speech_frames)
        else self._opts.interrupt_min_speech_frames
    )
    opts_pre_pad = (
        pre_speech_pad_frames
        if is_given(pre_speech_pad_frames)
        else self._opts.pre_speech_pad_frames
    )
    opts_initial_ignored = (
        num_initial_ignored_frames
        if is_given(num_initial_ignored_frames)
        else self._opts.num_initial_ignored_frames
    )
    single_attempt_conn_options = self._single_attempt_conn_options(conn_options)

    # Create options for the stream
    stream_opts = SarvamSTTOptions(
        language=opts_language,
        api_key=self._api_key if self._api_key else "",
        model=opts_model,
        mode=opts_mode,
        prompt=final_prompt,
        high_vad_sensitivity=opts_high_vad,
        sample_rate=opts_sample_rate,
        flush_signal=opts_flush_signal,
        input_audio_codec=opts_input_codec,
        positive_speech_threshold=opts_positive_speech,
        negative_speech_threshold=opts_negative_speech,
        min_speech_frames=opts_min_speech,
        first_turn_min_speech_frames=opts_first_turn,
        negative_frames_count=opts_neg_count,
        negative_frames_window=opts_neg_window,
        start_speech_volume_threshold=opts_vol_threshold,
        interrupt_min_speech_frames=opts_interrupt,
        pre_speech_pad_frames=opts_pre_pad,
        num_initial_ignored_frames=opts_initial_ignored,
    )

    # Create a fresh session for this stream to avoid conflicts
    stream_session = aiohttp.ClientSession()

    if not self._api_key:
        raise ValueError("API key cannot be None")
    stream = SpeechStream(
        stt=self,
        opts=stream_opts,
        conn_options=single_attempt_conn_options,
        api_key=self._api_key,
        http_session=stream_session,
    )
    self._streams.add(stream)
    return stream

Create a streaming transcription session.

Inherited members

class TTS (*,
target_language_code: SarvamTTSLanguages | str = 'en-IN',
model: SarvamTTSModels | str = 'bulbul:v3',
speaker: SarvamTTSSpeakers | str | None = None,
speech_sample_rate: int = 22050,
num_channels: int = 1,
pitch: float = 0.0,
pace: float = 1.0,
loudness: float = 1.0,
temperature: float = 0.6,
output_audio_bitrate: SarvamTTSOutputAudioBitrate | str = '128k',
min_buffer_size: int = 50,
max_chunk_length: int = 150,
enable_preprocessing: bool = False,
dict_id: str | None = None,
enable_cached_responses: bool | None = None,
api_key: str | None = None,
base_url: str = 'https://api.sarvam.ai/text-to-speech',
ws_url: str = 'wss://api.sarvam.ai/text-to-speech/ws',
http_session: aiohttp.ClientSession | None = None,
send_completion_event: bool = True,
output_audio_codec: str = 'mp3')
Expand source code
class TTS(tts.TTS):
    """Sarvam.ai Text-to-Speech implementation.

    This class provides text-to-speech functionality using the Sarvam.ai API.
    Sarvam.ai specializes in high-quality TTS for Indian languages.

    Args:
        target_language_code: BCP-47 language code for supported Indian languages
        model: Sarvam TTS model to use (bulbul:v2)
        speaker: Voice to use for synthesis
        speech_sample_rate: Audio sample rate in Hz
        num_channels: Number of audio channels (Sarvam outputs mono)
        pitch: Voice pitch adjustment (-0.75 to 0.75) - only supported in v2 for now
        pace: Speech rate multiplier (0.3 to 3.0)
        loudness: Volume multiplier (0.5 to 2.0) - only supported in v2 for now
        temperature: Sampling temperature (0.01 to 2.0), only used in v3 and v3-beta
        dict_id: Custom pronunciation dictionary ID (bulbul:v3 only)
        enable_cached_responses: Enable response caching beta feature (bulbul:v1/v2 only)
        output_audio_bitrate: Output audio bitrate (default 128k)
        min_buffer_size: Minimum character length for flushing (30 to 200)
        max_chunk_length: Maximum chunk length for sentence splitting (50 to 500)
        enable_preprocessing: Whether to use text preprocessing
        api_key: Sarvam.ai API key (required)
        base_url: API endpoint URL
        ws_url: WebSocket endpoint URL
        http_session: Optional aiohttp session to use
        output_audio_codec: Optionally choose the output codec format (mp3)
    """

    def __init__(
        self,
        *,
        target_language_code: SarvamTTSLanguages | str = "en-IN",
        model: SarvamTTSModels | str = "bulbul:v3",
        speaker: SarvamTTSSpeakers | str | None = None,
        speech_sample_rate: int = 22050,
        num_channels: int = 1,  # Sarvam output is mono WAV
        pitch: float = 0.0,
        pace: float = 1.0,
        loudness: float = 1.0,
        temperature: float = 0.6,
        output_audio_bitrate: SarvamTTSOutputAudioBitrate | str = "128k",
        min_buffer_size: int = 50,
        max_chunk_length: int = 150,
        enable_preprocessing: bool = False,
        dict_id: str | None = None,
        enable_cached_responses: bool | None = None,
        api_key: str | None = None,
        base_url: str = SARVAM_TTS_BASE_URL,
        ws_url: str = SARVAM_TTS_WS_URL,
        http_session: aiohttp.ClientSession | None = None,
        send_completion_event: bool = True,
        output_audio_codec: str = "mp3",
    ) -> None:
        super().__init__(
            capabilities=tts.TTSCapabilities(streaming=True),
            sample_rate=speech_sample_rate,
            num_channels=num_channels,
        )

        self._api_key = api_key or os.environ.get("SARVAM_API_KEY")
        if not self._api_key:
            raise ValueError(
                "Sarvam API key is required. Provide it directly or set SARVAM_API_KEY env var."
            )

        # Validate inputs early
        if not target_language_code or not target_language_code.strip():
            raise ValueError("Target language code is required and cannot be empty")
        if not model or not model.strip():
            raise ValueError("Model is required and cannot be empty")
        if speaker is None:
            # speaker = "shubh"
            if model == "bulbul:v3-beta" or model == "bulbul:v3":
                speaker = "shubh"
            else:
                speaker = "anushka"

        # Validate parameter ranges
        if not -0.75 <= pitch <= 0.75:
            logger.warning(
                "pitch value %.2f is outside the Sarvam API accepted range [-0.75, 0.75]; "
                "clamping to nearest bound. Please update your code.",
                pitch,
            )
            pitch = max(-0.75, min(0.75, pitch))
        if not 0.3 <= pace <= 3.0:
            raise ValueError("Pace must be between 0.3 and 3.0")
        if not 0.5 <= loudness <= 2.0:
            raise ValueError("Loudness must be between 0.5 and 2.0")
        if not 0.01 <= temperature <= 2.0:
            raise ValueError("Temperature must be between 0.01 and 2.0")
        if output_audio_bitrate not in ALLOWED_OUTPUT_AUDIO_BITRATES:
            raise ValueError(
                f"output_audio_bitrate must be one of {', '.join(sorted(ALLOWED_OUTPUT_AUDIO_BITRATES))}"
            )
        if not 30 <= min_buffer_size <= 200:
            raise ValueError("min_buffer_size must be between 30 and 200")
        if not 50 <= max_chunk_length <= 500:
            raise ValueError("max_chunk_length must be between 50 and 500")
        if speech_sample_rate not in [8000, 16000, 22050, 24000, 32000, 44100, 48000]:
            raise ValueError(
                "Sample rate must be one of 8000, 16000, 22050, 24000, 32000, 44100, or 48000 Hz"
            )
        if output_audio_codec not in ALLOWED_OUTPUT_AUDIO_CODECS:
            raise ValueError(
                f"output_audio_codec must be one of {','.join(sorted(ALLOWED_OUTPUT_AUDIO_CODECS))}"
            )

        # Validate model-speaker compatibility
        if not validate_model_speaker_compatibility(model, speaker):
            compatible_speakers = MODEL_SPEAKER_COMPATIBILITY.get(model, {}).get("all", [])
            raise ValueError(
                f"Speaker '{speaker}' is not compatible with model '{model}'. "
                f"Please choose a compatible speaker from: {', '.join(compatible_speakers)}"
            )

        # Initialize word tokenizer for streaming
        word_tokenizer = tokenize.basic.SentenceTokenizer()

        self._opts = SarvamTTSOptions(
            target_language_code=LanguageCode(target_language_code),
            model=model,
            speaker=speaker,
            speech_sample_rate=speech_sample_rate,
            pitch=pitch,
            pace=pace,
            loudness=loudness,
            temperature=temperature,
            output_audio_bitrate=output_audio_bitrate,
            min_buffer_size=min_buffer_size,
            max_chunk_length=max_chunk_length,
            enable_preprocessing=enable_preprocessing,
            dict_id=dict_id,
            enable_cached_responses=enable_cached_responses,
            api_key=self._api_key,
            base_url=base_url,
            ws_url=ws_url,
            word_tokenizer=word_tokenizer,
            send_completion_event=send_completion_event,
            output_audio_codec=output_audio_codec,
        )
        self._session = http_session
        self._streams = weakref.WeakSet[SynthesizeStream]()
        # Maps id(ws) -> background keepalive task that pings the server while
        # the connection sits idle in the pool. Sarvam closes idle connections
        # after 60 s; pinging every 30 s keeps them alive for reuse.
        self._ws_keepalive_tasks: dict[int, asyncio.Task[None]] = {}

        self._pool = utils.ConnectionPool[aiohttp.ClientWebSocketResponse](
            connect_cb=self._connect_ws,
            close_cb=self._close_ws,
            max_session_duration=3600,  # 1 hour
            mark_refreshed_on_get=False,
        )

    async def _connect_ws(self, timeout: float) -> aiohttp.ClientWebSocketResponse:
        session = self._ensure_session()
        headers = {
            "api-subscription-key": self._opts.api_key,
            "User-Agent": USER_AGENT,
            "Accept": "*/*",
            "Accept-Encoding": "gzip, deflate, br",
        }
        # Add model parameter to URL like the client does
        ws_url = f"{self._opts.ws_url}?model={self._opts.model}&send_completion_event={self._opts.send_completion_event}"

        logger.info("Connecting to Sarvam TTS WebSocket")

        try:
            ws = await asyncio.wait_for(
                session.ws_connect(
                    ws_url,
                    headers=headers,
                    # Send protocol-level WebSocket PING frames every
                    # ``_WS_HEARTBEAT_INTERVAL`` seconds. aiohttp handles the
                    # PONG accounting on its own and will close the connection
                    # locally if the server stops responding -- this is what
                    # actually keeps the TCP connection alive while it sits
                    # idle in the pool (no ``receive()`` call to auto-pong).
                    heartbeat=_WS_HEARTBEAT_INTERVAL,
                ),
                timeout,
            )
        except Exception as e:
            logger.error(
                "Failed to connect to Sarvam TTS WebSocket",
                extra={"error": str(e), "url": ws_url},
                exc_info=True,
            )
            raise APIConnectionError(f"WebSocket connection failed: {e}") from e

        self._start_keepalive(ws)
        return ws

    async def _close_ws(self, ws: aiohttp.ClientWebSocketResponse) -> None:
        await self._stop_keepalive(ws)
        await ws.close()

    def _start_keepalive(self, ws: aiohttp.ClientWebSocketResponse) -> None:
        """Spawn a background task that keeps ``ws`` alive with periodic pings.

        Idempotent: if a live keepalive task is already registered for ``ws``
        the call is a no-op. Callers that need a fresh task must invoke
        ``_stop_keepalive`` first.
        """
        if _KEEPALIVE_INTERVAL <= 0:
            return
        existing = self._ws_keepalive_tasks.get(id(ws))
        if existing is not None and not existing.done():
            return
        task = asyncio.create_task(self._keepalive_loop(ws), name="sarvam-tts-ws-keepalive")
        self._ws_keepalive_tasks[id(ws)] = task

    async def _stop_keepalive(self, ws: aiohttp.ClientWebSocketResponse) -> None:
        """Cancel the keepalive task associated with ``ws`` (if any)."""
        task = self._ws_keepalive_tasks.pop(id(ws), None)
        if task is None or task.done():
            return
        task.cancel()
        with contextlib.suppress(BaseException):
            await task

    async def _keepalive_loop(self, ws: aiohttp.ClientWebSocketResponse) -> None:
        """Keep a pooled WebSocket alive while it sits idle.

        This loop does two things in lockstep:

        1. Actively calls ``ws.receive()`` so that aiohttp can process
           server-initiated PONG frames (and any other messages). aiohttp
           only resets its internal "PONG not received" timer when a PONG
           is read via ``receive()`` -- without an active reader, the
           protocol-level heartbeat will tear the connection down even
           though the server is happily replying.

        2. Sends a Sarvam-defined ``{"type": "ping"}`` JSON message
           whenever ``receive()`` times out (i.e. nothing has come in for
           ``_KEEPALIVE_INTERVAL`` seconds). This resets Sarvam's
           server-side application idle timer (documented as 60 s).

        Any CLOSE/CLOSED/CLOSING/ERROR message from the server, or any
        write failure, evicts the connection from the pool so the next
        checkout creates a fresh one instead of handing the dead one out.
        """
        try:
            while not ws.closed:
                try:
                    msg = await asyncio.wait_for(ws.receive(), timeout=_KEEPALIVE_INTERVAL)
                except asyncio.TimeoutError:
                    # No incoming message within the interval -- send our
                    # app-level ping to reset Sarvam's idle timer.
                    if ws.closed:
                        return
                    try:
                        await ws.send_str(json.dumps({"type": "ping"}))
                    except asyncio.CancelledError:
                        raise
                    except Exception as e:
                        logger.debug(
                            "Sarvam TTS keepalive ping failed; evicting connection from pool",
                            extra={"error": str(e)},
                        )
                        with contextlib.suppress(Exception):
                            self._pool.remove(ws)
                        return
                    continue

                # We received something. CLOSE/CLOSED/CLOSING/ERROR means
                # the server tore the connection down -- evict and exit.
                if msg.type in (
                    aiohttp.WSMsgType.CLOSE,
                    aiohttp.WSMsgType.CLOSED,
                    aiohttp.WSMsgType.CLOSING,
                    aiohttp.WSMsgType.ERROR,
                ):
                    logger.debug(
                        "Sarvam TTS WebSocket closed while idle in pool; evicting connection",
                        extra={
                            "msg_type": str(msg.type),
                            "close_code": ws.close_code,
                        },
                    )
                    with contextlib.suppress(Exception):
                        self._pool.remove(ws)
                    return
                # Otherwise -- PONG, TEXT, BINARY, etc. -- discard. We are
                # idle in the pool so any unsolicited TTS traffic from a
                # previous request is no longer relevant. The act of
                # receiving has already reset aiohttp's heartbeat.
        except asyncio.CancelledError:
            pass

    @property
    def model(self) -> str:
        return self._opts.model

    @property
    def provider(self) -> str:
        return "Sarvam"

    def _ensure_session(self) -> aiohttp.ClientSession:
        if not self._session:
            self._session = utils.http_context.http_session()
        return self._session

    def update_options(
        self,
        *,
        model: str | None = None,
        target_language_code: SarvamTTSLanguages | str | None = None,
        speaker: str | None = None,
        pitch: float | None = None,
        pace: float | None = None,
        loudness: float | None = None,
        temperature: float | None = None,
        output_audio_bitrate: SarvamTTSOutputAudioBitrate | str | None = None,
        min_buffer_size: int | None = None,
        max_chunk_length: int | None = None,
        enable_preprocessing: bool | None = None,
        dict_id: str | None = None,
        enable_cached_responses: bool | None = None,
        send_completion_event: bool | None = None,
        output_audio_codec: str | None = None,
    ) -> None:
        """Update TTS options with validation."""
        if target_language_code is not None:
            if not target_language_code.strip():
                raise ValueError("Target language code cannot be empty")
            self._opts.target_language_code = LanguageCode(target_language_code)

        if model is not None:
            if not model.strip():
                raise ValueError("Model cannot be empty")
            self._opts.model = model
            if speaker is None and self._opts.speaker is not None:
                if not validate_model_speaker_compatibility(self._opts.model, self._opts.speaker):
                    compatible_speakers = MODEL_SPEAKER_COMPATIBILITY.get(self._opts.model, {}).get(
                        "all", []
                    )
                    raise ValueError(
                        f"Speaker '{self._opts.speaker}' incompatible with {self._opts.model}. "
                        f"Compatible speakers: {', '.join(compatible_speakers)}"
                    )
        if speaker is not None:
            if not speaker.strip():
                raise ValueError("Speaker cannot be empty")
            if not validate_model_speaker_compatibility(self._opts.model, speaker):
                compatible_speakers = MODEL_SPEAKER_COMPATIBILITY.get(self._opts.model, {}).get(
                    "all", []
                )
                raise ValueError(
                    f"Speaker '{speaker}' incompatible with {self._opts.model}. "
                    f"Compatible speakers: {', '.join(compatible_speakers)}"
                )
            self._opts.speaker = speaker

        if pitch is not None:
            if not -0.75 <= pitch <= 0.75:
                logger.warning(
                    "pitch value %.2f is outside the Sarvam API accepted range [-0.75, 0.75]; "
                    "clamping to nearest bound. Please update your code.",
                    pitch,
                )
                pitch = max(-0.75, min(0.75, pitch))
            self._opts.pitch = pitch

        if pace is not None:
            if not 0.3 <= pace <= 3.0:
                raise ValueError("Pace must be between 0.3 and 3.0")
            self._opts.pace = pace

        if loudness is not None:
            if not 0.5 <= loudness <= 2.0:
                raise ValueError("Loudness must be between 0.5 and 2.0")
            self._opts.loudness = loudness

        if temperature is not None:
            if not 0.01 <= temperature <= 2.0:
                raise ValueError("Temperature must be between 0.01 and 2.0")
            self._opts.temperature = temperature

        if output_audio_bitrate is not None:
            if output_audio_bitrate not in ALLOWED_OUTPUT_AUDIO_BITRATES:
                raise ValueError(
                    "output_audio_bitrate must be one of "
                    f"{', '.join(sorted(ALLOWED_OUTPUT_AUDIO_BITRATES))}"
                )
            self._opts.output_audio_bitrate = output_audio_bitrate

        if min_buffer_size is not None:
            if not 30 <= min_buffer_size <= 200:
                raise ValueError("min_buffer_size must be between 30 and 200")
            self._opts.min_buffer_size = min_buffer_size

        if max_chunk_length is not None:
            if not 50 <= max_chunk_length <= 500:
                raise ValueError("max_chunk_length must be between 50 and 500")
            self._opts.max_chunk_length = max_chunk_length

        if enable_preprocessing is not None:
            self._opts.enable_preprocessing = enable_preprocessing

        if dict_id is not None:
            self._opts.dict_id = dict_id

        if enable_cached_responses is not None:
            self._opts.enable_cached_responses = enable_cached_responses

        if send_completion_event is not None:
            self._opts.send_completion_event = send_completion_event

        if output_audio_codec is not None:
            if output_audio_codec not in ALLOWED_OUTPUT_AUDIO_CODECS:
                raise ValueError(
                    "output_audio_codec must be one of "
                    f"{','.join(sorted(ALLOWED_OUTPUT_AUDIO_CODECS))}"
                )
            self._opts.output_audio_codec = output_audio_codec

    # Implement the abstract synthesize method
    def synthesize(
        self, text: str, *, conn_options: APIConnectOptions | None = None
    ) -> ChunkedStream:
        """Synthesize text to audio using Sarvam.ai TTS API."""
        if conn_options is None:
            conn_options = DEFAULT_API_CONNECT_OPTIONS
        return ChunkedStream(tts=self, input_text=text, conn_options=conn_options)

    def stream(
        self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS
    ) -> SynthesizeStream:
        """Create a streaming TTS session."""
        stream = SynthesizeStream(tts=self, conn_options=conn_options)
        self._streams.add(stream)
        return stream

    def prewarm(self) -> None:
        """Prewarm WebSocket connections."""
        self._pool.prewarm()

    async def aclose(self) -> None:
        """Close all active streams and connections."""
        for stream in list(self._streams):
            await stream.aclose()

        self._streams.clear()
        await self._pool.aclose()

Sarvam.ai Text-to-Speech implementation.

This class provides text-to-speech functionality using the Sarvam.ai API. Sarvam.ai specializes in high-quality TTS for Indian languages.

Args

target_language_code
BCP-47 language code for supported Indian languages
model
Sarvam TTS model to use (bulbul:v2)
speaker
Voice to use for synthesis
speech_sample_rate
Audio sample rate in Hz
num_channels
Number of audio channels (Sarvam outputs mono)
pitch
Voice pitch adjustment (-0.75 to 0.75) - only supported in v2 for now
pace
Speech rate multiplier (0.3 to 3.0)
loudness
Volume multiplier (0.5 to 2.0) - only supported in v2 for now
temperature
Sampling temperature (0.01 to 2.0), only used in v3 and v3-beta
dict_id
Custom pronunciation dictionary ID (bulbul:v3 only)
enable_cached_responses
Enable response caching beta feature (bulbul:v1/v2 only)
output_audio_bitrate
Output audio bitrate (default 128k)
min_buffer_size
Minimum character length for flushing (30 to 200)
max_chunk_length
Maximum chunk length for sentence splitting (50 to 500)
enable_preprocessing
Whether to use text preprocessing
api_key
Sarvam.ai API key (required)
base_url
API endpoint URL
ws_url
WebSocket endpoint URL
http_session
Optional aiohttp session to use
output_audio_codec
Optionally choose the output codec format (mp3)

Ancestors

  • livekit.agents.tts.tts.TTS
  • abc.ABC
  • EventEmitter
  • typing.Generic

Instance variables

prop model : str
Expand source code
@property
def model(self) -> str:
    return self._opts.model

Get the model name/identifier for this TTS instance.

Returns

The model name if available, "unknown" otherwise.

Note

Plugins should override this property to provide their model information.

prop provider : str
Expand source code
@property
def provider(self) -> str:
    return "Sarvam"

Get the provider name/identifier for this TTS instance.

Returns

The provider name if available, "unknown" otherwise.

Note

Plugins should override this property to provide their provider information.

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    """Close all active streams and connections."""
    for stream in list(self._streams):
        await stream.aclose()

    self._streams.clear()
    await self._pool.aclose()

Close all active streams and connections.

def prewarm(self) ‑> None
Expand source code
def prewarm(self) -> None:
    """Prewarm WebSocket connections."""
    self._pool.prewarm()

Prewarm WebSocket connections.

def stream(self,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.sarvam.tts.SynthesizeStream
Expand source code
def stream(
    self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS
) -> SynthesizeStream:
    """Create a streaming TTS session."""
    stream = SynthesizeStream(tts=self, conn_options=conn_options)
    self._streams.add(stream)
    return stream

Create a streaming TTS session.

def synthesize(self, text: str, *, conn_options: APIConnectOptions | None = None) ‑> livekit.plugins.sarvam.tts.ChunkedStream
Expand source code
def synthesize(
    self, text: str, *, conn_options: APIConnectOptions | None = None
) -> ChunkedStream:
    """Synthesize text to audio using Sarvam.ai TTS API."""
    if conn_options is None:
        conn_options = DEFAULT_API_CONNECT_OPTIONS
    return ChunkedStream(tts=self, input_text=text, conn_options=conn_options)

Synthesize text to audio using Sarvam.ai TTS API.

def update_options(self,
*,
model: str | None = None,
target_language_code: SarvamTTSLanguages | str | None = None,
speaker: str | None = None,
pitch: float | None = None,
pace: float | None = None,
loudness: float | None = None,
temperature: float | None = None,
output_audio_bitrate: SarvamTTSOutputAudioBitrate | str | None = None,
min_buffer_size: int | None = None,
max_chunk_length: int | None = None,
enable_preprocessing: bool | None = None,
dict_id: str | None = None,
enable_cached_responses: bool | None = None,
send_completion_event: bool | None = None,
output_audio_codec: str | None = None) ‑> None
Expand source code
def update_options(
    self,
    *,
    model: str | None = None,
    target_language_code: SarvamTTSLanguages | str | None = None,
    speaker: str | None = None,
    pitch: float | None = None,
    pace: float | None = None,
    loudness: float | None = None,
    temperature: float | None = None,
    output_audio_bitrate: SarvamTTSOutputAudioBitrate | str | None = None,
    min_buffer_size: int | None = None,
    max_chunk_length: int | None = None,
    enable_preprocessing: bool | None = None,
    dict_id: str | None = None,
    enable_cached_responses: bool | None = None,
    send_completion_event: bool | None = None,
    output_audio_codec: str | None = None,
) -> None:
    """Update TTS options with validation."""
    if target_language_code is not None:
        if not target_language_code.strip():
            raise ValueError("Target language code cannot be empty")
        self._opts.target_language_code = LanguageCode(target_language_code)

    if model is not None:
        if not model.strip():
            raise ValueError("Model cannot be empty")
        self._opts.model = model
        if speaker is None and self._opts.speaker is not None:
            if not validate_model_speaker_compatibility(self._opts.model, self._opts.speaker):
                compatible_speakers = MODEL_SPEAKER_COMPATIBILITY.get(self._opts.model, {}).get(
                    "all", []
                )
                raise ValueError(
                    f"Speaker '{self._opts.speaker}' incompatible with {self._opts.model}. "
                    f"Compatible speakers: {', '.join(compatible_speakers)}"
                )
    if speaker is not None:
        if not speaker.strip():
            raise ValueError("Speaker cannot be empty")
        if not validate_model_speaker_compatibility(self._opts.model, speaker):
            compatible_speakers = MODEL_SPEAKER_COMPATIBILITY.get(self._opts.model, {}).get(
                "all", []
            )
            raise ValueError(
                f"Speaker '{speaker}' incompatible with {self._opts.model}. "
                f"Compatible speakers: {', '.join(compatible_speakers)}"
            )
        self._opts.speaker = speaker

    if pitch is not None:
        if not -0.75 <= pitch <= 0.75:
            logger.warning(
                "pitch value %.2f is outside the Sarvam API accepted range [-0.75, 0.75]; "
                "clamping to nearest bound. Please update your code.",
                pitch,
            )
            pitch = max(-0.75, min(0.75, pitch))
        self._opts.pitch = pitch

    if pace is not None:
        if not 0.3 <= pace <= 3.0:
            raise ValueError("Pace must be between 0.3 and 3.0")
        self._opts.pace = pace

    if loudness is not None:
        if not 0.5 <= loudness <= 2.0:
            raise ValueError("Loudness must be between 0.5 and 2.0")
        self._opts.loudness = loudness

    if temperature is not None:
        if not 0.01 <= temperature <= 2.0:
            raise ValueError("Temperature must be between 0.01 and 2.0")
        self._opts.temperature = temperature

    if output_audio_bitrate is not None:
        if output_audio_bitrate not in ALLOWED_OUTPUT_AUDIO_BITRATES:
            raise ValueError(
                "output_audio_bitrate must be one of "
                f"{', '.join(sorted(ALLOWED_OUTPUT_AUDIO_BITRATES))}"
            )
        self._opts.output_audio_bitrate = output_audio_bitrate

    if min_buffer_size is not None:
        if not 30 <= min_buffer_size <= 200:
            raise ValueError("min_buffer_size must be between 30 and 200")
        self._opts.min_buffer_size = min_buffer_size

    if max_chunk_length is not None:
        if not 50 <= max_chunk_length <= 500:
            raise ValueError("max_chunk_length must be between 50 and 500")
        self._opts.max_chunk_length = max_chunk_length

    if enable_preprocessing is not None:
        self._opts.enable_preprocessing = enable_preprocessing

    if dict_id is not None:
        self._opts.dict_id = dict_id

    if enable_cached_responses is not None:
        self._opts.enable_cached_responses = enable_cached_responses

    if send_completion_event is not None:
        self._opts.send_completion_event = send_completion_event

    if output_audio_codec is not None:
        if output_audio_codec not in ALLOWED_OUTPUT_AUDIO_CODECS:
            raise ValueError(
                "output_audio_codec must be one of "
                f"{','.join(sorted(ALLOWED_OUTPUT_AUDIO_CODECS))}"
            )
        self._opts.output_audio_codec = output_audio_codec

Update TTS options with validation.

Inherited members