Module livekit.plugins.soniox

Soniox plugin for LiveKit Agents

See https://docs.livekit.io/agents/integrations/stt/soniox/ for more information.

Classes

class ContextGeneralItem (key: str, value: str)
Expand source code
@dataclass
class ContextGeneralItem:
    key: str
    value: str

ContextGeneralItem(key: 'str', value: 'str')

Instance variables

var key : str
var value : str
class ContextObject (general: list[ContextGeneralItem] | None = None,
text: str | None = None,
terms: list[str] | None = None,
translation_terms: list[ContextTranslationTerm] | None = None)
Expand source code
@dataclass
class ContextObject:
    """Context object for models with context_version 2, for Soniox stt-rt-v3-preview and higher.

    Learn more about context in the documentation:
    https://soniox.com/docs/stt/concepts/context
    """

    general: list[ContextGeneralItem] | None = None
    text: str | None = None
    terms: list[str] | None = None
    translation_terms: list[ContextTranslationTerm] | None = None

Context object for models with context_version 2, for Soniox stt-rt-v3-preview and higher.

Learn more about context in the documentation: https://soniox.com/docs/stt/concepts/context

Instance variables

var general : list[livekit.plugins.soniox.stt.ContextGeneralItem] | None
var terms : list[str] | None
var text : str | None
var translation_terms : list[livekit.plugins.soniox.stt.ContextTranslationTerm] | None
class ContextTranslationTerm (source: str, target: str)
Expand source code
@dataclass
class ContextTranslationTerm:
    source: str
    target: str

ContextTranslationTerm(source: 'str', target: 'str')

Instance variables

var source : str
var target : str
class STT (*,
api_key: str | None = None,
base_url: str = 'wss://stt-rt.soniox.com/transcribe-websocket',
http_session: aiohttp.ClientSession | None = None,
params: STTOptions | None = None)
Expand source code
class STT(stt.STT):
    """Speech-to-Text service using Soniox Speech-to-Text API.

    This service connects to Soniox Speech-to-Text API for real-time transcription
    with support for multiple languages, custom context, speaker diarization,
    and more.

    For complete API documentation, see: https://soniox.com/docs/stt/api-reference/websocket-api
    """

    def __init__(
        self,
        *,
        api_key: str | None = None,
        base_url: str = BASE_URL,
        http_session: aiohttp.ClientSession | None = None,
        params: STTOptions | None = None,
    ):
        """Initialize instance of Soniox Speech-to-Text API service.

        Args:
            api_key: Soniox API key, if not provided, will look for SONIOX_API_KEY env variable.
            base_url: Base URL for Soniox Speech-to-Text API, default to BASE_URL defined in this
                module.
            http_session: Optional aiohttp.ClientSession to use for requests.
            params: Additional configuration parameters, such as model, language hints, context and
                speaker diarization.
        """
        params = params or STTOptions()
        super().__init__(
            capabilities=stt.STTCapabilities(
                streaming=True,
                interim_results=True,
                aligned_transcript="chunk",
                offline_recognize=False,
                diarization=params.enable_speaker_diarization,
            )
        )

        self._api_key = api_key or os.getenv("SONIOX_API_KEY")
        if not self._api_key:
            raise ValueError("Soniox API key is required. Set SONIOX_API_KEY or pass api_key")
        self._base_url = base_url
        self._http_session = http_session
        self._params = params

    @property
    def model(self) -> str:
        return self._params.model

    @property
    def provider(self) -> str:
        return "Soniox"

    async def _recognize_impl(
        self,
        buffer: utils.AudioBuffer,
        *,
        language: NotGivenOr[str] = NOT_GIVEN,
        conn_options: APIConnectOptions,
    ) -> stt.SpeechEvent:
        """Raise error since single-frame recognition is not supported
        by Soniox Speech-to-Text API."""
        raise NotImplementedError(
            "Soniox Speech-to-Text API does not support single frame recognition"
        )

    def stream(
        self,
        *,
        language: NotGivenOr[str] = NOT_GIVEN,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    ) -> SpeechStream:
        """Return a new LiveKit streaming speech-to-text session."""
        return SpeechStream(
            stt=self,
            conn_options=conn_options,
        )

Speech-to-Text service using Soniox Speech-to-Text API.

This service connects to Soniox Speech-to-Text API for real-time transcription with support for multiple languages, custom context, speaker diarization, and more.

For complete API documentation, see: https://soniox.com/docs/stt/api-reference/websocket-api

Initialize instance of Soniox Speech-to-Text API service.

