Module livekit.plugins.google

Classes

class STT (*, languages: LanguageCode = 'en-US', detect_language: bool = True, interim_results: bool = True, punctuate: bool = True, spoken_punctuation: bool = True, model: SpeechModels = 'long', credentials_info: dict | None = None, credentials_file: str | None = None, keywords: List[tuple[str, float]] | None = None)

Helper class that provides a standard way to create an ABC using inheritance.

Create a new instance of Google STT.

Credentials must be provided, either by using the credentials_info dict, or reading from the file specified in credentials_file or via Application Default Credentials as described in https://cloud.google.com/docs/authentication/application-default-credentials

Expand source code
class STT(stt.STT):
    def __init__(
        self,
        *,
        languages: LanguageCode = "en-US",  # Google STT can accept multiple languages
        detect_language: bool = True,
        interim_results: bool = True,
        punctuate: bool = True,
        spoken_punctuation: bool = True,
        model: SpeechModels = "long",
        credentials_info: dict | None = None,
        credentials_file: str | None = None,
        keywords: List[tuple[str, float]] | None = None,
    ):
        """
        Create a new instance of Google STT.

        Credentials must be provided, either by using the ``credentials_info`` dict, or reading
        from the file specified in ``credentials_file`` or via Application Default Credentials as
        described in https://cloud.google.com/docs/authentication/application-default-credentials
        """
        super().__init__(
            capabilities=stt.STTCapabilities(streaming=True, interim_results=True)
        )

        self._client: SpeechAsyncClient | None = None
        self._credentials_info = credentials_info
        self._credentials_file = credentials_file

        if credentials_file is None and credentials_info is None:
            try:
                gauth_default()
            except DefaultCredentialsError:
                raise ValueError(
                    "Application default credentials must be available "
                    "when using Google STT without explicitly passing "
                    "credentials through credentials_info or credentials_file."
                )

        if isinstance(languages, str):
            languages = [languages]

        self._config = STTOptions(
            languages=languages,
            detect_language=detect_language,
            interim_results=interim_results,
            punctuate=punctuate,
            spoken_punctuation=spoken_punctuation,
            model=model,
            keywords=keywords,
        )

    def _ensure_client(self) -> SpeechAsyncClient:
        if self._credentials_info:
            self._client = SpeechAsyncClient.from_service_account_info(
                self._credentials_info
            )
        elif self._credentials_file:
            self._client = SpeechAsyncClient.from_service_account_file(
                self._credentials_file
            )
        else:
            self._client = SpeechAsyncClient()

        assert self._client is not None
        return self._client

    @property
    def _recognizer(self) -> str:
        # TODO(theomonnom): should we use recognizers?
        # recognizers may improve latency https://cloud.google.com/speech-to-text/v2/docs/recognizers#understand_recognizers

        # TODO(theomonnom): find a better way to access the project_id
        try:
            project_id = self._ensure_client().transport._credentials.project_id  # type: ignore
        except AttributeError:
            from google.auth import default as ga_default

            _, project_id = ga_default()
        return f"projects/{project_id}/locations/global/recognizers/_"

    def _sanitize_options(self, *, language: str | None = None) -> STTOptions:
        config = dataclasses.replace(self._config)

        if language:
            config.languages = [language]

        if not isinstance(config.languages, list):
            config.languages = [config.languages]
        elif not config.detect_language:
            if len(config.languages) > 1:
                logger.warning(
                    "multiple languages provided, but language detection is disabled"
                )
            config.languages = [config.languages[0]]

        return config

    async def _recognize_impl(
        self,
        buffer: utils.AudioBuffer,
        *,
        language: SpeechLanguages | str | None = None,
    ) -> stt.SpeechEvent:
        config = self._sanitize_options(language=language)
        frame = agents.utils.merge_frames(buffer)

        config = cloud_speech.RecognitionConfig(
            explicit_decoding_config=cloud_speech.ExplicitDecodingConfig(
                encoding=cloud_speech.ExplicitDecodingConfig.AudioEncoding.LINEAR16,
                sample_rate_hertz=frame.sample_rate,
                audio_channel_count=frame.num_channels,
            ),
            adaptation=config.build_adaptation(),
            features=cloud_speech.RecognitionFeatures(
                enable_automatic_punctuation=config.punctuate,
                enable_spoken_punctuation=config.spoken_punctuation,
                enable_word_time_offsets=True,
            ),
            model=config.model,
            language_codes=config.languages,
        )

        try:
            raw = await self._ensure_client().recognize(
                cloud_speech.RecognizeRequest(
                    recognizer=self._recognizer,
                    config=config,
                    content=frame.data.tobytes(),
                )
            )

            return _recognize_response_to_speech_event(raw)
        except DeadlineExceeded:
            raise APITimeoutError()
        except GoogleAPICallError as e:
            raise APIStatusError(
                e.message,
                status_code=e.code or -1,
                request_id=None,
                body=None,
            )
        except Exception as e:
            raise APIConnectionError() from e

    def stream(
        self, *, language: SpeechLanguages | str | None = None
    ) -> "SpeechStream":
        config = self._sanitize_options(language=language)
        return SpeechStream(self, self._ensure_client(), self._recognizer, config)

