Module livekit.plugins.google

Sub-modules

livekit.plugins.google.beta

Classes

class LLM (*,
model: ChatModels | str = 'gemini-2.0-flash-001',
api_key: str | None = None,
vertexai: bool = False,
project: str | None = None,
location: str | None = None,
candidate_count: int = 1,
temperature: float = 0.8,
max_output_tokens: int | None = None,
top_p: float | None = None,
top_k: float | None = None,
presence_penalty: float | None = None,
frequency_penalty: float | None = None,
tool_choice: "Union[ToolChoice, Literal['auto', 'required', 'none']]" = 'auto')
Expand source code
class LLM(llm.LLM):
    def __init__(
        self,
        *,
        model: ChatModels | str = "gemini-2.0-flash-001",
        api_key: str | None = None,
        vertexai: bool = False,
        project: str | None = None,
        location: str | None = None,
        candidate_count: int = 1,
        temperature: float = 0.8,
        max_output_tokens: int | None = None,
        top_p: float | None = None,
        top_k: float | None = None,
        presence_penalty: float | None = None,
        frequency_penalty: float | None = None,
        tool_choice: Union[ToolChoice, Literal["auto", "required", "none"]] = "auto",
    ) -> None:
        """
        Create a new instance of Google GenAI LLM.

        Environment Requirements:
        - For VertexAI: Set the `GOOGLE_APPLICATION_CREDENTIALS` environment variable to the path of the service account key file.
        The Google Cloud project and location can be set via `project` and `location` arguments or the environment variables
        `GOOGLE_CLOUD_PROJECT` and `GOOGLE_CLOUD_LOCATION`. By default, the project is inferred from the service account key file,
        and the location defaults to "us-central1".
        - For Google Gemini API: Set the `api_key` argument or the `GOOGLE_API_KEY` environment variable.

        Args:
            model (ChatModels | str, optional): The model name to use. Defaults to "gemini-2.0-flash-001".
            api_key (str, optional): The API key for Google Gemini. If not provided, it attempts to read from the `GOOGLE_API_KEY` environment variable.
            vertexai (bool, optional): Whether to use VertexAI. Defaults to False.
            project (str, optional): The Google Cloud project to use (only for VertexAI). Defaults to None.
            location (str, optional): The location to use for VertexAI API requests. Defaults value is "us-central1".
            candidate_count (int, optional): Number of candidate responses to generate. Defaults to 1.
            temperature (float, optional): Sampling temperature for response generation. Defaults to 0.8.
            max_output_tokens (int, optional): Maximum number of tokens to generate in the output. Defaults to None.
            top_p (float, optional): The nucleus sampling probability for response generation. Defaults to None.
            top_k (int, optional): The top-k sampling value for response generation. Defaults to None.
            presence_penalty (float, optional): Penalizes the model for generating previously mentioned concepts. Defaults to None.
            frequency_penalty (float, optional): Penalizes the model for repeating words. Defaults to None.
            tool_choice (ToolChoice or Literal["auto", "required", "none"], optional): Specifies whether to use tools during response generation. Defaults to "auto".
        """
        super().__init__(
            capabilities=LLMCapabilities(
                supports_choices_on_int=False,
                requires_persistent_functions=False,
            )
        )
        self._project_id = project or os.environ.get("GOOGLE_CLOUD_PROJECT", None)
        self._location = location or os.environ.get(
            "GOOGLE_CLOUD_LOCATION", "us-central1"
        )
        self._api_key = api_key or os.environ.get("GOOGLE_API_KEY", None)
        _gac = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS")
        if _gac is None:
            logger.warning(
                "`GOOGLE_APPLICATION_CREDENTIALS` environment variable is not set. please set it to the path of the service account key file. Otherwise, use any of the other Google Cloud auth methods."
            )

        if vertexai:
            if not self._project_id:
                _, self._project_id = default_async(
                    scopes=["https://www.googleapis.com/auth/cloud-platform"]
                )
            self._api_key = None  # VertexAI does not require an API key

        else:
            self._project_id = None
            self._location = None
            if not self._api_key:
                raise ValueError(
                    "API key is required for Google API either via api_key or GOOGLE_API_KEY environment variable"
                )

        self._opts = LLMOptions(
            model=model,
            temperature=temperature,
            tool_choice=tool_choice,
            vertexai=vertexai,
            project=project,
            location=location,
            candidate_count=candidate_count,
            max_output_tokens=max_output_tokens,
            top_p=top_p,
            top_k=top_k,
            presence_penalty=presence_penalty,
            frequency_penalty=frequency_penalty,
        )
        self._client = genai.Client(
            api_key=self._api_key,
            vertexai=vertexai,
            project=self._project_id,
            location=self._location,
        )
        self._running_fncs: MutableSet[asyncio.Task[Any]] = set()

    def chat(
        self,
        *,
        chat_ctx: llm.ChatContext,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
        fnc_ctx: llm.FunctionContext | None = None,
        temperature: float | None = None,
        n: int | None = 1,
        parallel_tool_calls: bool | None = None,
        tool_choice: Union[ToolChoice, Literal["auto", "required", "none"]]
        | None = None,
    ) -> "LLMStream":
        if tool_choice is None:
            tool_choice = self._opts.tool_choice

        if temperature is None:
            temperature = self._opts.temperature

        return LLMStream(
            self,
            client=self._client,
            model=self._opts.model,
            max_output_tokens=self._opts.max_output_tokens,
            top_p=self._opts.top_p,
            top_k=self._opts.top_k,
            presence_penalty=self._opts.presence_penalty,
            frequency_penalty=self._opts.frequency_penalty,
            chat_ctx=chat_ctx,
            fnc_ctx=fnc_ctx,
            conn_options=conn_options,
            n=n,
            temperature=temperature,
            tool_choice=tool_choice,
        )