Args

api_key
Soniox API key, if not provided, will look for SONIOX_API_KEY env variable.
base_url
Base URL for Soniox Speech-to-Text API, default to BASE_URL defined in this module.
http_session
Optional aiohttp.ClientSession to use for requests.
params
Additional configuration parameters, such as model, language hints, context and speaker diarization.

Ancestors

  • livekit.agents.stt.stt.STT
  • abc.ABC
  • EventEmitter
  • typing.Generic

Instance variables

prop model : str
Expand source code
@property
def model(self) -> str:
    return self._params.model

Get the model name/identifier for this STT instance.

Returns

The model name if available, "unknown" otherwise.

Note

Plugins should override this property to provide their model information.

prop provider : str
Expand source code
@property
def provider(self) -> str:
    return "Soniox"

Get the provider name/identifier for this STT instance.

Returns

The provider name if available, "unknown" otherwise.

Note

Plugins should override this property to provide their provider information.

Methods

def stream(self,
*,
language: NotGivenOr[str] = NOT_GIVEN,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.soniox.stt.SpeechStream
Expand source code
def stream(
    self,
    *,
    language: NotGivenOr[str] = NOT_GIVEN,
    conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
) -> SpeechStream:
    """Return a new LiveKit streaming speech-to-text session."""
    return SpeechStream(
        stt=self,
        conn_options=conn_options,
    )

Return a new LiveKit streaming speech-to-text session.

Inherited members

class STTOptions (model: str = 'stt-rt-v4',
language_hints: list[str] | None = None,
language_hints_strict: bool = False,
context: ContextObject | str | None = None,
num_channels: int = 1,
sample_rate: int = 16000,
enable_speaker_diarization: bool = False,
enable_language_identification: bool = True,
max_endpoint_delay_ms: int = 500,
client_reference_id: str | None = None,
translation: TranslationConfig | None = None)
Expand source code
@dataclass
class STTOptions:
    """Configuration options for Soniox Speech-to-Text service."""

    model: str = "stt-rt-v4"

    language_hints: list[str] | None = None
    language_hints_strict: bool = False
    context: ContextObject | str | None = None

    num_channels: int = 1
    sample_rate: int = 16000

    enable_speaker_diarization: bool = False
    enable_language_identification: bool = True

    max_endpoint_delay_ms: int = 500
    """Maximum delay in milliseconds between speech cessation and endpoint detection.
    Range: 500–3000.
    See: https://soniox.com/docs/stt/rt/endpoint-detection"""

    client_reference_id: str | None = None
    translation: TranslationConfig | None = None

    def __post_init__(self) -> None:
        if not (500 <= self.max_endpoint_delay_ms <= 3000):
            raise ValueError("max_endpoint_delay_ms must be between 500 and 3000")

Configuration options for Soniox Speech-to-Text service.

Instance variables

var client_reference_id : str | None
var context : livekit.plugins.soniox.stt.ContextObject | str | None
var enable_language_identification : bool
var enable_speaker_diarization : bool
var language_hints : list[str] | None
var language_hints_strict : bool
var max_endpoint_delay_ms : int

Maximum delay in milliseconds between speech cessation and endpoint detection. Range: 500–3000. See: https://soniox.com/docs/stt/rt/endpoint-detection

var model : str
var num_channels : int
var sample_rate : int
var translation : livekit.plugins.soniox.stt.TranslationConfig | None
class SynthesizeStream (*,
tts: TTS,
conn_options: APIConnectOptions)
Expand source code
class SynthesizeStream(tts.SynthesizeStream):
    """Streaming TTS implementation on a shared _Connection."""

    def __init__(self, *, tts: TTS, conn_options: APIConnectOptions):
        super().__init__(tts=tts, conn_options=conn_options)
        self._tts: TTS = tts
        self._opts = replace(tts._opts)
        self._stream_id: str = ""
        self._connection: _Connection | None = None
        self._cancelled = asyncio.Event()

    async def aclose(self) -> None:
        """Close the stream, signalling cancel to the server if still active.

        Cancelling only affects this stream's ``stream_id``; the underlying
        WebSocket stays alive for subsequent streams.
        """
        if self._cancelled.is_set():
            await super().aclose()
            return

        self._cancelled.set()
        if self._connection is not None and not self._connection.closed and self._stream_id:
            self._connection.cancel_stream(self._stream_id)
            logger.debug(
                "Sent cancellation for Soniox TTS stream",
                extra={"stream_id": self._stream_id},
            )

        await super().aclose()

    async def _run(self, output_emitter: tts.AudioEmitter) -> None:
        """Register with the connection, stream text, await the completion future."""
        request_id = utils.shortuuid()
        self._stream_id = utils.shortuuid()

        output_emitter.initialize(
            request_id=request_id,
            sample_rate=self._opts.sample_rate,
            num_channels=NUM_CHANNELS,
            mime_type=_audio_format_to_mime_type(self._opts.audio_format),
            stream=True,
        )
        output_emitter.start_segment(segment_id=utils.shortuuid())

        try:
            (
                connection,
                self._acquire_time,
                self._connection_reused,
            ) = await self._tts._current_connection(timeout=self._conn_options.timeout)
        except asyncio.TimeoutError:
            raise APITimeoutError() from None
        except aiohttp.ClientResponseError as e:
            raise APIStatusError(
                message=e.message, status_code=e.status, request_id=request_id, body=None
            ) from None
        except Exception as e:
            raise APIConnectionError() from e

        self._connection = connection

        waiter: asyncio.Future[None] = asyncio.get_event_loop().create_future()
        connection.register_stream(self._stream_id, output_emitter, waiter, opts=self._opts)

        async def _input_task() -> None:
            async for data in self._input_ch:
                if self._cancelled.is_set():
                    break
                if isinstance(data, self._FlushSentinel):
                    continue
                self._mark_started()
                connection.send_text(self._stream_id, data, text_end=False)

            if not self._cancelled.is_set():
                connection.send_text(self._stream_id, "", text_end=True)

        input_t = asyncio.create_task(_input_task(), name="soniox-tts-stream-input")

        try:
            await waiter
        except APIStatusError:
            raise
        except Exception as e:
            raise APIConnectionError() from e
        finally:
            output_emitter.end_segment()
            await utils.aio.gracefully_cancel(input_t)
            connection.unregister_stream(self._stream_id)

Streaming TTS implementation on a shared _Connection.

Ancestors

  • livekit.agents.tts.tts.SynthesizeStream
  • abc.ABC

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    """Close the stream, signalling cancel to the server if still active.

    Cancelling only affects this stream's ``stream_id``; the underlying
    WebSocket stays alive for subsequent streams.
    """
    if self._cancelled.is_set():
        await super().aclose()
        return

    self._cancelled.set()
    if self._connection is not None and not self._connection.closed and self._stream_id:
        self._connection.cancel_stream(self._stream_id)
        logger.debug(
            "Sent cancellation for Soniox TTS stream",
            extra={"stream_id": self._stream_id},
        )

    await super().aclose()

Close the stream, signalling cancel to the server if still active.

Cancelling only affects this stream's stream_id; the underlying WebSocket stays alive for subsequent streams.

class TTS (*,
model: str = 'tts-rt-v1-preview',
language: str = 'en',
voice: str = 'Maya',
audio_format: str = 'pcm_s16le',
sample_rate: int = 24000,
bitrate: int | None = None,
api_key: str | None = None,
websocket_url: str = 'wss://tts-rt.soniox.com/tts-websocket',
http_session: aiohttp.ClientSession | None = None)
Expand source code
class TTS(tts.TTS):
    """Text-to-Speech service using Soniox Text-to-Speech API.

    This service connects to Soniox Text-to-Speech API for real-time speech synthesis
    with support for multiple languages, voices, and audio formats.

    For complete API documentation, see: https://soniox.com/docs/api-reference/tts/websocket-api
    """

    def __init__(
        self,
        *,
        model: str = DEFAULT_MODEL,
        language: str = DEFAULT_LANGUAGE,
        voice: str = DEFAULT_VOICE,
        audio_format: str = DEFAULT_AUDIO_FORMAT,
        sample_rate: int = DEFAULT_SAMPLE_RATE,
        bitrate: int | None = None,
        api_key: str | None = None,
        websocket_url: str = WEBSOCKET_URL,
        http_session: aiohttp.ClientSession | None = None,
    ) -> None:
        """Initialize instance of Soniox Text-to-Speech API service.

        Args:
            model (str): Soniox TTS model to use. Defaults to "tts-rt-v1-preview".
            language (str): Language code (e.g., "en", "es", "fr"). Defaults to "en".
            voice (str): Voice name (e.g., "Maya", "Adrian"). Defaults to "Maya".
            audio_format (str): Audio format (e.g., "pcm_s16le", "mp3"). Defaults to "pcm_s16le".
            sample_rate (int): Sample rate in Hz. Required for raw audio formats. Defaults to 24000.
            bitrate (int): Codec bitrate in bps for compressed formats. Optional.
            api_key (str): Soniox API key. If not provided, will look for SONIOX_API_KEY env variable.
            websocket_url (str): Base WebSocket URL for Soniox TTS API.
            http_session (aiohttp.ClientSession): Optional aiohttp.ClientSession to use for requests.
        """
        super().__init__(
            capabilities=tts.TTSCapabilities(streaming=True),
            sample_rate=sample_rate,
            num_channels=NUM_CHANNELS,
        )

        api_key = api_key or os.environ.get("SONIOX_API_KEY")
        if not api_key:
            raise ValueError("Soniox API key is required. Set SONIOX_API_KEY or provide api_key.")

        self._opts = _TTSOptions(
            model=model,
            language=language,
            voice=voice,
            audio_format=audio_format,
            sample_rate=sample_rate,
            bitrate=bitrate,
            websocket_url=websocket_url,
            api_key=api_key,
        )
        self._session = http_session
        self._streams = weakref.WeakSet[SynthesizeStream]()

        # One persistent connection shared across streams (see _Connection).
        self.__current_connection: _Connection | None = None
        self.__conn_lock = asyncio.Lock()

    @property
    def model(self) -> str:
        return self._opts.model

    @property
    def provider(self) -> str:
        return "Soniox"

    def _ensure_session(self) -> aiohttp.ClientSession:
        if not self._session:
            self._session = utils.http_context.http_session()
        return self._session

    async def _current_connection(self, *, timeout: float) -> tuple[_Connection, float, bool]:
        """Return the live connection, creating one if needed.

        Returns ``(connection, acquire_time, reused)`` — matches the
        ``ConnectionPool`` surface used by other plugins' metrics.
        """
        async with self.__conn_lock:
            conn = self.__current_connection
            if conn is not None and conn.is_current and not conn.closed:
                return conn, 0.0, True

            # Discard any stale connection reference so it can drain and close
            # itself once any outstanding streams finish.
            if conn is not None and not conn.closed:
                conn.mark_non_current()

            t0 = time.perf_counter()
            new_conn = _Connection(self._opts, self._ensure_session())
            await asyncio.wait_for(new_conn.connect(), timeout=timeout)
            self.__current_connection = new_conn
            return new_conn, time.perf_counter() - t0, False

    def update_options(
        self,
        *,
        model: NotGivenOr[str] = NOT_GIVEN,
        language: NotGivenOr[str] = NOT_GIVEN,
        voice: NotGivenOr[str] = NOT_GIVEN,
    ) -> None:
        """
        Args:
            model: TTS model to use.
            language: Language code to use.
            voice: Voice to use.
        """
        if is_given(model):
            self._opts.model = model
        if is_given(language):
            self._opts.language = language
        if is_given(voice):
            self._opts.voice = voice

    def synthesize(
        self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS
    ) -> tts.ChunkedStream:
        return self._synthesize_with_stream(text, conn_options=conn_options)

    def stream(
        self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS
    ) -> SynthesizeStream:
        """Create a streaming TTS session."""
        stream = SynthesizeStream(tts=self, conn_options=conn_options)
        self._streams.add(stream)
        return stream

    def prewarm(self) -> None:
        """Pre-warm the persistent connection in the background."""

        async def _task() -> None:
            try:
                await self._current_connection(timeout=20.0)
            except Exception as e:
                logger.debug(f"Soniox TTS prewarm failed: {e}")

        try:
            asyncio.create_task(_task(), name="soniox-tts-prewarm")
        except RuntimeError:
            # No running event loop (e.g. called outside async context) — skip.
            pass

    async def aclose(self) -> None:
        """Close all streams and the persistent connection."""
        for stream in list(self._streams):
            await stream.aclose()
        self._streams.clear()

        if self.__current_connection is not None:
            await self.__current_connection.aclose()
            self.__current_connection = None

Text-to-Speech service using Soniox Text-to-Speech API.

This service connects to Soniox Text-to-Speech API for real-time speech synthesis with support for multiple languages, voices, and audio formats.

For complete API documentation, see: https://soniox.com/docs/api-reference/tts/websocket-api

Initialize instance of Soniox Text-to-Speech API service.

Args

model : str
Soniox TTS model to use. Defaults to "tts-rt-v1-preview".
language : str
Language code (e.g., "en", "es", "fr"). Defaults to "en".
voice : str
Voice name (e.g., "Maya", "Adrian"). Defaults to "Maya".
audio_format : str
Audio format (e.g., "pcm_s16le", "mp3"). Defaults to "pcm_s16le".
sample_rate : int
Sample rate in Hz. Required for raw audio formats. Defaults to 24000.
bitrate : int
Codec bitrate in bps for compressed formats. Optional.
api_key : str
Soniox API key. If not provided, will look for SONIOX_API_KEY env variable.
websocket_url : str
Base WebSocket URL for Soniox TTS API.
http_session : aiohttp.ClientSession
Optional aiohttp.ClientSession to use for requests.

Ancestors

  • livekit.agents.tts.tts.TTS
  • abc.ABC
  • EventEmitter
  • typing.Generic

Instance variables

prop model : str
Expand source code
@property
def model(self) -> str:
    return self._opts.model

Get the model name/identifier for this TTS instance.

Returns

The model name if available, "unknown" otherwise.

Note

Plugins should override this property to provide their model information.

prop provider : str
Expand source code
@property
def provider(self) -> str:
    return "Soniox"

Get the provider name/identifier for this TTS instance.

Returns

The provider name if available, "unknown" otherwise.

Note

Plugins should override this property to provide their provider information.

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    """Close all streams and the persistent connection."""
    for stream in list(self._streams):
        await stream.aclose()
    self._streams.clear()

    if self.__current_connection is not None:
        await self.__current_connection.aclose()
        self.__current_connection = None

Close all streams and the persistent connection.

def prewarm(self) ‑> None
Expand source code
def prewarm(self) -> None:
    """Pre-warm the persistent connection in the background."""

    async def _task() -> None:
        try:
            await self._current_connection(timeout=20.0)
        except Exception as e:
            logger.debug(f"Soniox TTS prewarm failed: {e}")

    try:
        asyncio.create_task(_task(), name="soniox-tts-prewarm")
    except RuntimeError:
        # No running event loop (e.g. called outside async context) — skip.
        pass

Pre-warm the persistent connection in the background.

def stream(self,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.soniox.tts.SynthesizeStream
Expand source code
def stream(
    self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS
) -> SynthesizeStream:
    """Create a streaming TTS session."""
    stream = SynthesizeStream(tts=self, conn_options=conn_options)
    self._streams.add(stream)
    return stream

Create a streaming TTS session.

def synthesize(self,
text: str,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.agents.tts.tts.ChunkedStream
Expand source code
def synthesize(
    self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS
) -> tts.ChunkedStream:
    return self._synthesize_with_stream(text, conn_options=conn_options)
def update_options(self,
*,
model: NotGivenOr[str] = NOT_GIVEN,
language: NotGivenOr[str] = NOT_GIVEN,
voice: NotGivenOr[str] = NOT_GIVEN) ‑> None
Expand source code
def update_options(
    self,
    *,
    model: NotGivenOr[str] = NOT_GIVEN,
    language: NotGivenOr[str] = NOT_GIVEN,
    voice: NotGivenOr[str] = NOT_GIVEN,
) -> None:
    """
    Args:
        model: TTS model to use.
        language: Language code to use.
        voice: Voice to use.
    """
    if is_given(model):
        self._opts.model = model
    if is_given(language):
        self._opts.language = language
    if is_given(voice):
        self._opts.voice = voice

Args

model
TTS model to use.
language
Language code to use.
voice
Voice to use.

Inherited members

class TranslationConfig (type: "Literal['one_way', 'two_way']",
target_language: str | None = None,
language_a: str | None = None,
language_b: str | None = None)
Expand source code
@dataclass
class TranslationConfig:
    """Translation configuration for the Soniox Speech-to-Text API.

    See: https://soniox.com/docs/stt/api-reference/websocket-api
    """

    type: Literal["one_way", "two_way"]
    target_language: str | None = None
    """Target language for one-way translation."""
    language_a: str | None = None
    """First language for two-way translation."""
    language_b: str | None = None
    """Second language for two-way translation."""

    def __post_init__(self) -> None:
        if self.type == "one_way" and not self.target_language:
            raise ValueError("target_language is required for one_way translation")
        if self.type == "two_way" and not (self.language_a and self.language_b):
            raise ValueError("language_a and language_b are both required for two_way translation")

Translation configuration for the Soniox Speech-to-Text API.

See: https://soniox.com/docs/stt/api-reference/websocket-api

Instance variables

var language_a : str | None

First language for two-way translation.

var language_b : str | None

Second language for two-way translation.

var target_language : str | None

Target language for one-way translation.

var type : Literal['one_way', 'two_way']