Ancestors

Methods

def stream(self, *, language: SpeechLanguages | str | None = None) ‑> livekit.plugins.google.stt.SpeechStream

Inherited members

class SpeechStream (stt: STT, client: SpeechAsyncClient, recognizer: str, config: STTOptions, sample_rate: int = 48000, num_channels: int = 1, max_retry: int = 32)

Helper class that provides a standard way to create an ABC using inheritance.

Args: sample_rate : int or None, optional The desired sample rate for the audio input. If specified, the audio input will be automatically resampled to match the given sample rate before being processed for Speech-to-Text. If not provided (None), the input will retain its original sample rate.

Expand source code
class SpeechStream(stt.SpeechStream):
    def __init__(
        self,
        stt: STT,
        client: SpeechAsyncClient,
        recognizer: str,
        config: STTOptions,
        sample_rate: int = 48000,
        num_channels: int = 1,
        max_retry: int = 32,
    ) -> None:
        super().__init__(stt)

        self._client = client
        self._recognizer = recognizer
        self._config = config
        self._sample_rate = sample_rate
        self._num_channels = num_channels
        self._max_retry = max_retry

        self._streaming_config = cloud_speech.StreamingRecognitionConfig(
            config=cloud_speech.RecognitionConfig(
                explicit_decoding_config=cloud_speech.ExplicitDecodingConfig(
                    encoding=cloud_speech.ExplicitDecodingConfig.AudioEncoding.LINEAR16,
                    sample_rate_hertz=self._sample_rate,
                    audio_channel_count=self._num_channels,
                ),
                adaptation=config.build_adaptation(),
                language_codes=self._config.languages,
                model=self._config.model,
                features=cloud_speech.RecognitionFeatures(
                    enable_automatic_punctuation=self._config.punctuate,
                    enable_word_time_offsets=True,
                ),
            ),
            streaming_features=cloud_speech.StreamingRecognitionFeatures(
                enable_voice_activity_events=True,
                interim_results=self._config.interim_results,
            ),
        )

    @utils.log_exceptions(logger=logger)
    async def _main_task(self) -> None:
        await self._run(self._max_retry)

    async def _run(self, max_retry: int) -> None:
        retry_count = 0
        while self._input_ch.qsize() or not self._input_ch.closed:
            try:
                # google requires a async generator when calling streaming_recognize
                # this function basically convert the queue into a async generator
                async def input_generator():
                    try:
                        # first request should contain the config
                        yield cloud_speech.StreamingRecognizeRequest(
                            recognizer=self._recognizer,
                            streaming_config=self._streaming_config,
                        )

                        async for frame in self._input_ch:
                            if isinstance(frame, rtc.AudioFrame):
                                frame = frame.remix_and_resample(
                                    self._sample_rate, self._num_channels
                                )
                                yield cloud_speech.StreamingRecognizeRequest(
                                    audio=frame.data.tobytes()
                                )

                    except Exception:
                        logger.exception(
                            "an error occurred while streaming input to google STT"
                        )

                # try to connect
                stream = await self._client.streaming_recognize(
                    requests=input_generator()
                )
                retry_count = 0  # connection successful, reset retry count

                await self._run_stream(stream)
            except Aborted:
                logger.error("google stt connection aborted")
                break
            except Exception as e:
                if retry_count >= max_retry:
                    logger.error(
                        f"failed to connect to google stt after {max_retry} tries",
                        exc_info=e,
                    )
                    break

                retry_delay = min(retry_count * 2, 5)  # max 5s
                retry_count += 1
                logger.warning(
                    f"google stt connection failed, retrying in {retry_delay}s",
                    exc_info=e,
                )
                await asyncio.sleep(retry_delay)

    async def _run_stream(
        self, stream: AsyncIterable[cloud_speech.StreamingRecognizeResponse]
    ):
        async for resp in stream:
            if (
                resp.speech_event_type
                == cloud_speech.StreamingRecognizeResponse.SpeechEventType.SPEECH_ACTIVITY_BEGIN
            ):
                self._event_ch.send_nowait(
                    stt.SpeechEvent(type=stt.SpeechEventType.START_OF_SPEECH)
                )

            if (
                resp.speech_event_type
                == cloud_speech.StreamingRecognizeResponse.SpeechEventType.SPEECH_EVENT_TYPE_UNSPECIFIED
            ):
                result = resp.results[0]
                speech_data = _streaming_recognize_response_to_speech_data(resp)
                if speech_data is None:
                    continue

                if not result.is_final:
                    self._event_ch.send_nowait(
                        stt.SpeechEvent(
                            type=stt.SpeechEventType.INTERIM_TRANSCRIPT,
                            alternatives=[speech_data],
                        )
                    )
                else:
                    self._event_ch.send_nowait(
                        stt.SpeechEvent(
                            type=stt.SpeechEventType.FINAL_TRANSCRIPT,
                            alternatives=[speech_data],
                        )
                    )

            if (
                resp.speech_event_type
                == cloud_speech.StreamingRecognizeResponse.SpeechEventType.SPEECH_ACTIVITY_END
            ):
                self._event_ch.send_nowait(
                    stt.SpeechEvent(type=stt.SpeechEventType.END_OF_SPEECH)
                )

