Module livekit.plugins.google

Classes

class STT (*, languages: LanguageCode = 'en-US', detect_language: bool = True, interim_results: bool = True, punctuate: bool = True, spoken_punctuation: bool = True, model: SpeechModels = 'long', credentials_info: dict | None = None, credentials_file: str | None = None)

Helper class that provides a standard way to create an ABC using inheritance.

Create a new instance of Google STT.

Credentials must be provided, either by using the credentials_info dict, or reading from the file specified in credentials_file or via Application Default Credentials as described in https://cloud.google.com/docs/authentication/application-default-credentials

Expand source code
class STT(stt.STT):
    def __init__(
        self,
        *,
        languages: LanguageCode = "en-US",  # Google STT can accept multiple languages
        detect_language: bool = True,
        interim_results: bool = True,
        punctuate: bool = True,
        spoken_punctuation: bool = True,
        model: SpeechModels = "long",
        credentials_info: dict | None = None,
        credentials_file: str | None = None,
    ):
        """
        Create a new instance of Google STT.

        Credentials must be provided, either by using the ``credentials_info`` dict, or reading
        from the file specified in ``credentials_file`` or via Application Default Credentials as
        described in https://cloud.google.com/docs/authentication/application-default-credentials
        """
        super().__init__(
            capabilities=stt.STTCapabilities(streaming=True, interim_results=True)
        )

        self._client: SpeechAsyncClient | None = None
        self._credentials_info = credentials_info
        self._credentials_file = credentials_file

        if credentials_file is None and credentials_info is None:
            try:
                gauth_default()
            except DefaultCredentialsError:
                raise ValueError(
                    "Application default credentials must be available "
                    "when using Google STT without explicitly passing "
                    "credentials through credentials_info or credentials_file."
                )

        if isinstance(languages, str):
            languages = [languages]

        self._config = STTOptions(
            languages=languages,
            detect_language=detect_language,
            interim_results=interim_results,
            punctuate=punctuate,
            spoken_punctuation=spoken_punctuation,
            model=model,
        )

    def _ensure_client(self) -> SpeechAsyncClient:
        if self._credentials_info:
            self._client = SpeechAsyncClient.from_service_account_info(
                self._credentials_info
            )
        elif self._credentials_file:
            self._client = SpeechAsyncClient.from_service_account_file(
                self._credentials_file
            )
        else:
            self._client = SpeechAsyncClient()

        assert self._client is not None
        return self._client

    @property
    def _recognizer(self) -> str:
        # TODO(theomonnom): should we use recognizers?
        # recognizers may improve latency https://cloud.google.com/speech-to-text/v2/docs/recognizers#understand_recognizers

        # TODO(theomonnom): find a better way to access the project_id
        try:
            project_id = self._ensure_client().transport._credentials.project_id  # type: ignore
        except AttributeError:
            from google.auth import default as ga_default

            _, project_id = ga_default()
        return f"projects/{project_id}/locations/global/recognizers/_"

    def _sanitize_options(self, *, language: str | None = None) -> STTOptions:
        config = dataclasses.replace(self._config)

        if language:
            config.languages = [language]

        if not isinstance(config.languages, list):
            config.languages = [config.languages]
        elif not config.detect_language:
            if len(config.languages) > 1:
                logger.warning(
                    "multiple languages provided, but language detection is disabled"
                )
            config.languages = [config.languages[0]]

        return config

    async def recognize(
        self,
        buffer: utils.AudioBuffer,
        *,
        language: SpeechLanguages | str | None = None,
    ) -> stt.SpeechEvent:
        config = self._sanitize_options(language=language)
        frame = agents.utils.merge_frames(buffer)

        config = cloud_speech.RecognitionConfig(
            explicit_decoding_config=cloud_speech.ExplicitDecodingConfig(
                encoding=cloud_speech.ExplicitDecodingConfig.AudioEncoding.LINEAR16,
                sample_rate_hertz=frame.sample_rate,
                audio_channel_count=frame.num_channels,
            ),
            features=cloud_speech.RecognitionFeatures(
                enable_automatic_punctuation=config.punctuate,
                enable_spoken_punctuation=config.spoken_punctuation,
                enable_word_time_offsets=True,
            ),
            model=config.model,
            language_codes=config.languages,
        )

        raw = await self._ensure_client().recognize(
            cloud_speech.RecognizeRequest(
                recognizer=self._recognizer, config=config, content=frame.data.tobytes()
            )
        )
        return _recognize_response_to_speech_event(raw)

    def stream(
        self, *, language: SpeechLanguages | str | None = None
    ) -> "SpeechStream":
        config = self._sanitize_options(language=language)
        return SpeechStream(self._ensure_client(), self._recognizer, config)

