Module livekit.plugins.google

Google AI plugin for LiveKit Agents

Supports Gemini, Cloud Speech-to-Text, and Cloud Text-to-Speech.

See https://docs.livekit.io/agents/integrations/stt/google/ for more information.

Sub-modules

livekit.plugins.google.beta

Classes

class LLM (*,
model: ChatModels | str = 'gemini-2.0-flash-001',
api_key: NotGivenOr[str] = NOT_GIVEN,
vertexai: NotGivenOr[bool] = NOT_GIVEN,
project: NotGivenOr[str] = NOT_GIVEN,
location: NotGivenOr[str] = NOT_GIVEN,
temperature: NotGivenOr[float] = NOT_GIVEN,
max_output_tokens: NotGivenOr[int] = NOT_GIVEN,
top_p: NotGivenOr[float] = NOT_GIVEN,
top_k: NotGivenOr[float] = NOT_GIVEN,
presence_penalty: NotGivenOr[float] = NOT_GIVEN,
frequency_penalty: NotGivenOr[float] = NOT_GIVEN,
tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN,
thinking_config: NotGivenOr[types.ThinkingConfigOrDict] = NOT_GIVEN)
Expand source code
class LLM(llm.LLM):
    def __init__(
        self,
        *,
        model: ChatModels | str = "gemini-2.0-flash-001",
        api_key: NotGivenOr[str] = NOT_GIVEN,
        vertexai: NotGivenOr[bool] = NOT_GIVEN,
        project: NotGivenOr[str] = NOT_GIVEN,
        location: NotGivenOr[str] = NOT_GIVEN,
        temperature: NotGivenOr[float] = NOT_GIVEN,
        max_output_tokens: NotGivenOr[int] = NOT_GIVEN,
        top_p: NotGivenOr[float] = NOT_GIVEN,
        top_k: NotGivenOr[float] = NOT_GIVEN,
        presence_penalty: NotGivenOr[float] = NOT_GIVEN,
        frequency_penalty: NotGivenOr[float] = NOT_GIVEN,
        tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN,
        thinking_config: NotGivenOr[types.ThinkingConfigOrDict] = NOT_GIVEN,
    ) -> None:
        """
        Create a new instance of Google GenAI LLM.

        Environment Requirements:
        - For VertexAI: Set the `GOOGLE_APPLICATION_CREDENTIALS` environment variable to the path of the service account key file or use any of the other Google Cloud auth methods.
        The Google Cloud project and location can be set via `project` and `location` arguments or the environment variables
        `GOOGLE_CLOUD_PROJECT` and `GOOGLE_CLOUD_LOCATION`. By default, the project is inferred from the service account key file,
        and the location defaults to "us-central1".
        - For Google Gemini API: Set the `api_key` argument or the `GOOGLE_API_KEY` environment variable.

        Args:
            model (ChatModels | str, optional): The model name to use. Defaults to "gemini-2.0-flash-001".
            api_key (str, optional): The API key for Google Gemini. If not provided, it attempts to read from the `GOOGLE_API_KEY` environment variable.
            vertexai (bool, optional): Whether to use VertexAI. If not provided, it attempts to read from the `GOOGLE_GENAI_USE_VERTEXAI` environment variable. Defaults to False.
                project (str, optional): The Google Cloud project to use (only for VertexAI). Defaults to None.
                location (str, optional): The location to use for VertexAI API requests. Defaults value is "us-central1".
            temperature (float, optional): Sampling temperature for response generation. Defaults to 0.8.
            max_output_tokens (int, optional): Maximum number of tokens to generate in the output. Defaults to None.
            top_p (float, optional): The nucleus sampling probability for response generation. Defaults to None.
            top_k (int, optional): The top-k sampling value for response generation. Defaults to None.
            presence_penalty (float, optional): Penalizes the model for generating previously mentioned concepts. Defaults to None.
            frequency_penalty (float, optional): Penalizes the model for repeating words. Defaults to None.
            tool_choice (ToolChoice, optional): Specifies whether to use tools during response generation. Defaults to "auto".
            thinking_config (ThinkingConfigOrDict, optional): The thinking configuration for response generation. Defaults to None.
        """  # noqa: E501
        super().__init__()
        gcp_project = project if is_given(project) else os.environ.get("GOOGLE_CLOUD_PROJECT")
        gcp_location = (
            location
            if is_given(location)
            else os.environ.get("GOOGLE_CLOUD_LOCATION") or "us-central1"
        )
        use_vertexai = (
            vertexai
            if is_given(vertexai)
            else os.environ.get("GOOGLE_GENAI_USE_VERTEXAI", "0").lower() in ["true", "1"]
        )
        gemini_api_key = api_key if is_given(api_key) else os.environ.get("GOOGLE_API_KEY")

        if use_vertexai:
            if not gcp_project:
                _, gcp_project = default_async(
                    scopes=["https://www.googleapis.com/auth/cloud-platform"]
                )
            gemini_api_key = None  # VertexAI does not require an API key

        else:
            gcp_project = None
            gcp_location = None
            if not gemini_api_key:
                raise ValueError(
                    "API key is required for Google API either via api_key or GOOGLE_API_KEY environment variable"  # noqa: E501
                )

        # Validate thinking_config
        if is_given(thinking_config):
            _thinking_budget = None
            if isinstance(thinking_config, dict):
                _thinking_budget = thinking_config.get("thinking_budget")
            elif isinstance(thinking_config, types.ThinkingConfig):
                _thinking_budget = thinking_config.thinking_budget

            if _thinking_budget is not None:
                if not isinstance(_thinking_budget, int):
                    raise ValueError("thinking_budget inside thinking_config must be an integer")
                if not (0 <= _thinking_budget <= 24576):
                    raise ValueError(
                        "thinking_budget inside thinking_config must be between 0 and 24576"
                    )

        self._opts = _LLMOptions(
            model=model,
            temperature=temperature,
            tool_choice=tool_choice,
            vertexai=use_vertexai,
            project=project,
            location=location,
            max_output_tokens=max_output_tokens,
            top_p=top_p,
            top_k=top_k,
            presence_penalty=presence_penalty,
            frequency_penalty=frequency_penalty,
            thinking_config=thinking_config,
        )
        self._client = genai.Client(
            api_key=gemini_api_key,
            vertexai=use_vertexai,
            project=gcp_project,
            location=gcp_location,
        )

    def chat(
        self,
        *,
        chat_ctx: llm.ChatContext,
        tools: list[FunctionTool] | None = None,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
        parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN,
        tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN,
        response_format: NotGivenOr[
            types.SchemaUnion | type[llm_utils.ResponseFormatT]
        ] = NOT_GIVEN,
        extra_kwargs: NotGivenOr[dict[str, Any]] = NOT_GIVEN,
    ) -> LLMStream:
        extra = {}

        if is_given(extra_kwargs):
            extra.update(extra_kwargs)

        tool_choice = tool_choice if is_given(tool_choice) else self._opts.tool_choice
        if is_given(tool_choice):
            gemini_tool_choice: types.ToolConfig
            if isinstance(tool_choice, dict) and tool_choice.get("type") == "function":
                gemini_tool_choice = types.ToolConfig(
                    function_calling_config=types.FunctionCallingConfig(
                        mode="ANY",
                        allowed_function_names=[tool_choice["function"]["name"]],
                    )
                )
                extra["tool_config"] = gemini_tool_choice
            elif tool_choice == "required":
                gemini_tool_choice = types.ToolConfig(
                    function_calling_config=types.FunctionCallingConfig(
                        mode="ANY",
                        allowed_function_names=[get_function_info(fnc).name for fnc in tools]
                        if tools
                        else None,
                    )
                )
                extra["tool_config"] = gemini_tool_choice
            elif tool_choice == "auto":
                gemini_tool_choice = types.ToolConfig(
                    function_calling_config=types.FunctionCallingConfig(
                        mode="AUTO",
                    )
                )
                extra["tool_config"] = gemini_tool_choice
            elif tool_choice == "none":
                gemini_tool_choice = types.ToolConfig(
                    function_calling_config=types.FunctionCallingConfig(
                        mode="NONE",
                    )
                )
                extra["tool_config"] = gemini_tool_choice

        if is_given(response_format):
            extra["response_schema"] = to_response_format(response_format)
            extra["response_mime_type"] = "application/json"

        if is_given(self._opts.temperature):
            extra["temperature"] = self._opts.temperature
        if is_given(self._opts.max_output_tokens):
            extra["max_output_tokens"] = self._opts.max_output_tokens
        if is_given(self._opts.top_p):
            extra["top_p"] = self._opts.top_p
        if is_given(self._opts.top_k):
            extra["top_k"] = self._opts.top_k
        if is_given(self._opts.presence_penalty):
            extra["presence_penalty"] = self._opts.presence_penalty
        if is_given(self._opts.frequency_penalty):
            extra["frequency_penalty"] = self._opts.frequency_penalty

        # Add thinking config if thinking_budget is provided
        if is_given(self._opts.thinking_config):
            extra["thinking_config"] = self._opts.thinking_config

        return LLMStream(
            self,
            client=self._client,
            model=self._opts.model,
            chat_ctx=chat_ctx,
            tools=tools or [],
            conn_options=conn_options,
            extra_kwargs=extra,
        )

