Module livekit.plugins.slng

SLNG plugin for LiveKit Agents

STT and TTS adapters for SLNG gateway models.

See https://docs.slng.ai/ for more information.

Classes

class STT (*,
api_key: str | None = None,
model: str = 'deepgram/nova:3',
model_endpoint: str | None = None,
model_endpoints: list[str] | None = None,
slng_base_url: str = 'api.slng.ai',
region_override: str | list[str] | None = None,
sample_rate: int = 16000,
encoding: NotGivenOr[STTEncoding] = NOT_GIVEN,
buffer_size_seconds: float = 0.064,
enable_partial_transcripts: bool = True,
vad_threshold: float = 0.5,
vad_min_silence_duration_ms: int = 300,
vad_speech_pad_ms: int = 30,
enable_diarization: bool = False,
min_speakers: int | None = None,
max_speakers: int | None = None,
language: str = 'en',
http_session: aiohttp.ClientSession | None = None,
**model_options: Any)
Expand source code
class STT(stt.STT):
    def __init__(
        self,
        *,
        api_key: str | None = None,
        model: str = "deepgram/nova:3",
        model_endpoint: str | None = None,
        model_endpoints: list[str] | None = None,
        slng_base_url: str = "api.slng.ai",
        region_override: str | list[str] | None = None,
        sample_rate: int = 16000,
        encoding: NotGivenOr[STTEncoding] = NOT_GIVEN,
        buffer_size_seconds: float = DEFAULT_BUFFER_SIZE_SECONDS,
        # Common SLNG options
        enable_partial_transcripts: bool = True,
        vad_threshold: float = 0.5,
        vad_min_silence_duration_ms: int = 300,
        vad_speech_pad_ms: int = 30,
        enable_diarization: bool = False,
        min_speakers: int | None = None,
        max_speakers: int | None = None,
        language: str = "en",
        http_session: aiohttp.ClientSession | None = None,
        **model_options: Any,
    ) -> None:
        """
        Initialize SLNG STT.

        Args:
            api_key: SLNG API key for authentication.
            model: SLNG model identifier (for example "deepgram/nova:3")
            model_endpoint: Optional full SLNG WebSocket endpoint URL
            model_endpoints: Optional fallback STT endpoints
            slng_base_url: SLNG gateway host. Defaults to "api.slng.ai"
            region_override: Optional gateway region override sent as X-Region-Override.
            sample_rate: Audio sample rate (default: 16000)
            encoding: Audio encoding format
            buffer_size_seconds: Buffer size in seconds
            enable_partial_transcripts: Enable interim results
            vad_threshold: Voice activity detection threshold
            vad_min_silence_duration_ms: Min silence duration for VAD
            vad_speech_pad_ms: Speech padding for VAD
            enable_diarization: Enable speaker identification
            min_speakers: Minimum speakers for diarization
            max_speakers: Maximum speakers for diarization
            language: Language code (default: "en")
            http_session: Optional HTTP session
            **model_options: Model-specific options (e.g., whisper_params={"task": "translate"})
        """
        resolved_key = api_key or os.environ.get("SLNG_API_KEY")
        if not resolved_key:
            raise ValueError("api_key is required, or set SLNG_API_KEY environment variable")

        # Detect if endpoint supports streaming (WebSocket endpoints do)
        # - streaming=True: Supports real-time streaming (WebSocket only)
        # - streaming=False: HTTP batch recognition only
        resolved_model_endpoint = model_endpoint or _default_stt_endpoint(
            slng_base_url=slng_base_url,
            model=model,
        )
        endpoints = list(
            model_endpoints
            or [
                resolved_model_endpoint,
            ]
        )
        if not endpoints:
            endpoints = [resolved_model_endpoint]
        primary_endpoint = endpoints[0]

        is_streaming = primary_endpoint.startswith("ws://") or primary_endpoint.startswith("wss://")

        super().__init__(
            capabilities=stt.STTCapabilities(
                streaming=is_streaming,
                interim_results=is_streaming,
                offline_recognize=not is_streaming,
            ),
        )

        self._api_key = resolved_key
        self._region_override_header = normalize_region_override(region_override)
        self._model_endpoints = endpoints
        self._active_endpoint_index = 0
        self._model_endpoint = endpoints[0]
        self._models = [_extract_model_from_endpoint(e) for e in endpoints]
        self._model = (
            self._models[0] if self._models else _extract_model_from_endpoint(primary_endpoint)
        )

        self._opts = STTOptions(
            sample_rate=sample_rate,
            buffer_size_seconds=buffer_size_seconds,
            enable_partial_transcripts=enable_partial_transcripts,
            vad_threshold=vad_threshold,
            vad_min_silence_duration_ms=vad_min_silence_duration_ms,
            vad_speech_pad_ms=vad_speech_pad_ms,
            enable_diarization=enable_diarization,
            min_speakers=min_speakers,
            max_speakers=max_speakers,
            language=language,
        )

        if is_given(encoding):
            self._opts.encoding = encoding

        # Store any extra model-specific options
        self._model_options = model_options

        self._session = http_session
        self._streams = weakref.WeakSet[SpeechStream]()

    def _set_active_endpoint_index(self, index: int) -> None:
        """Update the active endpoint index (called by SpeechStream after successful failover)."""
        self._active_endpoint_index = index

    @property
    def model(self) -> str:
        return "slng"

    @property
    def provider(self) -> str:
        return "SLNG"

    @property
    def session(self) -> aiohttp.ClientSession:
        if not self._session:
            self._session = utils.http_context.http_session()
        return self._session

    async def _recognize_impl(
        self,
        buffer: AudioBuffer,
        *,
        language: NotGivenOr[str] = NOT_GIVEN,
        conn_options: APIConnectOptions,
    ) -> stt.SpeechEvent:
        """
        HTTP batch recognition for non-streaming STT.

        Converts audio buffer to base64 and sends to SLNG HTTP endpoint.
        """
        # Use language from parameter or fall back to instance default
        lang = language if is_given(language) else self._opts.language

        # Convert AudioBuffer to bytes
        audio_data = rtc.combine_audio_frames(buffer).data.tobytes()

        # Encode as base64
        audio_b64 = base64.b64encode(audio_data).decode("utf-8")

        # Prepare request payload
        payload = {
            "audio_b64": audio_b64,
            "language": lang,
        }

        # Add any model-specific options
        if self._model_options:
            payload.update(self._model_options)

        try:
            async with self.session.post(
                self._model_endpoint,
                headers={
                    "Authorization": f"Bearer {self._api_key}",
                    "Content-Type": "application/json",
                    **(
                        {"X-Region-Override": self._region_override_header}
                        if self._region_override_header
                        else {}
                    ),
                },
                json=payload,
                timeout=aiohttp.ClientTimeout(
                    total=conn_options.timeout, sock_connect=conn_options.timeout
                ),
            ) as resp:
                if resp.status != 200:
                    error_text = await resp.text()
                    logger.error(f"[SLNG STT] HTTP error {resp.status}: {error_text}")
                    raise APIStatusError(
                        f"SLNG STT HTTP error {resp.status}: {error_text}",
                        status_code=resp.status,
                    )

                data = await resp.json()

                # Extract transcription from response
                # Expected format: {"text": "...", "language": "en", "segments": [...]}
                text = data.get("text", "")
                detected_language = data.get("language", lang)
                segments = data.get("segments", [])

                # Calculate start and end times from segments
                start_time = segments[0].get("start", 0.0) if segments else 0.0
                end_time = segments[-1].get("end", 0.0) if segments else 0.0

                # Build SpeechEvent
                return stt.SpeechEvent(
                    type=stt.SpeechEventType.FINAL_TRANSCRIPT,
                    alternatives=[
                        stt.SpeechData(
                            language=detected_language,
                            text=text,
                            confidence=1.0,  # SLNG doesn't provide confidence in HTTP mode
                            start_time=start_time,
                            end_time=end_time,
                        )
                    ],
                )

        except aiohttp.ClientError as e:
            logger.error(f"[SLNG STT] HTTP connection error: {e}")
            raise APIConnectionError(f"SLNG STT HTTP connection error: {e}") from e
        except asyncio.TimeoutError:
            logger.error("[SLNG STT] HTTP request timed out")
            raise APITimeoutError("SLNG STT HTTP request timed out") from None
        except APIStatusError:
            raise
        except Exception as e:
            logger.error(f"[SLNG STT] HTTP unexpected error: {e}", exc_info=True)
            raise APIStatusError(f"SLNG STT HTTP error: {e}") from e

    def stream(
        self,
        *,
        language: NotGivenOr[str] = NOT_GIVEN,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    ) -> SpeechStream:
        config = dataclasses.replace(self._opts)
        if is_given(language):
            config.language = language
        stream = SpeechStream(
            stt=self,
            conn_options=conn_options,
            opts=config,
            api_key=self._api_key,
            region_override_header=self._region_override_header,
            model_endpoints=self._model_endpoints,
            models=self._models,
            active_endpoint_index=self._active_endpoint_index,
            model_options=self._model_options,
            http_session=self.session,
        )
        self._streams.add(stream)
        return stream

    def update_options(
        self,
        *,
        enable_partial_transcripts: NotGivenOr[bool] = NOT_GIVEN,
        enable_diarization: NotGivenOr[bool] = NOT_GIVEN,
        vad_threshold: NotGivenOr[float] = NOT_GIVEN,
        vad_min_silence_duration_ms: NotGivenOr[int] = NOT_GIVEN,
        vad_speech_pad_ms: NotGivenOr[int] = NOT_GIVEN,
        language: NotGivenOr[str] = NOT_GIVEN,
        buffer_size_seconds: NotGivenOr[float] = NOT_GIVEN,
    ) -> None:
        if is_given(enable_partial_transcripts):
            self._opts.enable_partial_transcripts = enable_partial_transcripts
        if is_given(enable_diarization):
            self._opts.enable_diarization = enable_diarization
        if is_given(vad_threshold):
            self._opts.vad_threshold = vad_threshold
        if is_given(vad_min_silence_duration_ms):
            self._opts.vad_min_silence_duration_ms = vad_min_silence_duration_ms
        if is_given(vad_speech_pad_ms):
            self._opts.vad_speech_pad_ms = vad_speech_pad_ms
        if is_given(language):
            self._opts.language = language
        if is_given(buffer_size_seconds):
            self._opts.buffer_size_seconds = buffer_size_seconds

        for stream in self._streams:
            stream.update_options(
                enable_partial_transcripts=enable_partial_transcripts,
                enable_diarization=enable_diarization,
                vad_threshold=vad_threshold,
                vad_min_silence_duration_ms=vad_min_silence_duration_ms,
                vad_speech_pad_ms=vad_speech_pad_ms,
                language=language,
                buffer_size_seconds=buffer_size_seconds,
            )