Helper class that provides a standard way to create an ABC using inheritance.

Create a new instance of Google GenAI LLM.

Environment Requirements: - For VertexAI: Set the GOOGLE_APPLICATION_CREDENTIALS environment variable to the path of the service account key file. The Google Cloud project and location can be set via project and location arguments or the environment variables GOOGLE_CLOUD_PROJECT and GOOGLE_CLOUD_LOCATION. By default, the project is inferred from the service account key file, and the location defaults to "us-central1". - For Google Gemini API: Set the api_key argument or the GOOGLE_API_KEY environment variable.

Args

model : ChatModels | str, optional
The model name to use. Defaults to "gemini-2.0-flash-001".
api_key : str, optional
The API key for Google Gemini. If not provided, it attempts to read from the GOOGLE_API_KEY environment variable.
vertexai : bool, optional
Whether to use VertexAI. Defaults to False.
project : str, optional
The Google Cloud project to use (only for VertexAI). Defaults to None.
location : str, optional
The location to use for VertexAI API requests. Defaults value is "us-central1".
candidate_count : int, optional
Number of candidate responses to generate. Defaults to 1.
temperature : float, optional
Sampling temperature for response generation. Defaults to 0.8.
max_output_tokens : int, optional
Maximum number of tokens to generate in the output. Defaults to None.
top_p : float, optional
The nucleus sampling probability for response generation. Defaults to None.
top_k : int, optional
The top-k sampling value for response generation. Defaults to None.
presence_penalty : float, optional
Penalizes the model for generating previously mentioned concepts. Defaults to None.
frequency_penalty : float, optional
Penalizes the model for repeating words. Defaults to None.

tool_choice (ToolChoice or Literal["auto", "required", "none"], optional): Specifies whether to use tools during response generation. Defaults to "auto".

Ancestors

Methods

def chat(self,
*,
chat_ctx: llm.ChatContext,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0),
fnc_ctx: llm.FunctionContext | None = None,
temperature: float | None = None,
n: int | None = 1,
parallel_tool_calls: bool | None = None,
tool_choice: "Union[ToolChoice, Literal['auto', 'required', 'none']] | None" = None) ‑> livekit.plugins.google.llm.LLMStream
Expand source code
def chat(
    self,
    *,
    chat_ctx: llm.ChatContext,
    conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    fnc_ctx: llm.FunctionContext | None = None,
    temperature: float | None = None,
    n: int | None = 1,
    parallel_tool_calls: bool | None = None,
    tool_choice: Union[ToolChoice, Literal["auto", "required", "none"]]
    | None = None,
) -> "LLMStream":
    if tool_choice is None:
        tool_choice = self._opts.tool_choice

    if temperature is None:
        temperature = self._opts.temperature

    return LLMStream(
        self,
        client=self._client,
        model=self._opts.model,
        max_output_tokens=self._opts.max_output_tokens,
        top_p=self._opts.top_p,
        top_k=self._opts.top_k,
        presence_penalty=self._opts.presence_penalty,
        frequency_penalty=self._opts.frequency_penalty,
        chat_ctx=chat_ctx,
        fnc_ctx=fnc_ctx,
        conn_options=conn_options,
        n=n,
        temperature=temperature,
        tool_choice=tool_choice,
    )

