Module livekit.plugins.slng
SLNG plugin for LiveKit Agents
STT and TTS adapters for SLNG gateway models.
See https://docs.slng.ai/ for more information.
Classes
class STT (*,
api_key: str | None = None,
model: str = 'deepgram/nova:3',
model_endpoint: str | None = None,
model_endpoints: list[str] | None = None,
slng_base_url: str = 'api.slng.ai',
region_override: str | list[str] | None = None,
sample_rate: int = 16000,
encoding: NotGivenOr[STTEncoding] = NOT_GIVEN,
buffer_size_seconds: float = 0.064,
enable_partial_transcripts: bool = True,
vad_threshold: float = 0.5,
vad_min_silence_duration_ms: int = 300,
vad_speech_pad_ms: int = 30,
enable_diarization: bool = False,
min_speakers: int | None = None,
max_speakers: int | None = None,
language: str = 'en',
http_session: aiohttp.ClientSession | None = None,
**model_options: Any)-
Expand source code
class STT(stt.STT): def __init__( self, *, api_key: str | None = None, model: str = "deepgram/nova:3", model_endpoint: str | None = None, model_endpoints: list[str] | None = None, slng_base_url: str = "api.slng.ai", region_override: str | list[str] | None = None, sample_rate: int = 16000, encoding: NotGivenOr[STTEncoding] = NOT_GIVEN, buffer_size_seconds: float = DEFAULT_BUFFER_SIZE_SECONDS, # Common SLNG options enable_partial_transcripts: bool = True, vad_threshold: float = 0.5, vad_min_silence_duration_ms: int = 300, vad_speech_pad_ms: int = 30, enable_diarization: bool = False, min_speakers: int | None = None, max_speakers: int | None = None, language: str = "en", http_session: aiohttp.ClientSession | None = None, **model_options: Any, ) -> None: """ Initialize SLNG STT. Args: api_key: SLNG API key for authentication. model: SLNG model identifier (for example "deepgram/nova:3") model_endpoint: Optional full SLNG WebSocket endpoint URL model_endpoints: Optional fallback STT endpoints slng_base_url: SLNG gateway host. Defaults to "api.slng.ai" region_override: Optional gateway region override sent as X-Region-Override. sample_rate: Audio sample rate (default: 16000) encoding: Audio encoding format buffer_size_seconds: Buffer size in seconds enable_partial_transcripts: Enable interim results vad_threshold: Voice activity detection threshold vad_min_silence_duration_ms: Min silence duration for VAD vad_speech_pad_ms: Speech padding for VAD enable_diarization: Enable speaker identification min_speakers: Minimum speakers for diarization max_speakers: Maximum speakers for diarization language: Language code (default: "en") http_session: Optional HTTP session **model_options: Model-specific options (e.g., whisper_params={"task": "translate"}) """ resolved_key = api_key or os.environ.get("SLNG_API_KEY") if not resolved_key: raise ValueError("api_key is required, or set SLNG_API_KEY environment variable") # Detect if endpoint supports streaming (WebSocket endpoints do) # - streaming=True: Supports real-time streaming (WebSocket only) # - streaming=False: HTTP batch recognition only resolved_model_endpoint = model_endpoint or _default_stt_endpoint( slng_base_url=slng_base_url, model=model, ) endpoints = list( model_endpoints or [ resolved_model_endpoint, ] ) if not endpoints: endpoints = [resolved_model_endpoint] primary_endpoint = endpoints[0] is_streaming = primary_endpoint.startswith("ws://") or primary_endpoint.startswith("wss://") super().__init__( capabilities=stt.STTCapabilities( streaming=is_streaming, interim_results=is_streaming, offline_recognize=not is_streaming, ), ) self._api_key = resolved_key self._region_override_header = normalize_region_override(region_override) self._model_endpoints = endpoints self._active_endpoint_index = 0 self._model_endpoint = endpoints[0] self._models = [_extract_model_from_endpoint(e) for e in endpoints] self._model = ( self._models[0] if self._models else _extract_model_from_endpoint(primary_endpoint) ) self._opts = STTOptions( sample_rate=sample_rate, buffer_size_seconds=buffer_size_seconds, enable_partial_transcripts=enable_partial_transcripts, vad_threshold=vad_threshold, vad_min_silence_duration_ms=vad_min_silence_duration_ms, vad_speech_pad_ms=vad_speech_pad_ms, enable_diarization=enable_diarization, min_speakers=min_speakers, max_speakers=max_speakers, language=language, ) if is_given(encoding): self._opts.encoding = encoding # Store any extra model-specific options self._model_options = model_options self._session = http_session self._streams = weakref.WeakSet[SpeechStream]() def _set_active_endpoint_index(self, index: int) -> None: """Update the active endpoint index (called by SpeechStream after successful failover).""" self._active_endpoint_index = index @property def model(self) -> str: return "slng" @property def provider(self) -> str: return "SLNG" @property def session(self) -> aiohttp.ClientSession: if not self._session: self._session = utils.http_context.http_session() return self._session async def _recognize_impl( self, buffer: AudioBuffer, *, language: NotGivenOr[str] = NOT_GIVEN, conn_options: APIConnectOptions, ) -> stt.SpeechEvent: """ HTTP batch recognition for non-streaming STT. Converts audio buffer to base64 and sends to SLNG HTTP endpoint. """ # Use language from parameter or fall back to instance default lang = language if is_given(language) else self._opts.language # Convert AudioBuffer to bytes audio_data = rtc.combine_audio_frames(buffer).data.tobytes() # Encode as base64 audio_b64 = base64.b64encode(audio_data).decode("utf-8") # Prepare request payload payload = { "audio_b64": audio_b64, "language": lang, } # Add any model-specific options if self._model_options: payload.update(self._model_options) try: async with self.session.post( self._model_endpoint, headers={ "Authorization": f"Bearer {self._api_key}", "Content-Type": "application/json", **( {"X-Region-Override": self._region_override_header} if self._region_override_header else {} ), }, json=payload, timeout=aiohttp.ClientTimeout( total=conn_options.timeout, sock_connect=conn_options.timeout ), ) as resp: if resp.status != 200: error_text = await resp.text() logger.error(f"[SLNG STT] HTTP error {resp.status}: {error_text}") raise APIStatusError( f"SLNG STT HTTP error {resp.status}: {error_text}", status_code=resp.status, ) data = await resp.json() # Extract transcription from response # Expected format: {"text": "...", "language": "en", "segments": [...]} text = data.get("text", "") detected_language = data.get("language", lang) segments = data.get("segments", []) # Calculate start and end times from segments start_time = segments[0].get("start", 0.0) if segments else 0.0 end_time = segments[-1].get("end", 0.0) if segments else 0.0 # Build SpeechEvent return stt.SpeechEvent( type=stt.SpeechEventType.FINAL_TRANSCRIPT, alternatives=[ stt.SpeechData( language=detected_language, text=text, confidence=1.0, # SLNG doesn't provide confidence in HTTP mode start_time=start_time, end_time=end_time, ) ], ) except aiohttp.ClientError as e: logger.error(f"[SLNG STT] HTTP connection error: {e}") raise APIConnectionError(f"SLNG STT HTTP connection error: {e}") from e except asyncio.TimeoutError: logger.error("[SLNG STT] HTTP request timed out") raise APITimeoutError("SLNG STT HTTP request timed out") from None except APIStatusError: raise except Exception as e: logger.error(f"[SLNG STT] HTTP unexpected error: {e}", exc_info=True) raise APIStatusError(f"SLNG STT HTTP error: {e}") from e def stream( self, *, language: NotGivenOr[str] = NOT_GIVEN, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> SpeechStream: config = dataclasses.replace(self._opts) if is_given(language): config.language = language stream = SpeechStream( stt=self, conn_options=conn_options, opts=config, api_key=self._api_key, region_override_header=self._region_override_header, model_endpoints=self._model_endpoints, models=self._models, active_endpoint_index=self._active_endpoint_index, model_options=self._model_options, http_session=self.session, ) self._streams.add(stream) return stream def update_options( self, *, enable_partial_transcripts: NotGivenOr[bool] = NOT_GIVEN, enable_diarization: NotGivenOr[bool] = NOT_GIVEN, vad_threshold: NotGivenOr[float] = NOT_GIVEN, vad_min_silence_duration_ms: NotGivenOr[int] = NOT_GIVEN, vad_speech_pad_ms: NotGivenOr[int] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, buffer_size_seconds: NotGivenOr[float] = NOT_GIVEN, ) -> None: if is_given(enable_partial_transcripts): self._opts.enable_partial_transcripts = enable_partial_transcripts if is_given(enable_diarization): self._opts.enable_diarization = enable_diarization if is_given(vad_threshold): self._opts.vad_threshold = vad_threshold if is_given(vad_min_silence_duration_ms): self._opts.vad_min_silence_duration_ms = vad_min_silence_duration_ms if is_given(vad_speech_pad_ms): self._opts.vad_speech_pad_ms = vad_speech_pad_ms if is_given(language): self._opts.language = language if is_given(buffer_size_seconds): self._opts.buffer_size_seconds = buffer_size_seconds for stream in self._streams: stream.update_options( enable_partial_transcripts=enable_partial_transcripts, enable_diarization=enable_diarization, vad_threshold=vad_threshold, vad_min_silence_duration_ms=vad_min_silence_duration_ms, vad_speech_pad_ms=vad_speech_pad_ms, language=language, buffer_size_seconds=buffer_size_seconds, )Helper class that provides a standard way to create an ABC using inheritance.
Initialize SLNG STT.
Args
api_key- SLNG API key for authentication.
model- SLNG model identifier (for example "deepgram/nova:3")
model_endpoint- Optional full SLNG WebSocket endpoint URL
model_endpoints- Optional fallback STT endpoints
slng_base_url- SLNG gateway host. Defaults to "api.slng.ai"
region_override- Optional gateway region override sent as X-Region-Override.
sample_rate- Audio sample rate (default: 16000)
encoding- Audio encoding format
buffer_size_seconds- Buffer size in seconds
enable_partial_transcripts- Enable interim results
vad_threshold- Voice activity detection threshold
vad_min_silence_duration_ms- Min silence duration for VAD
vad_speech_pad_ms- Speech padding for VAD
enable_diarization- Enable speaker identification
min_speakers- Minimum speakers for diarization
max_speakers- Maximum speakers for diarization
language- Language code (default: "en")
http_session- Optional HTTP session
**model_options- Model-specific options (e.g., whisper_params={"task": "translate"})
Ancestors
- livekit.agents.stt.stt.STT
- abc.ABC
- EventEmitter
- typing.Generic
Instance variables
prop model : str-
Expand source code
@property def model(self) -> str: return "slng"Get the model name/identifier for this STT instance.
Returns
The model name if available, "unknown" otherwise.
Note
Plugins should override this property to provide their model information.
prop provider : str-
Expand source code
@property def provider(self) -> str: return "SLNG"Get the provider name/identifier for this STT instance.
Returns
The provider name if available, "unknown" otherwise.
Note
Plugins should override this property to provide their provider information.
prop session : aiohttp.ClientSession-
Expand source code
@property def session(self) -> aiohttp.ClientSession: if not self._session: self._session = utils.http_context.http_session() return self._session
Methods
def stream(self,
*,
language: NotGivenOr[str] = NOT_GIVEN,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.slng.stt.SpeechStream-
Expand source code
def stream( self, *, language: NotGivenOr[str] = NOT_GIVEN, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> SpeechStream: config = dataclasses.replace(self._opts) if is_given(language): config.language = language stream = SpeechStream( stt=self, conn_options=conn_options, opts=config, api_key=self._api_key, region_override_header=self._region_override_header, model_endpoints=self._model_endpoints, models=self._models, active_endpoint_index=self._active_endpoint_index, model_options=self._model_options, http_session=self.session, ) self._streams.add(stream) return stream def update_options(self,
*,
enable_partial_transcripts: NotGivenOr[bool] = NOT_GIVEN,
enable_diarization: NotGivenOr[bool] = NOT_GIVEN,
vad_threshold: NotGivenOr[float] = NOT_GIVEN,
vad_min_silence_duration_ms: NotGivenOr[int] = NOT_GIVEN,
vad_speech_pad_ms: NotGivenOr[int] = NOT_GIVEN,
language: NotGivenOr[str] = NOT_GIVEN,
buffer_size_seconds: NotGivenOr[float] = NOT_GIVEN) ‑> None-
Expand source code
def update_options( self, *, enable_partial_transcripts: NotGivenOr[bool] = NOT_GIVEN, enable_diarization: NotGivenOr[bool] = NOT_GIVEN, vad_threshold: NotGivenOr[float] = NOT_GIVEN, vad_min_silence_duration_ms: NotGivenOr[int] = NOT_GIVEN, vad_speech_pad_ms: NotGivenOr[int] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, buffer_size_seconds: NotGivenOr[float] = NOT_GIVEN, ) -> None: if is_given(enable_partial_transcripts): self._opts.enable_partial_transcripts = enable_partial_transcripts if is_given(enable_diarization): self._opts.enable_diarization = enable_diarization if is_given(vad_threshold): self._opts.vad_threshold = vad_threshold if is_given(vad_min_silence_duration_ms): self._opts.vad_min_silence_duration_ms = vad_min_silence_duration_ms if is_given(vad_speech_pad_ms): self._opts.vad_speech_pad_ms = vad_speech_pad_ms if is_given(language): self._opts.language = language if is_given(buffer_size_seconds): self._opts.buffer_size_seconds = buffer_size_seconds for stream in self._streams: stream.update_options( enable_partial_transcripts=enable_partial_transcripts, enable_diarization=enable_diarization, vad_threshold=vad_threshold, vad_min_silence_duration_ms=vad_min_silence_duration_ms, vad_speech_pad_ms=vad_speech_pad_ms, language=language, buffer_size_seconds=buffer_size_seconds, )
Inherited members
class SpeechStream (*,
stt: STT,
opts: STTOptions,
conn_options: APIConnectOptions,
api_key: str,
region_override_header: str | None,
model_endpoints: list[str],
models: list[str | None],
active_endpoint_index: int,
model_options: dict,
http_session: aiohttp.ClientSession)-
Expand source code
class SpeechStream(stt.SpeechStream): def __init__( self, *, stt: STT, opts: STTOptions, conn_options: APIConnectOptions, api_key: str, region_override_header: str | None, model_endpoints: list[str], models: list[str | None], active_endpoint_index: int, model_options: dict, http_session: aiohttp.ClientSession, ) -> None: super().__init__(stt=stt, conn_options=conn_options, sample_rate=opts.sample_rate) self._stt_parent: STT = stt self._opts = opts self._api_key = api_key self._region_override_header = region_override_header self._model_endpoints = model_endpoints self._models = models self._active_endpoint_index = active_endpoint_index self._model_options = model_options self._session = http_session self._speech_duration: float = 0 # keep a list of final transcripts to combine them inside the END_OF_SPEECH event self._final_events: list[SpeechEvent] = [] self._reconnect_event = asyncio.Event() def update_options( self, *, enable_partial_transcripts: NotGivenOr[bool] = NOT_GIVEN, enable_diarization: NotGivenOr[bool] = NOT_GIVEN, vad_threshold: NotGivenOr[float] = NOT_GIVEN, vad_min_silence_duration_ms: NotGivenOr[int] = NOT_GIVEN, vad_speech_pad_ms: NotGivenOr[int] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, buffer_size_seconds: NotGivenOr[float] = NOT_GIVEN, ) -> None: if is_given(enable_partial_transcripts): self._opts.enable_partial_transcripts = enable_partial_transcripts if is_given(enable_diarization): self._opts.enable_diarization = enable_diarization if is_given(vad_threshold): self._opts.vad_threshold = vad_threshold if is_given(vad_min_silence_duration_ms): self._opts.vad_min_silence_duration_ms = vad_min_silence_duration_ms if is_given(vad_speech_pad_ms): self._opts.vad_speech_pad_ms = vad_speech_pad_ms if is_given(language): self._opts.language = language if is_given(buffer_size_seconds): self._opts.buffer_size_seconds = buffer_size_seconds self._reconnect_event.set() def _samples_per_buffer(self) -> int: try: buffer_size_seconds = float(self._opts.buffer_size_seconds) except (TypeError, ValueError): buffer_size_seconds = DEFAULT_BUFFER_SIZE_SECONDS if buffer_size_seconds <= 0: buffer_size_seconds = DEFAULT_BUFFER_SIZE_SECONDS return max(1, round(self._opts.sample_rate * buffer_size_seconds)) async def _run(self) -> None: did_failover = False send: asyncio.Task[None] | None = None recv: asyncio.Task[None] | None = None wait_reconnect: asyncio.Task[bool] | None = None immediate_reconnect_attempts: dict[int, int] = {} def current_model() -> str | None: try: return self._models[self._active_endpoint_index] except Exception: return None def next_model() -> str | None: idx = self._active_endpoint_index + 1 if idx < len(self._models): return self._models[idx] return None async def failover(*, exc: BaseException | None) -> bool: from_model = current_model() exc_info = ( (type(exc), exc, exc.__traceback__) if exc is not None and exc.__traceback__ is not None else None ) if self._active_endpoint_index + 1 >= len(self._model_endpoints): logger.error( "STT fallback exhausted: from=%s", from_model, exc_info=exc_info, ) return False to_model = next_model() logger.warning( "STT attempt failed: switching %s -> %s", from_model, to_model, exc_info=exc_info, ) self._active_endpoint_index += 1 return True async def next_audio_frame() -> Any | None: async for item in self._input_ch: if isinstance(item, self._FlushSentinel): continue return item return None async def send_task( ws: aiohttp.ClientWebSocketResponse, *, pending_frames: list[Any] ) -> None: samples_per_buffer = self._samples_per_buffer() bytes_per_sample = bytes_per_frame.get(self._opts.encoding, 2) audio_bstream = utils.audio.AudioByteStream( sample_rate=self._opts.sample_rate, num_channels=1, samples_per_channel=samples_per_buffer, ) for frame in pending_frames: frames = audio_bstream.write(frame.data.tobytes()) for out in frames: if len(out.data) % bytes_per_sample != 0: continue await ws.send_bytes(bytes(out.data)) self._speech_duration += out.duration async for item in self._input_ch: if isinstance(item, self._FlushSentinel): frames = audio_bstream.flush() else: frames = audio_bstream.write(item.data.tobytes()) for frame in frames: if len(frame.data) % bytes_per_sample != 0: continue await ws.send_bytes(bytes(frame.data)) self._speech_duration += frame.duration async def recv_task(ws: aiohttp.ClientWebSocketResponse) -> None: speech_started = False while True: msg = await ws.receive() if msg.type in ( aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE, aiohttp.WSMsgType.CLOSING, ): raise APIStatusError("SLNG connection closed unexpectedly") if msg.type != aiohttp.WSMsgType.TEXT: continue try: data = json.loads(msg.data) except json.JSONDecodeError: logger.debug("[SLNG STT] ignoring non-JSON text frame: %s", msg.data[:200]) continue if not isinstance(data, dict): continue msg_type = data.get("type") if msg_type in ("Metadata", "SpeechStarted", "UtteranceEnd"): continue if msg_type == "Results": is_final_value = data.get("is_final") if isinstance(is_final_value, str): is_final = is_final_value.strip().lower() in ("true", "1") else: is_final = bool(is_final_value) raw_channel = data.get("channel") channel = raw_channel if isinstance(raw_channel, dict) else {} raw_alts = channel.get("alternatives") alternatives = raw_alts if isinstance(raw_alts, list) else [] alt0 = ( alternatives[0] if alternatives and isinstance(alternatives[0], dict) else {} ) data = { "type": "final_transcript" if is_final else "partial_transcript", "transcript": alt0.get("transcript", ""), "confidence": alt0.get("confidence", 0.0), "words": alt0.get("words", []), "language": data.get("language", alt0.get("language")), } msg_type = data["type"] if msg_type == "Error": raise APIStatusError( f"SLNG STT error: {data.get('description') or data.get('message')}" ) if msg_type in ("partial_transcript", "final_transcript"): if ( msg_type == "partial_transcript" and not self._opts.enable_partial_transcripts ): continue text = data.get("transcript", "") is_final = msg_type == "final_transcript" if not text: # Empty-text final still consumed audio at the gateway; # emit the usage metric so billed audio gets reported. if is_final and self._speech_duration > 0: self._event_ch.send_nowait( stt.SpeechEvent( type=stt.SpeechEventType.RECOGNITION_USAGE, alternatives=[], recognition_usage=stt.RecognitionUsage( audio_duration=self._speech_duration, ), ) ) self._speech_duration = 0 continue confidence = data.get("confidence", 0.0) language = data.get("language", self._opts.language) words = data.get("words", []) # Emit START_OF_SPEECH on first transcript (interim or final) if not speech_started: speech_started = True self._event_ch.send_nowait( stt.SpeechEvent(type=stt.SpeechEventType.START_OF_SPEECH) ) if is_final: start_time = words[0].get("start", 0.0) if words else 0.0 end_time = words[-1].get("end", 0.0) if words else 0.0 else: start_time = 0.0 end_time = 0.0 event = stt.SpeechEvent( type=stt.SpeechEventType.FINAL_TRANSCRIPT if is_final else stt.SpeechEventType.INTERIM_TRANSCRIPT, alternatives=[ stt.SpeechData( language=language, text=text, confidence=confidence, start_time=start_time, end_time=end_time, ) ], ) self._event_ch.send_nowait(event) # Emit END_OF_SPEECH after each final transcript. # Note: the gateway may send multiple final_transcript messages # per utterance (e.g., sentence-by-sentence). Each final is # treated as a completed segment, so START/END bracket each one. if is_final: self._event_ch.send_nowait( stt.SpeechEvent(type=stt.SpeechEventType.END_OF_SPEECH) ) speech_started = False if self._speech_duration > 0: self._event_ch.send_nowait( stt.SpeechEvent( type=stt.SpeechEventType.RECOGNITION_USAGE, alternatives=[], recognition_usage=stt.RecognitionUsage( audio_duration=self._speech_duration, ), ) ) self._speech_duration = 0 while True: send = None recv = None wait_reconnect = None first = await next_audio_frame() if first is None: return pending_frames: list[Any] = [first] endpoint = self._model_endpoints[self._active_endpoint_index] model = current_model() ws: aiohttp.ClientWebSocketResponse | None = None try: ws = await self._connect_ws(model_endpoint=endpoint, model=model) if did_failover: logger.info("STT switched to fallback: model=%s", model) # Propagate successful failover to parent so new streams # start from the working endpoint. self._stt_parent._set_active_endpoint_index(self._active_endpoint_index) did_failover = False send = asyncio.create_task(send_task(ws, pending_frames=pending_frames)) recv = asyncio.create_task(recv_task(ws)) wait_reconnect = asyncio.create_task(self._reconnect_event.wait()) tasks_group = asyncio.gather(send, recv) done, _ = await asyncio.wait( (tasks_group, wait_reconnect), return_when=asyncio.FIRST_COMPLETED, ) if wait_reconnect in done: self._reconnect_event.clear() tasks_group.cancel() await utils.aio.gracefully_cancel(send, recv, wait_reconnect) continue for task in done: task.result() await utils.aio.gracefully_cancel(wait_reconnect) return except Exception as exc: tasks = [t for t in (send, recv, wait_reconnect) if t is not None] if tasks: with contextlib.suppress(Exception): await utils.aio.gracefully_cancel(*tasks) if isinstance(exc, APIStatusError): status_code = _safe_error_code(exc) is_permanent_client_error = ( status_code is not None and 400 <= status_code < 500 and status_code != 429 ) endpoint_index = self._active_endpoint_index attempts = immediate_reconnect_attempts.get(endpoint_index, 0) if not is_permanent_client_error and attempts < MAX_IMMEDIATE_RETRIES: immediate_reconnect_attempts[endpoint_index] = attempts + 1 continue if not await failover(exc=exc): raise did_failover = True immediate_reconnect_attempts[self._active_endpoint_index] = 0 continue finally: if ws is not None: await ws.close() async def _connect_ws( self, *, model_endpoint: str, model: str | None ) -> aiohttp.ClientWebSocketResponse: # Match e2e test headers exactly - send both Authorization and X-API-Key headers = { "Authorization": f"Bearer {self._api_key}", "X-API-Key": self._api_key, } if self._region_override_header: headers["X-Region-Override"] = self._region_override_header # Don't enable compression - e2e tests work without it and compress=15 # was causing handshake errors with Deepgram Nova endpoint try: ws = await asyncio.wait_for( self._session.ws_connect(model_endpoint, headers=headers), self._conn_options.timeout, ) except (aiohttp.ClientConnectorError, asyncio.TimeoutError) as e: raise APIConnectionError("failed to connect to SLNG STT") from e init_message = build_stt_init_payload( model=model, language=self._opts.language, sample_rate=self._opts.sample_rate, encoding=self._opts.encoding, vad_threshold=self._opts.vad_threshold, vad_min_silence_duration_ms=self._opts.vad_min_silence_duration_ms, vad_speech_pad_ms=self._opts.vad_speech_pad_ms, enable_diarization=self._opts.enable_diarization, enable_partial_transcripts=self._opts.enable_partial_transcripts, min_speakers=self._opts.min_speakers, max_speakers=self._opts.max_speakers, model_options=self._model_options, ) try: await ws.send_str(json.dumps(init_message)) except Exception: await ws.close() raise return wsHelper class that provides a standard way to create an ABC using inheritance.
Args: sample_rate : int or None, optional The desired sample rate for the audio input. If specified, the audio input will be automatically resampled to match the given sample rate before being processed for Speech-to-Text. If not provided (None), the input will retain its original sample rate.
Ancestors
- livekit.agents.stt.stt.RecognizeStream
- abc.ABC
Methods
def update_options(self,
*,
enable_partial_transcripts: NotGivenOr[bool] = NOT_GIVEN,
enable_diarization: NotGivenOr[bool] = NOT_GIVEN,
vad_threshold: NotGivenOr[float] = NOT_GIVEN,
vad_min_silence_duration_ms: NotGivenOr[int] = NOT_GIVEN,
vad_speech_pad_ms: NotGivenOr[int] = NOT_GIVEN,
language: NotGivenOr[str] = NOT_GIVEN,
buffer_size_seconds: NotGivenOr[float] = NOT_GIVEN) ‑> None-
Expand source code
def update_options( self, *, enable_partial_transcripts: NotGivenOr[bool] = NOT_GIVEN, enable_diarization: NotGivenOr[bool] = NOT_GIVEN, vad_threshold: NotGivenOr[float] = NOT_GIVEN, vad_min_silence_duration_ms: NotGivenOr[int] = NOT_GIVEN, vad_speech_pad_ms: NotGivenOr[int] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, buffer_size_seconds: NotGivenOr[float] = NOT_GIVEN, ) -> None: if is_given(enable_partial_transcripts): self._opts.enable_partial_transcripts = enable_partial_transcripts if is_given(enable_diarization): self._opts.enable_diarization = enable_diarization if is_given(vad_threshold): self._opts.vad_threshold = vad_threshold if is_given(vad_min_silence_duration_ms): self._opts.vad_min_silence_duration_ms = vad_min_silence_duration_ms if is_given(vad_speech_pad_ms): self._opts.vad_speech_pad_ms = vad_speech_pad_ms if is_given(language): self._opts.language = language if is_given(buffer_size_seconds): self._opts.buffer_size_seconds = buffer_size_seconds self._reconnect_event.set()
class TTS (*,
api_key: str | None = None,
model: str = 'deepgram/aura:2',
model_endpoint: str | None = None,
slng_base_url: str = 'api.slng.ai',
region_override: str | list[str] | None = None,
voice: str = 'default',
language: str = 'en',
sample_rate: int = 24000,
speed: float = 1.0,
word_tokenizer: NotGivenOr[tokenize.WordTokenizer] = NOT_GIVEN,
http_session: aiohttp.ClientSession | None = None,
**model_options: object)-
Expand source code
class TTS(tts.TTS): def __init__( self, *, api_key: str | None = None, model: str = "deepgram/aura:2", model_endpoint: str | None = None, slng_base_url: str = "api.slng.ai", region_override: str | list[str] | None = None, voice: str = "default", language: str = "en", sample_rate: int = 24000, speed: float = 1.0, word_tokenizer: NotGivenOr[tokenize.WordTokenizer] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, **model_options: object, ) -> None: """ Create a new instance of SLNG TTS (based on Deepgram's architecture). Args: model (str): SLNG model identifier (e.g., "deepgram/aura:2"). model_endpoint (str): Optional full SLNG WebSocket endpoint. slng_base_url (str): SLNG gateway host. Defaults to "api.slng.ai". region_override (str | list[str] | None): Optional gateway region override. voice (str): Voice to use. Defaults to "default". language (str): Language code. Defaults to "en". sample_rate (int): Sample rate of audio. Defaults to 24000. api_key (str): SLNG API key. Falls back to SLNG_API_KEY environment variable. word_tokenizer (tokenize.WordTokenizer): Tokenizer for processing text. Defaults to basic WordTokenizer. http_session (aiohttp.ClientSession): Optional aiohttp session to use for requests. """ # Resolve api_key from parameter or SLNG_API_KEY env var resolved_key = api_key or os.environ.get("SLNG_API_KEY") if not resolved_key: raise ValueError("api_key is required, or set SLNG_API_KEY environment variable") super().__init__( capabilities=tts.TTSCapabilities(streaming=True), sample_rate=sample_rate, num_channels=NUM_CHANNELS, ) if not is_given(word_tokenizer): word_tokenizer = tokenize.basic.WordTokenizer(ignore_punctuation=False) resolved_model_endpoint = model_endpoint or _default_tts_endpoint( slng_base_url=slng_base_url, model=model, ) voice = normalize_tts_voice(model, voice) # _TTSOptions.encoding defaults to "linear16" because LiveKit expects raw PCM and # some SLNG models default to MP3 unless explicitly requested. self._opts = _TTSOptions( model_endpoint=resolved_model_endpoint, model=model, voice=voice, language=language, sample_rate=sample_rate, speed=speed, word_tokenizer=word_tokenizer, api_key=resolved_key, model_options=dict(model_options), ) self._region_override_header = normalize_region_override(region_override) self._session = http_session self._streams = weakref.WeakSet[SynthesizeStream]() self._pool = utils.ConnectionPool[aiohttp.ClientWebSocketResponse]( connect_cb=self._connect_ws, close_cb=self._close_ws, max_session_duration=3600, # 1 hour mark_refreshed_on_get=False, ) @property def model(self) -> str: return "slng" @property def provider(self) -> str: return "SLNG" async def _connect_ws(self, timeout: float) -> aiohttp.ClientWebSocketResponse: session = self._ensure_session() # Connect to WebSocket model_endpoint = self._opts.model_endpoint headers = { "Authorization": f"Bearer {self._opts.api_key}", "X-API-Key": self._opts.api_key, } if self._region_override_header: headers["X-Region-Override"] = self._region_override_header ws = await asyncio.wait_for( session.ws_connect( model_endpoint, headers=headers, ), timeout, ) # SLNG-specific: Send init and wait for ready init_payload = build_tts_init_payload( model=self._opts.model, voice=self._opts.voice, language=self._opts.language, sample_rate=self._opts.sample_rate, encoding=self._opts.encoding, speed=self._opts.speed, model_options=self._opts.model_options, ) try: await ws.send_str(json.dumps(init_payload)) except Exception: await ws.close() raise return ws async def _close_ws(self, ws: aiohttp.ClientWebSocketResponse) -> None: try: # Send final flush (similar to Deepgram's Flush+Close pattern). # Arcana-specific cancel/EOS is handled in the streaming send_task when bypassing # the connection pool. await ws.send_str(SynthesizeStream._FLUSH_MSG) # Wait for server acknowledgment with contextlib.suppress(asyncio.TimeoutError): await asyncio.wait_for(ws.receive(), timeout=5.0) except Exception as e: logger.warning(f"[SLNG TTS] error during WebSocket close sequence: {e}") finally: await ws.close() def _ensure_session(self) -> aiohttp.ClientSession: if not self._session: self._session = utils.http_context.http_session() return self._session def update_options( self, *, voice: NotGivenOr[str] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, ) -> None: """ Args: voice (str): Voice to use. language (str): Language code. """ invalidate_pool = False if is_given(voice): voice = normalize_tts_voice(self._opts.model, voice) invalidate_pool = invalidate_pool or self._opts.voice != voice self._opts.voice = voice if is_given(language): invalidate_pool = invalidate_pool or self._opts.language != language self._opts.language = language if invalidate_pool: self._pool.invalidate() def synthesize( self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> ChunkedStream: return ChunkedStream(tts=self, input_text=text, conn_options=conn_options) def stream( self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS ) -> SynthesizeStream: stream = SynthesizeStream(tts=self, conn_options=conn_options) self._streams.add(stream) return stream def prewarm(self) -> None: self._pool.prewarm() async def aclose(self) -> None: for stream in list(self._streams): await stream.aclose() self._streams.clear() await self._pool.aclose()Helper class that provides a standard way to create an ABC using inheritance.
Create a new instance of SLNG TTS (based on Deepgram's architecture).
Args
model:str- SLNG model identifier (e.g., "deepgram/aura:2").
model_endpoint:str- Optional full SLNG WebSocket endpoint.
slng_base_url:str- SLNG gateway host. Defaults to "api.slng.ai".
region_override:str | list[str] | None- Optional gateway region override.
voice:str- Voice to use. Defaults to "default".
language:str- Language code. Defaults to "en".
sample_rate:int- Sample rate of audio. Defaults to 24000.
api_key:str- SLNG API key. Falls back to SLNG_API_KEY environment variable.
word_tokenizer:tokenize.WordTokenizer- Tokenizer for processing text. Defaults to basic WordTokenizer.
http_session:aiohttp.ClientSession- Optional aiohttp session to use for requests.
Ancestors
- livekit.agents.tts.tts.TTS
- abc.ABC
- EventEmitter
- typing.Generic
Instance variables
prop model : str-
Expand source code
@property def model(self) -> str: return "slng"Get the model name/identifier for this TTS instance.
Returns
The model name if available, "unknown" otherwise.
Note
Plugins should override this property to provide their model information.
prop provider : str-
Expand source code
@property def provider(self) -> str: return "SLNG"Get the provider name/identifier for this TTS instance.
Returns
The provider name if available, "unknown" otherwise.
Note
Plugins should override this property to provide their provider information.
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: for stream in list(self._streams): await stream.aclose() self._streams.clear() await self._pool.aclose() def prewarm(self) ‑> None-
Expand source code
def prewarm(self) -> None: self._pool.prewarm()Pre-warm connection to the TTS service
def stream(self,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.slng.tts.SynthesizeStream-
Expand source code
def stream( self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS ) -> SynthesizeStream: stream = SynthesizeStream(tts=self, conn_options=conn_options) self._streams.add(stream) return stream def synthesize(self,
text: str,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.slng.tts.ChunkedStream-
Expand source code
def synthesize( self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> ChunkedStream: return ChunkedStream(tts=self, input_text=text, conn_options=conn_options) def update_options(self,
*,
voice: NotGivenOr[str] = NOT_GIVEN,
language: NotGivenOr[str] = NOT_GIVEN) ‑> None-
Expand source code
def update_options( self, *, voice: NotGivenOr[str] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, ) -> None: """ Args: voice (str): Voice to use. language (str): Language code. """ invalidate_pool = False if is_given(voice): voice = normalize_tts_voice(self._opts.model, voice) invalidate_pool = invalidate_pool or self._opts.voice != voice self._opts.voice = voice if is_given(language): invalidate_pool = invalidate_pool or self._opts.language != language self._opts.language = language if invalidate_pool: self._pool.invalidate()Args
voice:str- Voice to use.
language:str- Language code.
Inherited members