Module livekit.plugins.smallestai

Smallest AI plugin for LiveKit Agents

See https://docs.livekit.io/agents/integrations/smallestai/ for more information.

Classes

class ChunkedStream (*,
tts: TTS,
input_text: str,
conn_options: APIConnectOptions)
Expand source code
class ChunkedStream(tts.ChunkedStream):
    """Synthesize chunked text using the Smallest AI TTS endpoint."""

    def __init__(self, *, tts: TTS, input_text: str, conn_options: APIConnectOptions) -> None:
        super().__init__(tts=tts, input_text=input_text, conn_options=conn_options)
        self._tts: TTS = tts
        self._opts = replace(tts._opts)

    async def _run(self, output_emitter: tts.AudioEmitter) -> None:
        """Run the chunked synthesis process."""
        try:
            data = _to_smallest_options(self._opts)
            data["text"] = self._input_text

            url = f"{self._opts.base_url}/{self._opts.model}/get_speech"

            headers = {
                "Authorization": f"Bearer {self._opts.api_key}",
                "Content-Type": "application/json",
                "X-Source": "livekit",
                "X-LiveKit-Version": __version__,
            }
            async with self._tts._ensure_session().post(
                url,
                headers=headers,
                json=data,
                timeout=aiohttp.ClientTimeout(total=self._conn_options.timeout),
            ) as resp:
                if resp.status >= 400:
                    body = await resp.text()
                    raise create_api_error_from_http(body, status=resp.status)

                output_emitter.initialize(
                    request_id=utils.shortuuid(),
                    sample_rate=self._opts.sample_rate,
                    num_channels=NUM_CHANNELS,
                    mime_type=f"audio/{self._opts.output_format}",
                )

                async for chunk, _ in resp.content.iter_chunks():
                    output_emitter.push(chunk)

                output_emitter.flush()

        except asyncio.TimeoutError:
            raise APITimeoutError() from None
        except aiohttp.ClientResponseError as e:
            raise create_api_error_from_http(e.message, status=e.status) from None
        except APIStatusError:
            raise
        except Exception as e:
            raise APIConnectionError() from e

Synthesize chunked text using the Smallest AI TTS endpoint.

Ancestors

  • livekit.agents.tts.tts.ChunkedStream
  • abc.ABC
