Module livekit.agents.inference.stt

Classes

class AssemblyaiOptions (*args, **kwargs)
Expand source code
class AssemblyaiOptions(TypedDict, total=False):
    format_turns: bool  # default: False
    end_of_turn_confidence_threshold: float  # default: 0.01
    min_end_of_turn_silence_when_confident: int  # default: 0
    max_turn_silence: int  # default: not specified
    keyterms_prompt: list[str]  # default: not specified
    language_detection: bool
    inactivity_timeout: float  # seconds
    prompt: str  # default: not specified (u3-rt-pro only, mutually exclusive with keyterms_prompt)

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)

Ancestors

  • builtins.dict

Class variables

var end_of_turn_confidence_threshold : float
var format_turns : bool
var inactivity_timeout : float
var keyterms_prompt : list[str]
var language_detection : bool
var max_turn_silence : int
var min_end_of_turn_silence_when_confident : int
var prompt : str
class CartesiaOptions (*args, **kwargs)
Expand source code
class CartesiaOptions(TypedDict, total=False):
    min_volume: float  # default: not specified
    max_silence_duration_secs: float  # default: not specified

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)

Ancestors

  • builtins.dict

Class variables

var max_silence_duration_secs : float
var min_volume : float
class DeepgramFluxOptions (*args, **kwargs)
Expand source code
class DeepgramFluxOptions(TypedDict, total=False):
    eager_eot_threshold: float  # range 0.3-0.9, default: 0.5
    eot_threshold: float  # range 0.5-0.9
    eot_timeout_ms: int
    keyterm: str | list[str]
    mip_opt_out: bool  # default: False
    tag: str | list[str]
    detect_language: bool

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)

Ancestors

  • builtins.dict

Class variables

var detect_language : bool
var eager_eot_threshold : float
var eot_threshold : float
var eot_timeout_ms : int
var keyterm : str | list[str]
var mip_opt_out : bool
var tag : str | list[str]
class DeepgramOptions (*args, **kwargs)
Expand source code
class DeepgramOptions(TypedDict, total=False):
    filler_words: bool  # default: True
    interim_results: bool  # default: True
    endpointing: int  # default: 25 (ms)
    punctuate: bool  # default: True
    smart_format: bool
    keywords: list[tuple[str, float]]
    keyterm: str | list[str]
    profanity_filter: bool
    numerals: bool
    mip_opt_out: bool  # default: False
    vad_events: bool  # default: False
    diarize: bool
    dictation: bool
    detect_language: bool
    no_delay: bool  # default: True
    utterance_end: bool
    redact: str | list[str]
    replace: str | list[str]
    search: str | list[str]
    tag: str | list[str]
    channels: int
    version: str
    callback: str
    callback_method: str
    extra: str

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)

Ancestors

  • builtins.dict

Class variables

var callback : str
var callback_method : str
var channels : int
var detect_language : bool
var diarize : bool
var dictation : bool
var endpointing : int
var extra : str
var filler_words : bool
var interim_results : bool
var keyterm : str | list[str]
var keywords : list[tuple[str, float]]
var mip_opt_out : bool
var no_delay : bool
var numerals : bool
var profanity_filter : bool
var punctuate : bool
var redact : str | list[str]
var replace : str | list[str]
var search : str | list[str]
var smart_format : bool
var tag : str | list[str]
var utterance_end : bool
var vad_events : bool
var version : str
class ElevenlabsOptions (*args, **kwargs)
Expand source code
class ElevenlabsOptions(TypedDict, total=False):
    commit_strategy: Literal["manual", "vad"]
    include_timestamps: bool
    vad_silence_threshold_secs: float
    vad_threshold: float
    min_speech_duration_ms: int
    min_silence_duration_ms: int
    language_code: str

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)

Ancestors

  • builtins.dict

Class variables

var commit_strategy : Literal['manual', 'vad']
var include_timestamps : bool
var language_code : str
var min_silence_duration_ms : int
var min_speech_duration_ms : int
var vad_silence_threshold_secs : float
var vad_threshold : float
class FallbackModel (*args, **kwargs)
Expand source code
class FallbackModel(TypedDict, total=False):
    """A fallback model with optional extra configuration.

    Extra fields are passed through to the provider.

    Example:
        >>> FallbackModel(model="deepgram/nova-3", extra_kwargs={"keyterm": ["livekit"]})
    """

    model: Required[str]
    """Model name (e.g. "deepgram/nova-3", "assemblyai/universal-streaming", "cartesia/ink-whisper")."""

    extra_kwargs: dict[str, Any]
    """Extra configuration for the model."""