Ancestors

Methods

async def recognize(self, buffer: utils.AudioBuffer, *, language: SpeechLanguages | str | None = None) ‑> SpeechEvent
def stream(self, *, language: SpeechLanguages | str | None = None) ‑> livekit.plugins.google.stt.SpeechStream

Inherited members

class SpeechStream (client: SpeechAsyncClient, recognizer: str, config: STTOptions, sample_rate: int = 48000, num_channels: int = 1, max_retry: int = 32)

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code
class SpeechStream(stt.SpeechStream):
    def __init__(
        self,
        client: SpeechAsyncClient,
        recognizer: str,
        config: STTOptions,
        sample_rate: int = 48000,
        num_channels: int = 1,
        max_retry: int = 32,
    ) -> None:
        super().__init__()

        self._client = client
        self._recognizer = recognizer
        self._config = config
        self._sample_rate = sample_rate
        self._num_channels = num_channels
        self._max_retry = max_retry

        self._streaming_config = cloud_speech.StreamingRecognitionConfig(
            config=cloud_speech.RecognitionConfig(
                explicit_decoding_config=cloud_speech.ExplicitDecodingConfig(
                    encoding=cloud_speech.ExplicitDecodingConfig.AudioEncoding.LINEAR16,
                    sample_rate_hertz=self._sample_rate,
                    audio_channel_count=self._num_channels,
                ),
                language_codes=self._config.languages,
                model=self._config.model,
                features=cloud_speech.RecognitionFeatures(
                    enable_automatic_punctuation=self._config.punctuate,
                    enable_word_time_offsets=True,
                ),
            ),
            streaming_features=cloud_speech.StreamingRecognitionFeatures(
                enable_voice_activity_events=True,
                interim_results=self._config.interim_results,
            ),
        )

    @utils.log_exceptions(logger=logger)
    async def _main_task(self) -> None:
        await self._run(self._max_retry)

    async def _run(self, max_retry: int) -> None:
        retry_count = 0
        while self._input_ch.qsize() or not self._input_ch.closed:
            try:
                # google requires a async generator when calling streaming_recognize
                # this function basically convert the queue into a async generator
                async def input_generator():
                    try:
                        # first request should contain the config
                        yield cloud_speech.StreamingRecognizeRequest(
                            recognizer=self._recognizer,
                            streaming_config=self._streaming_config,
                        )

                        async for frame in self._input_ch:
                            if isinstance(frame, rtc.AudioFrame):
                                frame = frame.remix_and_resample(
                                    self._sample_rate, self._num_channels
                                )
                                yield cloud_speech.StreamingRecognizeRequest(
                                    audio=frame.data.tobytes()
                                )

                    except Exception:
                        logger.exception(
                            "an error occurred while streaming input to google STT"
                        )

                # try to connect
                stream = await self._client.streaming_recognize(
                    requests=input_generator()
                )
                retry_count = 0  # connection successful, reset retry count

                await self._run_stream(stream)
            except Exception as e:
                if retry_count >= max_retry:
                    logger.error(
                        f"failed to connect to google stt after {max_retry} tries",
                        exc_info=e,
                    )
                    break

                retry_delay = min(retry_count * 2, 5)  # max 5s
                retry_count += 1
                logger.warning(
                    f"google stt connection failed, retrying in {retry_delay}s",
                    exc_info=e,
                )
                await asyncio.sleep(retry_delay)

    async def _run_stream(
        self, stream: AsyncIterable[cloud_speech.StreamingRecognizeResponse]
    ):
        async for resp in stream:
            if (
                resp.speech_event_type
                == cloud_speech.StreamingRecognizeResponse.SpeechEventType.SPEECH_ACTIVITY_BEGIN
            ):
                self._event_ch.send_nowait(
                    stt.SpeechEvent(type=stt.SpeechEventType.START_OF_SPEECH)
                )

            if (
                resp.speech_event_type
                == cloud_speech.StreamingRecognizeResponse.SpeechEventType.SPEECH_EVENT_TYPE_UNSPECIFIED
            ):
                result = resp.results[0]
                speech_data = _streaming_recognize_response_to_speech_data(resp)
                if speech_data is None:
                    continue

                if not result.is_final:
                    self._event_ch.send_nowait(
                        stt.SpeechEvent(
                            type=stt.SpeechEventType.INTERIM_TRANSCRIPT,
                            alternatives=[speech_data],
                        )
                    )
                else:
                    self._event_ch.send_nowait(
                        stt.SpeechEvent(
                            type=stt.SpeechEventType.FINAL_TRANSCRIPT,
                            alternatives=[speech_data],
                        )
                    )

            if (
                resp.speech_event_type
                == cloud_speech.StreamingRecognizeResponse.SpeechEventType.SPEECH_ACTIVITY_END
            ):
                self._event_ch.send_nowait(
                    stt.SpeechEvent(type=stt.SpeechEventType.END_OF_SPEECH)
                )

