Module livekit.plugins.google
Classes
class STT (*, languages: LanguageCode = 'en-US', detect_language: bool = True, interim_results: bool = True, punctuate: bool = True, spoken_punctuation: bool = True, model: SpeechModels = 'long', credentials_info: dict | None = None, credentials_file: str | None = None, keywords: List[tuple[str, float]] | None = None)
-
Helper class that provides a standard way to create an ABC using inheritance.
Create a new instance of Google STT.
Credentials must be provided, either by using the
credentials_info
dict, or reading from the file specified incredentials_file
or via Application Default Credentials as described in https://cloud.google.com/docs/authentication/application-default-credentialsExpand source code
class STT(stt.STT): def __init__( self, *, languages: LanguageCode = "en-US", # Google STT can accept multiple languages detect_language: bool = True, interim_results: bool = True, punctuate: bool = True, spoken_punctuation: bool = True, model: SpeechModels = "long", credentials_info: dict | None = None, credentials_file: str | None = None, keywords: List[tuple[str, float]] | None = None, ): """ Create a new instance of Google STT. Credentials must be provided, either by using the ``credentials_info`` dict, or reading from the file specified in ``credentials_file`` or via Application Default Credentials as described in https://cloud.google.com/docs/authentication/application-default-credentials """ super().__init__( capabilities=stt.STTCapabilities(streaming=True, interim_results=True) ) self._client: SpeechAsyncClient | None = None self._credentials_info = credentials_info self._credentials_file = credentials_file if credentials_file is None and credentials_info is None: try: gauth_default() except DefaultCredentialsError: raise ValueError( "Application default credentials must be available " "when using Google STT without explicitly passing " "credentials through credentials_info or credentials_file." ) if isinstance(languages, str): languages = [languages] self._config = STTOptions( languages=languages, detect_language=detect_language, interim_results=interim_results, punctuate=punctuate, spoken_punctuation=spoken_punctuation, model=model, keywords=keywords, ) def _ensure_client(self) -> SpeechAsyncClient: if self._credentials_info: self._client = SpeechAsyncClient.from_service_account_info( self._credentials_info ) elif self._credentials_file: self._client = SpeechAsyncClient.from_service_account_file( self._credentials_file ) else: self._client = SpeechAsyncClient() assert self._client is not None return self._client @property def _recognizer(self) -> str: # TODO(theomonnom): should we use recognizers? # recognizers may improve latency https://cloud.google.com/speech-to-text/v2/docs/recognizers#understand_recognizers # TODO(theomonnom): find a better way to access the project_id try: project_id = self._ensure_client().transport._credentials.project_id # type: ignore except AttributeError: from google.auth import default as ga_default _, project_id = ga_default() return f"projects/{project_id}/locations/global/recognizers/_" def _sanitize_options(self, *, language: str | None = None) -> STTOptions: config = dataclasses.replace(self._config) if language: config.languages = [language] if not isinstance(config.languages, list): config.languages = [config.languages] elif not config.detect_language: if len(config.languages) > 1: logger.warning( "multiple languages provided, but language detection is disabled" ) config.languages = [config.languages[0]] return config async def _recognize_impl( self, buffer: utils.AudioBuffer, *, language: SpeechLanguages | str | None = None, ) -> stt.SpeechEvent: config = self._sanitize_options(language=language) frame = agents.utils.merge_frames(buffer) config = cloud_speech.RecognitionConfig( explicit_decoding_config=cloud_speech.ExplicitDecodingConfig( encoding=cloud_speech.ExplicitDecodingConfig.AudioEncoding.LINEAR16, sample_rate_hertz=frame.sample_rate, audio_channel_count=frame.num_channels, ), adaptation=config.build_adaptation(), features=cloud_speech.RecognitionFeatures( enable_automatic_punctuation=config.punctuate, enable_spoken_punctuation=config.spoken_punctuation, enable_word_time_offsets=True, ), model=config.model, language_codes=config.languages, ) try: raw = await self._ensure_client().recognize( cloud_speech.RecognizeRequest( recognizer=self._recognizer, config=config, content=frame.data.tobytes(), ) ) return _recognize_response_to_speech_event(raw) except DeadlineExceeded: raise APITimeoutError() except GoogleAPICallError as e: raise APIStatusError( e.message, status_code=e.code or -1, request_id=None, body=None, ) except Exception as e: raise APIConnectionError() from e def stream( self, *, language: SpeechLanguages | str | None = None ) -> "SpeechStream": config = self._sanitize_options(language=language) return SpeechStream(self, self._ensure_client(), self._recognizer, config)
Ancestors
- STT
- abc.ABC
- EventEmitter
- typing.Generic
Methods
def stream(self, *, language: SpeechLanguages | str | None = None) ‑> livekit.plugins.google.stt.SpeechStream
Inherited members
class SpeechStream (stt: STT, client: SpeechAsyncClient, recognizer: str, config: STTOptions, sample_rate: int = 48000, num_channels: int = 1, max_retry: int = 32)
-
Helper class that provides a standard way to create an ABC using inheritance.
Args: sample_rate : int or None, optional The desired sample rate for the audio input. If specified, the audio input will be automatically resampled to match the given sample rate before being processed for Speech-to-Text. If not provided (None), the input will retain its original sample rate.
Expand source code
class SpeechStream(stt.SpeechStream): def __init__( self, stt: STT, client: SpeechAsyncClient, recognizer: str, config: STTOptions, sample_rate: int = 48000, num_channels: int = 1, max_retry: int = 32, ) -> None: super().__init__(stt) self._client = client self._recognizer = recognizer self._config = config self._sample_rate = sample_rate self._num_channels = num_channels self._max_retry = max_retry self._streaming_config = cloud_speech.StreamingRecognitionConfig( config=cloud_speech.RecognitionConfig( explicit_decoding_config=cloud_speech.ExplicitDecodingConfig( encoding=cloud_speech.ExplicitDecodingConfig.AudioEncoding.LINEAR16, sample_rate_hertz=self._sample_rate, audio_channel_count=self._num_channels, ), adaptation=config.build_adaptation(), language_codes=self._config.languages, model=self._config.model, features=cloud_speech.RecognitionFeatures( enable_automatic_punctuation=self._config.punctuate, enable_word_time_offsets=True, ), ), streaming_features=cloud_speech.StreamingRecognitionFeatures( enable_voice_activity_events=True, interim_results=self._config.interim_results, ), ) @utils.log_exceptions(logger=logger) async def _main_task(self) -> None: await self._run(self._max_retry) async def _run(self, max_retry: int) -> None: retry_count = 0 while self._input_ch.qsize() or not self._input_ch.closed: try: # google requires a async generator when calling streaming_recognize # this function basically convert the queue into a async generator async def input_generator(): try: # first request should contain the config yield cloud_speech.StreamingRecognizeRequest( recognizer=self._recognizer, streaming_config=self._streaming_config, ) async for frame in self._input_ch: if isinstance(frame, rtc.AudioFrame): frame = frame.remix_and_resample( self._sample_rate, self._num_channels ) yield cloud_speech.StreamingRecognizeRequest( audio=frame.data.tobytes() ) except Exception: logger.exception( "an error occurred while streaming input to google STT" ) # try to connect stream = await self._client.streaming_recognize( requests=input_generator() ) retry_count = 0 # connection successful, reset retry count await self._run_stream(stream) except Aborted: logger.error("google stt connection aborted") break except Exception as e: if retry_count >= max_retry: logger.error( f"failed to connect to google stt after {max_retry} tries", exc_info=e, ) break retry_delay = min(retry_count * 2, 5) # max 5s retry_count += 1 logger.warning( f"google stt connection failed, retrying in {retry_delay}s", exc_info=e, ) await asyncio.sleep(retry_delay) async def _run_stream( self, stream: AsyncIterable[cloud_speech.StreamingRecognizeResponse] ): async for resp in stream: if ( resp.speech_event_type == cloud_speech.StreamingRecognizeResponse.SpeechEventType.SPEECH_ACTIVITY_BEGIN ): self._event_ch.send_nowait( stt.SpeechEvent(type=stt.SpeechEventType.START_OF_SPEECH) ) if ( resp.speech_event_type == cloud_speech.StreamingRecognizeResponse.SpeechEventType.SPEECH_EVENT_TYPE_UNSPECIFIED ): result = resp.results[0] speech_data = _streaming_recognize_response_to_speech_data(resp) if speech_data is None: continue if not result.is_final: self._event_ch.send_nowait( stt.SpeechEvent( type=stt.SpeechEventType.INTERIM_TRANSCRIPT, alternatives=[speech_data], ) ) else: self._event_ch.send_nowait( stt.SpeechEvent( type=stt.SpeechEventType.FINAL_TRANSCRIPT, alternatives=[speech_data], ) ) if ( resp.speech_event_type == cloud_speech.StreamingRecognizeResponse.SpeechEventType.SPEECH_ACTIVITY_END ): self._event_ch.send_nowait( stt.SpeechEvent(type=stt.SpeechEventType.END_OF_SPEECH) )
Ancestors
- SpeechStream
- abc.ABC
Inherited members
class TTS (*, language: SpeechLanguages | str = 'en-US', gender: Gender | str = 'neutral', voice_name: str = '', encoding: AudioEncoding | str = 'linear16', sample_rate: int = 24000, pitch: int = 0, effects_profile_id: str = '', speaking_rate: float = 1.0, credentials_info: dict | None = None, credentials_file: str | None = None)
-
Helper class that provides a standard way to create an ABC using inheritance.
Create a new instance of Google TTS.
Credentials must be provided, either by using the
credentials_info
dict, or reading from the file specified incredentials_file
or theGOOGLE_APPLICATION_CREDENTIALS
environmental variable.Args
language
:SpeechLanguages | str
, optional- Language code (e.g., "en-US"). Default is "en-US".
gender
:Gender | str
, optional- Voice gender ("male", "female", "neutral"). Default is "neutral".
voice_name
:str
, optional- Specific voice name. Default is an empty string.
encoding
:AudioEncoding | str
, optional- Audio encoding format (e.g., "linear16"). Default is "linear16".
sample_rate
:int
, optional- Audio sample rate in Hz. Default is 24000.
pitch
:float
, optional- Speaking pitch, ranging from -20.0 to 20.0 semitones relative to the original pitch. Default is 0.
effects_profile_id
:str
- Optional identifier for selecting audio effects profiles to apply to the synthesized speech.
speaking_rate
:float
, optional- Speed of speech. Default is 1.0.
credentials_info
:dict
, optional- Dictionary containing Google Cloud credentials. Default is None.
credentials_file
:str
, optional- Path to the Google Cloud credentials JSON file. Default is None.
Expand source code
class TTS(tts.TTS): def __init__( self, *, language: SpeechLanguages | str = "en-US", gender: Gender | str = "neutral", voice_name: str = "", # Not required encoding: AudioEncoding | str = "linear16", sample_rate: int = 24000, pitch: int = 0, effects_profile_id: str = "", speaking_rate: float = 1.0, credentials_info: dict | None = None, credentials_file: str | None = None, ) -> None: """ Create a new instance of Google TTS. Credentials must be provided, either by using the ``credentials_info`` dict, or reading from the file specified in ``credentials_file`` or the ``GOOGLE_APPLICATION_CREDENTIALS`` environmental variable. Args: language (SpeechLanguages | str, optional): Language code (e.g., "en-US"). Default is "en-US". gender (Gender | str, optional): Voice gender ("male", "female", "neutral"). Default is "neutral". voice_name (str, optional): Specific voice name. Default is an empty string. encoding (AudioEncoding | str, optional): Audio encoding format (e.g., "linear16"). Default is "linear16". sample_rate (int, optional): Audio sample rate in Hz. Default is 24000. pitch (float, optional): Speaking pitch, ranging from -20.0 to 20.0 semitones relative to the original pitch. Default is 0. effects_profile_id (str): Optional identifier for selecting audio effects profiles to apply to the synthesized speech. speaking_rate (float, optional): Speed of speech. Default is 1.0. credentials_info (dict, optional): Dictionary containing Google Cloud credentials. Default is None. credentials_file (str, optional): Path to the Google Cloud credentials JSON file. Default is None. """ super().__init__( capabilities=tts.TTSCapabilities( streaming=False, ), sample_rate=sample_rate, num_channels=1, ) self._client: texttospeech.TextToSpeechAsyncClient | None = None self._credentials_info = credentials_info self._credentials_file = credentials_file voice = texttospeech.VoiceSelectionParams( name=voice_name, language_code=language, ssml_gender=_gender_from_str(gender), ) if encoding == "linear16" or encoding == "wav": _audio_encoding = texttospeech.AudioEncoding.LINEAR16 elif encoding == "mp3": _audio_encoding = texttospeech.AudioEncoding.MP3 else: raise NotImplementedError(f"audio encoding {encoding} is not supported") self._opts = _TTSOptions( voice=voice, audio_config=texttospeech.AudioConfig( audio_encoding=_audio_encoding, sample_rate_hertz=sample_rate, pitch=pitch, effects_profile_id=effects_profile_id, speaking_rate=speaking_rate, ), ) def update_options( self, *, language: SpeechLanguages | str = "en-US", gender: Gender | str = "neutral", voice_name: str = "", # Not required speaking_rate: float = 1.0, ) -> None: """ Update the TTS options. Args: language (SpeechLanguages | str, optional): Language code (e.g., "en-US"). Default is "en-US". gender (Gender | str, optional): Voice gender ("male", "female", "neutral"). Default is "neutral". voice_name (str, optional): Specific voice name. Default is an empty string. speaking_rate (float, optional): Speed of speech. Default is 1.0. """ self._opts.voice = texttospeech.VoiceSelectionParams( name=voice_name, language_code=language, ssml_gender=_gender_from_str(gender), ) self._opts.audio_config.speaking_rate = speaking_rate def _ensure_client(self) -> texttospeech.TextToSpeechAsyncClient: if not self._client: if self._credentials_info: self._client = ( texttospeech.TextToSpeechAsyncClient.from_service_account_info( self._credentials_info ) ) elif self._credentials_file: self._client = ( texttospeech.TextToSpeechAsyncClient.from_service_account_file( self._credentials_file ) ) else: self._client = texttospeech.TextToSpeechAsyncClient() assert self._client is not None return self._client def synthesize(self, text: str) -> "ChunkedStream": return ChunkedStream(self, text, self._opts, self._ensure_client())
Ancestors
- TTS
- abc.ABC
- EventEmitter
- typing.Generic
Methods
def synthesize(self, text: str) ‑> livekit.plugins.google.tts.ChunkedStream
def update_options(self, *, language: SpeechLanguages | str = 'en-US', gender: Gender | str = 'neutral', voice_name: str = '', speaking_rate: float = 1.0) ‑> None
-
Update the TTS options.
Args
language
:SpeechLanguages | str
, optional- Language code (e.g., "en-US"). Default is "en-US".
gender
:Gender | str
, optional- Voice gender ("male", "female", "neutral"). Default is "neutral".
voice_name
:str
, optional- Specific voice name. Default is an empty string.
speaking_rate
:float
, optional- Speed of speech. Default is 1.0.
Inherited members