Helper class that provides a standard way to create an ABC using inheritance.

Create a new instance of Google GenAI LLM.

Environment Requirements: - For VertexAI: Set the GOOGLE_APPLICATION_CREDENTIALS environment variable to the path of the service account key file or use any of the other Google Cloud auth methods. The Google Cloud project and location can be set via project and location arguments or the environment variables GOOGLE_CLOUD_PROJECT and GOOGLE_CLOUD_LOCATION. By default, the project is inferred from the service account key file, and the location defaults to "us-central1". - For Google Gemini API: Set the api_key argument or the GOOGLE_API_KEY environment variable.

Args

model : ChatModels | str, optional
The model name to use. Defaults to "gemini-2.0-flash-001".
api_key : str, optional
The API key for Google Gemini. If not provided, it attempts to read from the GOOGLE_API_KEY environment variable.
vertexai : bool, optional
Whether to use VertexAI. If not provided, it attempts to read from the GOOGLE_GENAI_USE_VERTEXAI environment variable. Defaults to False. project (str, optional): The Google Cloud project to use (only for VertexAI). Defaults to None. location (str, optional): The location to use for VertexAI API requests. Defaults value is "us-central1".
temperature : float, optional
Sampling temperature for response generation. Defaults to 0.8.
max_output_tokens : int, optional
Maximum number of tokens to generate in the output. Defaults to None.
top_p : float, optional
The nucleus sampling probability for response generation. Defaults to None.
top_k : int, optional
The top-k sampling value for response generation. Defaults to None.
presence_penalty : float, optional
Penalizes the model for generating previously mentioned concepts. Defaults to None.
frequency_penalty : float, optional
Penalizes the model for repeating words. Defaults to None.
tool_choice : ToolChoice, optional
Specifies whether to use tools during response generation. Defaults to "auto".
thinking_config : ThinkingConfigOrDict, optional
The thinking configuration for response generation. Defaults to None.

Ancestors

  • livekit.agents.llm.llm.LLM
  • abc.ABC
  • EventEmitter
  • typing.Generic

Methods