Helper class that provides a standard way to create an ABC using inheritance.

Initialize SLNG STT.

Args

api_key
SLNG API key for authentication.
model
SLNG model identifier (for example "deepgram/nova:3")
model_endpoint
Optional full SLNG WebSocket endpoint URL
model_endpoints
Optional fallback STT endpoints
slng_base_url
SLNG gateway host. Defaults to "api.slng.ai"
region_override
Optional gateway region override sent as X-Region-Override.
sample_rate
Audio sample rate (default: 16000)
encoding
Audio encoding format
buffer_size_seconds
Buffer size in seconds
enable_partial_transcripts
Enable interim results
vad_threshold
Voice activity detection threshold
vad_min_silence_duration_ms
Min silence duration for VAD
vad_speech_pad_ms
Speech padding for VAD
enable_diarization
Enable speaker identification
min_speakers
Minimum speakers for diarization
max_speakers
Maximum speakers for diarization
language
Language code (default: "en")
http_session
Optional HTTP session
**model_options
Model-specific options (e.g., whisper_params={"task": "translate"})

Ancestors

  • livekit.agents.stt.stt.STT
  • abc.ABC
  • EventEmitter
  • typing.Generic

Instance variables

prop model : str
Expand source code
@property
def model(self) -> str:
    return "slng"