A fallback model with optional extra configuration.

Extra fields are passed through to the provider.

Example

>>> FallbackModel(model="deepgram/nova-3", extra_kwargs={"keyterm": ["livekit"]})

Ancestors

  • builtins.dict

Class variables

var extra_kwargs : dict[str, typing.Any]

Extra configuration for the model.

var model : str

Model name (e.g. "deepgram/nova-3", "assemblyai/universal-streaming", "cartesia/ink-whisper").

class STT (model: NotGivenOr[STTModels | str] = NOT_GIVEN,
*,
language: NotGivenOr[str] = NOT_GIVEN,
base_url: NotGivenOr[str] = NOT_GIVEN,
encoding: NotGivenOr[STTEncoding] = NOT_GIVEN,
sample_rate: NotGivenOr[int] = NOT_GIVEN,
api_key: NotGivenOr[str] = NOT_GIVEN,
api_secret: NotGivenOr[str] = NOT_GIVEN,
http_session: aiohttp.ClientSession | None = None,
extra_kwargs: NotGivenOr[dict[str, Any] | CartesiaOptions | DeepgramOptions | DeepgramFluxOptions | AssemblyaiOptions | ElevenlabsOptions] = NOT_GIVEN,
fallback: NotGivenOr[list[FallbackModelType] | FallbackModelType] = NOT_GIVEN,
conn_options: NotGivenOr[APIConnectOptions] = NOT_GIVEN)
Expand source code
class STT(stt.STT):
    @overload
    def __init__(
        self,
        model: CartesiaModels,
        *,
        language: NotGivenOr[str] = NOT_GIVEN,
        base_url: NotGivenOr[str] = NOT_GIVEN,
        encoding: NotGivenOr[STTEncoding] = NOT_GIVEN,
        sample_rate: NotGivenOr[int] = NOT_GIVEN,
        api_key: NotGivenOr[str] = NOT_GIVEN,
        api_secret: NotGivenOr[str] = NOT_GIVEN,
        http_session: aiohttp.ClientSession | None = None,
        extra_kwargs: NotGivenOr[CartesiaOptions] = NOT_GIVEN,
        fallback: NotGivenOr[list[FallbackModelType] | FallbackModelType] = NOT_GIVEN,
        conn_options: NotGivenOr[APIConnectOptions] = NOT_GIVEN,
    ) -> None: ...

    @overload
    def __init__(
        self,
        model: DeepgramModels,
        *,
        language: NotGivenOr[str] = NOT_GIVEN,
        base_url: NotGivenOr[str] = NOT_GIVEN,
        encoding: NotGivenOr[STTEncoding] = NOT_GIVEN,
        sample_rate: NotGivenOr[int] = NOT_GIVEN,
        api_key: NotGivenOr[str] = NOT_GIVEN,
        api_secret: NotGivenOr[str] = NOT_GIVEN,
        http_session: aiohttp.ClientSession | None = None,
        extra_kwargs: NotGivenOr[DeepgramOptions] = NOT_GIVEN,
        fallback: NotGivenOr[list[FallbackModelType] | FallbackModelType] = NOT_GIVEN,
        conn_options: NotGivenOr[APIConnectOptions] = NOT_GIVEN,
    ) -> None: ...

    @overload
    def __init__(
        self,
        model: DeepgramFluxModels,
        *,
        language: NotGivenOr[str] = NOT_GIVEN,
        base_url: NotGivenOr[str] = NOT_GIVEN,
        encoding: NotGivenOr[STTEncoding] = NOT_GIVEN,
        sample_rate: NotGivenOr[int] = NOT_GIVEN,
        api_key: NotGivenOr[str] = NOT_GIVEN,
        api_secret: NotGivenOr[str] = NOT_GIVEN,
        http_session: aiohttp.ClientSession | None = None,
        extra_kwargs: NotGivenOr[DeepgramFluxOptions] = NOT_GIVEN,
        fallback: NotGivenOr[list[FallbackModelType] | FallbackModelType] = NOT_GIVEN,
        conn_options: NotGivenOr[APIConnectOptions] = NOT_GIVEN,
    ) -> None: ...

    @overload
    def __init__(
        self,
        model: AssemblyAIModels,
        *,
        language: NotGivenOr[str] = NOT_GIVEN,
        base_url: NotGivenOr[str] = NOT_GIVEN,
        encoding: NotGivenOr[STTEncoding] = NOT_GIVEN,
        sample_rate: NotGivenOr[int] = NOT_GIVEN,
        api_key: NotGivenOr[str] = NOT_GIVEN,
        api_secret: NotGivenOr[str] = NOT_GIVEN,
        http_session: aiohttp.ClientSession | None = None,
        extra_kwargs: NotGivenOr[AssemblyaiOptions] = NOT_GIVEN,
        fallback: NotGivenOr[list[FallbackModelType] | FallbackModelType] = NOT_GIVEN,
        conn_options: NotGivenOr[APIConnectOptions] = NOT_GIVEN,
    ) -> None: ...

    @overload
    def __init__(
        self,
        model: ElevenlabsModels,
        *,
        language: NotGivenOr[str] = NOT_GIVEN,
        base_url: NotGivenOr[str] = NOT_GIVEN,
        encoding: NotGivenOr[STTEncoding] = NOT_GIVEN,
        sample_rate: NotGivenOr[int] = NOT_GIVEN,
        api_key: NotGivenOr[str] = NOT_GIVEN,
        api_secret: NotGivenOr[str] = NOT_GIVEN,
        http_session: aiohttp.ClientSession | None = None,
        extra_kwargs: NotGivenOr[ElevenlabsOptions] = NOT_GIVEN,
        fallback: NotGivenOr[list[FallbackModelType] | FallbackModelType] = NOT_GIVEN,
        conn_options: NotGivenOr[APIConnectOptions] = NOT_GIVEN,
    ) -> None: ...

    @overload
    def __init__(
        self,
        model: str,
        *,
        language: NotGivenOr[str] = NOT_GIVEN,
        base_url: NotGivenOr[str] = NOT_GIVEN,
        encoding: NotGivenOr[STTEncoding] = NOT_GIVEN,
        sample_rate: NotGivenOr[int] = NOT_GIVEN,
        api_key: NotGivenOr[str] = NOT_GIVEN,
        api_secret: NotGivenOr[str] = NOT_GIVEN,
        http_session: aiohttp.ClientSession | None = None,
        extra_kwargs: NotGivenOr[dict[str, Any]] = NOT_GIVEN,
        fallback: NotGivenOr[list[FallbackModelType] | FallbackModelType] = NOT_GIVEN,
        conn_options: NotGivenOr[APIConnectOptions] = NOT_GIVEN,
    ) -> None: ...

    def __init__(
        self,
        model: NotGivenOr[STTModels | str] = NOT_GIVEN,
        *,
        language: NotGivenOr[str] = NOT_GIVEN,
        base_url: NotGivenOr[str] = NOT_GIVEN,
        encoding: NotGivenOr[STTEncoding] = NOT_GIVEN,
        sample_rate: NotGivenOr[int] = NOT_GIVEN,
        api_key: NotGivenOr[str] = NOT_GIVEN,
        api_secret: NotGivenOr[str] = NOT_GIVEN,
        http_session: aiohttp.ClientSession | None = None,
        extra_kwargs: NotGivenOr[
            dict[str, Any]
            | CartesiaOptions
            | DeepgramOptions
            | DeepgramFluxOptions
            | AssemblyaiOptions
            | ElevenlabsOptions
        ] = NOT_GIVEN,
        fallback: NotGivenOr[list[FallbackModelType] | FallbackModelType] = NOT_GIVEN,
        conn_options: NotGivenOr[APIConnectOptions] = NOT_GIVEN,
    ) -> None:
        """Livekit Cloud Inference STT

        Args:
            model (STTModels | str, optional): STT model to use, in "provider/model[:language]" format.
            language (str, optional): Language of the STT model.
            encoding (STTEncoding, optional): Encoding of the STT model.
            sample_rate (int, optional): Sample rate of the STT model.
            base_url (str, optional): LIVEKIT_URL, if not provided, read from environment variable.
            api_key (str, optional): LIVEKIT_API_KEY, if not provided, read from environment variable.
            api_secret (str, optional): LIVEKIT_API_SECRET, if not provided, read from environment variable.
            http_session (aiohttp.ClientSession, optional): HTTP session to use.
            extra_kwargs (dict, optional): Extra kwargs to pass to the STT model.
            fallback (FallbackModelType, optional): Fallback models - either a list of model names,
                a list of FallbackModel instances.
            conn_options (APIConnectOptions, optional): Connection options for request attempts.
        """
        super().__init__(
            capabilities=stt.STTCapabilities(
                streaming=True,
                interim_results=True,
                aligned_transcript="word",
                offline_recognize=False,
            ),
        )

        # Parse language from model string if provided: "provider/model:language"
        if is_given(model) and isinstance(model, str):
            parsed_model, parsed_language = _parse_model_string(model)
            model = parsed_model
            if is_given(parsed_language) and not is_given(language):
                language = parsed_language

        lk_base_url = (
            base_url
            if is_given(base_url)
            else os.environ.get("LIVEKIT_INFERENCE_URL", DEFAULT_BASE_URL)
        )

        lk_api_key = (
            api_key
            if is_given(api_key)
            else os.getenv("LIVEKIT_INFERENCE_API_KEY", os.getenv("LIVEKIT_API_KEY", ""))
        )
        if not lk_api_key:
            raise ValueError(
                "api_key is required, either as argument or set LIVEKIT_API_KEY environmental variable"
            )

        lk_api_secret = (
            api_secret
            if is_given(api_secret)
            else os.getenv("LIVEKIT_INFERENCE_API_SECRET", os.getenv("LIVEKIT_API_SECRET", ""))
        )
        if not lk_api_secret:
            raise ValueError(
                "api_secret is required, either as argument or set LIVEKIT_API_SECRET environmental variable"
            )
        fallback_models: NotGivenOr[list[FallbackModel]] = NOT_GIVEN
        if is_given(fallback):
            fallback_models = _normalize_fallback(fallback)  # type: ignore[arg-type]

        self._opts = STTOptions(
            model=model,
            language=LanguageCode(language) if isinstance(language, str) else language,
            encoding=encoding if is_given(encoding) else DEFAULT_ENCODING,
            sample_rate=sample_rate if is_given(sample_rate) else DEFAULT_SAMPLE_RATE,
            base_url=lk_base_url,
            api_key=lk_api_key,
            api_secret=lk_api_secret,
            extra_kwargs=dict(extra_kwargs) if is_given(extra_kwargs) else {},
            fallback=fallback_models,
            conn_options=conn_options if is_given(conn_options) else DEFAULT_API_CONNECT_OPTIONS,
        )

        self._session = http_session
        self._streams = weakref.WeakSet[SpeechStream]()

    @classmethod
    def from_model_string(cls, model: str) -> STT:
        """Create a STT instance from a model string

        Args:
            model (str): STT model to use, in "provider/model[:language]" format

        Returns:
            STT: STT instance
        """
        model_name, language = _parse_model_string(model)
        return cls(model=model_name, language=language)

    @property
    def model(self) -> str:
        return self._opts.model if is_given(self._opts.model) else "unknown"

    @property
    def provider(self) -> str:
        return "livekit"

    def _ensure_session(self) -> aiohttp.ClientSession:
        if not self._session:
            self._session = utils.http_context.http_session()
        return self._session

    async def _recognize_impl(
        self,
        buffer: utils.AudioBuffer,
        *,
        language: NotGivenOr[str] = NOT_GIVEN,
        conn_options: APIConnectOptions,
    ) -> stt.SpeechEvent:
        raise NotImplementedError(
            "LiveKit Inference STT does not support batch recognition, use stream() instead"
        )

    def stream(
        self,
        *,
        language: NotGivenOr[STTLanguages | str] = NOT_GIVEN,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    ) -> SpeechStream:
        """Create a streaming transcription session."""
        options = self._sanitize_options(language=language)
        stream = SpeechStream(stt=self, opts=options, conn_options=conn_options)
        self._streams.add(stream)
        return stream

    def update_options(
        self,
        *,
        model: NotGivenOr[STTModels | str] = NOT_GIVEN,
        language: NotGivenOr[STTLanguages | str] = NOT_GIVEN,
        extra: NotGivenOr[dict[str, Any]] = NOT_GIVEN,
    ) -> None:
        """Update STT configuration options."""
        if is_given(model):
            self._opts.model = model
        if is_given(language):
            self._opts.language = LanguageCode(language)
        if is_given(extra):
            self._opts.extra_kwargs.update(extra)

        for stream in self._streams:
            stream.update_options(model=model, language=language, extra=extra)

    def _sanitize_options(
        self, *, language: NotGivenOr[STTLanguages | str] = NOT_GIVEN
    ) -> STTOptions:
        """Create a sanitized copy of options with language override if provided."""
        options = replace(self._opts)
        options.extra_kwargs = dict(options.extra_kwargs)

        if is_given(language):
            options.language = LanguageCode(language)

        return options