Inherited members

class STT (*,
languages: LanguageCode = 'en-US',
detect_language: bool = True,
interim_results: bool = True,
punctuate: bool = True,
spoken_punctuation: bool = False,
model: SpeechModels | str = 'latest_long',
location: str = 'global',
sample_rate: int = 16000,
credentials_info: dict | None = None,
credentials_file: str | None = None,
keywords: List[tuple[str, float]] | None = None)
Expand source code
class STT(stt.STT):
    def __init__(
        self,
        *,
        languages: LanguageCode = "en-US",  # Google STT can accept multiple languages
        detect_language: bool = True,
        interim_results: bool = True,
        punctuate: bool = True,
        spoken_punctuation: bool = False,
        model: SpeechModels | str = "latest_long",
        location: str = "global",
        sample_rate: int = 16000,
        credentials_info: dict | None = None,
        credentials_file: str | None = None,
        keywords: List[tuple[str, float]] | None = None,
    ):
        """
        Create a new instance of Google STT.

        Credentials must be provided, either by using the ``credentials_info`` dict, or reading
        from the file specified in ``credentials_file`` or via Application Default Credentials as
        described in https://cloud.google.com/docs/authentication/application-default-credentials

        args:
            languages(LanguageCode): list of language codes to recognize (default: "en-US")
            detect_language(bool): whether to detect the language of the audio (default: True)
            interim_results(bool): whether to return interim results (default: True)
            punctuate(bool): whether to punctuate the audio (default: True)
            spoken_punctuation(bool): whether to use spoken punctuation (default: False)
            model(SpeechModels): the model to use for recognition default: "latest_long"
            location(str): the location to use for recognition default: "global"
            sample_rate(int): the sample rate of the audio default: 16000
            credentials_info(dict): the credentials info to use for recognition (default: None)
            credentials_file(str): the credentials file to use for recognition (default: None)
            keywords(List[tuple[str, float]]): list of keywords to recognize (default: None)
        """
        super().__init__(
            capabilities=stt.STTCapabilities(streaming=True, interim_results=True)
        )

        self._location = location
        self._credentials_info = credentials_info
        self._credentials_file = credentials_file

        if credentials_file is None and credentials_info is None:
            try:
                gauth_default()
            except DefaultCredentialsError:
                raise ValueError(
                    "Application default credentials must be available "
                    "when using Google STT without explicitly passing "
                    "credentials through credentials_info or credentials_file."
                )

        if isinstance(languages, str):
            languages = [languages]

        self._config = STTOptions(
            languages=languages,
            detect_language=detect_language,
            interim_results=interim_results,
            punctuate=punctuate,
            spoken_punctuation=spoken_punctuation,
            model=model,
            sample_rate=sample_rate,
            keywords=keywords,
        )
        self._streams = weakref.WeakSet[SpeechStream]()
        self._pool = utils.ConnectionPool[SpeechAsyncClient](
            max_session_duration=_max_session_duration,
            connect_cb=self._create_client,
        )

    async def _create_client(self) -> SpeechAsyncClient:
        # Add support for passing a specific location that matches recognizer
        # see: https://cloud.google.com/speech-to-text/v2/docs/speech-to-text-supported-languages
        client_options = None
        client: SpeechAsyncClient | None = None
        if self._location != "global":
            client_options = ClientOptions(
                api_endpoint=f"{self._location}-speech.googleapis.com"
            )
        if self._credentials_info:
            client = SpeechAsyncClient.from_service_account_info(
                self._credentials_info,
                client_options=client_options,
            )
        elif self._credentials_file:
            client = SpeechAsyncClient.from_service_account_file(
                self._credentials_file,
                client_options=client_options,
            )
        else:
            client = SpeechAsyncClient(
                client_options=client_options,
            )
        assert client is not None
        return client

    def _get_recognizer(self, client: SpeechAsyncClient) -> str:
        # TODO(theomonnom): should we use recognizers?
        # recognizers may improve latency https://cloud.google.com/speech-to-text/v2/docs/recognizers#understand_recognizers

        # TODO(theomonnom): find a better way to access the project_id
        try:
            project_id = client.transport._credentials.project_id  # type: ignore
        except AttributeError:
            from google.auth import default as ga_default

            _, project_id = ga_default()
        return f"projects/{project_id}/locations/{self._location}/recognizers/_"

    def _sanitize_options(self, *, language: str | None = None) -> STTOptions:
        config = dataclasses.replace(self._config)

        if language:
            config.languages = [language]

        if not isinstance(config.languages, list):
            config.languages = [config.languages]
        elif not config.detect_language:
            if len(config.languages) > 1:
                logger.warning(
                    "multiple languages provided, but language detection is disabled"
                )
            config.languages = [config.languages[0]]

        return config

    async def _recognize_impl(
        self,
        buffer: utils.AudioBuffer,
        *,
        language: SpeechLanguages | str | None,
        conn_options: APIConnectOptions,
    ) -> stt.SpeechEvent:
        config = self._sanitize_options(language=language)
        frame = rtc.combine_audio_frames(buffer)

        config = cloud_speech.RecognitionConfig(
            explicit_decoding_config=cloud_speech.ExplicitDecodingConfig(
                encoding=cloud_speech.ExplicitDecodingConfig.AudioEncoding.LINEAR16,
                sample_rate_hertz=frame.sample_rate,
                audio_channel_count=frame.num_channels,
            ),
            adaptation=config.build_adaptation(),
            features=cloud_speech.RecognitionFeatures(
                enable_automatic_punctuation=config.punctuate,
                enable_spoken_punctuation=config.spoken_punctuation,
                enable_word_time_offsets=True,
            ),
            model=config.model,
            language_codes=config.languages,
        )

        try:
            async with self._pool.connection() as client:
                raw = await client.recognize(
                    cloud_speech.RecognizeRequest(
                        recognizer=self._get_recognizer(client),
                        config=config,
                        content=frame.data.tobytes(),
                    ),
                    timeout=conn_options.timeout,
                )

                return _recognize_response_to_speech_event(raw)
        except DeadlineExceeded:
            raise APITimeoutError()
        except GoogleAPICallError as e:
            raise APIStatusError(
                e.message,
                status_code=e.code or -1,
            )
        except Exception as e:
            raise APIConnectionError() from e

    def stream(
        self,
        *,
        language: SpeechLanguages | str | None = None,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    ) -> "SpeechStream":
        config = self._sanitize_options(language=language)
        stream = SpeechStream(
            stt=self,
            pool=self._pool,
            recognizer_cb=self._get_recognizer,
            config=config,
            conn_options=conn_options,
        )
        self._streams.add(stream)
        return stream

    def update_options(
        self,
        *,
        languages: LanguageCode | None = None,
        detect_language: bool | None = None,
        interim_results: bool | None = None,
        punctuate: bool | None = None,
        spoken_punctuation: bool | None = None,
        model: SpeechModels | None = None,
        location: str | None = None,
        keywords: List[tuple[str, float]] | None = None,
    ):
        if languages is not None:
            if isinstance(languages, str):
                languages = [languages]
            self._config.languages = languages
        if detect_language is not None:
            self._config.detect_language = detect_language
        if interim_results is not None:
            self._config.interim_results = interim_results
        if punctuate is not None:
            self._config.punctuate = punctuate
        if spoken_punctuation is not None:
            self._config.spoken_punctuation = spoken_punctuation
        if model is not None:
            self._config.model = model
        if location is not None:
            self._location = location
            # if location is changed, fetch a new client and recognizer as per the new location
            self._pool.invalidate()
        if keywords is not None:
            self._config.keywords = keywords

        for stream in self._streams:
            stream.update_options(
                languages=languages,
                detect_language=detect_language,
                interim_results=interim_results,
                punctuate=punctuate,
                spoken_punctuation=spoken_punctuation,
                model=model,
                keywords=keywords,
            )

    async def aclose(self) -> None:
        await self._pool.aclose()
        await super().aclose()