class STT (*,
model: STTModels | str = 'pulse',
language: str = 'en',
sample_rate: int = 16000,
encoding: STTEncoding | str = 'linear16',
word_timestamps: bool = True,
diarize: bool = False,
eou_timeout_ms: int = 0,
api_key: str | None = None,
http_session: aiohttp.ClientSession | None = None,
base_url: str = 'https://api.smallest.ai/waves/v1')
Expand source code
class STT(stt.STT):
    def __init__(
        self,
        *,
        model: STTModels | str = "pulse",
        language: str = "en",
        sample_rate: int = 16000,
        encoding: STTEncoding | str = "linear16",
        word_timestamps: bool = True,
        diarize: bool = False,
        eou_timeout_ms: int = 0,
        api_key: str | None = None,
        http_session: aiohttp.ClientSession | None = None,
        base_url: str = SMALLEST_STT_BASE_URL,
    ) -> None:
        """Create a new instance of Smallest AI Pulse STT.

        Args:
            model: STT model to use. Currently only "pulse" is available.
            language: BCP-47 language code (e.g. "en", "hi", "fr"). Use "multi"
                for automatic language detection across 39 supported languages.
            sample_rate: Audio sample rate in Hz. Supported: 8000, 16000, 22050,
                24000, 44100, 48000. Defaults to 16000.
            encoding: PCM encoding of the audio stream. Use "linear16" for raw
                16-bit PCM (the default and most compatible choice for streaming).
            word_timestamps: Include per-word start/end timestamps and confidence
                scores in transcripts. Defaults to True.
            diarize: Enable speaker diarization. When True, each word includes a
                speaker ID (integer during streaming). Defaults to False.
            eou_timeout_ms: Milliseconds of silence before the server considers an
                utterance complete and emits a final transcript. Set to 0 to disable
                server-side end-of-utterance detection, which is recommended when using
                LiveKit's built-in turn detection to minimise latency. Defaults to 0.
            api_key: Smallest AI API key. Falls back to the SMALLEST_API_KEY
                environment variable if not provided.
            http_session: An existing aiohttp ClientSession to reuse.
            base_url: Override the default API base URL.
        """
        super().__init__(
            capabilities=stt.STTCapabilities(
                streaming=True,
                interim_results=True,
                diarization=diarize,
                aligned_transcript="word" if word_timestamps else False,
            )
        )

        api_key = api_key or os.environ.get("SMALLEST_API_KEY")
        if not api_key:
            raise ValueError(
                "Smallest AI API key is required, either as argument or set "
                "SMALLEST_API_KEY environment variable"
            )

        self._opts = _STTOptions(
            model=model,
            api_key=api_key,
            language=language,
            sample_rate=sample_rate,
            encoding=encoding,
            word_timestamps=word_timestamps,
            diarize=diarize,
            eou_timeout_ms=eou_timeout_ms,
            base_url=base_url,
        )
        self._session = http_session
        self._streams: weakref.WeakSet[SpeechStream] = weakref.WeakSet()

    @property
    def model(self) -> str:
        return self._opts.model

    @property
    def provider(self) -> str:
        return "SmallestAI"

    def _ensure_session(self) -> aiohttp.ClientSession:
        if not self._session:
            self._session = utils.http_context.http_session()
        return self._session

    async def _recognize_impl(
        self,
        buffer: AudioBuffer,
        *,
        language: NotGivenOr[str] = NOT_GIVEN,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    ) -> stt.SpeechEvent:
        config = self._sanitize_options(language=language)
        params: dict[str, Any] = {
            "language": config.language,
            "encoding": config.encoding,
            "sample_rate": config.sample_rate,
            "word_timestamps": str(config.word_timestamps).lower(),
            "diarize": str(config.diarize).lower(),
        }

        try:
            async with self._ensure_session().post(
                url=f"{config.base_url}/{config.model}/get_text",
                headers={
                    "Authorization": f"Bearer {config.api_key}",
                    "Content-Type": "application/octet-stream",
                    "X-Source": "livekit",
                    "X-LiveKit-Version": __version__,
                },
                params=params,
                # to_wav_bytes() produces a valid WAV file; the server auto-detects format.
                data=rtc.combine_audio_frames(buffer).to_wav_bytes(),
                timeout=aiohttp.ClientTimeout(
                    total=30,
                    sock_connect=conn_options.timeout,
                ),
            ) as resp:
                resp.raise_for_status()
                data = await resp.json()
                return _batch_transcription_to_speech_event(config.language, data)

        except asyncio.TimeoutError as e:
            raise APITimeoutError() from e
        except aiohttp.ClientResponseError as e:
            raise APIStatusError(
                message=e.message,
                status_code=e.status,
                request_id=None,
                body=None,
            ) from e
        except Exception as e:
            raise APIConnectionError() from e

    def stream(
        self,
        *,
        language: NotGivenOr[str] = NOT_GIVEN,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    ) -> SpeechStream:
        config = self._sanitize_options(language=language)
        stream = SpeechStream(
            stt=self,
            conn_options=conn_options,
            opts=config,
            http_session=self._ensure_session(),
        )
        self._streams.add(stream)
        return stream

    def update_options(
        self,
        *,
        model: NotGivenOr[STTModels | str] = NOT_GIVEN,
        language: NotGivenOr[str] = NOT_GIVEN,
        sample_rate: NotGivenOr[int] = NOT_GIVEN,
        encoding: NotGivenOr[STTEncoding | str] = NOT_GIVEN,
        eou_timeout_ms: NotGivenOr[int] = NOT_GIVEN,
    ) -> None:
        """Update STT options; propagates to all active streams (triggers reconnect)."""
        if is_given(model):
            self._opts.model = model
        if is_given(language):
            self._opts.language = language
        if is_given(sample_rate):
            self._opts.sample_rate = sample_rate
        if is_given(encoding):
            self._opts.encoding = encoding
        if is_given(eou_timeout_ms):
            self._opts.eou_timeout_ms = eou_timeout_ms

        for stream in self._streams:
            stream.update_options(
                model=model,
                language=language,
                sample_rate=sample_rate,
                encoding=encoding,
                eou_timeout_ms=eou_timeout_ms,
            )

    def _sanitize_options(self, *, language: NotGivenOr[str] = NOT_GIVEN) -> _STTOptions:
        config = replace(self._opts)
        if is_given(language):
            config.language = language
        return config