def chat(self,
*,
chat_ctx: llm.ChatContext,
tools: list[FunctionTool] | None = None,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0),
parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN,
tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN,
response_format: NotGivenOr[types.SchemaUnion | type[llm_utils.ResponseFormatT]] = NOT_GIVEN,
extra_kwargs: NotGivenOr[dict[str, Any]] = NOT_GIVEN) ‑> livekit.plugins.google.llm.LLMStream
Expand source code
def chat(
    self,
    *,
    chat_ctx: llm.ChatContext,
    tools: list[FunctionTool] | None = None,
    conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN,
    tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN,
    response_format: NotGivenOr[
        types.SchemaUnion | type[llm_utils.ResponseFormatT]
    ] = NOT_GIVEN,
    extra_kwargs: NotGivenOr[dict[str, Any]] = NOT_GIVEN,
) -> LLMStream:
    extra = {}

    if is_given(extra_kwargs):
        extra.update(extra_kwargs)

    tool_choice = tool_choice if is_given(tool_choice) else self._opts.tool_choice
    if is_given(tool_choice):
        gemini_tool_choice: types.ToolConfig
        if isinstance(tool_choice, dict) and tool_choice.get("type") == "function":
            gemini_tool_choice = types.ToolConfig(
                function_calling_config=types.FunctionCallingConfig(
                    mode="ANY",
                    allowed_function_names=[tool_choice["function"]["name"]],
                )
            )
            extra["tool_config"] = gemini_tool_choice
        elif tool_choice == "required":
            gemini_tool_choice = types.ToolConfig(
                function_calling_config=types.FunctionCallingConfig(
                    mode="ANY",
                    allowed_function_names=[get_function_info(fnc).name for fnc in tools]
                    if tools
                    else None,
                )
            )
            extra["tool_config"] = gemini_tool_choice
        elif tool_choice == "auto":
            gemini_tool_choice = types.ToolConfig(
                function_calling_config=types.FunctionCallingConfig(
                    mode="AUTO",
                )
            )
            extra["tool_config"] = gemini_tool_choice
        elif tool_choice == "none":
            gemini_tool_choice = types.ToolConfig(
                function_calling_config=types.FunctionCallingConfig(
                    mode="NONE",
                )
            )
            extra["tool_config"] = gemini_tool_choice

    if is_given(response_format):
        extra["response_schema"] = to_response_format(response_format)
        extra["response_mime_type"] = "application/json"

    if is_given(self._opts.temperature):
        extra["temperature"] = self._opts.temperature
    if is_given(self._opts.max_output_tokens):
        extra["max_output_tokens"] = self._opts.max_output_tokens
    if is_given(self._opts.top_p):
        extra["top_p"] = self._opts.top_p
    if is_given(self._opts.top_k):
        extra["top_k"] = self._opts.top_k
    if is_given(self._opts.presence_penalty):
        extra["presence_penalty"] = self._opts.presence_penalty
    if is_given(self._opts.frequency_penalty):
        extra["frequency_penalty"] = self._opts.frequency_penalty

    # Add thinking config if thinking_budget is provided
    if is_given(self._opts.thinking_config):
        extra["thinking_config"] = self._opts.thinking_config

    return LLMStream(
        self,
        client=self._client,
        model=self._opts.model,
        chat_ctx=chat_ctx,
        tools=tools or [],
        conn_options=conn_options,
        extra_kwargs=extra,
    )

Inherited members