Get the model name/identifier for this STT instance.

Returns

The model name if available, "unknown" otherwise.

Note

Plugins should override this property to provide their model information.

prop provider : str
Expand source code
@property
def provider(self) -> str:
    return "SLNG"

Get the provider name/identifier for this STT instance.

Returns

The provider name if available, "unknown" otherwise.

Note

Plugins should override this property to provide their provider information.

prop session : aiohttp.ClientSession
Expand source code
@property
def session(self) -> aiohttp.ClientSession:
    if not self._session:
        self._session = utils.http_context.http_session()
    return self._session

Methods

def stream(self,
*,
language: NotGivenOr[str] = NOT_GIVEN,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.slng.stt.SpeechStream
Expand source code
def stream(
    self,
    *,
    language: NotGivenOr[str] = NOT_GIVEN,
    conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
) -> SpeechStream:
    config = dataclasses.replace(self._opts)
    if is_given(language):
        config.language = language
    stream = SpeechStream(
        stt=self,
        conn_options=conn_options,
        opts=config,
        api_key=self._api_key,
        region_override_header=self._region_override_header,
        model_endpoints=self._model_endpoints,
        models=self._models,
        active_endpoint_index=self._active_endpoint_index,
        model_options=self._model_options,
        http_session=self.session,
    )
    self._streams.add(stream)
    return stream
def update_options(self,
*,
enable_partial_transcripts: NotGivenOr[bool] = NOT_GIVEN,
enable_diarization: NotGivenOr[bool] = NOT_GIVEN,
vad_threshold: NotGivenOr[float] = NOT_GIVEN,
vad_min_silence_duration_ms: NotGivenOr[int] = NOT_GIVEN,
vad_speech_pad_ms: NotGivenOr[int] = NOT_GIVEN,
language: NotGivenOr[str] = NOT_GIVEN,
buffer_size_seconds: NotGivenOr[float] = NOT_GIVEN) ‑> None
Expand source code
def update_options(
    self,
    *,
    enable_partial_transcripts: NotGivenOr[bool] = NOT_GIVEN,
    enable_diarization: NotGivenOr[bool] = NOT_GIVEN,
    vad_threshold: NotGivenOr[float] = NOT_GIVEN,
    vad_min_silence_duration_ms: NotGivenOr[int] = NOT_GIVEN,
    vad_speech_pad_ms: NotGivenOr[int] = NOT_GIVEN,
    language: NotGivenOr[str] = NOT_GIVEN,
    buffer_size_seconds: NotGivenOr[float] = NOT_GIVEN,
) -> None:
    if is_given(enable_partial_transcripts):
        self._opts.enable_partial_transcripts = enable_partial_transcripts
    if is_given(enable_diarization):
        self._opts.enable_diarization = enable_diarization
    if is_given(vad_threshold):
        self._opts.vad_threshold = vad_threshold
    if is_given(vad_min_silence_duration_ms):
        self._opts.vad_min_silence_duration_ms = vad_min_silence_duration_ms
    if is_given(vad_speech_pad_ms):
        self._opts.vad_speech_pad_ms = vad_speech_pad_ms
    if is_given(language):
        self._opts.language = language
    if is_given(buffer_size_seconds):
        self._opts.buffer_size_seconds = buffer_size_seconds

    for stream in self._streams:
        stream.update_options(
            enable_partial_transcripts=enable_partial_transcripts,
            enable_diarization=enable_diarization,
            vad_threshold=vad_threshold,
            vad_min_silence_duration_ms=vad_min_silence_duration_ms,
            vad_speech_pad_ms=vad_speech_pad_ms,
            language=language,
            buffer_size_seconds=buffer_size_seconds,
        )

Inherited members

class SpeechStream (*,
stt: STT,
opts: STTOptions,
conn_options: APIConnectOptions,
api_key: str,
region_override_header: str | None,
model_endpoints: list[str],
models: list[str | None],
active_endpoint_index: int,
model_options: dict,
http_session: aiohttp.ClientSession)
Expand source code
class SpeechStream(stt.SpeechStream):
    def __init__(
        self,
        *,
        stt: STT,
        opts: STTOptions,
        conn_options: APIConnectOptions,
        api_key: str,
        region_override_header: str | None,
        model_endpoints: list[str],
        models: list[str | None],
        active_endpoint_index: int,
        model_options: dict,
        http_session: aiohttp.ClientSession,
    ) -> None:
        super().__init__(stt=stt, conn_options=conn_options, sample_rate=opts.sample_rate)

        self._stt_parent: STT = stt
        self._opts = opts
        self._api_key = api_key
        self._region_override_header = region_override_header
        self._model_endpoints = model_endpoints
        self._models = models
        self._active_endpoint_index = active_endpoint_index
        self._model_options = model_options
        self._session = http_session
        self._speech_duration: float = 0

        # keep a list of final transcripts to combine them inside the END_OF_SPEECH event
        self._final_events: list[SpeechEvent] = []
        self._reconnect_event = asyncio.Event()

    def update_options(
        self,
        *,
        enable_partial_transcripts: NotGivenOr[bool] = NOT_GIVEN,
        enable_diarization: NotGivenOr[bool] = NOT_GIVEN,
        vad_threshold: NotGivenOr[float] = NOT_GIVEN,
        vad_min_silence_duration_ms: NotGivenOr[int] = NOT_GIVEN,
        vad_speech_pad_ms: NotGivenOr[int] = NOT_GIVEN,
        language: NotGivenOr[str] = NOT_GIVEN,
        buffer_size_seconds: NotGivenOr[float] = NOT_GIVEN,
    ) -> None:
        if is_given(enable_partial_transcripts):
            self._opts.enable_partial_transcripts = enable_partial_transcripts
        if is_given(enable_diarization):
            self._opts.enable_diarization = enable_diarization
        if is_given(vad_threshold):
            self._opts.vad_threshold = vad_threshold
        if is_given(vad_min_silence_duration_ms):
            self._opts.vad_min_silence_duration_ms = vad_min_silence_duration_ms
        if is_given(vad_speech_pad_ms):
            self._opts.vad_speech_pad_ms = vad_speech_pad_ms
        if is_given(language):
            self._opts.language = language
        if is_given(buffer_size_seconds):
            self._opts.buffer_size_seconds = buffer_size_seconds

        self._reconnect_event.set()

    def _samples_per_buffer(self) -> int:
        try:
            buffer_size_seconds = float(self._opts.buffer_size_seconds)
        except (TypeError, ValueError):
            buffer_size_seconds = DEFAULT_BUFFER_SIZE_SECONDS

        if buffer_size_seconds <= 0:
            buffer_size_seconds = DEFAULT_BUFFER_SIZE_SECONDS

        return max(1, round(self._opts.sample_rate * buffer_size_seconds))

    async def _run(self) -> None:
        did_failover = False
        send: asyncio.Task[None] | None = None
        recv: asyncio.Task[None] | None = None
        wait_reconnect: asyncio.Task[bool] | None = None
        immediate_reconnect_attempts: dict[int, int] = {}

        def current_model() -> str | None:
            try:
                return self._models[self._active_endpoint_index]
            except Exception:
                return None

        def next_model() -> str | None:
            idx = self._active_endpoint_index + 1
            if idx < len(self._models):
                return self._models[idx]
            return None

        async def failover(*, exc: BaseException | None) -> bool:
            from_model = current_model()
            exc_info = (
                (type(exc), exc, exc.__traceback__)
                if exc is not None and exc.__traceback__ is not None
                else None
            )
            if self._active_endpoint_index + 1 >= len(self._model_endpoints):
                logger.error(
                    "STT fallback exhausted: from=%s",
                    from_model,
                    exc_info=exc_info,
                )
                return False

            to_model = next_model()
            logger.warning(
                "STT attempt failed: switching %s -> %s",
                from_model,
                to_model,
                exc_info=exc_info,
            )
            self._active_endpoint_index += 1
            return True

        async def next_audio_frame() -> Any | None:
            async for item in self._input_ch:
                if isinstance(item, self._FlushSentinel):
                    continue
                return item
            return None

        async def send_task(
            ws: aiohttp.ClientWebSocketResponse, *, pending_frames: list[Any]
        ) -> None:
            samples_per_buffer = self._samples_per_buffer()
            bytes_per_sample = bytes_per_frame.get(self._opts.encoding, 2)
            audio_bstream = utils.audio.AudioByteStream(
                sample_rate=self._opts.sample_rate,
                num_channels=1,
                samples_per_channel=samples_per_buffer,
            )

            for frame in pending_frames:
                frames = audio_bstream.write(frame.data.tobytes())
                for out in frames:
                    if len(out.data) % bytes_per_sample != 0:
                        continue
                    await ws.send_bytes(bytes(out.data))
                    self._speech_duration += out.duration

            async for item in self._input_ch:
                if isinstance(item, self._FlushSentinel):
                    frames = audio_bstream.flush()
                else:
                    frames = audio_bstream.write(item.data.tobytes())

                for frame in frames:
                    if len(frame.data) % bytes_per_sample != 0:
                        continue
                    await ws.send_bytes(bytes(frame.data))
                    self._speech_duration += frame.duration

        async def recv_task(ws: aiohttp.ClientWebSocketResponse) -> None:
            speech_started = False

            while True:
                msg = await ws.receive()
                if msg.type in (
                    aiohttp.WSMsgType.CLOSED,
                    aiohttp.WSMsgType.CLOSE,
                    aiohttp.WSMsgType.CLOSING,
                ):
                    raise APIStatusError("SLNG connection closed unexpectedly")

                if msg.type != aiohttp.WSMsgType.TEXT:
                    continue

                try:
                    data = json.loads(msg.data)
                except json.JSONDecodeError:
                    logger.debug("[SLNG STT] ignoring non-JSON text frame: %s", msg.data[:200])
                    continue
                if not isinstance(data, dict):
                    continue

                msg_type = data.get("type")
                if msg_type in ("Metadata", "SpeechStarted", "UtteranceEnd"):
                    continue

                if msg_type == "Results":
                    is_final_value = data.get("is_final")
                    if isinstance(is_final_value, str):
                        is_final = is_final_value.strip().lower() in ("true", "1")
                    else:
                        is_final = bool(is_final_value)
                    raw_channel = data.get("channel")
                    channel = raw_channel if isinstance(raw_channel, dict) else {}
                    raw_alts = channel.get("alternatives")
                    alternatives = raw_alts if isinstance(raw_alts, list) else []
                    alt0 = (
                        alternatives[0]
                        if alternatives and isinstance(alternatives[0], dict)
                        else {}
                    )
                    data = {
                        "type": "final_transcript" if is_final else "partial_transcript",
                        "transcript": alt0.get("transcript", ""),
                        "confidence": alt0.get("confidence", 0.0),
                        "words": alt0.get("words", []),
                        "language": data.get("language", alt0.get("language")),
                    }
                    msg_type = data["type"]

                if msg_type == "Error":
                    raise APIStatusError(
                        f"SLNG STT error: {data.get('description') or data.get('message')}"
                    )

                if msg_type in ("partial_transcript", "final_transcript"):
                    if (
                        msg_type == "partial_transcript"
                        and not self._opts.enable_partial_transcripts
                    ):
                        continue
                    text = data.get("transcript", "")
                    is_final = msg_type == "final_transcript"
                    if not text:
                        # Empty-text final still consumed audio at the gateway;
                        # emit the usage metric so billed audio gets reported.
                        if is_final and self._speech_duration > 0:
                            self._event_ch.send_nowait(
                                stt.SpeechEvent(
                                    type=stt.SpeechEventType.RECOGNITION_USAGE,
                                    alternatives=[],
                                    recognition_usage=stt.RecognitionUsage(
                                        audio_duration=self._speech_duration,
                                    ),
                                )
                            )
                            self._speech_duration = 0
                        continue

                    confidence = data.get("confidence", 0.0)
                    language = data.get("language", self._opts.language)
                    words = data.get("words", [])

                    # Emit START_OF_SPEECH on first transcript (interim or final)
                    if not speech_started:
                        speech_started = True
                        self._event_ch.send_nowait(
                            stt.SpeechEvent(type=stt.SpeechEventType.START_OF_SPEECH)
                        )

                    if is_final:
                        start_time = words[0].get("start", 0.0) if words else 0.0
                        end_time = words[-1].get("end", 0.0) if words else 0.0
                    else:
                        start_time = 0.0
                        end_time = 0.0

                    event = stt.SpeechEvent(
                        type=stt.SpeechEventType.FINAL_TRANSCRIPT
                        if is_final
                        else stt.SpeechEventType.INTERIM_TRANSCRIPT,
                        alternatives=[
                            stt.SpeechData(
                                language=language,
                                text=text,
                                confidence=confidence,
                                start_time=start_time,
                                end_time=end_time,
                            )
                        ],
                    )
                    self._event_ch.send_nowait(event)

                    # Emit END_OF_SPEECH after each final transcript.
                    # Note: the gateway may send multiple final_transcript messages
                    # per utterance (e.g., sentence-by-sentence). Each final is
                    # treated as a completed segment, so START/END bracket each one.
                    if is_final:
                        self._event_ch.send_nowait(
                            stt.SpeechEvent(type=stt.SpeechEventType.END_OF_SPEECH)
                        )
                        speech_started = False

                        if self._speech_duration > 0:
                            self._event_ch.send_nowait(
                                stt.SpeechEvent(
                                    type=stt.SpeechEventType.RECOGNITION_USAGE,
                                    alternatives=[],
                                    recognition_usage=stt.RecognitionUsage(
                                        audio_duration=self._speech_duration,
                                    ),
                                )
                            )
                            self._speech_duration = 0

        while True:
            send = None
            recv = None
            wait_reconnect = None

            first = await next_audio_frame()
            if first is None:
                return
            pending_frames: list[Any] = [first]

            endpoint = self._model_endpoints[self._active_endpoint_index]
            model = current_model()

            ws: aiohttp.ClientWebSocketResponse | None = None
            try:
                ws = await self._connect_ws(model_endpoint=endpoint, model=model)
                if did_failover:
                    logger.info("STT switched to fallback: model=%s", model)
                    # Propagate successful failover to parent so new streams
                    # start from the working endpoint.
                    self._stt_parent._set_active_endpoint_index(self._active_endpoint_index)
                    did_failover = False

                send = asyncio.create_task(send_task(ws, pending_frames=pending_frames))
                recv = asyncio.create_task(recv_task(ws))
                wait_reconnect = asyncio.create_task(self._reconnect_event.wait())

                tasks_group = asyncio.gather(send, recv)
                done, _ = await asyncio.wait(
                    (tasks_group, wait_reconnect),
                    return_when=asyncio.FIRST_COMPLETED,
                )

                if wait_reconnect in done:
                    self._reconnect_event.clear()
                    tasks_group.cancel()
                    await utils.aio.gracefully_cancel(send, recv, wait_reconnect)
                    continue

                for task in done:
                    task.result()

                await utils.aio.gracefully_cancel(wait_reconnect)
                return
            except Exception as exc:
                tasks = [t for t in (send, recv, wait_reconnect) if t is not None]
                if tasks:
                    with contextlib.suppress(Exception):
                        await utils.aio.gracefully_cancel(*tasks)
                if isinstance(exc, APIStatusError):
                    status_code = _safe_error_code(exc)
                    is_permanent_client_error = (
                        status_code is not None and 400 <= status_code < 500 and status_code != 429
                    )
                    endpoint_index = self._active_endpoint_index
                    attempts = immediate_reconnect_attempts.get(endpoint_index, 0)

                    if not is_permanent_client_error and attempts < MAX_IMMEDIATE_RETRIES:
                        immediate_reconnect_attempts[endpoint_index] = attempts + 1
                        continue
                if not await failover(exc=exc):
                    raise
                did_failover = True
                immediate_reconnect_attempts[self._active_endpoint_index] = 0
                continue
            finally:
                if ws is not None:
                    await ws.close()

    async def _connect_ws(
        self, *, model_endpoint: str, model: str | None
    ) -> aiohttp.ClientWebSocketResponse:
        # Match e2e test headers exactly - send both Authorization and X-API-Key
        headers = {
            "Authorization": f"Bearer {self._api_key}",
            "X-API-Key": self._api_key,
        }
        if self._region_override_header:
            headers["X-Region-Override"] = self._region_override_header

        # Don't enable compression - e2e tests work without it and compress=15
        # was causing handshake errors with Deepgram Nova endpoint
        try:
            ws = await asyncio.wait_for(
                self._session.ws_connect(model_endpoint, headers=headers),
                self._conn_options.timeout,
            )
        except (aiohttp.ClientConnectorError, asyncio.TimeoutError) as e:
            raise APIConnectionError("failed to connect to SLNG STT") from e

        init_message = build_stt_init_payload(
            model=model,
            language=self._opts.language,
            sample_rate=self._opts.sample_rate,
            encoding=self._opts.encoding,
            vad_threshold=self._opts.vad_threshold,
            vad_min_silence_duration_ms=self._opts.vad_min_silence_duration_ms,
            vad_speech_pad_ms=self._opts.vad_speech_pad_ms,
            enable_diarization=self._opts.enable_diarization,
            enable_partial_transcripts=self._opts.enable_partial_transcripts,
            min_speakers=self._opts.min_speakers,
            max_speakers=self._opts.max_speakers,
            model_options=self._model_options,
        )

        try:
            await ws.send_str(json.dumps(init_message))
        except Exception:
            await ws.close()
            raise
        return ws

Helper class that provides a standard way to create an ABC using inheritance.

Args: sample_rate : int or None, optional The desired sample rate for the audio input. If specified, the audio input will be automatically resampled to match the given sample rate before being processed for Speech-to-Text. If not provided (None), the input will retain its original sample rate.

Ancestors

  • livekit.agents.stt.stt.RecognizeStream
  • abc.ABC

Methods

def update_options(self,
*,
enable_partial_transcripts: NotGivenOr[bool] = NOT_GIVEN,
enable_diarization: NotGivenOr[bool] = NOT_GIVEN,
vad_threshold: NotGivenOr[float] = NOT_GIVEN,
vad_min_silence_duration_ms: NotGivenOr[int] = NOT_GIVEN,
vad_speech_pad_ms: NotGivenOr[int] = NOT_GIVEN,
language: NotGivenOr[str] = NOT_GIVEN,
buffer_size_seconds: NotGivenOr[float] = NOT_GIVEN) ‑> None
Expand source code
def update_options(
    self,
    *,
    enable_partial_transcripts: NotGivenOr[bool] = NOT_GIVEN,
    enable_diarization: NotGivenOr[bool] = NOT_GIVEN,
    vad_threshold: NotGivenOr[float] = NOT_GIVEN,
    vad_min_silence_duration_ms: NotGivenOr[int] = NOT_GIVEN,
    vad_speech_pad_ms: NotGivenOr[int] = NOT_GIVEN,
    language: NotGivenOr[str] = NOT_GIVEN,
    buffer_size_seconds: NotGivenOr[float] = NOT_GIVEN,
) -> None:
    if is_given(enable_partial_transcripts):
        self._opts.enable_partial_transcripts = enable_partial_transcripts
    if is_given(enable_diarization):
        self._opts.enable_diarization = enable_diarization
    if is_given(vad_threshold):
        self._opts.vad_threshold = vad_threshold
    if is_given(vad_min_silence_duration_ms):
        self._opts.vad_min_silence_duration_ms = vad_min_silence_duration_ms
    if is_given(vad_speech_pad_ms):
        self._opts.vad_speech_pad_ms = vad_speech_pad_ms
    if is_given(language):
        self._opts.language = language
    if is_given(buffer_size_seconds):
        self._opts.buffer_size_seconds = buffer_size_seconds

    self._reconnect_event.set()
class TTS (*,
api_key: str | None = None,
model: str = 'deepgram/aura:2',
model_endpoint: str | None = None,
slng_base_url: str = 'api.slng.ai',
region_override: str | list[str] | None = None,
voice: str = 'default',
language: str = 'en',
sample_rate: int = 24000,
speed: float = 1.0,
word_tokenizer: NotGivenOr[tokenize.WordTokenizer] = NOT_GIVEN,
http_session: aiohttp.ClientSession | None = None,
**model_options: object)
Expand source code
class TTS(tts.TTS):
    def __init__(
        self,
        *,
        api_key: str | None = None,
        model: str = "deepgram/aura:2",
        model_endpoint: str | None = None,
        slng_base_url: str = "api.slng.ai",
        region_override: str | list[str] | None = None,
        voice: str = "default",
        language: str = "en",
        sample_rate: int = 24000,
        speed: float = 1.0,
        word_tokenizer: NotGivenOr[tokenize.WordTokenizer] = NOT_GIVEN,
        http_session: aiohttp.ClientSession | None = None,
        **model_options: object,
    ) -> None:
        """
        Create a new instance of SLNG TTS (based on Deepgram's architecture).

        Args:
            model (str): SLNG model identifier (e.g., "deepgram/aura:2").
            model_endpoint (str): Optional full SLNG WebSocket endpoint.
            slng_base_url (str): SLNG gateway host. Defaults to "api.slng.ai".
            region_override (str | list[str] | None): Optional gateway region override.
            voice (str): Voice to use. Defaults to "default".
            language (str): Language code. Defaults to "en".
            sample_rate (int): Sample rate of audio. Defaults to 24000.
            api_key (str): SLNG API key. Falls back to SLNG_API_KEY environment variable.
            word_tokenizer (tokenize.WordTokenizer): Tokenizer for processing text. Defaults to basic WordTokenizer.
            http_session (aiohttp.ClientSession): Optional aiohttp session to use for requests.
        """
        # Resolve api_key from parameter or SLNG_API_KEY env var
        resolved_key = api_key or os.environ.get("SLNG_API_KEY")
        if not resolved_key:
            raise ValueError("api_key is required, or set SLNG_API_KEY environment variable")

        super().__init__(
            capabilities=tts.TTSCapabilities(streaming=True),
            sample_rate=sample_rate,
            num_channels=NUM_CHANNELS,
        )

        if not is_given(word_tokenizer):
            word_tokenizer = tokenize.basic.WordTokenizer(ignore_punctuation=False)

        resolved_model_endpoint = model_endpoint or _default_tts_endpoint(
            slng_base_url=slng_base_url,
            model=model,
        )

        voice = normalize_tts_voice(model, voice)

        # _TTSOptions.encoding defaults to "linear16" because LiveKit expects raw PCM and
        # some SLNG models default to MP3 unless explicitly requested.
        self._opts = _TTSOptions(
            model_endpoint=resolved_model_endpoint,
            model=model,
            voice=voice,
            language=language,
            sample_rate=sample_rate,
            speed=speed,
            word_tokenizer=word_tokenizer,
            api_key=resolved_key,
            model_options=dict(model_options),
        )
        self._region_override_header = normalize_region_override(region_override)
        self._session = http_session
        self._streams = weakref.WeakSet[SynthesizeStream]()

        self._pool = utils.ConnectionPool[aiohttp.ClientWebSocketResponse](
            connect_cb=self._connect_ws,
            close_cb=self._close_ws,
            max_session_duration=3600,  # 1 hour
            mark_refreshed_on_get=False,
        )

    @property
    def model(self) -> str:
        return "slng"

    @property
    def provider(self) -> str:
        return "SLNG"

    async def _connect_ws(self, timeout: float) -> aiohttp.ClientWebSocketResponse:
        session = self._ensure_session()

        # Connect to WebSocket
        model_endpoint = self._opts.model_endpoint
        headers = {
            "Authorization": f"Bearer {self._opts.api_key}",
            "X-API-Key": self._opts.api_key,
        }
        if self._region_override_header:
            headers["X-Region-Override"] = self._region_override_header
        ws = await asyncio.wait_for(
            session.ws_connect(
                model_endpoint,
                headers=headers,
            ),
            timeout,
        )

        # SLNG-specific: Send init and wait for ready
        init_payload = build_tts_init_payload(
            model=self._opts.model,
            voice=self._opts.voice,
            language=self._opts.language,
            sample_rate=self._opts.sample_rate,
            encoding=self._opts.encoding,
            speed=self._opts.speed,
            model_options=self._opts.model_options,
        )

        try:
            await ws.send_str(json.dumps(init_payload))
        except Exception:
            await ws.close()
            raise

        return ws

    async def _close_ws(self, ws: aiohttp.ClientWebSocketResponse) -> None:
        try:
            # Send final flush (similar to Deepgram's Flush+Close pattern).
            # Arcana-specific cancel/EOS is handled in the streaming send_task when bypassing
            # the connection pool.
            await ws.send_str(SynthesizeStream._FLUSH_MSG)

            # Wait for server acknowledgment
            with contextlib.suppress(asyncio.TimeoutError):
                await asyncio.wait_for(ws.receive(), timeout=5.0)
        except Exception as e:
            logger.warning(f"[SLNG TTS] error during WebSocket close sequence: {e}")
        finally:
            await ws.close()

    def _ensure_session(self) -> aiohttp.ClientSession:
        if not self._session:
            self._session = utils.http_context.http_session()
        return self._session

    def update_options(
        self,
        *,
        voice: NotGivenOr[str] = NOT_GIVEN,
        language: NotGivenOr[str] = NOT_GIVEN,
    ) -> None:
        """
        Args:
            voice (str): Voice to use.
            language (str): Language code.
        """
        invalidate_pool = False
        if is_given(voice):
            voice = normalize_tts_voice(self._opts.model, voice)
            invalidate_pool = invalidate_pool or self._opts.voice != voice
            self._opts.voice = voice
        if is_given(language):
            invalidate_pool = invalidate_pool or self._opts.language != language
            self._opts.language = language

        if invalidate_pool:
            self._pool.invalidate()

    def synthesize(
        self,
        text: str,
        *,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    ) -> ChunkedStream:
        return ChunkedStream(tts=self, input_text=text, conn_options=conn_options)

    def stream(
        self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS
    ) -> SynthesizeStream:
        stream = SynthesizeStream(tts=self, conn_options=conn_options)
        self._streams.add(stream)
        return stream

    def prewarm(self) -> None:
        self._pool.prewarm()

    async def aclose(self) -> None:
        for stream in list(self._streams):
            await stream.aclose()

        self._streams.clear()
        await self._pool.aclose()

Helper class that provides a standard way to create an ABC using inheritance.

Create a new instance of SLNG TTS (based on Deepgram's architecture).

Args

model : str
SLNG model identifier (e.g., "deepgram/aura:2").
model_endpoint : str
Optional full SLNG WebSocket endpoint.
slng_base_url : str
SLNG gateway host. Defaults to "api.slng.ai".
region_override : str | list[str] | None
Optional gateway region override.
voice : str
Voice to use. Defaults to "default".
language : str
Language code. Defaults to "en".
sample_rate : int
Sample rate of audio. Defaults to 24000.
api_key : str
SLNG API key. Falls back to SLNG_API_KEY environment variable.
word_tokenizer : tokenize.WordTokenizer
Tokenizer for processing text. Defaults to basic WordTokenizer.
http_session : aiohttp.ClientSession
Optional aiohttp session to use for requests.

Ancestors

  • livekit.agents.tts.tts.TTS
  • abc.ABC
  • EventEmitter
  • typing.Generic

Instance variables

prop model : str
Expand source code
@property
def model(self) -> str:
    return "slng"

Get the model name/identifier for this TTS instance.

Returns

The model name if available, "unknown" otherwise.

Note

Plugins should override this property to provide their model information.

prop provider : str
Expand source code
@property
def provider(self) -> str:
    return "SLNG"

Get the provider name/identifier for this TTS instance.

Returns

The provider name if available, "unknown" otherwise.

Note

Plugins should override this property to provide their provider information.

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    for stream in list(self._streams):
        await stream.aclose()

    self._streams.clear()
    await self._pool.aclose()
def prewarm(self) ‑> None
Expand source code
def prewarm(self) -> None:
    self._pool.prewarm()

Pre-warm connection to the TTS service

def stream(self,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.slng.tts.SynthesizeStream
Expand source code
def stream(
    self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS
) -> SynthesizeStream:
    stream = SynthesizeStream(tts=self, conn_options=conn_options)
    self._streams.add(stream)
    return stream
def synthesize(self,
text: str,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.slng.tts.ChunkedStream
Expand source code
def synthesize(
    self,
    text: str,
    *,
    conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
) -> ChunkedStream:
    return ChunkedStream(tts=self, input_text=text, conn_options=conn_options)
def update_options(self,
*,
voice: NotGivenOr[str] = NOT_GIVEN,
language: NotGivenOr[str] = NOT_GIVEN) ‑> None
Expand source code
def update_options(
    self,
    *,
    voice: NotGivenOr[str] = NOT_GIVEN,
    language: NotGivenOr[str] = NOT_GIVEN,
) -> None:
    """
    Args:
        voice (str): Voice to use.
        language (str): Language code.
    """
    invalidate_pool = False
    if is_given(voice):
        voice = normalize_tts_voice(self._opts.model, voice)
        invalidate_pool = invalidate_pool or self._opts.voice != voice
        self._opts.voice = voice
    if is_given(language):
        invalidate_pool = invalidate_pool or self._opts.language != language
        self._opts.language = language

    if invalidate_pool:
        self._pool.invalidate()

Args

voice : str
Voice to use.
language : str
Language code.

Inherited members