Module livekit.plugins.smallestai
Smallest AI plugin for LiveKit Agents
See https://docs.livekit.io/agents/integrations/smallestai/ for more information.
Classes
class ChunkedStream (*,
tts: TTS,
input_text: str,
conn_options: APIConnectOptions)-
Expand source code
class ChunkedStream(tts.ChunkedStream): """HTTP-based synthesis — used when synthesize() is called directly.""" def __init__(self, *, tts: TTS, input_text: str, conn_options: APIConnectOptions) -> None: super().__init__(tts=tts, input_text=input_text, conn_options=conn_options) self._tts: TTS = tts self._opts = replace(tts._opts) async def _run(self, output_emitter: tts.AudioEmitter) -> None: try: data = _to_smallest_options(self._opts) data["text"] = self._input_text headers = { "Authorization": f"Bearer {self._opts.api_key}", "Content-Type": "application/json", "X-Source": "livekit", "X-LiveKit-Version": __version__, } async with self._tts._ensure_session().post( f"{self._opts.base_url}/tts", headers=headers, json=data, timeout=aiohttp.ClientTimeout(total=self._conn_options.timeout), ) as resp: if resp.status >= 400: body = await resp.text() raise create_api_error_from_http(body, status=resp.status) output_emitter.initialize( request_id=utils.shortuuid(), sample_rate=self._opts.sample_rate, num_channels=NUM_CHANNELS, mime_type=f"audio/{self._opts.output_format}", ) async for chunk, _ in resp.content.iter_chunks(): output_emitter.push(chunk) output_emitter.flush() except asyncio.TimeoutError: raise APITimeoutError() from None except aiohttp.ClientResponseError as e: raise create_api_error_from_http(e.message, status=e.status) from None except APIStatusError: raise except Exception as e: raise APIConnectionError() from eHTTP-based synthesis — used when synthesize() is called directly.
Ancestors
- livekit.agents.tts.tts.ChunkedStream
- abc.ABC
class STT (*,
model: STTModels | str = 'pulse',
language: str = 'en',
sample_rate: int = 16000,
encoding: STTEncoding | str = 'linear16',
word_timestamps: bool = True,
diarize: bool = False,
eou_timeout_ms: int = 0,
api_key: str | None = None,
http_session: aiohttp.ClientSession | None = None,
base_url: str = 'https://api.smallest.ai/waves/v1')-
Expand source code
class STT(stt.STT): def __init__( self, *, model: STTModels | str = "pulse", language: str = "en", sample_rate: int = 16000, encoding: STTEncoding | str = "linear16", word_timestamps: bool = True, diarize: bool = False, eou_timeout_ms: int = 0, api_key: str | None = None, http_session: aiohttp.ClientSession | None = None, base_url: str = SMALLEST_STT_BASE_URL, ) -> None: """Create a new instance of Smallest AI Pulse STT. Args: model: STT model to use. Currently only "pulse" is available. language: BCP-47 language code (e.g. "en", "hi", "fr"). Use "multi" for automatic language detection across 39 supported languages. sample_rate: Audio sample rate in Hz. Supported: 8000, 16000, 22050, 24000, 44100, 48000. Defaults to 16000. encoding: PCM encoding of the audio stream. Use "linear16" for raw 16-bit PCM (the default and most compatible choice for streaming). word_timestamps: Include per-word start/end timestamps and confidence scores in transcripts. Defaults to True. diarize: Enable speaker diarization. When True, each word includes a speaker ID (integer during streaming). Defaults to False. eou_timeout_ms: Milliseconds of silence before the server considers an utterance complete and emits a final transcript. Set to 0 to disable server-side end-of-utterance detection, which is recommended when using LiveKit's built-in turn detection to minimise latency. Defaults to 0. api_key: Smallest AI API key. Falls back to the SMALLEST_API_KEY environment variable if not provided. http_session: An existing aiohttp ClientSession to reuse. base_url: Override the default API base URL. """ super().__init__( capabilities=stt.STTCapabilities( streaming=True, interim_results=True, diarization=diarize, aligned_transcript="word" if word_timestamps else False, ) ) api_key = api_key or os.environ.get("SMALLEST_API_KEY") if not api_key: raise ValueError( "Smallest AI API key is required, either as argument or set " "SMALLEST_API_KEY environment variable" ) self._opts = _STTOptions( model=model, api_key=api_key, language=language, sample_rate=sample_rate, encoding=encoding, word_timestamps=word_timestamps, diarize=diarize, eou_timeout_ms=eou_timeout_ms, base_url=base_url, ) self._session = http_session self._streams: weakref.WeakSet[SpeechStream] = weakref.WeakSet() @property def model(self) -> str: return self._opts.model @property def provider(self) -> str: return "SmallestAI" def _ensure_session(self) -> aiohttp.ClientSession: if not self._session: self._session = utils.http_context.http_session() return self._session async def _recognize_impl( self, buffer: AudioBuffer, *, language: NotGivenOr[str] = NOT_GIVEN, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> stt.SpeechEvent: config = self._sanitize_options(language=language) params: dict[str, Any] = { "language": config.language, "encoding": config.encoding, "sample_rate": config.sample_rate, "word_timestamps": str(config.word_timestamps).lower(), "diarize": str(config.diarize).lower(), } try: async with self._ensure_session().post( url=f"{config.base_url}/{config.model}/get_text", headers={ "Authorization": f"Bearer {config.api_key}", "Content-Type": "application/octet-stream", "X-Source": "livekit", "X-LiveKit-Version": __version__, }, params=params, # to_wav_bytes() produces a valid WAV file; the server auto-detects format. data=rtc.combine_audio_frames(buffer).to_wav_bytes(), timeout=aiohttp.ClientTimeout( total=30, sock_connect=conn_options.timeout, ), ) as resp: resp.raise_for_status() data = await resp.json() return _batch_transcription_to_speech_event(config.language, data) except asyncio.TimeoutError as e: raise APITimeoutError() from e except aiohttp.ClientResponseError as e: raise APIStatusError( message=e.message, status_code=e.status, request_id=None, body=None, ) from e except Exception as e: raise APIConnectionError() from e def stream( self, *, language: NotGivenOr[str] = NOT_GIVEN, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> SpeechStream: config = self._sanitize_options(language=language) stream = SpeechStream( stt=self, conn_options=conn_options, opts=config, http_session=self._ensure_session(), ) self._streams.add(stream) return stream def update_options( self, *, model: NotGivenOr[STTModels | str] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, encoding: NotGivenOr[STTEncoding | str] = NOT_GIVEN, eou_timeout_ms: NotGivenOr[int] = NOT_GIVEN, ) -> None: """Update STT options; propagates to all active streams (triggers reconnect).""" if is_given(model): self._opts.model = model if is_given(language): self._opts.language = language if is_given(sample_rate): self._opts.sample_rate = sample_rate if is_given(encoding): self._opts.encoding = encoding if is_given(eou_timeout_ms): self._opts.eou_timeout_ms = eou_timeout_ms for stream in self._streams: stream.update_options( model=model, language=language, sample_rate=sample_rate, encoding=encoding, eou_timeout_ms=eou_timeout_ms, ) def _sanitize_options(self, *, language: NotGivenOr[str] = NOT_GIVEN) -> _STTOptions: config = replace(self._opts) if is_given(language): config.language = language return configHelper class that provides a standard way to create an ABC using inheritance.
Create a new instance of Smallest AI Pulse STT.
Args
model- STT model to use. Currently only "pulse" is available.
language- BCP-47 language code (e.g. "en", "hi", "fr"). Use "multi" for automatic language detection across 39 supported languages.
sample_rate- Audio sample rate in Hz. Supported: 8000, 16000, 22050, 24000, 44100, 48000. Defaults to 16000.
encoding- PCM encoding of the audio stream. Use "linear16" for raw 16-bit PCM (the default and most compatible choice for streaming).
word_timestamps- Include per-word start/end timestamps and confidence scores in transcripts. Defaults to True.
diarize- Enable speaker diarization. When True, each word includes a speaker ID (integer during streaming). Defaults to False.
eou_timeout_ms- Milliseconds of silence before the server considers an utterance complete and emits a final transcript. Set to 0 to disable server-side end-of-utterance detection, which is recommended when using LiveKit's built-in turn detection to minimise latency. Defaults to 0.
api_key- Smallest AI API key. Falls back to the SMALLEST_API_KEY environment variable if not provided.
http_session- An existing aiohttp ClientSession to reuse.
base_url- Override the default API base URL.
Ancestors
- livekit.agents.stt.stt.STT
- abc.ABC
- EventEmitter
- typing.Generic
Instance variables
prop model : str-
Expand source code
@property def model(self) -> str: return self._opts.modelGet the model name/identifier for this STT instance.
Returns
The model name if available, "unknown" otherwise.
Note
Plugins should override this property to provide their model information.
prop provider : str-
Expand source code
@property def provider(self) -> str: return "SmallestAI"Get the provider name/identifier for this STT instance.
Returns
The provider name if available, "unknown" otherwise.
Note
Plugins should override this property to provide their provider information.
Methods
def stream(self,
*,
language: NotGivenOr[str] = NOT_GIVEN,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.smallestai.stt.SpeechStream-
Expand source code
def stream( self, *, language: NotGivenOr[str] = NOT_GIVEN, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> SpeechStream: config = self._sanitize_options(language=language) stream = SpeechStream( stt=self, conn_options=conn_options, opts=config, http_session=self._ensure_session(), ) self._streams.add(stream) return stream def update_options(self,
*,
model: NotGivenOr[STTModels | str] = NOT_GIVEN,
language: NotGivenOr[str] = NOT_GIVEN,
sample_rate: NotGivenOr[int] = NOT_GIVEN,
encoding: NotGivenOr[STTEncoding | str] = NOT_GIVEN,
eou_timeout_ms: NotGivenOr[int] = NOT_GIVEN) ‑> None-
Expand source code
def update_options( self, *, model: NotGivenOr[STTModels | str] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, encoding: NotGivenOr[STTEncoding | str] = NOT_GIVEN, eou_timeout_ms: NotGivenOr[int] = NOT_GIVEN, ) -> None: """Update STT options; propagates to all active streams (triggers reconnect).""" if is_given(model): self._opts.model = model if is_given(language): self._opts.language = language if is_given(sample_rate): self._opts.sample_rate = sample_rate if is_given(encoding): self._opts.encoding = encoding if is_given(eou_timeout_ms): self._opts.eou_timeout_ms = eou_timeout_ms for stream in self._streams: stream.update_options( model=model, language=language, sample_rate=sample_rate, encoding=encoding, eou_timeout_ms=eou_timeout_ms, )Update STT options; propagates to all active streams (triggers reconnect).
Inherited members
class SpeechStream (*,
stt: STT,
opts: _STTOptions,
conn_options: APIConnectOptions,
http_session: aiohttp.ClientSession)-
Expand source code
class SpeechStream(stt.SpeechStream): # Signals end of stream: server flushes remaining audio, emits final transcripts, # and responds with is_last=True before closing the session. # Use {"type": "finalize"} mid-session to force is_final without closing. _CLOSE_STREAM_MSG: str = json.dumps({"type": "close_stream"}) def __init__( self, *, stt: STT, opts: _STTOptions, conn_options: APIConnectOptions, http_session: aiohttp.ClientSession, ) -> None: super().__init__(stt=stt, conn_options=conn_options, sample_rate=opts.sample_rate) self._opts = opts self._session = http_session self._speaking = False self._session_id = "" self._reconnect_event = asyncio.Event() self._audio_duration_collector = _PeriodicCollector( callback=self._on_audio_duration_report, duration=5.0, ) def update_options( self, *, model: NotGivenOr[STTModels | str] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, encoding: NotGivenOr[STTEncoding | str] = NOT_GIVEN, eou_timeout_ms: NotGivenOr[int] = NOT_GIVEN, ) -> None: if is_given(model): self._opts.model = model if is_given(language): self._opts.language = language if is_given(sample_rate): self._opts.sample_rate = sample_rate if is_given(encoding): self._opts.encoding = encoding if is_given(eou_timeout_ms): self._opts.eou_timeout_ms = eou_timeout_ms self._reconnect_event.set() async def _run(self) -> None: closing_ws = False @utils.log_exceptions(logger=logger) async def send_task(ws: aiohttp.ClientWebSocketResponse) -> None: nonlocal closing_ws # Send audio in 50ms chunks; matches the 50–100ms guidance from Smallest AI docs. samples_per_chunk = self._opts.sample_rate // 20 audio_bstream = utils.audio.AudioByteStream( sample_rate=self._opts.sample_rate, num_channels=NUM_CHANNELS, samples_per_channel=samples_per_chunk, ) async for data in self._input_ch: if isinstance(data, rtc.AudioFrame): for frame in audio_bstream.write(data.data.tobytes()): self._audio_duration_collector.push(frame.duration) await ws.send_bytes(frame.data.tobytes()) elif isinstance(data, self._FlushSentinel): # User paused: drain the accumulator so the server gets all buffered # audio. The server's eou_timeout_ms will then detect the silence and # emit a final transcript — no explicit flush message is needed. for frame in audio_bstream.flush(): self._audio_duration_collector.push(frame.duration) await ws.send_bytes(frame.data.tobytes()) self._audio_duration_collector.flush() # Input channel closed: close the stream so the server flushes remaining # audio, emits final transcripts, and sends is_last=True. closing_ws = True await ws.send_str(SpeechStream._CLOSE_STREAM_MSG) @utils.log_exceptions(logger=logger) async def recv_task(ws: aiohttp.ClientWebSocketResponse) -> None: nonlocal closing_ws while True: msg = await ws.receive() if msg.type in ( aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE, aiohttp.WSMsgType.CLOSING, ): if closing_ws or self._session.closed: return raise APIStatusError( message="Smallest AI STT connection closed unexpectedly", status_code=ws.close_code or -1, body=f"{msg.data=} {msg.extra=}", ) if msg.type != aiohttp.WSMsgType.TEXT: logger.warning("unexpected Smallest AI STT message type: %s", msg.type) continue try: data = json.loads(msg.data) except json.JSONDecodeError: logger.warning("failed to parse Smallest AI STT message: %s", msg.data) continue self._process_stream_event(data) # Server confirms the session is fully flushed; recv loop can exit. if data.get("is_last"): return ws: aiohttp.ClientWebSocketResponse | None = None while True: try: ws = await self._connect_ws() tasks = [ asyncio.create_task(send_task(ws)), asyncio.create_task(recv_task(ws)), ] tasks_group = asyncio.gather(*tasks) wait_reconnect_task = asyncio.create_task(self._reconnect_event.wait()) try: done, _ = await asyncio.wait( (tasks_group, wait_reconnect_task), return_when=asyncio.FIRST_COMPLETED, ) for task in done: if task != wait_reconnect_task: task.result() if wait_reconnect_task not in done: break self._reconnect_event.clear() finally: await utils.aio.gracefully_cancel(*tasks, wait_reconnect_task) tasks_group.cancel() tasks_group.exception() finally: if ws is not None: await ws.close() async def _connect_ws(self) -> aiohttp.ClientWebSocketResponse: params: dict[str, Any] = { "language": self._opts.language, "encoding": self._opts.encoding, "sample_rate": self._opts.sample_rate, "word_timestamps": str(self._opts.word_timestamps).lower(), "diarize": str(self._opts.diarize).lower(), } # Only send eou_timeout_ms when explicitly set (non-zero). # When 0, omit the parameter and let the server use its default, # which avoids adding server-side silence latency on top of LiveKit's # own end-of-turn detection. if self._opts.eou_timeout_ms > 0: params["eou_timeout_ms"] = self._opts.eou_timeout_ms ws_url = ( self._opts.base_url.replace("https://", "wss://", 1).replace("http://", "ws://", 1) + f"/{self._opts.model}/get_text" + f"?{urlencode(params)}" ) t0 = time.perf_counter() try: # heartbeat sends standard WebSocket ping frames every 5s, which is sufficient # to keep the Smallest AI connection alive without a custom JSON message. ws = await asyncio.wait_for( self._session.ws_connect( ws_url, headers={ "Authorization": f"Bearer {self._opts.api_key}", "X-Source": "livekit", "X-LiveKit-Version": __version__, }, heartbeat=5.0, ), self._conn_options.timeout, ) self._report_connection_acquired(time.perf_counter() - t0, False) logger.debug("established Smallest AI STT WebSocket connection") except (aiohttp.ClientConnectorError, asyncio.TimeoutError) as e: raise APIConnectionError("failed to connect to Smallest AI STT") from e return ws def _on_audio_duration_report(self, duration: float) -> None: self._event_ch.send_nowait( stt.SpeechEvent( type=stt.SpeechEventType.RECOGNITION_USAGE, request_id=self._session_id, alternatives=[], recognition_usage=stt.RecognitionUsage(audio_duration=duration), ) ) def _process_stream_event(self, data: dict[str, Any]) -> None: # Streaming WebSocket response schema (Smallest AI Pulse API): # { # "session_id": str, # "transcript": str, # partial or final text for this utterance # "is_final": bool, # True when the utterance is complete # "is_last": bool, # True when the session itself is done (after close_stream) # "language": str, # present when is_final=True (detected or echoed) # "words": [ # present when word_timestamps=True # {"word": str, "start": float, "end": float, # "confidence": float, "speaker": int} # speaker only when diarize=True # ] # } session_id = data.get("session_id", "") if session_id: self._session_id = session_id transcript = data.get("transcript", "") is_final = data.get("is_final", False) if not transcript: return # Infer START_OF_SPEECH — the Pulse API does not emit a dedicated speech-start event. if not self._speaking: self._speaking = True self._event_ch.send_nowait(stt.SpeechEvent(type=stt.SpeechEventType.START_OF_SPEECH)) alts = _transcript_to_speech_data( language=self._opts.language, data=data, start_time_offset=self.start_time_offset, diarize=self._opts.diarize, ) if is_final: self._event_ch.send_nowait( stt.SpeechEvent( type=stt.SpeechEventType.FINAL_TRANSCRIPT, request_id=self._session_id, alternatives=alts, ) ) if self._speaking: self._speaking = False self._event_ch.send_nowait(stt.SpeechEvent(type=stt.SpeechEventType.END_OF_SPEECH)) else: self._event_ch.send_nowait( stt.SpeechEvent( type=stt.SpeechEventType.INTERIM_TRANSCRIPT, request_id=self._session_id, alternatives=alts, ) )Helper class that provides a standard way to create an ABC using inheritance.
Args: sample_rate : int or None, optional The desired sample rate for the audio input. If specified, the audio input will be automatically resampled to match the given sample rate before being processed for Speech-to-Text. If not provided (None), the input will retain its original sample rate.
Ancestors
- livekit.agents.stt.stt.RecognizeStream
- abc.ABC
Methods
def update_options(self,
*,
model: NotGivenOr[STTModels | str] = NOT_GIVEN,
language: NotGivenOr[str] = NOT_GIVEN,
sample_rate: NotGivenOr[int] = NOT_GIVEN,
encoding: NotGivenOr[STTEncoding | str] = NOT_GIVEN,
eou_timeout_ms: NotGivenOr[int] = NOT_GIVEN) ‑> None-
Expand source code
def update_options( self, *, model: NotGivenOr[STTModels | str] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, encoding: NotGivenOr[STTEncoding | str] = NOT_GIVEN, eou_timeout_ms: NotGivenOr[int] = NOT_GIVEN, ) -> None: if is_given(model): self._opts.model = model if is_given(language): self._opts.language = language if is_given(sample_rate): self._opts.sample_rate = sample_rate if is_given(encoding): self._opts.encoding = encoding if is_given(eou_timeout_ms): self._opts.eou_timeout_ms = eou_timeout_ms self._reconnect_event.set()
class TTS (*,
api_key: str | None = None,
model: TTSModels | str = 'lightning_v3.1_pro',
voice_id: str | None = None,
sample_rate: int = 24000,
speed: float = 1.0,
language: str = 'en',
output_format: TTSEncoding | str = 'pcm',
base_url: str = 'https://api.smallest.ai/waves/v1',
ws_url: str = 'wss://api.smallest.ai/waves/v1/tts/live',
http_session: aiohttp.ClientSession | None = None)-
Expand source code
class TTS(tts.TTS): def __init__( self, *, api_key: str | None = None, model: TTSModels | str = "lightning_v3.1_pro", voice_id: str | None = None, sample_rate: int = 24000, speed: float = 1.0, language: str = "en", output_format: TTSEncoding | str = "pcm", base_url: str = SMALLEST_BASE_URL, ws_url: str = SMALLEST_WS_URL, http_session: aiohttp.ClientSession | None = None, ) -> None: """ Create a new instance of Smallest AI Lightning TTS. Args: api_key: Your Smallest AI API key. model: The TTS model to use. Use "lightning_v3.1" for the standard model with 217 voices across 12 languages, or "lightning_v3.1_pro" (default) for the premium pool with curated American, British, and Indian voices at 44.1 kHz. voice_id: The voice ID to use for synthesis. Defaults to "meher" for "lightning_v3.1_pro" and "sophia" for all other models. Pro voices must be paired with "lightning_v3.1_pro"; standard voices with "lightning_v3.1". sample_rate: Sample rate for the audio output. Both models are natively 44.1 kHz; supported rates are 8000, 16000, 24000, and 44100. speed: Speed of the speech synthesis (0.5–2.0). language: Language of the text to be synthesized. Use "auto" for automatic detection and code-switching. Pro supports "en", "hi", and "auto" only. output_format: Output format for HTTP synthesize() calls ("pcm", "mp3", "wav", "ulaw", "alaw"). WebSocket streaming always returns PCM. base_url: Base URL for the Smallest AI HTTP API. ws_url: WebSocket URL for low-latency streaming synthesis. http_session: An existing aiohttp ClientSession to use. """ super().__init__( capabilities=tts.TTSCapabilities(streaming=True), sample_rate=sample_rate, num_channels=NUM_CHANNELS, ) api_key = api_key or os.environ.get("SMALLEST_API_KEY") if not api_key: raise ValueError( "Smallest.ai API key is required, either as argument or set" " SMALLEST_API_KEY environment variable" ) if voice_id is None: voice_id = "meher" if model == "lightning_v3.1_pro" else "sophia" self._opts = _TTSOptions( model=model, api_key=api_key, voice_id=voice_id, sample_rate=sample_rate, speed=speed, language=LanguageCode(language), output_format=output_format, base_url=base_url, ws_url=ws_url, ) self._session = http_session self._pool = utils.ConnectionPool[aiohttp.ClientWebSocketResponse]( connect_cb=self._connect_ws, close_cb=self._close_ws, max_session_duration=3600, mark_refreshed_on_get=False, ) @property def model(self) -> str: return self._opts.model @property def provider(self) -> str: return "SmallestAI" def _ensure_session(self) -> aiohttp.ClientSession: if not self._session: self._session = utils.http_context.http_session() return self._session async def _connect_ws(self, timeout: float) -> aiohttp.ClientWebSocketResponse: return await asyncio.wait_for( self._ensure_session().ws_connect( self._opts.ws_url, headers={ "Authorization": f"Bearer {self._opts.api_key}", "X-Source": "livekit", "X-LiveKit-Version": __version__, }, ), timeout, ) async def _close_ws(self, ws: aiohttp.ClientWebSocketResponse) -> None: await ws.close() def update_options( self, *, model: NotGivenOr[TTSModels | str] = NOT_GIVEN, voice_id: NotGivenOr[str] = NOT_GIVEN, speed: NotGivenOr[float] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, output_format: NotGivenOr[TTSEncoding | str] = NOT_GIVEN, ) -> None: """Update TTS options.""" if is_given(model): self._opts.model = model if is_given(voice_id): self._opts.voice_id = voice_id if is_given(speed): self._opts.speed = speed if is_given(sample_rate): self._opts.sample_rate = sample_rate if is_given(language): self._opts.language = LanguageCode(language) if is_given(output_format): self._opts.output_format = output_format def synthesize( self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> ChunkedStream: return ChunkedStream(tts=self, input_text=text, conn_options=conn_options) def stream( self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> SynthesizeStream: return SynthesizeStream(tts=self, conn_options=conn_options) def prewarm(self) -> None: self._pool.prewarm() async def aclose(self) -> None: await self._pool.aclose()Helper class that provides a standard way to create an ABC using inheritance.
Create a new instance of Smallest AI Lightning TTS.
Args
api_key- Your Smallest AI API key.
model- The TTS model to use. Use "lightning_v3.1" for the standard model with 217 voices across 12 languages, or "lightning_v3.1_pro" (default) for the premium pool with curated American, British, and Indian voices at 44.1 kHz.
voice_id- The voice ID to use for synthesis. Defaults to "meher" for "lightning_v3.1_pro" and "sophia" for all other models. Pro voices must be paired with "lightning_v3.1_pro"; standard voices with "lightning_v3.1".
sample_rate- Sample rate for the audio output. Both models are natively 44.1 kHz; supported rates are 8000, 16000, 24000, and 44100.
speed- Speed of the speech synthesis (0.5–2.0).
language- Language of the text to be synthesized. Use "auto" for automatic detection and code-switching. Pro supports "en", "hi", and "auto" only.
output_format- Output format for HTTP synthesize() calls ("pcm", "mp3", "wav", "ulaw", "alaw"). WebSocket streaming always returns PCM.
base_url- Base URL for the Smallest AI HTTP API.
ws_url- WebSocket URL for low-latency streaming synthesis.
http_session- An existing aiohttp ClientSession to use.
Ancestors
- livekit.agents.tts.tts.TTS
- abc.ABC
- EventEmitter
- typing.Generic
Instance variables
prop model : str-
Expand source code
@property def model(self) -> str: return self._opts.modelGet the model name/identifier for this TTS instance.
Returns
The model name if available, "unknown" otherwise.
Note
Plugins should override this property to provide their model information.
prop provider : str-
Expand source code
@property def provider(self) -> str: return "SmallestAI"Get the provider name/identifier for this TTS instance.
Returns
The provider name if available, "unknown" otherwise.
Note
Plugins should override this property to provide their provider information.
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: await self._pool.aclose() def prewarm(self) ‑> None-
Expand source code
def prewarm(self) -> None: self._pool.prewarm()Pre-warm connection to the TTS service
def stream(self,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.smallestai.tts.SynthesizeStream-
Expand source code
def stream( self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> SynthesizeStream: return SynthesizeStream(tts=self, conn_options=conn_options) def synthesize(self,
text: str,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.smallestai.tts.ChunkedStream-
Expand source code
def synthesize( self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> ChunkedStream: return ChunkedStream(tts=self, input_text=text, conn_options=conn_options) def update_options(self,
*,
model: NotGivenOr[TTSModels | str] = NOT_GIVEN,
voice_id: NotGivenOr[str] = NOT_GIVEN,
speed: NotGivenOr[float] = NOT_GIVEN,
sample_rate: NotGivenOr[int] = NOT_GIVEN,
language: NotGivenOr[str] = NOT_GIVEN,
output_format: NotGivenOr[TTSEncoding | str] = NOT_GIVEN) ‑> None-
Expand source code
def update_options( self, *, model: NotGivenOr[TTSModels | str] = NOT_GIVEN, voice_id: NotGivenOr[str] = NOT_GIVEN, speed: NotGivenOr[float] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, output_format: NotGivenOr[TTSEncoding | str] = NOT_GIVEN, ) -> None: """Update TTS options.""" if is_given(model): self._opts.model = model if is_given(voice_id): self._opts.voice_id = voice_id if is_given(speed): self._opts.speed = speed if is_given(sample_rate): self._opts.sample_rate = sample_rate if is_given(language): self._opts.language = LanguageCode(language) if is_given(output_format): self._opts.output_format = output_formatUpdate TTS options.
Inherited members