Module livekit.plugins.sarvam
Sarvam.ai plugin for LiveKit Agents
Support for speech-to-text and text-to-speech with Sarvam.ai.
Sarvam.ai provides high-quality STT and TTS for Indian languages.
For API access, visit https://sarvam.ai/
Classes
class STT (*,
language: str = 'en-IN',
model: SarvamSTTModels | str = 'saarika:v2.5',
api_key: str | None = None,
base_url: str | None = None,
http_session: aiohttp.ClientSession | None = None,
prompt: str | None = None)-
Expand source code
class STT(stt.STT): """Sarvam.ai Speech-to-Text implementation. This class provides speech-to-text functionality using the Sarvam.ai API. Sarvam.ai specializes in high-quality STT for Indian languages. Args: language: BCP-47 language code, e.g., "hi-IN", "en-IN" model: The Sarvam STT model to use api_key: Sarvam.ai API key (falls back to SARVAM_API_KEY env var) base_url: API endpoint URL http_session: Optional aiohttp session to use prompt: Optional prompt for STT translate (saaras models only) """ def __init__( self, *, language: str = "en-IN", model: SarvamSTTModels | str = "saarika:v2.5", api_key: str | None = None, base_url: str | None = None, http_session: aiohttp.ClientSession | None = None, prompt: str | None = None, ) -> None: super().__init__(capabilities=stt.STTCapabilities(streaming=True, interim_results=True)) self._api_key = api_key or os.environ.get("SARVAM_API_KEY") if not self._api_key: raise ValueError( "Sarvam API key is required. " "Provide it directly or set SARVAM_API_KEY environment variable." ) self._opts = SarvamSTTOptions( language=language, api_key=self._api_key, model=model, base_url=base_url, prompt=prompt, ) self._session = http_session self._logger = logger.getChild(self.__class__.__name__) self._streams = weakref.WeakSet[SpeechStream]() @property def model(self) -> str: return self._opts.model @property def provider(self) -> str: return "Sarvam" def _ensure_session(self) -> aiohttp.ClientSession: if not self._session: self._session = utils.http_context.http_session() return self._session async def _recognize_impl( self, buffer: AudioBuffer, *, language: NotGivenOr[str] = NOT_GIVEN, model: NotGivenOr[SarvamSTTModels | str] = NOT_GIVEN, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> stt.SpeechEvent: """Recognize speech using Sarvam.ai API. Args: buffer: Audio buffer containing speech data language: BCP-47 language code (overrides the one set in constructor) model: Sarvam model to use (overrides the one set in constructor) conn_options: Connection options for API requests Returns: A SpeechEvent containing the transcription result Raises: APIConnectionError: On network connection errors APIStatusError: On API errors (non-200 status) APITimeoutError: On API timeout """ opts_language = self._opts.language if isinstance(language, type(NOT_GIVEN)) else language opts_model = self._opts.model if isinstance(model, type(NOT_GIVEN)) else model wav_bytes = rtc.combine_audio_frames(buffer).to_wav_bytes() form_data = aiohttp.FormData() form_data.add_field("file", wav_bytes, filename="audio.wav", content_type="audio/wav") # Add model and language_code to the form data if specified # Sarvam API docs state language_code is optional for saarika:v2x but mandatory for v1 # Model is also optional, defaults to saarika:v2.5 if opts_language: form_data.add_field("language_code", opts_language) if opts_model: form_data.add_field("model", str(opts_model)) if self._api_key is None: raise ValueError("API key cannot be None") headers = {"api-subscription-key": self._api_key} try: if self._opts.base_url is None: raise ValueError("base_url cannot be None") async with self._ensure_session().post( url=self._opts.base_url, data=form_data, headers=headers, timeout=aiohttp.ClientTimeout( total=conn_options.timeout, sock_connect=conn_options.timeout, ), ) as res: if res.status != 200: error_text = await res.text() self._logger.error(f"Sarvam API error: {res.status} - {error_text}") raise APIStatusError( message=f"Sarvam API Error: {error_text}", status_code=res.status, ) response_json = await res.json() self._logger.debug(f"Sarvam API response: {response_json}") transcript_text = response_json.get("transcript", "") request_id = response_json.get("request_id", "") detected_language = response_json.get("language_code") if not isinstance(detected_language, str): detected_language = opts_language or "" start_time = 0.0 end_time = 0.0 # Try to get timestamps if available timestamps_data = response_json.get("timestamps") if timestamps_data and isinstance(timestamps_data, dict): words_ts_start = timestamps_data.get("start_time_seconds") words_ts_end = timestamps_data.get("end_time_seconds") if isinstance(words_ts_start, list) and len(words_ts_start) > 0: start_time = words_ts_start[0] if isinstance(words_ts_end, list) and len(words_ts_end) > 0: end_time = words_ts_end[-1] # If start/end times are still 0, use buffer duration as an estimate for end_time if start_time == 0.0 and end_time == 0.0: end_time = _calculate_audio_duration(buffer) alternatives = [ stt.SpeechData( language=detected_language, text=transcript_text, start_time=start_time, end_time=end_time, confidence=1.0, # Sarvam doesn't provide confidence score in this response ) ] return stt.SpeechEvent( type=stt.SpeechEventType.FINAL_TRANSCRIPT, request_id=request_id, alternatives=alternatives, ) except asyncio.TimeoutError as e: self._logger.error(f"Sarvam API timeout: {e}") raise APITimeoutError("Sarvam API request timed out") from e except aiohttp.ClientError as e: self._logger.error(f"Sarvam API client error: {e}") raise APIConnectionError(f"Sarvam API connection error: {e}") from e except Exception as e: self._logger.error(f"Error during Sarvam STT processing: {e}") raise APIConnectionError(f"Unexpected error in Sarvam STT: {e}") from e def stream( self, *, language: NotGivenOr[str] = NOT_GIVEN, model: NotGivenOr[SarvamSTTModels | str] = NOT_GIVEN, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, prompt: NotGivenOr[str] = NOT_GIVEN, ) -> SpeechStream: """Create a streaming transcription session.""" opts_language = _get_option_value(self._opts.language, language) opts_model = _get_option_value(self._opts.model, model) if not isinstance(opts_language, str): opts_language = self._opts.language if not isinstance(opts_model, str): opts_model = self._opts.model # Handle prompt conversion from NotGiven to None final_prompt: str | None if isinstance(prompt, str): final_prompt = prompt else: final_prompt = self._opts.prompt # Create options for the stream stream_opts = SarvamSTTOptions( language=opts_language, api_key=self._api_key if self._api_key else "", model=opts_model, prompt=final_prompt, ) # Create a fresh session for this stream to avoid conflicts stream_session = aiohttp.ClientSession() if self._api_key is None: raise ValueError("API key cannot be None") stream = SpeechStream( stt=self, opts=stream_opts, conn_options=conn_options, api_key=self._api_key, http_session=stream_session, ) self._streams.add(stream) return streamSarvam.ai Speech-to-Text implementation.
This class provides speech-to-text functionality using the Sarvam.ai API. Sarvam.ai specializes in high-quality STT for Indian languages.
Args
language- BCP-47 language code, e.g., "hi-IN", "en-IN"
model- The Sarvam STT model to use
api_key- Sarvam.ai API key (falls back to SARVAM_API_KEY env var)
base_url- API endpoint URL
http_session- Optional aiohttp session to use
prompt- Optional prompt for STT translate (saaras models only)
Ancestors
- livekit.agents.stt.stt.STT
- abc.ABC
- EventEmitter
- typing.Generic
Instance variables
prop model : str-
Expand source code
@property def model(self) -> str: return self._opts.modelGet the model name/identifier for this STT instance.
Returns
The model name if available, "unknown" otherwise.
Note
Plugins should override this property to provide their model information.
prop provider : str-
Expand source code
@property def provider(self) -> str: return "Sarvam"Get the provider name/identifier for this STT instance.
Returns
The provider name if available, "unknown" otherwise.
Note
Plugins should override this property to provide their provider information.
Methods
def stream(self,
*,
language: NotGivenOr[str] = NOT_GIVEN,
model: NotGivenOr[SarvamSTTModels | str] = NOT_GIVEN,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0),
prompt: NotGivenOr[str] = NOT_GIVEN) ‑> livekit.plugins.sarvam.stt.SpeechStream-
Expand source code
def stream( self, *, language: NotGivenOr[str] = NOT_GIVEN, model: NotGivenOr[SarvamSTTModels | str] = NOT_GIVEN, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, prompt: NotGivenOr[str] = NOT_GIVEN, ) -> SpeechStream: """Create a streaming transcription session.""" opts_language = _get_option_value(self._opts.language, language) opts_model = _get_option_value(self._opts.model, model) if not isinstance(opts_language, str): opts_language = self._opts.language if not isinstance(opts_model, str): opts_model = self._opts.model # Handle prompt conversion from NotGiven to None final_prompt: str | None if isinstance(prompt, str): final_prompt = prompt else: final_prompt = self._opts.prompt # Create options for the stream stream_opts = SarvamSTTOptions( language=opts_language, api_key=self._api_key if self._api_key else "", model=opts_model, prompt=final_prompt, ) # Create a fresh session for this stream to avoid conflicts stream_session = aiohttp.ClientSession() if self._api_key is None: raise ValueError("API key cannot be None") stream = SpeechStream( stt=self, opts=stream_opts, conn_options=conn_options, api_key=self._api_key, http_session=stream_session, ) self._streams.add(stream) return streamCreate a streaming transcription session.
Inherited members
class TTS (*,
target_language_code: SarvamTTSLanguages | str,
model: SarvamTTSModels | str = 'bulbul:v2',
speaker: SarvamTTSSpeakers | str = 'anushka',
speech_sample_rate: int = 22050,
num_channels: int = 1,
pitch: float = 0.0,
pace: float = 1.0,
loudness: float = 1.0,
enable_preprocessing: bool = False,
api_key: str | None = None,
base_url: str = 'https://api.sarvam.ai/text-to-speech',
ws_url: str = 'wss://api.sarvam.ai/text-to-speech/ws',
http_session: aiohttp.ClientSession | None = None,
send_completion_event: bool = True)-
Expand source code
class TTS(tts.TTS): """Sarvam.ai Text-to-Speech implementation. This class provides text-to-speech functionality using the Sarvam.ai API. Sarvam.ai specializes in high-quality TTS for Indian languages. Args: target_language_code: BCP-47 language code for supported Indian languages model: Sarvam TTS model to use (bulbul:v2) speaker: Voice to use for synthesis speech_sample_rate: Audio sample rate in Hz num_channels: Number of audio channels (Sarvam outputs mono) pitch: Voice pitch adjustment (-20.0 to 20.0) - only supported in v2 for now pace: Speech rate multiplier (0.5 to 2.0) loudness: Volume multiplier (0.5 to 2.0) - only supported in v2 for now enable_preprocessing: Whether to use text preprocessing api_key: Sarvam.ai API key (required) base_url: API endpoint URL ws_url: WebSocket endpoint URL http_session: Optional aiohttp session to use """ def __init__( self, *, target_language_code: SarvamTTSLanguages | str, model: SarvamTTSModels | str = "bulbul:v2", speaker: SarvamTTSSpeakers | str = "anushka", speech_sample_rate: int = 22050, num_channels: int = 1, # Sarvam output is mono WAV pitch: float = 0.0, pace: float = 1.0, loudness: float = 1.0, enable_preprocessing: bool = False, api_key: str | None = None, base_url: str = SARVAM_TTS_BASE_URL, ws_url: str = SARVAM_TTS_WS_URL, http_session: aiohttp.ClientSession | None = None, send_completion_event: bool = True, ) -> None: super().__init__( capabilities=tts.TTSCapabilities(streaming=True), sample_rate=speech_sample_rate, num_channels=num_channels, ) self._api_key = api_key or os.environ.get("SARVAM_API_KEY") if not self._api_key: raise ValueError( "Sarvam API key is required. Provide it directly or set SARVAM_API_KEY env var." ) # Validate inputs early if not target_language_code or not target_language_code.strip(): raise ValueError("Target language code is required and cannot be empty") if not model or not model.strip(): raise ValueError("Model is required and cannot be empty") if not speaker or not speaker.strip(): raise ValueError("Speaker is required and cannot be empty") # Validate parameter ranges if not -20.0 <= pitch <= 20.0: raise ValueError("Pitch must be between -20.0 and 20.0") if not 0.5 <= pace <= 2.0: raise ValueError("Pace must be between 0.5 and 2.0") if not 0.5 <= loudness <= 2.0: raise ValueError("Loudness must be between 0.5 and 2.0") if speech_sample_rate not in [8000, 16000, 22050, 24000]: raise ValueError("Sample rate must be 8000, 16000, 22050, or 24000 Hz") # Validate model-speaker compatibility if not validate_model_speaker_compatibility(model, speaker): compatible_speakers = MODEL_SPEAKER_COMPATIBILITY.get(model, {}).get("all", []) raise ValueError( f"Speaker '{speaker}' is not compatible with model '{model}'. " f"Please choose a compatible speaker from: {', '.join(compatible_speakers)}" ) # Initialize word tokenizer for streaming word_tokenizer = tokenize.basic.SentenceTokenizer() self._opts = SarvamTTSOptions( target_language_code=target_language_code, model=model, speaker=speaker, speech_sample_rate=speech_sample_rate, pitch=pitch, pace=pace, loudness=loudness, enable_preprocessing=enable_preprocessing, api_key=self._api_key, base_url=base_url, ws_url=ws_url, word_tokenizer=word_tokenizer, send_completion_event=send_completion_event, ) self._session = http_session self._streams = weakref.WeakSet[SynthesizeStream]() self._pool = utils.ConnectionPool[aiohttp.ClientWebSocketResponse]( connect_cb=self._connect_ws, close_cb=self._close_ws, max_session_duration=3600, # 1 hour mark_refreshed_on_get=False, ) async def _connect_ws(self, timeout: float) -> aiohttp.ClientWebSocketResponse: session = self._ensure_session() headers = { "api-subscription-key": self._opts.api_key, "User-Agent": "LiveKit-Sarvam-TTS/1.0", "Accept": "*/*", "Accept-Encoding": "gzip, deflate, br", } # Add model parameter to URL like the client does ws_url = f"{self._opts.ws_url}?model={self._opts.model}&send_completion_event={self._opts.send_completion_event}" logger.info("Connecting to Sarvam TTS WebSocket") try: return await asyncio.wait_for( session.ws_connect( ws_url, headers=headers, ), timeout, ) except Exception as e: logger.error( "Failed to connect to Sarvam TTS WebSocket", extra={"error": str(e), "url": ws_url}, exc_info=True, ) raise APIConnectionError(f"WebSocket connection failed: {e}") from e async def _close_ws(self, ws: aiohttp.ClientWebSocketResponse) -> None: await ws.close() @property def model(self) -> str: return self._opts.model @property def provider(self) -> str: return "Sarvam" def _ensure_session(self) -> aiohttp.ClientSession: if not self._session: self._session = utils.http_context.http_session() return self._session def update_options( self, *, model: str | None = None, speaker: str | None = None, pitch: float | None = None, pace: float | None = None, loudness: float | None = None, enable_preprocessing: bool | None = False, send_completion_event: bool | None = True, ) -> None: """Update TTS options with validation.""" if model is not None: if not model.strip(): raise ValueError("Model cannot be empty") if model not in ["bulbul:v2"]: raise ValueError(f"Unsupported model: {model}") self._opts.model = model if speaker is not None: if not speaker.strip(): raise ValueError("Speaker cannot be empty") if not validate_model_speaker_compatibility(self._opts.model, speaker): compatible_speakers = MODEL_SPEAKER_COMPATIBILITY.get(self._opts.model, {}).get( "all", [] ) raise ValueError( f"Speaker '{speaker}' incompatible with {self._opts.model}. " f"Compatible speakers: {', '.join(compatible_speakers)}" ) self._opts.speaker = speaker if pitch is not None: if not -20.0 <= pitch <= 20.0: raise ValueError("Pitch must be between -20.0 and 20.0") self._opts.pitch = pitch if pace is not None: if not 0.5 <= pace <= 2.0: raise ValueError("Pace must be between 0.5 and 2.0") self._opts.pace = pace if loudness is not None: if not 0.5 <= loudness <= 2.0: raise ValueError("Loudness must be between 0.5 and 2.0") self._opts.loudness = loudness if enable_preprocessing is not None: self._opts.enable_preprocessing = enable_preprocessing if send_completion_event is not None: self._opts.send_completion_event = send_completion_event # Implement the abstract synthesize method def synthesize( self, text: str, *, conn_options: APIConnectOptions | None = None ) -> ChunkedStream: """Synthesize text to audio using Sarvam.ai TTS API.""" if conn_options is None: conn_options = DEFAULT_API_CONNECT_OPTIONS return ChunkedStream(tts=self, input_text=text, conn_options=conn_options) def stream( self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS ) -> SynthesizeStream: """Create a streaming TTS session.""" stream = SynthesizeStream(tts=self, conn_options=conn_options) self._streams.add(stream) return stream def prewarm(self) -> None: """Prewarm WebSocket connections.""" self._pool.prewarm() async def aclose(self) -> None: """Close all active streams and connections.""" for stream in list(self._streams): await stream.aclose() self._streams.clear() await self._pool.aclose()Sarvam.ai Text-to-Speech implementation.
This class provides text-to-speech functionality using the Sarvam.ai API. Sarvam.ai specializes in high-quality TTS for Indian languages.
Args
target_language_code- BCP-47 language code for supported Indian languages
model- Sarvam TTS model to use (bulbul:v2)
speaker- Voice to use for synthesis
speech_sample_rate- Audio sample rate in Hz
num_channels- Number of audio channels (Sarvam outputs mono)
pitch- Voice pitch adjustment (-20.0 to 20.0) - only supported in v2 for now
pace- Speech rate multiplier (0.5 to 2.0)
loudness- Volume multiplier (0.5 to 2.0) - only supported in v2 for now
enable_preprocessing- Whether to use text preprocessing
api_key- Sarvam.ai API key (required)
base_url- API endpoint URL
ws_url- WebSocket endpoint URL
http_session- Optional aiohttp session to use
Ancestors
- livekit.agents.tts.tts.TTS
- abc.ABC
- EventEmitter
- typing.Generic
Instance variables
prop model : str-
Expand source code
@property def model(self) -> str: return self._opts.modelGet the model name/identifier for this TTS instance.
Returns
The model name if available, "unknown" otherwise.
Note
Plugins should override this property to provide their model information.
prop provider : str-
Expand source code
@property def provider(self) -> str: return "Sarvam"Get the provider name/identifier for this TTS instance.
Returns
The provider name if available, "unknown" otherwise.
Note
Plugins should override this property to provide their provider information.
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: """Close all active streams and connections.""" for stream in list(self._streams): await stream.aclose() self._streams.clear() await self._pool.aclose()Close all active streams and connections.
def prewarm(self) ‑> None-
Expand source code
def prewarm(self) -> None: """Prewarm WebSocket connections.""" self._pool.prewarm()Prewarm WebSocket connections.
def stream(self,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.sarvam.tts.SynthesizeStream-
Expand source code
def stream( self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS ) -> SynthesizeStream: """Create a streaming TTS session.""" stream = SynthesizeStream(tts=self, conn_options=conn_options) self._streams.add(stream) return streamCreate a streaming TTS session.
def synthesize(self, text: str, *, conn_options: APIConnectOptions | None = None) ‑> livekit.plugins.sarvam.tts.ChunkedStream-
Expand source code
def synthesize( self, text: str, *, conn_options: APIConnectOptions | None = None ) -> ChunkedStream: """Synthesize text to audio using Sarvam.ai TTS API.""" if conn_options is None: conn_options = DEFAULT_API_CONNECT_OPTIONS return ChunkedStream(tts=self, input_text=text, conn_options=conn_options)Synthesize text to audio using Sarvam.ai TTS API.
def update_options(self,
*,
model: str | None = None,
speaker: str | None = None,
pitch: float | None = None,
pace: float | None = None,
loudness: float | None = None,
enable_preprocessing: bool | None = False,
send_completion_event: bool | None = True) ‑> None-
Expand source code
def update_options( self, *, model: str | None = None, speaker: str | None = None, pitch: float | None = None, pace: float | None = None, loudness: float | None = None, enable_preprocessing: bool | None = False, send_completion_event: bool | None = True, ) -> None: """Update TTS options with validation.""" if model is not None: if not model.strip(): raise ValueError("Model cannot be empty") if model not in ["bulbul:v2"]: raise ValueError(f"Unsupported model: {model}") self._opts.model = model if speaker is not None: if not speaker.strip(): raise ValueError("Speaker cannot be empty") if not validate_model_speaker_compatibility(self._opts.model, speaker): compatible_speakers = MODEL_SPEAKER_COMPATIBILITY.get(self._opts.model, {}).get( "all", [] ) raise ValueError( f"Speaker '{speaker}' incompatible with {self._opts.model}. " f"Compatible speakers: {', '.join(compatible_speakers)}" ) self._opts.speaker = speaker if pitch is not None: if not -20.0 <= pitch <= 20.0: raise ValueError("Pitch must be between -20.0 and 20.0") self._opts.pitch = pitch if pace is not None: if not 0.5 <= pace <= 2.0: raise ValueError("Pace must be between 0.5 and 2.0") self._opts.pace = pace if loudness is not None: if not 0.5 <= loudness <= 2.0: raise ValueError("Loudness must be between 0.5 and 2.0") self._opts.loudness = loudness if enable_preprocessing is not None: self._opts.enable_preprocessing = enable_preprocessing if send_completion_event is not None: self._opts.send_completion_event = send_completion_eventUpdate TTS options with validation.
Inherited members