Helper class that provides a standard way to create an ABC using inheritance.

Create a new instance of Smallest AI Pulse STT.

Args

model
STT model to use. Currently only "pulse" is available.
language
BCP-47 language code (e.g. "en", "hi", "fr"). Use "multi" for automatic language detection across 39 supported languages.
sample_rate
Audio sample rate in Hz. Supported: 8000, 16000, 22050, 24000, 44100, 48000. Defaults to 16000.
encoding
PCM encoding of the audio stream. Use "linear16" for raw 16-bit PCM (the default and most compatible choice for streaming).
word_timestamps
Include per-word start/end timestamps and confidence scores in transcripts. Defaults to True.
diarize
Enable speaker diarization. When True, each word includes a speaker ID (integer during streaming). Defaults to False.
eou_timeout_ms
Milliseconds of silence before the server considers an utterance complete and emits a final transcript. Set to 0 to disable server-side end-of-utterance detection, which is recommended when using LiveKit's built-in turn detection to minimise latency. Defaults to 0.
api_key
Smallest AI API key. Falls back to the SMALLEST_API_KEY environment variable if not provided.
http_session
An existing aiohttp ClientSession to reuse.
base_url
Override the default API base URL.

Ancestors

  • livekit.agents.stt.stt.STT
  • abc.ABC
  • EventEmitter
  • typing.Generic

Instance variables

prop model : str
Expand source code
@property
def model(self) -> str:
    return self._opts.model

Get the model name/identifier for this STT instance.

Returns

The model name if available, "unknown" otherwise.

Note

Plugins should override this property to provide their model information.

prop provider : str
Expand source code
@property
def provider(self) -> str:
    return "SmallestAI"

Get the provider name/identifier for this STT instance.

Returns

The provider name if available, "unknown" otherwise.

Note

Plugins should override this property to provide their provider information.

Methods