Helper class that provides a standard way to create an ABC using inheritance.

Livekit Cloud Inference STT

Args

model : STTModels | str, optional
STT model to use, in "provider/model[:language]" format.
language : str, optional
Language of the STT model.
encoding : STTEncoding, optional
Encoding of the STT model.
sample_rate : int, optional
Sample rate of the STT model.
base_url : str, optional
LIVEKIT_URL, if not provided, read from environment variable.
api_key : str, optional
LIVEKIT_API_KEY, if not provided, read from environment variable.
api_secret : str, optional
LIVEKIT_API_SECRET, if not provided, read from environment variable.
http_session : aiohttp.ClientSession, optional
HTTP session to use.
extra_kwargs : dict, optional
Extra kwargs to pass to the STT model.
fallback : FallbackModelType, optional
Fallback models - either a list of model names, a list of FallbackModel instances.
conn_options : APIConnectOptions, optional
Connection options for request attempts.

Ancestors

  • livekit.agents.stt.stt.STT
  • abc.ABC
  • EventEmitter
  • typing.Generic

Static methods

def from_model_string(model: str) ‑> STT

Create a STT instance from a model string

Args

model : str
STT model to use, in "provider/model[:language]" format

Returns

STT
STT instance

Instance variables

prop model : str
Expand source code
@property
def model(self) -> str:
    return self._opts.model if is_given(self._opts.model) else "unknown"