class STT (*,
languages: LanguageCode = 'en-US',
detect_language: bool = True,
interim_results: bool = True,
punctuate: bool = True,
spoken_punctuation: bool = False,
model: SpeechModels | str = 'latest_long',
location: str = 'global',
sample_rate: int = 16000,
min_confidence_threshold: float = 0.65,
credentials_info: NotGivenOr[dict] = NOT_GIVEN,
credentials_file: NotGivenOr[str] = NOT_GIVEN,
keywords: NotGivenOr[list[tuple[str, float]]] = NOT_GIVEN,
use_streaming: NotGivenOr[bool] = NOT_GIVEN)
Expand source code
class STT(stt.STT):
    def __init__(
        self,
        *,
        languages: LanguageCode = "en-US",  # Google STT can accept multiple languages
        detect_language: bool = True,
        interim_results: bool = True,
        punctuate: bool = True,
        spoken_punctuation: bool = False,
        model: SpeechModels | str = "latest_long",
        location: str = "global",
        sample_rate: int = 16000,
        min_confidence_threshold: float = _default_min_confidence,
        credentials_info: NotGivenOr[dict] = NOT_GIVEN,
        credentials_file: NotGivenOr[str] = NOT_GIVEN,
        keywords: NotGivenOr[list[tuple[str, float]]] = NOT_GIVEN,
        use_streaming: NotGivenOr[bool] = NOT_GIVEN,
    ):
        """
        Create a new instance of Google STT.

        Credentials must be provided, either by using the ``credentials_info`` dict, or reading
        from the file specified in ``credentials_file`` or via Application Default Credentials as
        described in https://cloud.google.com/docs/authentication/application-default-credentials

        args:
            languages(LanguageCode): list of language codes to recognize (default: "en-US")
            detect_language(bool): whether to detect the language of the audio (default: True)
            interim_results(bool): whether to return interim results (default: True)
            punctuate(bool): whether to punctuate the audio (default: True)
            spoken_punctuation(bool): whether to use spoken punctuation (default: False)
            model(SpeechModels): the model to use for recognition default: "latest_long"
            location(str): the location to use for recognition default: "global"
            sample_rate(int): the sample rate of the audio default: 16000
            min_confidence_threshold(float): minimum confidence threshold for recognition
            (default: 0.65)
            credentials_info(dict): the credentials info to use for recognition (default: None)
            credentials_file(str): the credentials file to use for recognition (default: None)
            keywords(List[tuple[str, float]]): list of keywords to recognize (default: None)
            use_streaming(bool): whether to use streaming for recognition (default: True)
        """
        if not is_given(use_streaming):
            use_streaming = True
        super().__init__(
            capabilities=stt.STTCapabilities(streaming=use_streaming, interim_results=True)
        )

        self._location = location
        self._credentials_info = credentials_info
        self._credentials_file = credentials_file

        if not is_given(credentials_file) and not is_given(credentials_info):
            try:
                gauth_default()
            except DefaultCredentialsError:
                raise ValueError(
                    "Application default credentials must be available "
                    "when using Google STT without explicitly passing "
                    "credentials through credentials_info or credentials_file."
                ) from None

        if isinstance(languages, str):
            languages = [languages]

        self._config = STTOptions(
            languages=languages,
            detect_language=detect_language,
            interim_results=interim_results,
            punctuate=punctuate,
            spoken_punctuation=spoken_punctuation,
            model=model,
            sample_rate=sample_rate,
            min_confidence_threshold=min_confidence_threshold,
            keywords=keywords,
        )
        self._streams = weakref.WeakSet[SpeechStream]()
        self._pool = utils.ConnectionPool[SpeechAsyncClient](
            max_session_duration=_max_session_duration,
            connect_cb=self._create_client,
        )

    async def _create_client(self) -> SpeechAsyncClient:
        # Add support for passing a specific location that matches recognizer
        # see: https://cloud.google.com/speech-to-text/v2/docs/speech-to-text-supported-languages
        client_options = None
        client: SpeechAsyncClient | None = None
        if self._location != "global":
            client_options = ClientOptions(api_endpoint=f"{self._location}-speech.googleapis.com")
        if is_given(self._credentials_info):
            client = SpeechAsyncClient.from_service_account_info(
                self._credentials_info, client_options=client_options
            )
        elif is_given(self._credentials_file):
            client = SpeechAsyncClient.from_service_account_file(
                self._credentials_file, client_options=client_options
            )
        else:
            client = SpeechAsyncClient(client_options=client_options)
        assert client is not None
        return client

    def _get_recognizer(self, client: SpeechAsyncClient) -> str:
        # TODO(theomonnom): should we use recognizers?
        # recognizers may improve latency https://cloud.google.com/speech-to-text/v2/docs/recognizers#understand_recognizers

        # TODO(theomonnom): find a better way to access the project_id
        try:
            project_id = client.transport._credentials.project_id  # type: ignore
        except AttributeError:
            from google.auth import default as ga_default

            _, project_id = ga_default()
        return f"projects/{project_id}/locations/{self._location}/recognizers/_"

    def _sanitize_options(self, *, language: NotGivenOr[str] = NOT_GIVEN) -> STTOptions:
        config = dataclasses.replace(self._config)

        if is_given(language):
            config.languages = [language]

        if not isinstance(config.languages, list):
            config.languages = [config.languages]
        elif not config.detect_language:
            if len(config.languages) > 1:
                logger.warning("multiple languages provided, but language detection is disabled")
            config.languages = [config.languages[0]]

        return config

    async def _recognize_impl(
        self,
        buffer: utils.AudioBuffer,
        *,
        language: NotGivenOr[SpeechLanguages | str] = NOT_GIVEN,
        conn_options: APIConnectOptions,
    ) -> stt.SpeechEvent:
        config = self._sanitize_options(language=language)
        frame = rtc.combine_audio_frames(buffer)

        config = cloud_speech.RecognitionConfig(
            explicit_decoding_config=cloud_speech.ExplicitDecodingConfig(
                encoding=cloud_speech.ExplicitDecodingConfig.AudioEncoding.LINEAR16,
                sample_rate_hertz=frame.sample_rate,
                audio_channel_count=frame.num_channels,
            ),
            adaptation=config.build_adaptation(),
            features=cloud_speech.RecognitionFeatures(
                enable_automatic_punctuation=config.punctuate,
                enable_spoken_punctuation=config.spoken_punctuation,
                enable_word_time_offsets=True,
            ),
            model=config.model,
            language_codes=config.languages,
        )

        try:
            async with self._pool.connection() as client:
                raw = await client.recognize(
                    cloud_speech.RecognizeRequest(
                        recognizer=self._get_recognizer(client),
                        config=config,
                        content=frame.data.tobytes(),
                    ),
                    timeout=conn_options.timeout,
                )

                return _recognize_response_to_speech_event(raw)
        except DeadlineExceeded:
            raise APITimeoutError() from None
        except GoogleAPICallError as e:
            raise APIStatusError(f"{e.message} {e.details}", status_code=e.code or -1) from e
        except Exception as e:
            raise APIConnectionError() from e

    def stream(
        self,
        *,
        language: NotGivenOr[SpeechLanguages | str] = NOT_GIVEN,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    ) -> SpeechStream:
        config = self._sanitize_options(language=language)
        stream = SpeechStream(
            stt=self,
            pool=self._pool,
            recognizer_cb=self._get_recognizer,
            config=config,
            conn_options=conn_options,
        )
        self._streams.add(stream)
        return stream

    def update_options(
        self,
        *,
        languages: NotGivenOr[LanguageCode] = NOT_GIVEN,
        detect_language: NotGivenOr[bool] = NOT_GIVEN,
        interim_results: NotGivenOr[bool] = NOT_GIVEN,
        punctuate: NotGivenOr[bool] = NOT_GIVEN,
        spoken_punctuation: NotGivenOr[bool] = NOT_GIVEN,
        model: NotGivenOr[SpeechModels] = NOT_GIVEN,
        location: NotGivenOr[str] = NOT_GIVEN,
        keywords: NotGivenOr[list[tuple[str, float]]] = NOT_GIVEN,
    ):
        if is_given(languages):
            if isinstance(languages, str):
                languages = [languages]
            self._config.languages = languages
        if is_given(detect_language):
            self._config.detect_language = detect_language
        if is_given(interim_results):
            self._config.interim_results = interim_results
        if is_given(punctuate):
            self._config.punctuate = punctuate
        if is_given(spoken_punctuation):
            self._config.spoken_punctuation = spoken_punctuation
        if is_given(model):
            self._config.model = model
        if is_given(location):
            self._location = location
            # if location is changed, fetch a new client and recognizer as per the new location
            self._pool.invalidate()
        if is_given(keywords):
            self._config.keywords = keywords

        for stream in self._streams:
            stream.update_options(
                languages=languages,
                detect_language=detect_language,
                interim_results=interim_results,
                punctuate=punctuate,
                spoken_punctuation=spoken_punctuation,
                model=model,
                keywords=keywords,
            )

    async def aclose(self) -> None:
        await self._pool.aclose()
        await super().aclose()