Ancestors

Inherited members

class TTS (*, language: LgType = 'en-US', gender: GenderType = 'neutral', voice_name: str = '', encoding: AudioEncodingType = 'linear16', sample_rate: int = 24000, speaking_rate: float = 1.0, credentials_info: dict | None = None, credentials_file: str | None = None)

Helper class that provides a standard way to create an ABC using inheritance.

Create a new instance of Google TTS.

Credentials must be provided, either by using the credentials_info dict, or reading from the file specified in credentials_file or the GOOGLE_APPLICATION_CREDENTIALS environmental variable.

Expand source code
class TTS(tts.TTS):
    def __init__(
        self,
        *,
        language: LgType = "en-US",
        gender: GenderType = "neutral",
        voice_name: str = "",  # Not required
        encoding: AudioEncodingType = "linear16",
        sample_rate: int = 24000,
        speaking_rate: float = 1.0,
        credentials_info: dict | None = None,
        credentials_file: str | None = None,
    ) -> None:
        """
        Create a new instance of Google TTS.

        Credentials must be provided, either by using the ``credentials_info`` dict, or reading
        from the file specified in ``credentials_file`` or the ``GOOGLE_APPLICATION_CREDENTIALS``
        environmental variable.
        """

        super().__init__(
            capabilities=tts.TTSCapabilities(
                streaming=False,
            ),
            sample_rate=sample_rate,
            num_channels=1,
        )

        self._client: texttospeech.TextToSpeechAsyncClient | None = None
        self._credentials_info = credentials_info
        self._credentials_file = credentials_file

        ssml_gender = SsmlVoiceGender.NEUTRAL
        if gender == "male":
            ssml_gender = SsmlVoiceGender.MALE
        elif gender == "female":
            ssml_gender = SsmlVoiceGender.FEMALE

        voice = texttospeech.VoiceSelectionParams(
            name=voice_name, language_code=language, ssml_gender=ssml_gender
        )

        if encoding == "linear16" or encoding == "wav":
            _audio_encoding = texttospeech.AudioEncoding.LINEAR16
        elif encoding == "mp3":
            _audio_encoding = texttospeech.AudioEncoding.MP3
        else:
            raise NotImplementedError(f"audio encoding {encoding} is not supported")

        self._opts = _TTSOptions(
            voice=voice,
            audio_config=texttospeech.AudioConfig(
                audio_encoding=_audio_encoding,
                sample_rate_hertz=sample_rate,
                speaking_rate=speaking_rate,
            ),
        )

    def _ensure_client(self) -> texttospeech.TextToSpeechAsyncClient:
        if not self._client:
            if self._credentials_info:
                self._client = (
                    texttospeech.TextToSpeechAsyncClient.from_service_account_info(
                        self._credentials_info
                    )
                )

            elif self._credentials_file:
                self._client = (
                    texttospeech.TextToSpeechAsyncClient.from_service_account_file(
                        self._credentials_file
                    )
                )
            else:
                self._client = texttospeech.TextToSpeechAsyncClient()

        assert self._client is not None
        return self._client

    def synthesize(self, text: str) -> "ChunkedStream":
        return ChunkedStream(text, self._opts, self._ensure_client())

Ancestors

Methods

def synthesize(self, text: str) ‑> livekit.plugins.google.tts.ChunkedStream