Helper class that provides a standard way to create an ABC using inheritance.

Create a new instance of Google STT.

Credentials must be provided, either by using the credentials_info dict, or reading from the file specified in credentials_file or via Application Default Credentials as described in https://cloud.google.com/docs/authentication/application-default-credentials

args: languages(LanguageCode): list of language codes to recognize (default: "en-US") detect_language(bool): whether to detect the language of the audio (default: True) interim_results(bool): whether to return interim results (default: True) punctuate(bool): whether to punctuate the audio (default: True) spoken_punctuation(bool): whether to use spoken punctuation (default: False) model(SpeechModels): the model to use for recognition default: "latest_long" location(str): the location to use for recognition default: "global" sample_rate(int): the sample rate of the audio default: 16000 credentials_info(dict): the credentials info to use for recognition (default: None) credentials_file(str): the credentials file to use for recognition (default: None) keywords(List[tuple[str, float]]): list of keywords to recognize (default: None)

Ancestors

Methods

def stream(self,
*,
language: SpeechLanguages | str | None = None,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.google.stt.SpeechStream
Expand source code
def stream(
    self,
    *,
    language: SpeechLanguages | str | None = None,
    conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
) -> "SpeechStream":
    config = self._sanitize_options(language=language)
    stream = SpeechStream(
        stt=self,
        pool=self._pool,
        recognizer_cb=self._get_recognizer,
        config=config,
        conn_options=conn_options,
    )
    self._streams.add(stream)
    return stream
def update_options(self,
*,
languages: LanguageCode | None = None,
detect_language: bool | None = None,
interim_results: bool | None = None,
punctuate: bool | None = None,
spoken_punctuation: bool | None = None,
model: SpeechModels | None = None,
location: str | None = None,
keywords: List[tuple[str, float]] | None = None)
Expand source code
def update_options(
    self,
    *,
    languages: LanguageCode | None = None,
    detect_language: bool | None = None,
    interim_results: bool | None = None,
    punctuate: bool | None = None,
    spoken_punctuation: bool | None = None,
    model: SpeechModels | None = None,
    location: str | None = None,
    keywords: List[tuple[str, float]] | None = None,
):
    if languages is not None:
        if isinstance(languages, str):
            languages = [languages]
        self._config.languages = languages
    if detect_language is not None:
        self._config.detect_language = detect_language
    if interim_results is not None:
        self._config.interim_results = interim_results
    if punctuate is not None:
        self._config.punctuate = punctuate
    if spoken_punctuation is not None:
        self._config.spoken_punctuation = spoken_punctuation
    if model is not None:
        self._config.model = model
    if location is not None:
        self._location = location
        # if location is changed, fetch a new client and recognizer as per the new location
        self._pool.invalidate()
    if keywords is not None:
        self._config.keywords = keywords

    for stream in self._streams:
        stream.update_options(
            languages=languages,
            detect_language=detect_language,
            interim_results=interim_results,
            punctuate=punctuate,
            spoken_punctuation=spoken_punctuation,
            model=model,
            keywords=keywords,
        )

Inherited members

class SpeechStream (*,
stt: STT,
conn_options: APIConnectOptions,
pool: utils.ConnectionPool[SpeechAsyncClient],
recognizer_cb: Callable[[SpeechAsyncClient], str],
config: STTOptions)
Expand source code
class SpeechStream(stt.SpeechStream):
    def __init__(
        self,
        *,
        stt: STT,
        conn_options: APIConnectOptions,
        pool: utils.ConnectionPool[SpeechAsyncClient],
        recognizer_cb: Callable[[SpeechAsyncClient], str],
        config: STTOptions,
    ) -> None:
        super().__init__(
            stt=stt, conn_options=conn_options, sample_rate=config.sample_rate
        )

        self._pool = pool
        self._recognizer_cb = recognizer_cb
        self._config = config
        self._reconnect_event = asyncio.Event()
        self._session_connected_at: float = 0

    def update_options(
        self,
        *,
        languages: LanguageCode | None = None,
        detect_language: bool | None = None,
        interim_results: bool | None = None,
        punctuate: bool | None = None,
        spoken_punctuation: bool | None = None,
        model: SpeechModels | None = None,
        keywords: List[tuple[str, float]] | None = None,
    ):
        if languages is not None:
            if isinstance(languages, str):
                languages = [languages]
            self._config.languages = languages
        if detect_language is not None:
            self._config.detect_language = detect_language
        if interim_results is not None:
            self._config.interim_results = interim_results
        if punctuate is not None:
            self._config.punctuate = punctuate
        if spoken_punctuation is not None:
            self._config.spoken_punctuation = spoken_punctuation
        if model is not None:
            self._config.model = model
        if keywords is not None:
            self._config.keywords = keywords

        self._reconnect_event.set()

    async def _run(self) -> None:
        # google requires a async generator when calling streaming_recognize
        # this function basically convert the queue into a async generator
        async def input_generator(
            client: SpeechAsyncClient, should_stop: asyncio.Event
        ):
            try:
                # first request should contain the config
                yield cloud_speech.StreamingRecognizeRequest(
                    recognizer=self._recognizer_cb(client),
                    streaming_config=self._streaming_config,
                )

                async for frame in self._input_ch:
                    # when the stream is aborted due to reconnect, this input_generator
                    # needs to stop consuming frames
                    # when the generator stops, the previous gRPC stream will close
                    if should_stop.is_set():
                        return

                    if isinstance(frame, rtc.AudioFrame):
                        yield cloud_speech.StreamingRecognizeRequest(
                            audio=frame.data.tobytes()
                        )

            except Exception:
                logger.exception(
                    "an error occurred while streaming input to google STT"
                )

        async def process_stream(client: SpeechAsyncClient, stream):
            has_started = False
            async for resp in stream:
                if (
                    resp.speech_event_type
                    == cloud_speech.StreamingRecognizeResponse.SpeechEventType.SPEECH_ACTIVITY_BEGIN
                ):
                    self._event_ch.send_nowait(
                        stt.SpeechEvent(type=stt.SpeechEventType.START_OF_SPEECH)
                    )
                    has_started = True

                if (
                    resp.speech_event_type
                    == cloud_speech.StreamingRecognizeResponse.SpeechEventType.SPEECH_EVENT_TYPE_UNSPECIFIED
                ):
                    result = resp.results[0]
                    speech_data = _streaming_recognize_response_to_speech_data(resp)
                    if speech_data is None:
                        continue

                    if not result.is_final:
                        self._event_ch.send_nowait(
                            stt.SpeechEvent(
                                type=stt.SpeechEventType.INTERIM_TRANSCRIPT,
                                alternatives=[speech_data],
                            )
                        )
                    else:
                        self._event_ch.send_nowait(
                            stt.SpeechEvent(
                                type=stt.SpeechEventType.FINAL_TRANSCRIPT,
                                alternatives=[speech_data],
                            )
                        )
                        if (
                            time.time() - self._session_connected_at
                            > _max_session_duration
                        ):
                            logger.debug(
                                "Google STT maximum connection time reached. Reconnecting..."
                            )
                            self._pool.remove(client)
                            if has_started:
                                self._event_ch.send_nowait(
                                    stt.SpeechEvent(
                                        type=stt.SpeechEventType.END_OF_SPEECH
                                    )
                                )
                                has_started = False
                            self._reconnect_event.set()
                            return

                if (
                    resp.speech_event_type
                    == cloud_speech.StreamingRecognizeResponse.SpeechEventType.SPEECH_ACTIVITY_END
                ):
                    self._event_ch.send_nowait(
                        stt.SpeechEvent(type=stt.SpeechEventType.END_OF_SPEECH)
                    )
                    has_started = False

        while True:
            try:
                async with self._pool.connection() as client:
                    self._streaming_config = cloud_speech.StreamingRecognitionConfig(
                        config=cloud_speech.RecognitionConfig(
                            explicit_decoding_config=cloud_speech.ExplicitDecodingConfig(
                                encoding=cloud_speech.ExplicitDecodingConfig.AudioEncoding.LINEAR16,
                                sample_rate_hertz=self._config.sample_rate,
                                audio_channel_count=1,
                            ),
                            adaptation=self._config.build_adaptation(),
                            language_codes=self._config.languages,
                            model=self._config.model,
                            features=cloud_speech.RecognitionFeatures(
                                enable_automatic_punctuation=self._config.punctuate,
                                enable_word_time_offsets=True,
                            ),
                        ),
                        streaming_features=cloud_speech.StreamingRecognitionFeatures(
                            interim_results=self._config.interim_results,
                        ),
                    )

                    should_stop = asyncio.Event()
                    stream = await client.streaming_recognize(
                        requests=input_generator(client, should_stop),
                    )
                    self._session_connected_at = time.time()

                    process_stream_task = asyncio.create_task(
                        process_stream(client, stream)
                    )
                    wait_reconnect_task = asyncio.create_task(
                        self._reconnect_event.wait()
                    )

                    try:
                        done, _ = await asyncio.wait(
                            [process_stream_task, wait_reconnect_task],
                            return_when=asyncio.FIRST_COMPLETED,
                        )
                        for task in done:
                            if task != wait_reconnect_task:
                                task.result()
                        if wait_reconnect_task not in done:
                            break
                        self._reconnect_event.clear()
                    finally:
                        await utils.aio.gracefully_cancel(
                            process_stream_task, wait_reconnect_task
                        )
                        should_stop.set()
            except DeadlineExceeded:
                raise APITimeoutError()
            except GoogleAPICallError as e:
                raise APIStatusError(
                    e.message,
                    status_code=e.code or -1,
                )
            except Exception as e:
                raise APIConnectionError() from e

Helper class that provides a standard way to create an ABC using inheritance.

Args: sample_rate : int or None, optional The desired sample rate for the audio input. If specified, the audio input will be automatically resampled to match the given sample rate before being processed for Speech-to-Text. If not provided (None), the input will retain its original sample rate.

Ancestors

Methods

def update_options(self,
*,
languages: LanguageCode | None = None,
detect_language: bool | None = None,
interim_results: bool | None = None,
punctuate: bool | None = None,
spoken_punctuation: bool | None = None,
model: SpeechModels | None = None,
keywords: List[tuple[str, float]] | None = None)
Expand source code
def update_options(
    self,
    *,
    languages: LanguageCode | None = None,
    detect_language: bool | None = None,
    interim_results: bool | None = None,
    punctuate: bool | None = None,
    spoken_punctuation: bool | None = None,
    model: SpeechModels | None = None,
    keywords: List[tuple[str, float]] | None = None,
):
    if languages is not None:
        if isinstance(languages, str):
            languages = [languages]
        self._config.languages = languages
    if detect_language is not None:
        self._config.detect_language = detect_language
    if interim_results is not None:
        self._config.interim_results = interim_results
    if punctuate is not None:
        self._config.punctuate = punctuate
    if spoken_punctuation is not None:
        self._config.spoken_punctuation = spoken_punctuation
    if model is not None:
        self._config.model = model
    if keywords is not None:
        self._config.keywords = keywords

    self._reconnect_event.set()

Inherited members

class TTS (*,
language: SpeechLanguages | str = 'en-US',
gender: Gender | str = 'neutral',
voice_name: str = '',
sample_rate: int = 24000,
pitch: int = 0,
effects_profile_id: str = '',
speaking_rate: float = 1.0,
credentials_info: dict | None = None,
credentials_file: str | None = None)
Expand source code
class TTS(tts.TTS):
    def __init__(
        self,
        *,
        language: SpeechLanguages | str = "en-US",
        gender: Gender | str = "neutral",
        voice_name: str = "",  # Not required
        sample_rate: int = 24000,
        pitch: int = 0,
        effects_profile_id: str = "",
        speaking_rate: float = 1.0,
        credentials_info: dict | None = None,
        credentials_file: str | None = None,
    ) -> None:
        """
        Create a new instance of Google TTS.

        Credentials must be provided, either by using the ``credentials_info`` dict, or reading
        from the file specified in ``credentials_file`` or the ``GOOGLE_APPLICATION_CREDENTIALS``
        environmental variable.

        Args:
            language (SpeechLanguages | str, optional): Language code (e.g., "en-US"). Default is "en-US".
            gender (Gender | str, optional): Voice gender ("male", "female", "neutral"). Default is "neutral".
            voice_name (str, optional): Specific voice name. Default is an empty string.
            sample_rate (int, optional): Audio sample rate in Hz. Default is 24000.
            pitch (float, optional): Speaking pitch, ranging from -20.0 to 20.0 semitones relative to the original pitch. Default is 0.
            effects_profile_id (str): Optional identifier for selecting audio effects profiles to apply to the synthesized speech.
            speaking_rate (float, optional): Speed of speech. Default is 1.0.
            credentials_info (dict, optional): Dictionary containing Google Cloud credentials. Default is None.
            credentials_file (str, optional): Path to the Google Cloud credentials JSON file. Default is None.
        """

        super().__init__(
            capabilities=tts.TTSCapabilities(
                streaming=False,
            ),
            sample_rate=sample_rate,
            num_channels=1,
        )

        self._client: texttospeech.TextToSpeechAsyncClient | None = None
        self._credentials_info = credentials_info
        self._credentials_file = credentials_file

        voice = texttospeech.VoiceSelectionParams(
            name=voice_name,
            language_code=language,
            ssml_gender=_gender_from_str(gender),
        )

        self._opts = _TTSOptions(
            voice=voice,
            audio_config=texttospeech.AudioConfig(
                audio_encoding=texttospeech.AudioEncoding.OGG_OPUS,
                sample_rate_hertz=sample_rate,
                pitch=pitch,
                effects_profile_id=effects_profile_id,
                speaking_rate=speaking_rate,
            ),
        )

    def update_options(
        self,
        *,
        language: SpeechLanguages | str = "en-US",
        gender: Gender | str = "neutral",
        voice_name: str = "",  # Not required
        speaking_rate: float = 1.0,
    ) -> None:
        """
        Update the TTS options.

        Args:
            language (SpeechLanguages | str, optional): Language code (e.g., "en-US"). Default is "en-US".
            gender (Gender | str, optional): Voice gender ("male", "female", "neutral"). Default is "neutral".
            voice_name (str, optional): Specific voice name. Default is an empty string.
            speaking_rate (float, optional): Speed of speech. Default is 1.0.
        """
        self._opts.voice = texttospeech.VoiceSelectionParams(
            name=voice_name,
            language_code=language,
            ssml_gender=_gender_from_str(gender),
        )
        self._opts.audio_config.speaking_rate = speaking_rate

    def _ensure_client(self) -> texttospeech.TextToSpeechAsyncClient:
        if self._client is None:
            if self._credentials_info:
                self._client = (
                    texttospeech.TextToSpeechAsyncClient.from_service_account_info(
                        self._credentials_info
                    )
                )

            elif self._credentials_file:
                self._client = (
                    texttospeech.TextToSpeechAsyncClient.from_service_account_file(
                        self._credentials_file
                    )
                )
            else:
                self._client = texttospeech.TextToSpeechAsyncClient()

        assert self._client is not None
        return self._client

    def synthesize(
        self,
        text: str,
        *,
        conn_options: Optional[APIConnectOptions] = None,
    ) -> "ChunkedStream":
        return ChunkedStream(
            tts=self,
            input_text=text,
            conn_options=conn_options,
            opts=self._opts,
            client=self._ensure_client(),
        )

Helper class that provides a standard way to create an ABC using inheritance.

Create a new instance of Google TTS.

Credentials must be provided, either by using the credentials_info dict, or reading from the file specified in credentials_file or the GOOGLE_APPLICATION_CREDENTIALS environmental variable.

Args

language : SpeechLanguages | str, optional
Language code (e.g., "en-US"). Default is "en-US".
gender : Gender | str, optional
Voice gender ("male", "female", "neutral"). Default is "neutral".
voice_name : str, optional
Specific voice name. Default is an empty string.
sample_rate : int, optional
Audio sample rate in Hz. Default is 24000.
pitch : float, optional
Speaking pitch, ranging from -20.0 to 20.0 semitones relative to the original pitch. Default is 0.
effects_profile_id : str
Optional identifier for selecting audio effects profiles to apply to the synthesized speech.
speaking_rate : float, optional
Speed of speech. Default is 1.0.
credentials_info : dict, optional
Dictionary containing Google Cloud credentials. Default is None.
credentials_file : str, optional
Path to the Google Cloud credentials JSON file. Default is None.

Ancestors

Methods

def synthesize(self, text: str, *, conn_options: Optional[APIConnectOptions] = None) ‑> livekit.plugins.google.tts.ChunkedStream
Expand source code
def synthesize(
    self,
    text: str,
    *,
    conn_options: Optional[APIConnectOptions] = None,
) -> "ChunkedStream":
    return ChunkedStream(
        tts=self,
        input_text=text,
        conn_options=conn_options,
        opts=self._opts,
        client=self._ensure_client(),
    )
def update_options(self,
*,
language: SpeechLanguages | str = 'en-US',
gender: Gender | str = 'neutral',
voice_name: str = '',
speaking_rate: float = 1.0) ‑> None
Expand source code
def update_options(
    self,
    *,
    language: SpeechLanguages | str = "en-US",
    gender: Gender | str = "neutral",
    voice_name: str = "",  # Not required
    speaking_rate: float = 1.0,
) -> None:
    """
    Update the TTS options.

    Args:
        language (SpeechLanguages | str, optional): Language code (e.g., "en-US"). Default is "en-US".
        gender (Gender | str, optional): Voice gender ("male", "female", "neutral"). Default is "neutral".
        voice_name (str, optional): Specific voice name. Default is an empty string.
        speaking_rate (float, optional): Speed of speech. Default is 1.0.
    """
    self._opts.voice = texttospeech.VoiceSelectionParams(
        name=voice_name,
        language_code=language,
        ssml_gender=_gender_from_str(gender),
    )
    self._opts.audio_config.speaking_rate = speaking_rate

Update the TTS options.

Args

language : SpeechLanguages | str, optional
Language code (e.g., "en-US"). Default is "en-US".
gender : Gender | str, optional
Voice gender ("male", "female", "neutral"). Default is "neutral".
voice_name : str, optional
Specific voice name. Default is an empty string.
speaking_rate : float, optional
Speed of speech. Default is 1.0.

Inherited members