Helper class that provides a standard way to create an ABC using inheritance.

Create a new instance of Google STT.

Credentials must be provided, either by using the credentials_info dict, or reading from the file specified in credentials_file or via Application Default Credentials as described in https://cloud.google.com/docs/authentication/application-default-credentials

args: languages(LanguageCode): list of language codes to recognize (default: "en-US") detect_language(bool): whether to detect the language of the audio (default: True) interim_results(bool): whether to return interim results (default: True) punctuate(bool): whether to punctuate the audio (default: True) spoken_punctuation(bool): whether to use spoken punctuation (default: False) model(SpeechModels): the model to use for recognition default: "latest_long" location(str): the location to use for recognition default: "global" sample_rate(int): the sample rate of the audio default: 16000 min_confidence_threshold(float): minimum confidence threshold for recognition (default: 0.65) credentials_info(dict): the credentials info to use for recognition (default: None) credentials_file(str): the credentials file to use for recognition (default: None) keywords(List[tuple[str, float]]): list of keywords to recognize (default: None) use_streaming(bool): whether to use streaming for recognition (default: True)

Ancestors

  • livekit.agents.stt.stt.STT
  • abc.ABC
  • EventEmitter
  • typing.Generic

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    await self._pool.aclose()
    await super().aclose()

Close the STT, and every stream/requests associated with it