Get the model name/identifier for this STT instance.

Returns

The model name if available, "unknown" otherwise.

Note

Plugins should override this property to provide their model information.

prop provider : str
Expand source code
@property
def provider(self) -> str:
    return "livekit"

Get the provider name/identifier for this STT instance.

Returns

The provider name if available, "unknown" otherwise.

Note

Plugins should override this property to provide their provider information.

Methods

def stream(self,
*,
language: NotGivenOr[STTLanguages | str] = NOT_GIVEN,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> SpeechStream
Expand source code
def stream(
    self,
    *,
    language: NotGivenOr[STTLanguages | str] = NOT_GIVEN,
    conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
) -> SpeechStream:
    """Create a streaming transcription session."""
    options = self._sanitize_options(language=language)
    stream = SpeechStream(stt=self, opts=options, conn_options=conn_options)
    self._streams.add(stream)
    return stream

Create a streaming transcription session.

def update_options(self,
*,
model: NotGivenOr[STTModels | str] = NOT_GIVEN,
language: NotGivenOr[STTLanguages | str] = NOT_GIVEN,
extra: NotGivenOr[dict[str, Any]] = NOT_GIVEN) ‑> None
Expand source code
def update_options(
    self,
    *,
    model: NotGivenOr[STTModels | str] = NOT_GIVEN,
    language: NotGivenOr[STTLanguages | str] = NOT_GIVEN,
    extra: NotGivenOr[dict[str, Any]] = NOT_GIVEN,
) -> None:
    """Update STT configuration options."""
    if is_given(model):
        self._opts.model = model
    if is_given(language):
        self._opts.language = LanguageCode(language)
    if is_given(extra):
        self._opts.extra_kwargs.update(extra)

    for stream in self._streams:
        stream.update_options(model=model, language=language, extra=extra)

Update STT configuration options.

Inherited members

class STTOptions (model: NotGivenOr[STTModels | str],
language: NotGivenOr[LanguageCode],
encoding: STTEncoding,
sample_rate: int,
base_url: str,
api_key: str,
api_secret: str,
extra_kwargs: dict[str, Any],
fallback: NotGivenOr[list[FallbackModel]],
conn_options: NotGivenOr[APIConnectOptions])
Expand source code
@dataclass
class STTOptions:
    model: NotGivenOr[STTModels | str]
    language: NotGivenOr[LanguageCode]
    encoding: STTEncoding
    sample_rate: int
    base_url: str
    api_key: str
    api_secret: str
    extra_kwargs: dict[str, Any]
    fallback: NotGivenOr[list[FallbackModel]]
    conn_options: NotGivenOr[APIConnectOptions]

STTOptions(model: 'NotGivenOr[STTModels | str]', language: 'NotGivenOr[LanguageCode]', encoding: 'STTEncoding', sample_rate: 'int', base_url: 'str', api_key: 'str', api_secret: 'str', extra_kwargs: 'dict[str, Any]', fallback: 'NotGivenOr[list[FallbackModel]]', conn_options: 'NotGivenOr[APIConnectOptions]')

Instance variables

var api_key : str
var api_secret : str
var base_url : str
var conn_options : livekit.agents.types.APIConnectOptions | livekit.agents.types.NotGiven
var encoding : Literal['pcm_s16le']
var extra_kwargs : dict[str, typing.Any]
var fallback : list[FallbackModel] | livekit.agents.types.NotGiven
var language : livekit.agents.language.LanguageCode | livekit.agents.types.NotGiven
var model : Literal['deepgram/nova-3', 'deepgram/nova-3-medical', 'deepgram/nova-2', 'deepgram/nova-2-medical', 'deepgram/nova-2-conversationalai', 'deepgram/nova-2-phonecall'] | Literal['deepgram/flux-general', 'deepgram/flux-general-en'] | Literal['cartesia/ink-whisper'] | Literal['assemblyai/universal-streaming', 'assemblyai/universal-streaming-multilingual', 'assemblyai/u3-rt-pro'] | Literal['elevenlabs/scribe_v2_realtime'] | Literal['auto'] | str | livekit.agents.types.NotGiven
var sample_rate : int
class SpeechStream (*,
stt: STT,
opts: STTOptions,
conn_options: APIConnectOptions)
Expand source code
class SpeechStream(stt.SpeechStream):
    def __init__(
        self,
        *,
        stt: STT,
        opts: STTOptions,
        conn_options: APIConnectOptions,
    ) -> None:
        super().__init__(stt=stt, conn_options=conn_options, sample_rate=opts.sample_rate)
        self._opts = opts
        self._session = stt._ensure_session()
        self._request_id = str(utils.shortuuid("stt_request_"))

        self._speaking = False
        self._speech_duration: float = 0
        self._ws: aiohttp.ClientWebSocketResponse | None = None

    def update_options(
        self,
        *,
        model: NotGivenOr[STTModels | str] = NOT_GIVEN,
        language: NotGivenOr[STTLanguages | str] = NOT_GIVEN,
        extra: NotGivenOr[dict[str, Any]] = NOT_GIVEN,
    ) -> None:
        """Update streaming transcription options.

        When the WebSocket is live, a mid-stream session.update is sent so providers
        that support it (e.g. AssemblyAI, Deepgram Flux) can apply changes without
        reconnecting. Unsupported providers ignore the message.
        """
        if is_given(model):
            self._opts.model = model
        if is_given(language):
            self._opts.language = LanguageCode(language)
        if is_given(extra):
            self._opts.extra_kwargs.update(extra)

        has_update = is_given(model) or is_given(language) or is_given(extra)
        if has_update and self._ws is not None and not self._ws.closed:
            settings: dict[str, Any] = {}
            if is_given(model):
                settings["model"] = model
            if is_given(language):
                settings["language"] = str(LanguageCode(language))
            if is_given(extra):
                settings["extra"] = extra
            update_msg = {
                "type": "session.update",
                "settings": settings,
            }
            asyncio.ensure_future(self._send_session_update(update_msg))

    async def _send_session_update(self, msg: dict[str, Any]) -> None:
        try:
            if self._ws is not None and not self._ws.closed:
                await self._ws.send_str(json.dumps(msg))
        except Exception:
            logger.debug("failed to send session.update, ws may be closing")

    async def _run(self) -> None:
        """Main loop for streaming transcription."""
        closing_ws = False

        @utils.log_exceptions(logger=logger)
        async def send_task(ws: aiohttp.ClientWebSocketResponse) -> None:
            nonlocal closing_ws

            audio_bstream = utils.audio.AudioByteStream(
                sample_rate=self._opts.sample_rate,
                num_channels=1,
                samples_per_channel=self._opts.sample_rate // 20,  # 50ms
            )

            async for ev in self._input_ch:
                frames: list[rtc.AudioFrame] = []
                if isinstance(ev, rtc.AudioFrame):
                    frames.extend(audio_bstream.push(ev.data))
                elif isinstance(ev, self._FlushSentinel):
                    frames.extend(audio_bstream.flush())

                for frame in frames:
                    self._speech_duration += frame.duration
                    audio_bytes = frame.data.tobytes()
                    base64_audio = base64.b64encode(audio_bytes).decode("utf-8")
                    audio_msg = {
                        "type": "input_audio",
                        "audio": base64_audio,
                    }
                    await ws.send_str(json.dumps(audio_msg))

            closing_ws = True
            finalize_msg = {
                "type": "session.finalize",
            }
            await ws.send_str(json.dumps(finalize_msg))

        @utils.log_exceptions(logger=logger)
        async def recv_task(ws: aiohttp.ClientWebSocketResponse) -> None:
            nonlocal closing_ws
            while True:
                msg = await ws.receive()
                if msg.type in (
                    aiohttp.WSMsgType.CLOSED,
                    aiohttp.WSMsgType.CLOSE,
                    aiohttp.WSMsgType.CLOSING,
                ):
                    if closing_ws or self._session.closed:
                        return
                    raise APIStatusError(
                        message="LiveKit Inference STT connection closed unexpectedly"
                    )

                if msg.type != aiohttp.WSMsgType.TEXT:
                    logger.warning("unexpected LiveKit Inference STT message type %s", msg.type)
                    continue

                data = json.loads(msg.data)
                msg_type = data.get("type")
                if msg_type == "session.created":
                    pass
                elif msg_type == "interim_transcript":
                    self._process_transcript(data, is_final=False)
                elif msg_type == "final_transcript":
                    self._process_transcript(data, is_final=True)
                elif msg_type == "session.finalized":
                    pass
                elif msg_type == "session.closed":
                    pass
                elif msg_type == "error":
                    raise APIError(f"LiveKit Inference STT returned error: {msg.data}")
                else:
                    logger.warning(
                        "received unexpected message from LiveKit Inference STT: %s", data
                    )

        ws: aiohttp.ClientWebSocketResponse | None = None
        try:
            ws = await self._connect_ws()
            self._ws = ws
            tasks = [
                asyncio.create_task(send_task(ws)),
                asyncio.create_task(recv_task(ws)),
            ]
            try:
                await asyncio.gather(*tasks)
            finally:
                await utils.aio.gracefully_cancel(*tasks)
        finally:
            self._ws = None
            if ws is not None:
                await ws.close()

    async def _connect_ws(self) -> aiohttp.ClientWebSocketResponse:
        """Connect to the LiveKit Inference STT WebSocket."""
        params: dict[str, Any] = {
            "settings": {
                "sample_rate": str(self._opts.sample_rate),
                "encoding": self._opts.encoding,
                "extra": self._opts.extra_kwargs,
            },
        }

        if self._opts.model and self._opts.model != "auto":
            params["model"] = self._opts.model

        if self._opts.language:
            params["settings"]["language"] = self._opts.language

        if self._opts.fallback:
            models = [
                {"model": m.get("model"), "extra": m.get("extra_kwargs")}
                for m in self._opts.fallback
            ]
            params["fallback"] = {"models": models}

        if self._opts.conn_options:
            params["connection"] = {
                "timeout": self._opts.conn_options.timeout,
                "retries": self._opts.conn_options.max_retry,
            }

        base_url = self._opts.base_url
        if base_url.startswith(("http://", "https://")):
            base_url = base_url.replace("http", "ws", 1)
        headers = {
            "Authorization": f"Bearer {create_access_token(self._opts.api_key, self._opts.api_secret)}"
        }
        try:
            ws = await asyncio.wait_for(
                self._session.ws_connect(
                    f"{base_url}/stt?model={self._opts.model}", headers=headers
                ),
                self._conn_options.timeout,
            )
            params["type"] = "session.create"
            await ws.send_str(json.dumps(params))
        except aiohttp.ClientResponseError as e:
            raise create_api_error_from_http(e.message, status=e.status) from e
        except asyncio.TimeoutError as e:
            raise APITimeoutError("LiveKit Inference STT connection timed out.") from e
        except aiohttp.ClientConnectorError as e:
            raise APIConnectionError("failed to connect to LiveKit Inference STT") from e
        return ws

    def _process_transcript(self, data: dict, is_final: bool) -> None:
        request_id = data.get("request_id", self._request_id)
        text = data.get("transcript", "")
        language = LanguageCode(data.get("language", self._opts.language or "en"))
        words = data.get("words", []) or []

        if not text and not is_final:
            return
        # We'll have a more accurate way of detecting when speech started when we have VAD
        if not self._speaking:
            self._speaking = True
            start_event = stt.SpeechEvent(type=stt.SpeechEventType.START_OF_SPEECH)
            self._event_ch.send_nowait(start_event)

        speech_data = stt.SpeechData(
            language=language,
            start_time=self.start_time_offset + data.get("start", 0),
            end_time=self.start_time_offset + data.get("start", 0) + data.get("duration", 0),
            confidence=data.get("confidence", 1.0),
            text=text,
            words=[
                TimedString(
                    text=word.get("word", ""),
                    start_time=word.get("start", 0) + self.start_time_offset,
                    end_time=word.get("end", 0) + self.start_time_offset,
                    start_time_offset=self.start_time_offset,
                    confidence=word.get("confidence", 0.0),
                )
                for word in words
            ],
        )

        if is_final:
            if self._speech_duration > 0:
                self._event_ch.send_nowait(
                    stt.SpeechEvent(
                        type=stt.SpeechEventType.RECOGNITION_USAGE,
                        request_id=request_id,
                        recognition_usage=stt.RecognitionUsage(
                            audio_duration=self._speech_duration,
                        ),
                    )
                )
                self._speech_duration = 0

            event = stt.SpeechEvent(
                type=stt.SpeechEventType.FINAL_TRANSCRIPT,
                request_id=request_id,
                alternatives=[speech_data],
            )
            self._event_ch.send_nowait(event)

            if self._speaking:
                self._speaking = False
                end_event = stt.SpeechEvent(type=stt.SpeechEventType.END_OF_SPEECH)
                self._event_ch.send_nowait(end_event)
        else:
            event = stt.SpeechEvent(
                type=stt.SpeechEventType.INTERIM_TRANSCRIPT,
                request_id=request_id,
                alternatives=[speech_data],
            )
            self._event_ch.send_nowait(event)

Helper class that provides a standard way to create an ABC using inheritance.

Args: sample_rate : int or None, optional The desired sample rate for the audio input. If specified, the audio input will be automatically resampled to match the given sample rate before being processed for Speech-to-Text. If not provided (None), the input will retain its original sample rate.

Ancestors

  • livekit.agents.stt.stt.RecognizeStream
  • abc.ABC

Methods

def update_options(self,
*,
model: NotGivenOr[STTModels | str] = NOT_GIVEN,
language: NotGivenOr[STTLanguages | str] = NOT_GIVEN,
extra: NotGivenOr[dict[str, Any]] = NOT_GIVEN) ‑> None
Expand source code
def update_options(
    self,
    *,
    model: NotGivenOr[STTModels | str] = NOT_GIVEN,
    language: NotGivenOr[STTLanguages | str] = NOT_GIVEN,
    extra: NotGivenOr[dict[str, Any]] = NOT_GIVEN,
) -> None:
    """Update streaming transcription options.

    When the WebSocket is live, a mid-stream session.update is sent so providers
    that support it (e.g. AssemblyAI, Deepgram Flux) can apply changes without
    reconnecting. Unsupported providers ignore the message.
    """
    if is_given(model):
        self._opts.model = model
    if is_given(language):
        self._opts.language = LanguageCode(language)
    if is_given(extra):
        self._opts.extra_kwargs.update(extra)

    has_update = is_given(model) or is_given(language) or is_given(extra)
    if has_update and self._ws is not None and not self._ws.closed:
        settings: dict[str, Any] = {}
        if is_given(model):
            settings["model"] = model
        if is_given(language):
            settings["language"] = str(LanguageCode(language))
        if is_given(extra):
            settings["extra"] = extra
        update_msg = {
            "type": "session.update",
            "settings": settings,
        }
        asyncio.ensure_future(self._send_session_update(update_msg))

Update streaming transcription options.

When the WebSocket is live, a mid-stream session.update is sent so providers that support it (e.g. AssemblyAI, Deepgram Flux) can apply changes without reconnecting. Unsupported providers ignore the message.