Ancestors

Inherited members

class TTS (*, language: SpeechLanguages | str = 'en-US', gender: Gender | str = 'neutral', voice_name: str = '', encoding: AudioEncoding | str = 'linear16', sample_rate: int = 24000, pitch: int = 0, effects_profile_id: str = '', speaking_rate: float = 1.0, credentials_info: dict | None = None, credentials_file: str | None = None)

Helper class that provides a standard way to create an ABC using inheritance.

Create a new instance of Google TTS.

Credentials must be provided, either by using the credentials_info dict, or reading from the file specified in credentials_file or the GOOGLE_APPLICATION_CREDENTIALS environmental variable.

Args

language : SpeechLanguages | str, optional
Language code (e.g., "en-US"). Default is "en-US".
gender : Gender | str, optional
Voice gender ("male", "female", "neutral"). Default is "neutral".
voice_name : str, optional
Specific voice name. Default is an empty string.
encoding : AudioEncoding | str, optional
Audio encoding format (e.g., "linear16"). Default is "linear16".
sample_rate : int, optional
Audio sample rate in Hz. Default is 24000.
pitch : float, optional
Speaking pitch, ranging from -20.0 to 20.0 semitones relative to the original pitch. Default is 0.
effects_profile_id : str
Optional identifier for selecting audio effects profiles to apply to the synthesized speech.
speaking_rate : float, optional
Speed of speech. Default is 1.0.
credentials_info : dict, optional
Dictionary containing Google Cloud credentials. Default is None.
credentials_file : str, optional
Path to the Google Cloud credentials JSON file. Default is None.
Expand source code
class TTS(tts.TTS):
    def __init__(
        self,
        *,
        language: SpeechLanguages | str = "en-US",
        gender: Gender | str = "neutral",
        voice_name: str = "",  # Not required
        encoding: AudioEncoding | str = "linear16",
        sample_rate: int = 24000,
        pitch: int = 0,
        effects_profile_id: str = "",
        speaking_rate: float = 1.0,
        credentials_info: dict | None = None,
        credentials_file: str | None = None,
    ) -> None:
        """
        Create a new instance of Google TTS.

        Credentials must be provided, either by using the ``credentials_info`` dict, or reading
        from the file specified in ``credentials_file`` or the ``GOOGLE_APPLICATION_CREDENTIALS``
        environmental variable.

        Args:
            language (SpeechLanguages | str, optional): Language code (e.g., "en-US"). Default is "en-US".
            gender (Gender | str, optional): Voice gender ("male", "female", "neutral"). Default is "neutral".
            voice_name (str, optional): Specific voice name. Default is an empty string.
            encoding (AudioEncoding | str, optional): Audio encoding format (e.g., "linear16"). Default is "linear16".
            sample_rate (int, optional): Audio sample rate in Hz. Default is 24000.
            pitch (float, optional): Speaking pitch, ranging from -20.0 to 20.0 semitones relative to the original pitch. Default is 0.
            effects_profile_id (str): Optional identifier for selecting audio effects profiles to apply to the synthesized speech.
            speaking_rate (float, optional): Speed of speech. Default is 1.0.
            credentials_info (dict, optional): Dictionary containing Google Cloud credentials. Default is None.
            credentials_file (str, optional): Path to the Google Cloud credentials JSON file. Default is None.
        """

        super().__init__(
            capabilities=tts.TTSCapabilities(
                streaming=False,
            ),
            sample_rate=sample_rate,
            num_channels=1,
        )

        self._client: texttospeech.TextToSpeechAsyncClient | None = None
        self._credentials_info = credentials_info
        self._credentials_file = credentials_file

        voice = texttospeech.VoiceSelectionParams(
            name=voice_name,
            language_code=language,
            ssml_gender=_gender_from_str(gender),
        )

        if encoding == "linear16" or encoding == "wav":
            _audio_encoding = texttospeech.AudioEncoding.LINEAR16
        elif encoding == "mp3":
            _audio_encoding = texttospeech.AudioEncoding.MP3
        else:
            raise NotImplementedError(f"audio encoding {encoding} is not supported")

        self._opts = _TTSOptions(
            voice=voice,
            audio_config=texttospeech.AudioConfig(
                audio_encoding=_audio_encoding,
                sample_rate_hertz=sample_rate,
                pitch=pitch,
                effects_profile_id=effects_profile_id,
                speaking_rate=speaking_rate,
            ),
        )

    def update_options(
        self,
        *,
        language: SpeechLanguages | str = "en-US",
        gender: Gender | str = "neutral",
        voice_name: str = "",  # Not required
        speaking_rate: float = 1.0,
    ) -> None:
        """
        Update the TTS options.

        Args:
            language (SpeechLanguages | str, optional): Language code (e.g., "en-US"). Default is "en-US".
            gender (Gender | str, optional): Voice gender ("male", "female", "neutral"). Default is "neutral".
            voice_name (str, optional): Specific voice name. Default is an empty string.
            speaking_rate (float, optional): Speed of speech. Default is 1.0.
        """
        self._opts.voice = texttospeech.VoiceSelectionParams(
            name=voice_name,
            language_code=language,
            ssml_gender=_gender_from_str(gender),
        )
        self._opts.audio_config.speaking_rate = speaking_rate

    def _ensure_client(self) -> texttospeech.TextToSpeechAsyncClient:
        if not self._client:
            if self._credentials_info:
                self._client = (
                    texttospeech.TextToSpeechAsyncClient.from_service_account_info(
                        self._credentials_info
                    )
                )

            elif self._credentials_file:
                self._client = (
                    texttospeech.TextToSpeechAsyncClient.from_service_account_file(
                        self._credentials_file
                    )
                )
            else:
                self._client = texttospeech.TextToSpeechAsyncClient()

        assert self._client is not None
        return self._client

    def synthesize(self, text: str) -> "ChunkedStream":
        return ChunkedStream(self, text, self._opts, self._ensure_client())

Ancestors

Methods

def synthesize(self, text: str) ‑> livekit.plugins.google.tts.ChunkedStream
def update_options(self, *, language: SpeechLanguages | str = 'en-US', gender: Gender | str = 'neutral', voice_name: str = '', speaking_rate: float = 1.0) ‑> None

Update the TTS options.

Args

language : SpeechLanguages | str, optional
Language code (e.g., "en-US"). Default is "en-US".
gender : Gender | str, optional
Voice gender ("male", "female", "neutral"). Default is "neutral".
voice_name : str, optional
Specific voice name. Default is an empty string.
speaking_rate : float, optional
Speed of speech. Default is 1.0.

Inherited members