def stream(self,
*,
language: NotGivenOr[SpeechLanguages | str] = NOT_GIVEN,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.google.stt.SpeechStream
Expand source code
def stream(
    self,
    *,
    language: NotGivenOr[SpeechLanguages | str] = NOT_GIVEN,
    conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
) -> SpeechStream:
    config = self._sanitize_options(language=language)
    stream = SpeechStream(
        stt=self,
        pool=self._pool,
        recognizer_cb=self._get_recognizer,
        config=config,
        conn_options=conn_options,
    )
    self._streams.add(stream)
    return stream
def update_options(self,
*,
languages: NotGivenOr[LanguageCode] = NOT_GIVEN,
detect_language: NotGivenOr[bool] = NOT_GIVEN,
interim_results: NotGivenOr[bool] = NOT_GIVEN,
punctuate: NotGivenOr[bool] = NOT_GIVEN,
spoken_punctuation: NotGivenOr[bool] = NOT_GIVEN,
model: NotGivenOr[SpeechModels] = NOT_GIVEN,
location: NotGivenOr[str] = NOT_GIVEN,
keywords: NotGivenOr[list[tuple[str, float]]] = NOT_GIVEN)
Expand source code
def update_options(
    self,
    *,
    languages: NotGivenOr[LanguageCode] = NOT_GIVEN,
    detect_language: NotGivenOr[bool] = NOT_GIVEN,
    interim_results: NotGivenOr[bool] = NOT_GIVEN,
    punctuate: NotGivenOr[bool] = NOT_GIVEN,
    spoken_punctuation: NotGivenOr[bool] = NOT_GIVEN,
    model: NotGivenOr[SpeechModels] = NOT_GIVEN,
    location: NotGivenOr[str] = NOT_GIVEN,
    keywords: NotGivenOr[list[tuple[str, float]]] = NOT_GIVEN,
):
    if is_given(languages):
        if isinstance(languages, str):
            languages = [languages]
        self._config.languages = languages
    if is_given(detect_language):
        self._config.detect_language = detect_language
    if is_given(interim_results):
        self._config.interim_results = interim_results
    if is_given(punctuate):
        self._config.punctuate = punctuate
    if is_given(spoken_punctuation):
        self._config.spoken_punctuation = spoken_punctuation
    if is_given(model):
        self._config.model = model
    if is_given(location):
        self._location = location
        # if location is changed, fetch a new client and recognizer as per the new location
        self._pool.invalidate()
    if is_given(keywords):
        self._config.keywords = keywords

    for stream in self._streams:
        stream.update_options(
            languages=languages,
            detect_language=detect_language,
            interim_results=interim_results,
            punctuate=punctuate,
            spoken_punctuation=spoken_punctuation,
            model=model,
            keywords=keywords,
        )

Inherited members

class SpeechStream (*,
stt: STT,
conn_options: APIConnectOptions,
pool: utils.ConnectionPool[SpeechAsyncClient],
recognizer_cb: Callable[[SpeechAsyncClient], str],
config: STTOptions)
Expand source code
class SpeechStream(stt.SpeechStream):
    def __init__(
        self,
        *,
        stt: STT,
        conn_options: APIConnectOptions,
        pool: utils.ConnectionPool[SpeechAsyncClient],
        recognizer_cb: Callable[[SpeechAsyncClient], str],
        config: STTOptions,
    ) -> None:
        super().__init__(stt=stt, conn_options=conn_options, sample_rate=config.sample_rate)

        self._pool = pool
        self._recognizer_cb = recognizer_cb
        self._config = config
        self._reconnect_event = asyncio.Event()
        self._session_connected_at: float = 0

    def update_options(
        self,
        *,
        languages: NotGivenOr[LanguageCode] = NOT_GIVEN,
        detect_language: NotGivenOr[bool] = NOT_GIVEN,
        interim_results: NotGivenOr[bool] = NOT_GIVEN,
        punctuate: NotGivenOr[bool] = NOT_GIVEN,
        spoken_punctuation: NotGivenOr[bool] = NOT_GIVEN,
        model: NotGivenOr[SpeechModels] = NOT_GIVEN,
        min_confidence_threshold: NotGivenOr[float] = NOT_GIVEN,
        keywords: NotGivenOr[list[tuple[str, float]]] = NOT_GIVEN,
    ):
        if is_given(languages):
            if isinstance(languages, str):
                languages = [languages]
            self._config.languages = languages
        if is_given(detect_language):
            self._config.detect_language = detect_language
        if is_given(interim_results):
            self._config.interim_results = interim_results
        if is_given(punctuate):
            self._config.punctuate = punctuate
        if is_given(spoken_punctuation):
            self._config.spoken_punctuation = spoken_punctuation
        if is_given(model):
            self._config.model = model
        if is_given(min_confidence_threshold):
            self._config.min_confidence_threshold = min_confidence_threshold
        if is_given(keywords):
            self._config.keywords = keywords

        self._reconnect_event.set()

    async def _run(self) -> None:
        # google requires a async generator when calling streaming_recognize
        # this function basically convert the queue into a async generator
        async def input_generator(client: SpeechAsyncClient, should_stop: asyncio.Event):
            try:
                # first request should contain the config
                yield cloud_speech.StreamingRecognizeRequest(
                    recognizer=self._recognizer_cb(client),
                    streaming_config=self._streaming_config,
                )

                async for frame in self._input_ch:
                    # when the stream is aborted due to reconnect, this input_generator
                    # needs to stop consuming frames
                    # when the generator stops, the previous gRPC stream will close
                    if should_stop.is_set():
                        return

                    if isinstance(frame, rtc.AudioFrame):
                        yield cloud_speech.StreamingRecognizeRequest(audio=frame.data.tobytes())

            except Exception:
                logger.exception("an error occurred while streaming input to google STT")

        async def process_stream(client: SpeechAsyncClient, stream):
            has_started = False
            async for resp in stream:
                if (
                    resp.speech_event_type
                    == cloud_speech.StreamingRecognizeResponse.SpeechEventType.SPEECH_ACTIVITY_BEGIN
                ):
                    self._event_ch.send_nowait(
                        stt.SpeechEvent(type=stt.SpeechEventType.START_OF_SPEECH)
                    )
                    has_started = True

                if (
                    resp.speech_event_type
                    == cloud_speech.StreamingRecognizeResponse.SpeechEventType.SPEECH_EVENT_TYPE_UNSPECIFIED  # noqa: E501
                ):
                    result = resp.results[0]
                    speech_data = _streaming_recognize_response_to_speech_data(
                        resp,
                        min_confidence_threshold=self._config.min_confidence_threshold,
                    )
                    if speech_data is None:
                        continue

                    if not result.is_final:
                        self._event_ch.send_nowait(
                            stt.SpeechEvent(
                                type=stt.SpeechEventType.INTERIM_TRANSCRIPT,
                                alternatives=[speech_data],
                            )
                        )
                    else:
                        self._event_ch.send_nowait(
                            stt.SpeechEvent(
                                type=stt.SpeechEventType.FINAL_TRANSCRIPT,
                                alternatives=[speech_data],
                            )
                        )
                        if time.time() - self._session_connected_at > _max_session_duration:
                            logger.debug(
                                "Google STT maximum connection time reached. Reconnecting..."
                            )
                            self._pool.remove(client)
                            if has_started:
                                self._event_ch.send_nowait(
                                    stt.SpeechEvent(type=stt.SpeechEventType.END_OF_SPEECH)
                                )
                                has_started = False
                            self._reconnect_event.set()
                            return

                if (
                    resp.speech_event_type
                    == cloud_speech.StreamingRecognizeResponse.SpeechEventType.SPEECH_ACTIVITY_END
                ):
                    self._event_ch.send_nowait(
                        stt.SpeechEvent(type=stt.SpeechEventType.END_OF_SPEECH)
                    )
                    has_started = False

        while True:
            try:
                async with self._pool.connection() as client:
                    self._streaming_config = cloud_speech.StreamingRecognitionConfig(
                        config=cloud_speech.RecognitionConfig(
                            explicit_decoding_config=cloud_speech.ExplicitDecodingConfig(
                                encoding=cloud_speech.ExplicitDecodingConfig.AudioEncoding.LINEAR16,
                                sample_rate_hertz=self._config.sample_rate,
                                audio_channel_count=1,
                            ),
                            adaptation=self._config.build_adaptation(),
                            language_codes=self._config.languages,
                            model=self._config.model,
                            features=cloud_speech.RecognitionFeatures(
                                enable_automatic_punctuation=self._config.punctuate,
                                enable_word_time_offsets=True,
                                enable_spoken_punctuation=self._config.spoken_punctuation,
                            ),
                        ),
                        streaming_features=cloud_speech.StreamingRecognitionFeatures(
                            interim_results=self._config.interim_results,
                        ),
                    )

                    should_stop = asyncio.Event()
                    stream = await client.streaming_recognize(
                        requests=input_generator(client, should_stop),
                    )
                    self._session_connected_at = time.time()

                    process_stream_task = asyncio.create_task(process_stream(client, stream))
                    wait_reconnect_task = asyncio.create_task(self._reconnect_event.wait())

                    try:
                        done, _ = await asyncio.wait(
                            [process_stream_task, wait_reconnect_task],
                            return_when=asyncio.FIRST_COMPLETED,
                        )
                        for task in done:
                            if task != wait_reconnect_task:
                                task.result()
                        if wait_reconnect_task not in done:
                            break
                        self._reconnect_event.clear()
                    finally:
                        await utils.aio.gracefully_cancel(process_stream_task, wait_reconnect_task)
                        should_stop.set()
            except DeadlineExceeded:
                raise APITimeoutError() from None
            except GoogleAPICallError as e:
                if e.code == 409:
                    logger.debug("stream timed out, restarting.")
                else:
                    raise APIStatusError(
                        f"{e.message} {e.details}", status_code=e.code or -1
                    ) from e
            except Exception as e:
                raise APIConnectionError() from e

Helper class that provides a standard way to create an ABC using inheritance.

Args: sample_rate : int or None, optional The desired sample rate for the audio input. If specified, the audio input will be automatically resampled to match the given sample rate before being processed for Speech-to-Text. If not provided (None), the input will retain its original sample rate.

Ancestors

  • livekit.agents.stt.stt.RecognizeStream
  • abc.ABC

Methods

def update_options(self,
*,
languages: NotGivenOr[LanguageCode] = NOT_GIVEN,
detect_language: NotGivenOr[bool] = NOT_GIVEN,
interim_results: NotGivenOr[bool] = NOT_GIVEN,
punctuate: NotGivenOr[bool] = NOT_GIVEN,
spoken_punctuation: NotGivenOr[bool] = NOT_GIVEN,
model: NotGivenOr[SpeechModels] = NOT_GIVEN,
min_confidence_threshold: NotGivenOr[float] = NOT_GIVEN,
keywords: NotGivenOr[list[tuple[str, float]]] = NOT_GIVEN)
Expand source code
def update_options(
    self,
    *,
    languages: NotGivenOr[LanguageCode] = NOT_GIVEN,
    detect_language: NotGivenOr[bool] = NOT_GIVEN,
    interim_results: NotGivenOr[bool] = NOT_GIVEN,
    punctuate: NotGivenOr[bool] = NOT_GIVEN,
    spoken_punctuation: NotGivenOr[bool] = NOT_GIVEN,
    model: NotGivenOr[SpeechModels] = NOT_GIVEN,
    min_confidence_threshold: NotGivenOr[float] = NOT_GIVEN,
    keywords: NotGivenOr[list[tuple[str, float]]] = NOT_GIVEN,
):
    if is_given(languages):
        if isinstance(languages, str):
            languages = [languages]
        self._config.languages = languages
    if is_given(detect_language):
        self._config.detect_language = detect_language
    if is_given(interim_results):
        self._config.interim_results = interim_results
    if is_given(punctuate):
        self._config.punctuate = punctuate
    if is_given(spoken_punctuation):
        self._config.spoken_punctuation = spoken_punctuation
    if is_given(model):
        self._config.model = model
    if is_given(min_confidence_threshold):
        self._config.min_confidence_threshold = min_confidence_threshold
    if is_given(keywords):
        self._config.keywords = keywords

    self._reconnect_event.set()
class TTS (*,
language: NotGivenOr[SpeechLanguages | str] = NOT_GIVEN,
gender: NotGivenOr[Gender | str] = NOT_GIVEN,
voice_name: NotGivenOr[str] = NOT_GIVEN,
sample_rate: int = 24000,
pitch: int = 0,
effects_profile_id: str = '',
speaking_rate: float = 1.0,
location: str = 'global',
audio_encoding: texttospeech.AudioEncoding = 7,
credentials_info: NotGivenOr[dict] = NOT_GIVEN,
credentials_file: NotGivenOr[str] = NOT_GIVEN)
Expand source code
class TTS(tts.TTS):
    def __init__(
        self,
        *,
        language: NotGivenOr[SpeechLanguages | str] = NOT_GIVEN,
        gender: NotGivenOr[Gender | str] = NOT_GIVEN,
        voice_name: NotGivenOr[str] = NOT_GIVEN,
        sample_rate: int = 24000,
        pitch: int = 0,
        effects_profile_id: str = "",
        speaking_rate: float = 1.0,
        location: str = "global",
        audio_encoding: texttospeech.AudioEncoding = texttospeech.AudioEncoding.PCM,
        credentials_info: NotGivenOr[dict] = NOT_GIVEN,
        credentials_file: NotGivenOr[str] = NOT_GIVEN,
    ) -> None:
        """
        Create a new instance of Google TTS.

        Credentials must be provided, either by using the ``credentials_info`` dict, or reading
        from the file specified in ``credentials_file`` or the ``GOOGLE_APPLICATION_CREDENTIALS``
        environmental variable.

        Args:
            language (SpeechLanguages | str, optional): Language code (e.g., "en-US"). Default is "en-US".
            gender (Gender | str, optional): Voice gender ("male", "female", "neutral"). Default is "neutral".
            voice_name (str, optional): Specific voice name. Default is an empty string.
            sample_rate (int, optional): Audio sample rate in Hz. Default is 24000.
            location (str, optional): Location for the TTS client. Default is "global".
            pitch (float, optional): Speaking pitch, ranging from -20.0 to 20.0 semitones relative to the original pitch. Default is 0.
            effects_profile_id (str): Optional identifier for selecting audio effects profiles to apply to the synthesized speech.
            speaking_rate (float, optional): Speed of speech. Default is 1.0.
            credentials_info (dict, optional): Dictionary containing Google Cloud credentials. Default is None.
            credentials_file (str, optional): Path to the Google Cloud credentials JSON file. Default is None.
        """  # noqa: E501

        super().__init__(
            capabilities=tts.TTSCapabilities(
                streaming=False,
            ),
            sample_rate=sample_rate,
            num_channels=1,
        )

        self._client: texttospeech.TextToSpeechAsyncClient | None = None
        self._credentials_info = credentials_info
        self._credentials_file = credentials_file
        self._location = location

        lang = language if is_given(language) else "en-US"
        ssml_gender = _gender_from_str("neutral" if not is_given(gender) else gender)
        name = "" if not is_given(voice_name) else voice_name

        voice_params = texttospeech.VoiceSelectionParams(
            name=name,
            language_code=lang,
            ssml_gender=ssml_gender,
        )

        self._opts = _TTSOptions(
            voice=voice_params,
            audio_config=texttospeech.AudioConfig(
                audio_encoding=audio_encoding,
                sample_rate_hertz=sample_rate,
                pitch=pitch,
                effects_profile_id=effects_profile_id,
                speaking_rate=speaking_rate,
            ),
        )

    def update_options(
        self,
        *,
        language: NotGivenOr[SpeechLanguages | str] = NOT_GIVEN,
        gender: NotGivenOr[Gender | str] = NOT_GIVEN,
        voice_name: NotGivenOr[str] = NOT_GIVEN,
        speaking_rate: NotGivenOr[float] = NOT_GIVEN,
    ) -> None:
        """
        Update the TTS options.

        Args:
            language (SpeechLanguages | str, optional): Language code (e.g., "en-US").
            gender (Gender | str, optional): Voice gender ("male", "female", "neutral").
            voice_name (str, optional): Specific voice name.
            speaking_rate (float, optional): Speed of speech.
        """  # noqa: E501
        params = {}
        if is_given(language):
            params["language_code"] = str(language)
        if is_given(gender):
            params["ssml_gender"] = _gender_from_str(str(gender))
        if is_given(voice_name):
            params["name"] = voice_name

        if params:
            self._opts.voice = texttospeech.VoiceSelectionParams(**params)

        if is_given(speaking_rate):
            self._opts.audio_config.speaking_rate = speaking_rate

    def _ensure_client(self) -> texttospeech.TextToSpeechAsyncClient:
        api_endpoint = "texttospeech.googleapis.com"
        if self._location != "global":
            api_endpoint = f"{self._location}-texttospeech.googleapis.com"

        if self._client is None:
            if self._credentials_info:
                self._client = texttospeech.TextToSpeechAsyncClient.from_service_account_info(
                    self._credentials_info, client_options=ClientOptions(api_endpoint=api_endpoint)
                )

            elif self._credentials_file:
                self._client = texttospeech.TextToSpeechAsyncClient.from_service_account_file(
                    self._credentials_file, client_options=ClientOptions(api_endpoint=api_endpoint)
                )
            else:
                self._client = texttospeech.TextToSpeechAsyncClient(
                    client_options=ClientOptions(api_endpoint=api_endpoint)
                )

        assert self._client is not None
        return self._client

    def synthesize(
        self,
        text: str,
        *,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    ) -> ChunkedStream:
        return ChunkedStream(
            tts=self,
            input_text=text,
            conn_options=conn_options,
            opts=self._opts,
            client=self._ensure_client(),
        )

Helper class that provides a standard way to create an ABC using inheritance.

Create a new instance of Google TTS.

Credentials must be provided, either by using the credentials_info dict, or reading from the file specified in credentials_file or the GOOGLE_APPLICATION_CREDENTIALS environmental variable.

Args

language : SpeechLanguages | str, optional
Language code (e.g., "en-US"). Default is "en-US".
gender : Gender | str, optional
Voice gender ("male", "female", "neutral"). Default is "neutral".
voice_name : str, optional
Specific voice name. Default is an empty string.
sample_rate : int, optional
Audio sample rate in Hz. Default is 24000.
location : str, optional
Location for the TTS client. Default is "global".
pitch : float, optional
Speaking pitch, ranging from -20.0 to 20.0 semitones relative to the original pitch. Default is 0.
effects_profile_id : str
Optional identifier for selecting audio effects profiles to apply to the synthesized speech.
speaking_rate : float, optional
Speed of speech. Default is 1.0.
credentials_info : dict, optional
Dictionary containing Google Cloud credentials. Default is None.
credentials_file : str, optional
Path to the Google Cloud credentials JSON file. Default is None.

Ancestors

  • livekit.agents.tts.tts.TTS
  • abc.ABC
  • EventEmitter
  • typing.Generic

Methods

def synthesize(self,
text: str,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.google.tts.ChunkedStream
Expand source code
def synthesize(
    self,
    text: str,
    *,
    conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
) -> ChunkedStream:
    return ChunkedStream(
        tts=self,
        input_text=text,
        conn_options=conn_options,
        opts=self._opts,
        client=self._ensure_client(),
    )
def update_options(self,
*,
language: NotGivenOr[SpeechLanguages | str] = NOT_GIVEN,
gender: NotGivenOr[Gender | str] = NOT_GIVEN,
voice_name: NotGivenOr[str] = NOT_GIVEN,
speaking_rate: NotGivenOr[float] = NOT_GIVEN) ‑> None
Expand source code
def update_options(
    self,
    *,
    language: NotGivenOr[SpeechLanguages | str] = NOT_GIVEN,
    gender: NotGivenOr[Gender | str] = NOT_GIVEN,
    voice_name: NotGivenOr[str] = NOT_GIVEN,
    speaking_rate: NotGivenOr[float] = NOT_GIVEN,
) -> None:
    """
    Update the TTS options.

    Args:
        language (SpeechLanguages | str, optional): Language code (e.g., "en-US").
        gender (Gender | str, optional): Voice gender ("male", "female", "neutral").
        voice_name (str, optional): Specific voice name.
        speaking_rate (float, optional): Speed of speech.
    """  # noqa: E501
    params = {}
    if is_given(language):
        params["language_code"] = str(language)
    if is_given(gender):
        params["ssml_gender"] = _gender_from_str(str(gender))
    if is_given(voice_name):
        params["name"] = voice_name

    if params:
        self._opts.voice = texttospeech.VoiceSelectionParams(**params)

    if is_given(speaking_rate):
        self._opts.audio_config.speaking_rate = speaking_rate

Update the TTS options.

Args

language : SpeechLanguages | str, optional
Language code (e.g., "en-US").
gender : Gender | str, optional
Voice gender ("male", "female", "neutral").
voice_name : str, optional
Specific voice name.
speaking_rate : float, optional
Speed of speech.

Inherited members