def stream(self,
*,
language: NotGivenOr[str] = NOT_GIVEN,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.smallestai.stt.SpeechStream
Expand source code
def stream(
    self,
    *,
    language: NotGivenOr[str] = NOT_GIVEN,
    conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
) -> SpeechStream:
    config = self._sanitize_options(language=language)
    stream = SpeechStream(
        stt=self,
        conn_options=conn_options,
        opts=config,
        http_session=self._ensure_session(),
    )
    self._streams.add(stream)
    return stream
def update_options(self,
*,
model: NotGivenOr[STTModels | str] = NOT_GIVEN,
language: NotGivenOr[str] = NOT_GIVEN,
sample_rate: NotGivenOr[int] = NOT_GIVEN,
encoding: NotGivenOr[STTEncoding | str] = NOT_GIVEN,
eou_timeout_ms: NotGivenOr[int] = NOT_GIVEN) ‑> None
Expand source code
def update_options(
    self,
    *,
    model: NotGivenOr[STTModels | str] = NOT_GIVEN,
    language: NotGivenOr[str] = NOT_GIVEN,
    sample_rate: NotGivenOr[int] = NOT_GIVEN,
    encoding: NotGivenOr[STTEncoding | str] = NOT_GIVEN,
    eou_timeout_ms: NotGivenOr[int] = NOT_GIVEN,
) -> None:
    """Update STT options; propagates to all active streams (triggers reconnect)."""
    if is_given(model):
        self._opts.model = model
    if is_given(language):
        self._opts.language = language
    if is_given(sample_rate):
        self._opts.sample_rate = sample_rate
    if is_given(encoding):
        self._opts.encoding = encoding
    if is_given(eou_timeout_ms):
        self._opts.eou_timeout_ms = eou_timeout_ms

    for stream in self._streams:
        stream.update_options(
            model=model,
            language=language,
            sample_rate=sample_rate,
            encoding=encoding,
            eou_timeout_ms=eou_timeout_ms,
        )

Update STT options; propagates to all active streams (triggers reconnect).

Inherited members

class SpeechStream (*,
stt: STT,
opts: _STTOptions,
conn_options: APIConnectOptions,
http_session: aiohttp.ClientSession)
Expand source code
class SpeechStream(stt.SpeechStream):
    # Signals end of stream: server flushes remaining audio, emits final transcripts,
    # and responds with is_last=True before closing the session.
    # Use {"type": "finalize"} mid-session to force is_final without closing.
    _CLOSE_STREAM_MSG: str = json.dumps({"type": "close_stream"})

    def __init__(
        self,
        *,
        stt: STT,
        opts: _STTOptions,
        conn_options: APIConnectOptions,
        http_session: aiohttp.ClientSession,
    ) -> None:
        super().__init__(stt=stt, conn_options=conn_options, sample_rate=opts.sample_rate)
        self._opts = opts
        self._session = http_session
        self._speaking = False
        self._session_id = ""
        self._reconnect_event = asyncio.Event()
        self._audio_duration_collector = _PeriodicCollector(
            callback=self._on_audio_duration_report,
            duration=5.0,
        )

    def update_options(
        self,
        *,
        model: NotGivenOr[STTModels | str] = NOT_GIVEN,
        language: NotGivenOr[str] = NOT_GIVEN,
        sample_rate: NotGivenOr[int] = NOT_GIVEN,
        encoding: NotGivenOr[STTEncoding | str] = NOT_GIVEN,
        eou_timeout_ms: NotGivenOr[int] = NOT_GIVEN,
    ) -> None:
        if is_given(model):
            self._opts.model = model
        if is_given(language):
            self._opts.language = language
        if is_given(sample_rate):
            self._opts.sample_rate = sample_rate
        if is_given(encoding):
            self._opts.encoding = encoding
        if is_given(eou_timeout_ms):
            self._opts.eou_timeout_ms = eou_timeout_ms
        self._reconnect_event.set()

    async def _run(self) -> None:
        closing_ws = False

        @utils.log_exceptions(logger=logger)
        async def send_task(ws: aiohttp.ClientWebSocketResponse) -> None:
            nonlocal closing_ws

            # Send audio in 50ms chunks; matches the 50–100ms guidance from Smallest AI docs.
            samples_per_chunk = self._opts.sample_rate // 20
            audio_bstream = utils.audio.AudioByteStream(
                sample_rate=self._opts.sample_rate,
                num_channels=NUM_CHANNELS,
                samples_per_channel=samples_per_chunk,
            )

            async for data in self._input_ch:
                if isinstance(data, rtc.AudioFrame):
                    for frame in audio_bstream.write(data.data.tobytes()):
                        self._audio_duration_collector.push(frame.duration)
                        await ws.send_bytes(frame.data.tobytes())
                elif isinstance(data, self._FlushSentinel):
                    # User paused: drain the accumulator so the server gets all buffered
                    # audio. The server's eou_timeout_ms will then detect the silence and
                    # emit a final transcript — no explicit flush message is needed.
                    for frame in audio_bstream.flush():
                        self._audio_duration_collector.push(frame.duration)
                        await ws.send_bytes(frame.data.tobytes())
                    self._audio_duration_collector.flush()

            # Input channel closed: close the stream so the server flushes remaining
            # audio, emits final transcripts, and sends is_last=True.
            closing_ws = True
            await ws.send_str(SpeechStream._CLOSE_STREAM_MSG)

        @utils.log_exceptions(logger=logger)
        async def recv_task(ws: aiohttp.ClientWebSocketResponse) -> None:
            nonlocal closing_ws
            while True:
                msg = await ws.receive()
                if msg.type in (
                    aiohttp.WSMsgType.CLOSED,
                    aiohttp.WSMsgType.CLOSE,
                    aiohttp.WSMsgType.CLOSING,
                ):
                    if closing_ws or self._session.closed:
                        return
                    raise APIStatusError(
                        message="Smallest AI STT connection closed unexpectedly",
                        status_code=ws.close_code or -1,
                        body=f"{msg.data=} {msg.extra=}",
                    )

                if msg.type != aiohttp.WSMsgType.TEXT:
                    logger.warning("unexpected Smallest AI STT message type: %s", msg.type)
                    continue

                try:
                    data = json.loads(msg.data)
                except json.JSONDecodeError:
                    logger.warning("failed to parse Smallest AI STT message: %s", msg.data)
                    continue

                self._process_stream_event(data)

                # Server confirms the session is fully flushed; recv loop can exit.
                if data.get("is_last"):
                    return

        ws: aiohttp.ClientWebSocketResponse | None = None
        while True:
            try:
                ws = await self._connect_ws()
                tasks = [
                    asyncio.create_task(send_task(ws)),
                    asyncio.create_task(recv_task(ws)),
                ]
                tasks_group = asyncio.gather(*tasks)
                wait_reconnect_task = asyncio.create_task(self._reconnect_event.wait())
                try:
                    done, _ = await asyncio.wait(
                        (tasks_group, wait_reconnect_task),
                        return_when=asyncio.FIRST_COMPLETED,
                    )
                    for task in done:
                        if task != wait_reconnect_task:
                            task.result()

                    if wait_reconnect_task not in done:
                        break

                    self._reconnect_event.clear()
                finally:
                    await utils.aio.gracefully_cancel(*tasks, wait_reconnect_task)
                    tasks_group.cancel()
                    tasks_group.exception()
            finally:
                if ws is not None:
                    await ws.close()

    async def _connect_ws(self) -> aiohttp.ClientWebSocketResponse:
        params: dict[str, Any] = {
            "language": self._opts.language,
            "encoding": self._opts.encoding,
            "sample_rate": self._opts.sample_rate,
            "word_timestamps": str(self._opts.word_timestamps).lower(),
            "diarize": str(self._opts.diarize).lower(),
        }
        # Only send eou_timeout_ms when explicitly set (non-zero).
        # When 0, omit the parameter and let the server use its default,
        # which avoids adding server-side silence latency on top of LiveKit's
        # own end-of-turn detection.
        if self._opts.eou_timeout_ms > 0:
            params["eou_timeout_ms"] = self._opts.eou_timeout_ms
        ws_url = (
            self._opts.base_url.replace("https://", "wss://", 1).replace("http://", "ws://", 1)
            + f"/{self._opts.model}/get_text"
            + f"?{urlencode(params)}"
        )

        t0 = time.perf_counter()
        try:
            # heartbeat sends standard WebSocket ping frames every 5s, which is sufficient
            # to keep the Smallest AI connection alive without a custom JSON message.
            ws = await asyncio.wait_for(
                self._session.ws_connect(
                    ws_url,
                    headers={
                        "Authorization": f"Bearer {self._opts.api_key}",
                        "X-Source": "livekit",
                        "X-LiveKit-Version": __version__,
                    },
                    heartbeat=5.0,
                ),
                self._conn_options.timeout,
            )
            self._report_connection_acquired(time.perf_counter() - t0, False)
            logger.debug("established Smallest AI STT WebSocket connection")
        except (aiohttp.ClientConnectorError, asyncio.TimeoutError) as e:
            raise APIConnectionError("failed to connect to Smallest AI STT") from e
        return ws

    def _on_audio_duration_report(self, duration: float) -> None:
        self._event_ch.send_nowait(
            stt.SpeechEvent(
                type=stt.SpeechEventType.RECOGNITION_USAGE,
                request_id=self._session_id,
                alternatives=[],
                recognition_usage=stt.RecognitionUsage(audio_duration=duration),
            )
        )

    def _process_stream_event(self, data: dict[str, Any]) -> None:
        # Streaming WebSocket response schema (Smallest AI Pulse API):
        # {
        #   "session_id":   str,
        #   "transcript":   str,        # partial or final text for this utterance
        #   "is_final":     bool,       # True when the utterance is complete
        #   "is_last":      bool,       # True when the session itself is done (after close_stream)
        #   "language":     str,        # present when is_final=True (detected or echoed)
        #   "words":        [           # present when word_timestamps=True
        #     {"word": str, "start": float, "end": float,
        #      "confidence": float, "speaker": int}  # speaker only when diarize=True
        #   ]
        # }
        session_id = data.get("session_id", "")
        if session_id:
            self._session_id = session_id

        transcript = data.get("transcript", "")
        is_final = data.get("is_final", False)

        if not transcript:
            return

        # Infer START_OF_SPEECH — the Pulse API does not emit a dedicated speech-start event.
        if not self._speaking:
            self._speaking = True
            self._event_ch.send_nowait(stt.SpeechEvent(type=stt.SpeechEventType.START_OF_SPEECH))

        alts = _transcript_to_speech_data(
            language=self._opts.language,
            data=data,
            start_time_offset=self.start_time_offset,
            diarize=self._opts.diarize,
        )

        if is_final:
            self._event_ch.send_nowait(
                stt.SpeechEvent(
                    type=stt.SpeechEventType.FINAL_TRANSCRIPT,
                    request_id=self._session_id,
                    alternatives=alts,
                )
            )
            if self._speaking:
                self._speaking = False
                self._event_ch.send_nowait(stt.SpeechEvent(type=stt.SpeechEventType.END_OF_SPEECH))
        else:
            self._event_ch.send_nowait(
                stt.SpeechEvent(
                    type=stt.SpeechEventType.INTERIM_TRANSCRIPT,
                    request_id=self._session_id,
                    alternatives=alts,
                )
            )

Helper class that provides a standard way to create an ABC using inheritance.

Args: sample_rate : int or None, optional The desired sample rate for the audio input. If specified, the audio input will be automatically resampled to match the given sample rate before being processed for Speech-to-Text. If not provided (None), the input will retain its original sample rate.

Ancestors

  • livekit.agents.stt.stt.RecognizeStream
  • abc.ABC

Methods

def update_options(self,
*,
model: NotGivenOr[STTModels | str] = NOT_GIVEN,
language: NotGivenOr[str] = NOT_GIVEN,
sample_rate: NotGivenOr[int] = NOT_GIVEN,
encoding: NotGivenOr[STTEncoding | str] = NOT_GIVEN,
eou_timeout_ms: NotGivenOr[int] = NOT_GIVEN) ‑> None
Expand source code
def update_options(
    self,
    *,
    model: NotGivenOr[STTModels | str] = NOT_GIVEN,
    language: NotGivenOr[str] = NOT_GIVEN,
    sample_rate: NotGivenOr[int] = NOT_GIVEN,
    encoding: NotGivenOr[STTEncoding | str] = NOT_GIVEN,
    eou_timeout_ms: NotGivenOr[int] = NOT_GIVEN,
) -> None:
    if is_given(model):
        self._opts.model = model
    if is_given(language):
        self._opts.language = language
    if is_given(sample_rate):
        self._opts.sample_rate = sample_rate
    if is_given(encoding):
        self._opts.encoding = encoding
    if is_given(eou_timeout_ms):
        self._opts.eou_timeout_ms = eou_timeout_ms
    self._reconnect_event.set()
class TTS (*,
api_key: str | None = None,
model: TTSModels | str = 'lightning-v3.1',
voice_id: str = 'sophia',
sample_rate: int = 24000,
speed: float = 1.0,
consistency: float = 0.5,
similarity: float = 0,
enhancement: float = 1,
language: str = 'en',
output_format: TTSEncoding | str = 'pcm',
base_url: str = 'https://api.smallest.ai/waves/v1',
http_session: aiohttp.ClientSession | None = None)
Expand source code
class TTS(tts.TTS):
    def __init__(
        self,
        *,
        api_key: str | None = None,
        model: TTSModels | str = "lightning-v3.1",
        voice_id: str = "sophia",
        sample_rate: int = 24000,
        speed: float = 1.0,
        consistency: float = 0.5,
        similarity: float = 0,
        enhancement: float = 1,
        language: str = "en",
        output_format: TTSEncoding | str = "pcm",
        base_url: str = SMALLEST_BASE_URL,
        http_session: aiohttp.ClientSession | None = None,
    ) -> None:
        """
        Create a new instance of Smallest AI Lightning TTS.
        Args:
            api_key: Your Smallest AI API key.
            model: The TTS model to use. Use "lightning-v3.1" (default) for the latest
                model with 80+ voices and ~100ms latency, or "lightning-v2" for the
                previous generation.
            voice_id: The voice ID to use for synthesis.
            sample_rate: Sample rate for the audio output.
            speed: Speed of the speech synthesis.
            consistency: Consistency of the speech synthesis.
            similarity: Similarity of the speech synthesis.
            enhancement: Enhancement level for the speech synthesis.
            language: Language of the text to be synthesized.
            output_format: Output format of the audio.
            base_url: Base URL for the Smallest AI API.
            http_session: An existing aiohttp ClientSession to use.
        """

        super().__init__(
            capabilities=tts.TTSCapabilities(streaming=False),
            sample_rate=sample_rate,
            num_channels=NUM_CHANNELS,
        )

        api_key = api_key or os.environ.get("SMALLEST_API_KEY")
        if not api_key:
            raise ValueError(
                "Smallest.ai API key is required, either as argument or set"
                " SMALLEST_API_KEY environment variable"
            )

        self._opts = _TTSOptions(
            model=model,
            api_key=api_key,
            voice_id=voice_id,
            sample_rate=sample_rate,
            speed=speed,
            consistency=consistency,
            similarity=similarity,
            enhancement=enhancement,
            language=LanguageCode(language),
            output_format=output_format,
            base_url=base_url,
        )
        self._session = http_session

    @property
    def model(self) -> str:
        return self._opts.model

    @property
    def provider(self) -> str:
        return "SmallestAI"

    def _ensure_session(self) -> aiohttp.ClientSession:
        if not self._session:
            self._session = utils.http_context.http_session()

        return self._session

    def update_options(
        self,
        *,
        model: NotGivenOr[TTSModels | str] = NOT_GIVEN,
        voice_id: NotGivenOr[str] = NOT_GIVEN,
        speed: NotGivenOr[float] = NOT_GIVEN,
        sample_rate: NotGivenOr[int] = NOT_GIVEN,
        consistency: NotGivenOr[float] = NOT_GIVEN,
        similarity: NotGivenOr[float] = NOT_GIVEN,
        enhancement: NotGivenOr[float] = NOT_GIVEN,
        language: NotGivenOr[str] = NOT_GIVEN,
        output_format: NotGivenOr[TTSEncoding | str] = NOT_GIVEN,
    ) -> None:
        """Update TTS options."""
        if is_given(model):
            self._opts.model = model
        if is_given(voice_id):
            self._opts.voice_id = voice_id
        if is_given(speed):
            self._opts.speed = speed
        if is_given(sample_rate):
            self._opts.sample_rate = sample_rate
        if is_given(consistency):
            self._opts.consistency = consistency
        if is_given(similarity):
            self._opts.similarity = similarity
        if is_given(enhancement):
            self._opts.enhancement = enhancement
        if is_given(language):
            self._opts.language = LanguageCode(language)
        if is_given(output_format):
            self._opts.output_format = output_format

    def synthesize(
        self,
        text: str,
        *,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    ) -> ChunkedStream:
        return ChunkedStream(
            tts=self,
            input_text=text,
            conn_options=conn_options,
        )

Helper class that provides a standard way to create an ABC using inheritance.

Create a new instance of Smallest AI Lightning TTS.

Args

api_key
Your Smallest AI API key.
model
The TTS model to use. Use "lightning-v3.1" (default) for the latest model with 80+ voices and ~100ms latency, or "lightning-v2" for the previous generation.
voice_id
The voice ID to use for synthesis.
sample_rate
Sample rate for the audio output.
speed
Speed of the speech synthesis.
consistency
Consistency of the speech synthesis.
similarity
Similarity of the speech synthesis.
enhancement
Enhancement level for the speech synthesis.
language
Language of the text to be synthesized.
output_format
Output format of the audio.
base_url
Base URL for the Smallest AI API.
http_session
An existing aiohttp ClientSession to use.

Ancestors

  • livekit.agents.tts.tts.TTS
  • abc.ABC
  • EventEmitter
  • typing.Generic

Instance variables

prop model : str
Expand source code
@property
def model(self) -> str:
    return self._opts.model

Get the model name/identifier for this TTS instance.

Returns

The model name if available, "unknown" otherwise.

Note

Plugins should override this property to provide their model information.

prop provider : str
Expand source code
@property
def provider(self) -> str:
    return "SmallestAI"

Get the provider name/identifier for this TTS instance.

Returns

The provider name if available, "unknown" otherwise.

Note

Plugins should override this property to provide their provider information.

Methods

def synthesize(self,
text: str,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.smallestai.tts.ChunkedStream
Expand source code
def synthesize(
    self,
    text: str,
    *,
    conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
) -> ChunkedStream:
    return ChunkedStream(
        tts=self,
        input_text=text,
        conn_options=conn_options,
    )
def update_options(self,
*,
model: NotGivenOr[TTSModels | str] = NOT_GIVEN,
voice_id: NotGivenOr[str] = NOT_GIVEN,
speed: NotGivenOr[float] = NOT_GIVEN,
sample_rate: NotGivenOr[int] = NOT_GIVEN,
consistency: NotGivenOr[float] = NOT_GIVEN,
similarity: NotGivenOr[float] = NOT_GIVEN,
enhancement: NotGivenOr[float] = NOT_GIVEN,
language: NotGivenOr[str] = NOT_GIVEN,
output_format: NotGivenOr[TTSEncoding | str] = NOT_GIVEN) ‑> None
Expand source code
def update_options(
    self,
    *,
    model: NotGivenOr[TTSModels | str] = NOT_GIVEN,
    voice_id: NotGivenOr[str] = NOT_GIVEN,
    speed: NotGivenOr[float] = NOT_GIVEN,
    sample_rate: NotGivenOr[int] = NOT_GIVEN,
    consistency: NotGivenOr[float] = NOT_GIVEN,
    similarity: NotGivenOr[float] = NOT_GIVEN,
    enhancement: NotGivenOr[float] = NOT_GIVEN,
    language: NotGivenOr[str] = NOT_GIVEN,
    output_format: NotGivenOr[TTSEncoding | str] = NOT_GIVEN,
) -> None:
    """Update TTS options."""
    if is_given(model):
        self._opts.model = model
    if is_given(voice_id):
        self._opts.voice_id = voice_id
    if is_given(speed):
        self._opts.speed = speed
    if is_given(sample_rate):
        self._opts.sample_rate = sample_rate
    if is_given(consistency):
        self._opts.consistency = consistency
    if is_given(similarity):
        self._opts.similarity = similarity
    if is_given(enhancement):
        self._opts.enhancement = enhancement
    if is_given(language):
        self._opts.language = LanguageCode(language)
    if is_given(output_format):
        self._opts.output_format = output_format

Update TTS options.

Inherited members