Module livekit.plugins.gladia

Gladia plugin for LiveKit Agents

See https://docs.livekit.io/agents/integrations/stt/gladia/ for more information.

Sub-modules

livekit.plugins.gladia.models

Classes

class AudioEnergyFilter (*, min_silence: float = 1.5, rms_threshold: float = 1.6e-05)
Expand source code
class AudioEnergyFilter:
    class State(Enum):
        START = 0
        SPEAKING = 1
        SILENCE = 2
        END = 3

    def __init__(self, *, min_silence: float = 1.5, rms_threshold: float = MAGIC_NUMBER_THRESHOLD):
        self._cooldown_seconds = min_silence
        self._cooldown = min_silence
        self._state = self.State.SILENCE
        self._rms_threshold = rms_threshold

    def update(self, frame: rtc.AudioFrame) -> State:
        arr = np.frombuffer(frame.data, dtype=np.int16)
        float_arr = arr.astype(np.float32) / 32768.0
        rms = np.mean(np.square(float_arr))

        if rms > self._rms_threshold:
            self._cooldown = self._cooldown_seconds
            if self._state in (self.State.SILENCE, self.State.END):
                self._state = self.State.START
            else:
                self._state = self.State.SPEAKING
        else:
            if self._cooldown <= 0:
                if self._state in (self.State.SPEAKING, self.State.START):
                    self._state = self.State.END
                elif self._state == self.State.END:
                    self._state = self.State.SILENCE
            else:
                self._cooldown -= frame.duration
                self._state = self.State.SPEAKING

        return self._state

Class variables

var State

Create a collection of name/value pairs.

Example enumeration:

>>> class Color(Enum):
...     RED = 1
...     BLUE = 2
...     GREEN = 3

Access them by:

  • attribute access:

Color.RED

  • value lookup:

Color(1)

  • name lookup:

Color['RED']

Enumerations can be iterated over, and know how many members they have:

>>> len(Color)
3
>>> list(Color)
[<Color.RED: 1>, <Color.BLUE: 2>, <Color.GREEN: 3>]

Methods can be added to enumerations, and members can have their own attributes – see the documentation for details.

Methods

def update(self, frame: rtc.AudioFrame) ‑> State
Expand source code
def update(self, frame: rtc.AudioFrame) -> State:
    arr = np.frombuffer(frame.data, dtype=np.int16)
    float_arr = arr.astype(np.float32) / 32768.0
    rms = np.mean(np.square(float_arr))

    if rms > self._rms_threshold:
        self._cooldown = self._cooldown_seconds
        if self._state in (self.State.SILENCE, self.State.END):
            self._state = self.State.START
        else:
            self._state = self.State.SPEAKING
    else:
        if self._cooldown <= 0:
            if self._state in (self.State.SPEAKING, self.State.START):
                self._state = self.State.END
            elif self._state == self.State.END:
                self._state = self.State.SILENCE
        else:
            self._cooldown -= frame.duration
            self._state = self.State.SPEAKING

    return self._state
class STT (*,
interim_results: bool = True,
languages: list[str] | None = None,
code_switching: bool = True,
sample_rate: int = 16000,
bit_depth: Literal[8, 16, 24, 32] = 16,
channels: int = 1,
encoding: "Literal['wav/pcm', 'wav/alaw', 'wav/ulaw']" = 'wav/pcm',
api_key: str | None = None,
http_session: aiohttp.ClientSession | None = None,
base_url: str = 'https://api.gladia.io/v2/live',
energy_filter: AudioEnergyFilter | bool = False,
translation_enabled: bool = False,
translation_target_languages: list[str] | None = None,
translation_model: str = 'base',
translation_match_original_utterances: bool = True)
Expand source code
class STT(stt.STT):
    def __init__(
        self,
        *,
        interim_results: bool = True,
        languages: list[str] | None = None,
        code_switching: bool = True,
        sample_rate: int = 16000,
        bit_depth: Literal[8, 16, 24, 32] = 16,
        channels: int = 1,
        encoding: Literal["wav/pcm", "wav/alaw", "wav/ulaw"] = "wav/pcm",
        api_key: str | None = None,
        http_session: aiohttp.ClientSession | None = None,
        base_url: str = BASE_URL,
        energy_filter: AudioEnergyFilter | bool = False,
        translation_enabled: bool = False,
        translation_target_languages: list[str] | None = None,
        translation_model: str = "base",
        translation_match_original_utterances: bool = True,
    ) -> None:
        """Create a new instance of Gladia STT.

        Args:
            interim_results: Whether to return interim (non-final) transcription results.
                            Defaults to True.
            languages: List of language codes to use for recognition. Defaults to None
                    (auto-detect).
            code_switching: Whether to allow switching between languages during recognition.
                            Defaults to True.
            sample_rate: The sample rate of the audio in Hz. Defaults to 16000.
            bit_depth: The bit depth of the audio. Defaults to 16.
            channels: The number of audio channels. Defaults to 1.
            encoding: The encoding of the audio. Defaults to "wav/pcm".
            api_key: Your Gladia API key. If not provided, will look for GLADIA_API_KEY
                        environment variable.
            http_session: Optional aiohttp ClientSession to use for requests.
            base_url: The base URL for Gladia API. Defaults to "https://api.gladia.io/v2/live".
            energy_filter: Audio energy filter configuration for voice activity detection.
                         Can be a boolean or AudioEnergyFilter instance. Defaults to False.
            translation_enabled: Whether to enable translation. Defaults to False.
            translation_target_languages: List of target languages for translation.
                                        Required if translation_enabled is True.
            translation_model: Translation model to use. Defaults to "base".
            translation_match_original_utterances: Whether to match original utterances with
                                                    translations. Defaults to True.

        Raises:
            ValueError: If no API key is provided or found in environment variables.
        """
        super().__init__(
            capabilities=stt.STTCapabilities(streaming=True, interim_results=interim_results)
        )
        self._base_url = base_url

        api_key = api_key or os.environ.get("GLADIA_API_KEY")
        if api_key is None:
            raise ValueError("Gladia API key is required")

        self._api_key = api_key

        language_config = LanguageConfiguration(languages=languages, code_switching=code_switching)

        translation_config = TranslationConfiguration(
            enabled=translation_enabled,
            target_languages=translation_target_languages or [],
            model=translation_model,
            match_original_utterances=translation_match_original_utterances,
        )

        if translation_enabled and not translation_target_languages:
            raise ValueError(
                "translation_target_languages is required when translation_enabled is True"
            )

        self._opts = STTOptions(
            language_config=language_config,
            interim_results=interim_results,
            sample_rate=sample_rate,
            bit_depth=bit_depth,
            channels=channels,
            encoding=encoding,
            translation_config=translation_config,
            energy_filter=energy_filter,
        )
        self._session = http_session
        self._streams = weakref.WeakSet()

    def _ensure_session(self) -> aiohttp.ClientSession:
        if not self._session:
            self._session = utils.http_context.http_session()

        return self._session

    async def _recognize_impl(
        self,
        buffer: AudioBuffer,
        *,
        language: list[str] | None = None,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    ) -> stt.SpeechEvent:
        """Implement synchronous speech recognition for Gladia using the live endpoint."""
        config = self._sanitize_options(languages=language)

        streaming_config = {
            "encoding": config.encoding,
            "sample_rate": config.sample_rate,
            "bit_depth": config.bit_depth,
            "channels": config.channels,
            "language_config": {
                "languages": config.language_config.languages or [],
                "code_switching": config.language_config.code_switching,
            },
            "realtime_processing": {
                "words_accurate_timestamps": False,
                "custom_vocabulary": False,
                "custom_vocabulary_config": {
                    "vocabulary": [
                        "Gladia",
                        {"value": "Gladia", "intensity": 0.5},
                    ],
                    "default_intensity": 0.5,
                },
                "custom_spelling": False,
                "custom_spelling_config": {
                    "spelling_dictionary": {
                        "SQL": ["Sequel"],
                    }
                },
            },
        }

        # Add translation configuration if enabled
        if config.translation_config.enabled:
            streaming_config["realtime_processing"]["translation"] = True
            streaming_config["realtime_processing"]["translation_config"] = {
                "target_languages": config.translation_config.target_languages,
                "model": config.translation_config.model,
                "match_original_utterances": config.translation_config.match_original_utterances,
            }

        try:
            # Initialize a session with Gladia
            session_response = await self._init_live_session(streaming_config, conn_options)
            session_id = session_response["id"]
            session_url = session_response["url"]

            # Connect to the WebSocket
            async with self._ensure_session().ws_connect(
                session_url,
                timeout=aiohttp.ClientTimeout(
                    total=30,  # Keep a reasonable total timeout
                    sock_connect=conn_options.timeout,
                ),
            ) as ws:
                # Combine audio frames to get a single frame with all raw PCM data
                combined_frame = rtc.combine_audio_frames(buffer)
                # Get the raw bytes from the combined frame
                pcm_data = combined_frame.data.tobytes()

                bytes_per_second = config.sample_rate * config.channels * (config.bit_depth // 8)
                chunk_size = (bytes_per_second * 150) // 1000
                chunk_size = max(chunk_size, 1024)

                # Send raw PCM audio data in chunks
                for i in range(0, len(pcm_data), chunk_size):
                    chunk = pcm_data[i : i + chunk_size]
                    chunk_b64 = base64.b64encode(chunk).decode("utf-8")
                    await ws.send_str(
                        json.dumps({"type": "audio_chunk", "data": {"chunk": chunk_b64}})
                    )

                # Tell Gladia we're done sending audio
                await ws.send_str(json.dumps({"type": "stop_recording"}))

                # Wait for final transcript
                utterances = []

                # Receive messages until we get the post_final_transcript message
                try:
                    # Set a timeout for waiting for the final results after sending stop_recording
                    receive_timeout = conn_options.timeout * 5
                    async for msg in ws.iter(timeout=receive_timeout):
                        if msg.type == aiohttp.WSMsgType.TEXT:
                            data = json.loads(msg.data)
                            # Collect final utterances
                            if data["type"] == "transcript" and data["data"]["is_final"]:
                                utterance = data["data"]["utterance"]
                                utterances.append(utterance)
                            # Check for translation as the final result if enabled
                            elif (
                                data["type"] == "translation" and config.translation_config.enabled
                            ):
                                pass
                            elif data["type"] == "post_final_transcript":
                                break
                            elif data["type"] == "error":
                                raise APIConnectionError(
                                    f"Gladia WebSocket error: {data.get('data')}"
                                ) from None

                        elif msg.type == aiohttp.WSMsgType.ERROR:
                            logger.error(f"Gladia WebSocket connection error: {ws.exception()}")
                            raise ws.exception() or APIConnectionError(
                                "Gladia WebSocket connection error"
                            )
                        elif msg.type in (
                            aiohttp.WSMsgType.CLOSE,
                            aiohttp.WSMsgType.CLOSED,
                            aiohttp.WSMsgType.CLOSING,
                        ):
                            logger.warning(
                                "Gladia WebSocket closed unexpectedly during result receiving: "
                                f"type={msg.type}"
                            )
                            break

                except asyncio.TimeoutError:
                    logger.warning(
                        f"Timeout waiting for Gladia final transcript ({receive_timeout}s)"
                    )
                    if not utterances:
                        raise APITimeoutError(
                            f"Timeout waiting for Gladia final transcript ({receive_timeout}s)"
                        ) from None

                # Create a speech event from the collected final utterances
                return self._create_speech_event(
                    utterances, session_id, config.language_config.languages
                )

        except asyncio.TimeoutError as e:
            # Catch timeout during connection or initial phase
            logger.error(f"Timeout during Gladia connection/initialization: {e}")
            raise APITimeoutError("Timeout connecting to or initializing Gladia session") from e
        except aiohttp.ClientResponseError as e:
            # Error during session initialization POST request
            logger.error(f"Gladia API status error during session init: {e.status} {e.message}")
            raise APIStatusError(
                message=e.message,
                status_code=e.status,
                request_id=e.headers.get(
                    "X-Request-ID"
                ),  # Check if Gladia provides a request ID header
                body=await e.response.text() if hasattr(e, "response") else None,
            ) from e
        except aiohttp.ClientError as e:
            # General client errors (connection refused, DNS resolution etc.)
            logger.error(f"Gladia connection error: {e}")
            raise APIConnectionError(f"Gladia connection error: {e}") from e
        except Exception as e:
            # Catch-all for other unexpected errors
            logger.exception(
                f"Unexpected error during Gladia synchronous recognition: {e}"
            )  # Use logger.exception to include stack trace
            raise APIConnectionError(f"An unexpected error occurred: {e}") from e

    async def _init_live_session(self, config: dict, conn_options: APIConnectOptions) -> dict:
        """Initialize a live transcription session with Gladia."""
        try:
            async with self._ensure_session().post(
                url=self._base_url,
                json=config,
                headers={"X-Gladia-Key": self._api_key},
                timeout=aiohttp.ClientTimeout(
                    total=30,
                    sock_connect=conn_options.timeout,
                ),
            ) as res:
                # Gladia returns 201 Created when successfully creating a session
                if res.status not in (200, 201):
                    raise APIStatusError(
                        message=f"Failed to initialize Gladia session: {res.status}",
                        status_code=res.status,
                        request_id=None,
                        body=await res.text(),
                    )
                return await res.json()
        except Exception as e:
            logger.exception(f"Failed to initialize Gladia session: {e}")
            raise APIConnectionError(f"Failed to initialize Gladia session: {str(e)}") from e

    def _create_speech_event(
        self, utterances: list[dict], session_id: str, languages: list[str] | None
    ) -> stt.SpeechEvent:
        """Create a SpeechEvent from Gladia's transcript data."""
        alternatives = []

        # Process each utterance into a SpeechData object
        for utterance in utterances:
            text = utterance.get("text", "").strip()
            if text:
                alternatives.append(
                    stt.SpeechData(
                        language=utterance.get("language", languages[0] if languages else "en"),
                        start_time=utterance.get("start", 0),
                        end_time=utterance.get("end", 0),
                        confidence=utterance.get("confidence", 1.0),
                        text=text,
                    )
                )

        if not alternatives:
            alternatives.append(
                stt.SpeechData(
                    language=languages[0] if languages and len(languages) > 0 else "en",
                    start_time=0,
                    end_time=0,
                    confidence=1.0,
                    text="",
                )
            )

        return stt.SpeechEvent(
            type=stt.SpeechEventType.FINAL_TRANSCRIPT,
            request_id=session_id,
            alternatives=alternatives,
        )

    def stream(
        self,
        *,
        language: list[str] | None = None,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    ) -> SpeechStream:
        config = self._sanitize_options(languages=language)
        stream = SpeechStream(
            stt=self,
            conn_options=conn_options,
            opts=config,
            api_key=self._api_key,
            http_session=self._ensure_session(),
            base_url=self._base_url,
        )
        self._streams.add(stream)
        return stream

    def update_options(
        self,
        *,
        languages: list[str] | None = None,
        code_switching: bool | None = None,
        interim_results: bool | None = None,
        sample_rate: int | None = None,
        bit_depth: Literal[8, 16, 24, 32] | None = None,
        channels: int | None = None,
        encoding: Literal["wav/pcm", "wav/alaw", "wav/ulaw"] | None = None,
        translation_enabled: bool | None = None,
        translation_target_languages: list[str] | None = None,
        translation_model: str | None = None,
        translation_match_original_utterances: bool | None = None,
    ):
        if languages is not None or code_switching is not None:
            language_config = dataclasses.replace(
                self._opts.language_config,
                languages=languages
                if languages is not None
                else self._opts.language_config.languages,
                code_switching=code_switching
                if code_switching is not None
                else self._opts.language_config.code_switching,
            )
            self._opts.language_config = language_config

        if (
            translation_enabled is not None
            or translation_target_languages is not None
            or translation_model is not None
            or translation_match_original_utterances is not None
        ):
            translation_config = dataclasses.replace(
                self._opts.translation_config,
                enabled=translation_enabled
                if translation_enabled is not None
                else self._opts.translation_config.enabled,
                target_languages=translation_target_languages
                if translation_target_languages is not None
                else self._opts.translation_config.target_languages,
                model=translation_model
                if translation_model is not None
                else self._opts.translation_config.model,
                match_original_utterances=translation_match_original_utterances
                if translation_match_original_utterances is not None
                else self._opts.translation_config.match_original_utterances,
            )
            self._opts.translation_config = translation_config

        if interim_results is not None:
            self._opts.interim_results = interim_results
        if sample_rate is not None:
            self._opts.sample_rate = sample_rate
        if bit_depth is not None:
            self._opts.bit_depth = bit_depth
        if channels is not None:
            self._opts.channels = channels
        if encoding is not None:
            self._opts.encoding = encoding

        for stream in self._streams:
            stream.update_options(
                languages=languages,
                code_switching=code_switching,
                interim_results=interim_results,
                sample_rate=sample_rate,
                bit_depth=bit_depth,
                channels=channels,
                encoding=encoding,
                translation_enabled=translation_enabled,
                translation_target_languages=translation_target_languages,
                translation_model=translation_model,
                translation_match_original_utterances=translation_match_original_utterances,
            )

    def _sanitize_options(self, *, languages: list[str] | None = None) -> STTOptions:
        config = dataclasses.replace(self._opts)
        if languages is not None:
            language_config = dataclasses.replace(
                config.language_config,
                languages=languages,
            )
            config.language_config = language_config
        return config

Helper class that provides a standard way to create an ABC using inheritance.

Create a new instance of Gladia STT.

Args

interim_results
Whether to return interim (non-final) transcription results. Defaults to True.
languages
List of language codes to use for recognition. Defaults to None (auto-detect).
code_switching
Whether to allow switching between languages during recognition. Defaults to True.
sample_rate
The sample rate of the audio in Hz. Defaults to 16000.
bit_depth
The bit depth of the audio. Defaults to 16.
channels
The number of audio channels. Defaults to 1.
encoding
The encoding of the audio. Defaults to "wav/pcm".
api_key
Your Gladia API key. If not provided, will look for GLADIA_API_KEY environment variable.
http_session
Optional aiohttp ClientSession to use for requests.
base_url
The base URL for Gladia API. Defaults to "https://api.gladia.io/v2/live".
energy_filter
Audio energy filter configuration for voice activity detection. Can be a boolean or AudioEnergyFilter instance. Defaults to False.
translation_enabled
Whether to enable translation. Defaults to False.
translation_target_languages
List of target languages for translation. Required if translation_enabled is True.
translation_model
Translation model to use. Defaults to "base".
translation_match_original_utterances
Whether to match original utterances with translations. Defaults to True.

Raises

ValueError
If no API key is provided or found in environment variables.

Ancestors

  • livekit.agents.stt.stt.STT
  • abc.ABC
  • EventEmitter
  • typing.Generic

Methods

def stream(self,
*,
language: list[str] | None = None,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.gladia.stt.SpeechStream
Expand source code
def stream(
    self,
    *,
    language: list[str] | None = None,
    conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
) -> SpeechStream:
    config = self._sanitize_options(languages=language)
    stream = SpeechStream(
        stt=self,
        conn_options=conn_options,
        opts=config,
        api_key=self._api_key,
        http_session=self._ensure_session(),
        base_url=self._base_url,
    )
    self._streams.add(stream)
    return stream
def update_options(self,
*,
languages: list[str] | None = None,
code_switching: bool | None = None,
interim_results: bool | None = None,
sample_rate: int | None = None,
bit_depth: Literal[8, 16, 24, 32] | None = None,
channels: int | None = None,
encoding: "Literal['wav/pcm', 'wav/alaw', 'wav/ulaw'] | None" = None,
translation_enabled: bool | None = None,
translation_target_languages: list[str] | None = None,
translation_model: str | None = None,
translation_match_original_utterances: bool | None = None)
Expand source code
def update_options(
    self,
    *,
    languages: list[str] | None = None,
    code_switching: bool | None = None,
    interim_results: bool | None = None,
    sample_rate: int | None = None,
    bit_depth: Literal[8, 16, 24, 32] | None = None,
    channels: int | None = None,
    encoding: Literal["wav/pcm", "wav/alaw", "wav/ulaw"] | None = None,
    translation_enabled: bool | None = None,
    translation_target_languages: list[str] | None = None,
    translation_model: str | None = None,
    translation_match_original_utterances: bool | None = None,
):
    if languages is not None or code_switching is not None:
        language_config = dataclasses.replace(
            self._opts.language_config,
            languages=languages
            if languages is not None
            else self._opts.language_config.languages,
            code_switching=code_switching
            if code_switching is not None
            else self._opts.language_config.code_switching,
        )
        self._opts.language_config = language_config

    if (
        translation_enabled is not None
        or translation_target_languages is not None
        or translation_model is not None
        or translation_match_original_utterances is not None
    ):
        translation_config = dataclasses.replace(
            self._opts.translation_config,
            enabled=translation_enabled
            if translation_enabled is not None
            else self._opts.translation_config.enabled,
            target_languages=translation_target_languages
            if translation_target_languages is not None
            else self._opts.translation_config.target_languages,
            model=translation_model
            if translation_model is not None
            else self._opts.translation_config.model,
            match_original_utterances=translation_match_original_utterances
            if translation_match_original_utterances is not None
            else self._opts.translation_config.match_original_utterances,
        )
        self._opts.translation_config = translation_config

    if interim_results is not None:
        self._opts.interim_results = interim_results
    if sample_rate is not None:
        self._opts.sample_rate = sample_rate
    if bit_depth is not None:
        self._opts.bit_depth = bit_depth
    if channels is not None:
        self._opts.channels = channels
    if encoding is not None:
        self._opts.encoding = encoding

    for stream in self._streams:
        stream.update_options(
            languages=languages,
            code_switching=code_switching,
            interim_results=interim_results,
            sample_rate=sample_rate,
            bit_depth=bit_depth,
            channels=channels,
            encoding=encoding,
            translation_enabled=translation_enabled,
            translation_target_languages=translation_target_languages,
            translation_model=translation_model,
            translation_match_original_utterances=translation_match_original_utterances,
        )

Inherited members

class SpeechStream (*,
stt: STT,
opts: STTOptions,
conn_options: APIConnectOptions,
api_key: str,
http_session: aiohttp.ClientSession,
base_url: str)
Expand source code
class SpeechStream(stt.SpeechStream):
    def __init__(
        self,
        *,
        stt: STT,
        opts: STTOptions,
        conn_options: APIConnectOptions,
        api_key: str,
        http_session: aiohttp.ClientSession,
        base_url: str,
    ) -> None:
        super().__init__(stt=stt, conn_options=conn_options, sample_rate=opts.sample_rate)

        self._opts = opts
        self._api_key = api_key
        self._session = http_session
        self._base_url = base_url
        self._speaking = False
        self._audio_duration_collector = PeriodicCollector(
            callback=self._on_audio_duration_report,
            duration=5.0,
        )

        self._audio_energy_filter: AudioEnergyFilter | None = None
        if opts.energy_filter:
            if isinstance(opts.energy_filter, AudioEnergyFilter):
                self._audio_energy_filter = opts.energy_filter
            else:
                self._audio_energy_filter = AudioEnergyFilter()

        self._pushed_audio_duration = 0.0
        self._request_id = ""
        self._reconnect_event = asyncio.Event()
        self._ws: aiohttp.ClientWebSocketResponse | None = None

    def update_options(
        self,
        *,
        languages: list[str] | None = None,
        code_switching: bool | None = None,
        interim_results: bool | None = None,
        sample_rate: int | None = None,
        bit_depth: Literal[8, 16, 24, 32] | None = None,
        channels: int | None = None,
        encoding: Literal["wav/pcm", "wav/alaw", "wav/ulaw"] | None = None,
        translation_enabled: bool | None = None,
        translation_target_languages: list[str] | None = None,
        translation_model: str | None = None,
        translation_match_original_utterances: bool | None = None,
    ):
        if languages is not None or code_switching is not None:
            language_config = dataclasses.replace(
                self._opts.language_config,
                languages=languages
                if languages is not None
                else self._opts.language_config.languages,
                code_switching=code_switching
                if code_switching is not None
                else self._opts.language_config.code_switching,
            )
            self._opts.language_config = language_config

        if (
            translation_enabled is not None
            or translation_target_languages is not None
            or translation_model is not None
            or translation_match_original_utterances is not None
        ):
            translation_config = dataclasses.replace(
                self._opts.translation_config,
                enabled=translation_enabled
                if translation_enabled is not None
                else self._opts.translation_config.enabled,
                target_languages=translation_target_languages
                if translation_target_languages is not None
                else self._opts.translation_config.target_languages,
                model=translation_model
                if translation_model is not None
                else self._opts.translation_config.model,
                match_original_utterances=translation_match_original_utterances
                if translation_match_original_utterances is not None
                else self._opts.translation_config.match_original_utterances,
            )
            self._opts.translation_config = translation_config

        if interim_results is not None:
            self._opts.interim_results = interim_results
        if sample_rate is not None:
            self._opts.sample_rate = sample_rate
        if bit_depth is not None:
            self._opts.bit_depth = bit_depth
        if channels is not None:
            self._opts.channels = channels
        if encoding is not None:
            self._opts.encoding = encoding

        self._reconnect_event.set()

    async def _run(self) -> None:
        backoff_time = 1.0
        max_backoff = 30.0

        while True:
            try:
                # Initialize the Gladia session
                session_info = await self._init_live_session()
                session_url = session_info["url"]
                self._request_id = session_info["id"]

                # Reset backoff on success
                backoff_time = 1.0

                # Connect to the WebSocket
                async with self._session.ws_connect(session_url) as ws:
                    self._ws = ws
                    logger.info(f"Connected to Gladia session {self._request_id}")

                    send_task = asyncio.create_task(self._send_audio_task())
                    recv_task = asyncio.create_task(self._recv_messages_task())

                    wait_reconnect_task = asyncio.create_task(self._reconnect_event.wait())

                    try:
                        done, _ = await asyncio.wait(
                            [send_task, recv_task, wait_reconnect_task],
                            return_when=asyncio.FIRST_COMPLETED,
                        )

                        for task in done:
                            if task != wait_reconnect_task:
                                task.result()

                        if wait_reconnect_task not in done:
                            break

                        self._reconnect_event.clear()
                        logger.info("Reconnecting Gladia session due to options change")
                    finally:
                        await utils.aio.gracefully_cancel(send_task, recv_task, wait_reconnect_task)
                        self._ws = None
            except APIStatusError as e:
                if e.status_code == 429:
                    logger.warning(
                        f"Rate limited by Gladia API. Backing off for {backoff_time} seconds."
                    )
                    await asyncio.sleep(backoff_time)
                    backoff_time = min(backoff_time * 2, max_backoff)
                else:
                    logger.exception(f"Error in speech stream: {e}")
                    await asyncio.sleep(backoff_time)
            except Exception as e:
                logger.exception(f"Error in speech stream: {e}")
                # Wait a bit before reconnecting to avoid rapid reconnection attempts
                await asyncio.sleep(backoff_time)

    async def _init_live_session(self) -> dict:
        """Initialize a live session with Gladia."""
        streaming_config = {
            "encoding": self._opts.encoding,
            "sample_rate": self._opts.sample_rate,
            "bit_depth": self._opts.bit_depth,
            "channels": self._opts.channels,
            "language_config": {
                "languages": self._opts.language_config.languages or [],
                "code_switching": self._opts.language_config.code_switching,
            },
            "realtime_processing": {},
        }

        # Add translation configuration if enabled
        if self._opts.translation_config.enabled:
            streaming_config["realtime_processing"]["translation"] = True
            streaming_config["realtime_processing"]["translation_config"] = {
                "target_languages": self._opts.translation_config.target_languages,
                "model": self._opts.translation_config.model,
                "match_original_utterances": (
                    self._opts.translation_config.match_original_utterances
                ),
            }

        try:
            async with self._session.post(
                url=self._base_url,
                json=streaming_config,
                headers={"X-Gladia-Key": self._api_key},
                timeout=aiohttp.ClientTimeout(
                    total=30,
                    sock_connect=self._conn_options.timeout,
                ),
            ) as res:
                # Gladia returns 201 Created when successfully creating a session
                if res.status not in (200, 201):
                    raise APIStatusError(
                        message=f"Failed to initialize Gladia session: {res.status}",
                        status_code=res.status,
                        request_id=None,
                        body=await res.text(),
                    )
                return await res.json()
        except Exception as e:
            logger.exception(f"Failed to initialize Gladia session: {e}")
            raise APIConnectionError(f"Failed to initialize Gladia session: {str(e)}") from e

    async def _send_audio_task(self):
        """Send audio data to Gladia WebSocket."""
        if not self._ws:
            return

        # We'll aim to send audio chunks every ~100ms
        samples_100ms = self._opts.sample_rate // 10
        audio_bstream = utils.audio.AudioByteStream(
            sample_rate=self._opts.sample_rate,
            num_channels=self._opts.channels,
            samples_per_channel=samples_100ms,
        )

        has_ended = False
        last_frame: rtc.AudioFrame | None = None

        async for data in self._input_ch:
            if not self._ws:
                break

            frames: list[rtc.AudioFrame] = []
            if isinstance(data, rtc.AudioFrame):
                state = self._check_energy_state(data)
                if state in (
                    AudioEnergyFilter.State.START,
                    AudioEnergyFilter.State.SPEAKING,
                ):
                    if last_frame:
                        frames.extend(audio_bstream.write(last_frame.data.tobytes()))
                        last_frame = None
                    frames.extend(audio_bstream.write(data.data.tobytes()))
                elif state == AudioEnergyFilter.State.END:
                    frames = audio_bstream.flush()
                    has_ended = True
                elif state == AudioEnergyFilter.State.SILENCE:
                    last_frame = data
            elif isinstance(data, self._FlushSentinel):
                frames = audio_bstream.flush()
                has_ended = True

            for frame in frames:
                self._audio_duration_collector.push(frame.duration)
                # Encode the audio data as base64
                chunk_b64 = base64.b64encode(frame.data.tobytes()).decode("utf-8")
                message = json.dumps({"type": "audio_chunk", "data": {"chunk": chunk_b64}})
                await self._ws.send_str(message)

                if has_ended:
                    self._audio_duration_collector.flush()
                    await self._ws.send_str(json.dumps({"type": "stop_recording"}))
                    has_ended = False

        # Tell Gladia we're done sending audio when the stream ends
        if self._ws:
            await self._ws.send_str(json.dumps({"type": "stop_recording"}))

    async def _recv_messages_task(self):
        """Receive and process messages from Gladia WebSocket."""
        if not self._ws:
            return

        async for msg in self._ws:
            if msg.type == aiohttp.WSMsgType.TEXT:
                try:
                    data = json.loads(msg.data)
                    self._process_gladia_message(data)
                except Exception as e:
                    logger.exception(f"Error processing Gladia message: {e}")
            elif msg.type in (
                aiohttp.WSMsgType.CLOSED,
                aiohttp.WSMsgType.CLOSE,
                aiohttp.WSMsgType.CLOSING,
            ):
                break
            else:
                logger.warning(f"Unexpected message type from Gladia: {msg.type}")

    def _process_gladia_message(self, data: dict):
        """Process messages from Gladia WebSocket."""
        if data["type"] == "transcript":
            is_final = data["data"]["is_final"]
            utterance = data["data"]["utterance"]
            text = utterance.get("text", "").strip()

            if not self._speaking and text:
                self._speaking = True
                self._event_ch.send_nowait(
                    stt.SpeechEvent(
                        type=stt.SpeechEventType.START_OF_SPEECH, request_id=self._request_id
                    )
                )

            if text:
                language = utterance.get(
                    "language",
                    self._opts.language_config.languages[0]
                    if self._opts.language_config.languages
                    else "en",
                )

                speech_data = stt.SpeechData(
                    language=language,
                    start_time=utterance.get("start", 0),
                    end_time=utterance.get("end", 0),
                    confidence=utterance.get("confidence", 1.0),
                    text=text,
                )

                if is_final:
                    # Only emit FINAL_TRANSCRIPT for the *original* language
                    # if translation is NOT enabled.
                    if not self._opts.translation_config.enabled:
                        event = stt.SpeechEvent(
                            type=stt.SpeechEventType.FINAL_TRANSCRIPT,
                            request_id=self._request_id,
                            alternatives=[speech_data],
                        )
                        self._event_ch.send_nowait(event)

                        # End of speech after final original transcript only if not translating
                        if self._speaking:
                            self._speaking = False
                            self._event_ch.send_nowait(
                                stt.SpeechEvent(
                                    type=stt.SpeechEventType.END_OF_SPEECH,
                                    request_id=self._request_id,
                                )
                            )
                    # If translation *is* enabled, we suppress this final event
                    # and wait for the 'translation' message to emit the final event.
                elif self._opts.interim_results:
                    # Always send INTERIM_TRANSCRIPT for the original language if enabled
                    event = stt.SpeechEvent(
                        type=stt.SpeechEventType.INTERIM_TRANSCRIPT,
                        request_id=self._request_id,
                        alternatives=[speech_data],
                    )
                    self._event_ch.send_nowait(event)

        elif data["type"] == "translation":
            # Process translation messages according to Gladia's documentation:
            # https://docs.gladia.io/reference/realtime-messages/translation
            if self._opts.translation_config.enabled and "data" in data:
                translation_data = data["data"]

                # Extract translated utterance
                translated_utterance = translation_data.get("translated_utterance", {})
                if not translated_utterance:
                    logger.warning(
                        f"No translated_utterance in translation message: {translation_data}"
                    )
                    return

                # Get language information
                target_language = translation_data.get("target_language", "")
                language = translated_utterance.get("language", target_language)

                # Get the translated text
                translated_text = translated_utterance.get("text", "").strip()

                if translated_text and language:
                    # Create speech data for the translation
                    speech_data = stt.SpeechData(
                        language=language,  # Use the target language
                        start_time=translated_utterance.get("start", 0),
                        end_time=translated_utterance.get("end", 0),
                        confidence=translated_utterance.get("confidence", 1.0),
                        text=translated_text,  # Use the translated text
                    )

                    # Emit FINAL_TRANSCRIPT containing the TRANSLATION
                    event = stt.SpeechEvent(
                        type=stt.SpeechEventType.FINAL_TRANSCRIPT,
                        request_id=self._request_id,
                        alternatives=[speech_data],  # Now contains translated data
                    )
                    self._event_ch.send_nowait(event)

                    # Emit END_OF_SPEECH after the final *translated* transcript
                    if self._speaking:
                        self._speaking = False
                        self._event_ch.send_nowait(
                            stt.SpeechEvent(
                                type=stt.SpeechEventType.END_OF_SPEECH, request_id=self._request_id
                            )
                        )

        elif data["type"] == "post_final_transcript":
            # This is sent at the end of a session
            # We now tie END_OF_SPEECH to the emission of the relevant FINAL_TRANSCRIPT
            # (either original if no translation, or translated if translation is enabled).
            # So, we might not strictly need to act on this message anymore for END_OF_SPEECH,
            # but ensure speaking state is reset if somehow missed.
            if self._speaking:
                self._speaking = False

    def _check_energy_state(self, frame: rtc.AudioFrame) -> AudioEnergyFilter.State:
        """Check the energy state of an audio frame."""
        if self._audio_energy_filter:
            return self._audio_energy_filter.update(frame)
        return AudioEnergyFilter.State.SPEAKING

    def _on_audio_duration_report(self, duration: float) -> None:
        """Report the audio duration for usage tracking."""
        usage_event = stt.SpeechEvent(
            type=stt.SpeechEventType.RECOGNITION_USAGE,
            request_id=self._request_id,
            alternatives=[],
            recognition_usage=stt.RecognitionUsage(audio_duration=duration),
        )
        self._event_ch.send_nowait(usage_event)

Helper class that provides a standard way to create an ABC using inheritance.

Args: sample_rate : int or None, optional The desired sample rate for the audio input. If specified, the audio input will be automatically resampled to match the given sample rate before being processed for Speech-to-Text. If not provided (None), the input will retain its original sample rate.

Ancestors

  • livekit.agents.stt.stt.RecognizeStream
  • abc.ABC

Methods

def update_options(self,
*,
languages: list[str] | None = None,
code_switching: bool | None = None,
interim_results: bool | None = None,
sample_rate: int | None = None,
bit_depth: Literal[8, 16, 24, 32] | None = None,
channels: int | None = None,
encoding: "Literal['wav/pcm', 'wav/alaw', 'wav/ulaw'] | None" = None,
translation_enabled: bool | None = None,
translation_target_languages: list[str] | None = None,
translation_model: str | None = None,
translation_match_original_utterances: bool | None = None)
Expand source code
def update_options(
    self,
    *,
    languages: list[str] | None = None,
    code_switching: bool | None = None,
    interim_results: bool | None = None,
    sample_rate: int | None = None,
    bit_depth: Literal[8, 16, 24, 32] | None = None,
    channels: int | None = None,
    encoding: Literal["wav/pcm", "wav/alaw", "wav/ulaw"] | None = None,
    translation_enabled: bool | None = None,
    translation_target_languages: list[str] | None = None,
    translation_model: str | None = None,
    translation_match_original_utterances: bool | None = None,
):
    if languages is not None or code_switching is not None:
        language_config = dataclasses.replace(
            self._opts.language_config,
            languages=languages
            if languages is not None
            else self._opts.language_config.languages,
            code_switching=code_switching
            if code_switching is not None
            else self._opts.language_config.code_switching,
        )
        self._opts.language_config = language_config

    if (
        translation_enabled is not None
        or translation_target_languages is not None
        or translation_model is not None
        or translation_match_original_utterances is not None
    ):
        translation_config = dataclasses.replace(
            self._opts.translation_config,
            enabled=translation_enabled
            if translation_enabled is not None
            else self._opts.translation_config.enabled,
            target_languages=translation_target_languages
            if translation_target_languages is not None
            else self._opts.translation_config.target_languages,
            model=translation_model
            if translation_model is not None
            else self._opts.translation_config.model,
            match_original_utterances=translation_match_original_utterances
            if translation_match_original_utterances is not None
            else self._opts.translation_config.match_original_utterances,
        )
        self._opts.translation_config = translation_config

    if interim_results is not None:
        self._opts.interim_results = interim_results
    if sample_rate is not None:
        self._opts.sample_rate = sample_rate
    if bit_depth is not None:
        self._opts.bit_depth = bit_depth
    if channels is not None:
        self._opts.channels = channels
    if encoding is not None:
        self._opts.encoding = encoding

    self._reconnect_event.set()