Module livekit.plugins.google

Sub-modules

livekit.plugins.google.beta

Classes

class LLM (*,
model: ChatModels | str = 'gemini-2.0-flash-001',
api_key: NotGivenOr[str] = NOT_GIVEN,
vertexai: NotGivenOr[bool] = False,
project: NotGivenOr[str] = NOT_GIVEN,
location: NotGivenOr[str] = NOT_GIVEN,
temperature: NotGivenOr[float] = NOT_GIVEN,
max_output_tokens: NotGivenOr[int] = NOT_GIVEN,
top_p: NotGivenOr[float] = NOT_GIVEN,
top_k: NotGivenOr[float] = NOT_GIVEN,
presence_penalty: NotGivenOr[float] = NOT_GIVEN,
frequency_penalty: NotGivenOr[float] = NOT_GIVEN,
tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN)
Expand source code
class LLM(llm.LLM):
    def __init__(
        self,
        *,
        model: ChatModels | str = "gemini-2.0-flash-001",
        api_key: NotGivenOr[str] = NOT_GIVEN,
        vertexai: NotGivenOr[bool] = False,
        project: NotGivenOr[str] = NOT_GIVEN,
        location: NotGivenOr[str] = NOT_GIVEN,
        temperature: NotGivenOr[float] = NOT_GIVEN,
        max_output_tokens: NotGivenOr[int] = NOT_GIVEN,
        top_p: NotGivenOr[float] = NOT_GIVEN,
        top_k: NotGivenOr[float] = NOT_GIVEN,
        presence_penalty: NotGivenOr[float] = NOT_GIVEN,
        frequency_penalty: NotGivenOr[float] = NOT_GIVEN,
        tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN,
    ) -> None:
        """
        Create a new instance of Google GenAI LLM.

        Environment Requirements:
        - For VertexAI: Set the `GOOGLE_APPLICATION_CREDENTIALS` environment variable to the path of the service account key file.
        The Google Cloud project and location can be set via `project` and `location` arguments or the environment variables
        `GOOGLE_CLOUD_PROJECT` and `GOOGLE_CLOUD_LOCATION`. By default, the project is inferred from the service account key file,
        and the location defaults to "us-central1".
        - For Google Gemini API: Set the `api_key` argument or the `GOOGLE_API_KEY` environment variable.

        Args:
            model (ChatModels | str, optional): The model name to use. Defaults to "gemini-2.0-flash-001".
            api_key (str, optional): The API key for Google Gemini. If not provided, it attempts to read from the `GOOGLE_API_KEY` environment variable.
            vertexai (bool, optional): Whether to use VertexAI. Defaults to False.
            project (str, optional): The Google Cloud project to use (only for VertexAI). Defaults to None.
            location (str, optional): The location to use for VertexAI API requests. Defaults value is "us-central1".
            temperature (float, optional): Sampling temperature for response generation. Defaults to 0.8.
            max_output_tokens (int, optional): Maximum number of tokens to generate in the output. Defaults to None.
            top_p (float, optional): The nucleus sampling probability for response generation. Defaults to None.
            top_k (int, optional): The top-k sampling value for response generation. Defaults to None.
            presence_penalty (float, optional): Penalizes the model for generating previously mentioned concepts. Defaults to None.
            frequency_penalty (float, optional): Penalizes the model for repeating words. Defaults to None.
            tool_choice (ToolChoice, optional): Specifies whether to use tools during response generation. Defaults to "auto".
        """  # noqa: E501
        super().__init__()
        gcp_project = project if is_given(project) else os.environ.get("GOOGLE_CLOUD_PROJECT")
        gcp_location = location if is_given(location) else os.environ.get("GOOGLE_CLOUD_LOCATION")
        gemini_api_key = api_key if is_given(api_key) else os.environ.get("GOOGLE_API_KEY")
        _gac = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS")
        if _gac is None:
            logger.warning(
                "`GOOGLE_APPLICATION_CREDENTIALS` environment variable is not set. please set it to the path of the service account key file. Otherwise, use any of the other Google Cloud auth methods."  # noqa: E501
            )

        if is_given(vertexai) and vertexai:
            if not gcp_project:
                _, gcp_project = default_async(
                    scopes=["https://www.googleapis.com/auth/cloud-platform"]
                )
            gemini_api_key = None  # VertexAI does not require an API key

        else:
            gcp_project = None
            gcp_location = None
            if not gemini_api_key:
                raise ValueError(
                    "API key is required for Google API either via api_key or GOOGLE_API_KEY environment variable"  # noqa: E501
                )

        self._opts = _LLMOptions(
            model=model,
            temperature=temperature,
            tool_choice=tool_choice,
            vertexai=vertexai,
            project=project,
            location=location,
            max_output_tokens=max_output_tokens,
            top_p=top_p,
            top_k=top_k,
            presence_penalty=presence_penalty,
            frequency_penalty=frequency_penalty,
        )
        self._client = genai.Client(
            api_key=gemini_api_key,
            vertexai=is_given(vertexai) and vertexai,
            project=gcp_project,
            location=gcp_location,
        )

    def chat(
        self,
        *,
        chat_ctx: llm.ChatContext,
        tools: list[FunctionTool] | None = None,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
        parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN,
        tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN,
        extra_kwargs: NotGivenOr[dict[str, Any]] = NOT_GIVEN,
    ) -> LLMStream:
        extra = {}

        if is_given(extra_kwargs):
            extra.update(extra_kwargs)

        tool_choice = tool_choice if is_given(tool_choice) else self._opts.tool_choice
        if is_given(tool_choice):
            gemini_tool_choice: types.ToolConfig
            if isinstance(tool_choice, dict) and tool_choice.get("type") == "function":
                gemini_tool_choice = types.ToolConfig(
                    function_calling_config=types.FunctionCallingConfig(
                        mode="ANY",
                        allowed_function_names=[tool_choice["function"]["name"]],
                    )
                )
                extra["tool_config"] = gemini_tool_choice
            elif tool_choice == "required":
                gemini_tool_choice = types.ToolConfig(
                    function_calling_config=types.FunctionCallingConfig(
                        mode="ANY",
                        allowed_function_names=[fnc.name for fnc in tools],
                    )
                )
                extra["tool_config"] = gemini_tool_choice
            elif tool_choice == "auto":
                gemini_tool_choice = types.ToolConfig(
                    function_calling_config=types.FunctionCallingConfig(
                        mode="AUTO",
                    )
                )
                extra["tool_config"] = gemini_tool_choice
            elif tool_choice == "none":
                gemini_tool_choice = types.ToolConfig(
                    function_calling_config=types.FunctionCallingConfig(
                        mode="NONE",
                    )
                )
                extra["tool_config"] = gemini_tool_choice

        if is_given(self._opts.temperature):
            extra["temperature"] = self._opts.temperature
        if is_given(self._opts.max_output_tokens):
            extra["max_output_tokens"] = self._opts.max_output_tokens
        if is_given(self._opts.top_p):
            extra["top_p"] = self._opts.top_p
        if is_given(self._opts.top_k):
            extra["top_k"] = self._opts.top_k
        if is_given(self._opts.presence_penalty):
            extra["presence_penalty"] = self._opts.presence_penalty
        if is_given(self._opts.frequency_penalty):
            extra["frequency_penalty"] = self._opts.frequency_penalty

        return LLMStream(
            self,
            client=self._client,
            model=self._opts.model,
            chat_ctx=chat_ctx,
            tools=tools,
            conn_options=conn_options,
            extra_kwargs=extra,
        )

Helper class that provides a standard way to create an ABC using inheritance.

Create a new instance of Google GenAI LLM.

Environment Requirements: - For VertexAI: Set the GOOGLE_APPLICATION_CREDENTIALS environment variable to the path of the service account key file. The Google Cloud project and location can be set via project and location arguments or the environment variables GOOGLE_CLOUD_PROJECT and GOOGLE_CLOUD_LOCATION. By default, the project is inferred from the service account key file, and the location defaults to "us-central1". - For Google Gemini API: Set the api_key argument or the GOOGLE_API_KEY environment variable.

Args

model : ChatModels | str, optional
The model name to use. Defaults to "gemini-2.0-flash-001".
api_key : str, optional
The API key for Google Gemini. If not provided, it attempts to read from the GOOGLE_API_KEY environment variable.
vertexai : bool, optional
Whether to use VertexAI. Defaults to False.
project : str, optional
The Google Cloud project to use (only for VertexAI). Defaults to None.
location : str, optional
The location to use for VertexAI API requests. Defaults value is "us-central1".
temperature : float, optional
Sampling temperature for response generation. Defaults to 0.8.
max_output_tokens : int, optional
Maximum number of tokens to generate in the output. Defaults to None.
top_p : float, optional
The nucleus sampling probability for response generation. Defaults to None.
top_k : int, optional
The top-k sampling value for response generation. Defaults to None.
presence_penalty : float, optional
Penalizes the model for generating previously mentioned concepts. Defaults to None.
frequency_penalty : float, optional
Penalizes the model for repeating words. Defaults to None.
tool_choice : ToolChoice, optional
Specifies whether to use tools during response generation. Defaults to "auto".

Ancestors

  • livekit.agents.llm.llm.LLM
  • abc.ABC
  • EventEmitter
  • typing.Generic

Methods

def chat(self,
*,
chat_ctx: llm.ChatContext,
tools: list[FunctionTool] | None = None,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0),
parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN,
tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN,
extra_kwargs: NotGivenOr[dict[str, Any]] = NOT_GIVEN) ‑> livekit.plugins.google.llm.LLMStream
Expand source code
def chat(
    self,
    *,
    chat_ctx: llm.ChatContext,
    tools: list[FunctionTool] | None = None,
    conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN,
    tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN,
    extra_kwargs: NotGivenOr[dict[str, Any]] = NOT_GIVEN,
) -> LLMStream:
    extra = {}

    if is_given(extra_kwargs):
        extra.update(extra_kwargs)

    tool_choice = tool_choice if is_given(tool_choice) else self._opts.tool_choice
    if is_given(tool_choice):
        gemini_tool_choice: types.ToolConfig
        if isinstance(tool_choice, dict) and tool_choice.get("type") == "function":
            gemini_tool_choice = types.ToolConfig(
                function_calling_config=types.FunctionCallingConfig(
                    mode="ANY",
                    allowed_function_names=[tool_choice["function"]["name"]],
                )
            )
            extra["tool_config"] = gemini_tool_choice
        elif tool_choice == "required":
            gemini_tool_choice = types.ToolConfig(
                function_calling_config=types.FunctionCallingConfig(
                    mode="ANY",
                    allowed_function_names=[fnc.name for fnc in tools],
                )
            )
            extra["tool_config"] = gemini_tool_choice
        elif tool_choice == "auto":
            gemini_tool_choice = types.ToolConfig(
                function_calling_config=types.FunctionCallingConfig(
                    mode="AUTO",
                )
            )
            extra["tool_config"] = gemini_tool_choice
        elif tool_choice == "none":
            gemini_tool_choice = types.ToolConfig(
                function_calling_config=types.FunctionCallingConfig(
                    mode="NONE",
                )
            )
            extra["tool_config"] = gemini_tool_choice

    if is_given(self._opts.temperature):
        extra["temperature"] = self._opts.temperature
    if is_given(self._opts.max_output_tokens):
        extra["max_output_tokens"] = self._opts.max_output_tokens
    if is_given(self._opts.top_p):
        extra["top_p"] = self._opts.top_p
    if is_given(self._opts.top_k):
        extra["top_k"] = self._opts.top_k
    if is_given(self._opts.presence_penalty):
        extra["presence_penalty"] = self._opts.presence_penalty
    if is_given(self._opts.frequency_penalty):
        extra["frequency_penalty"] = self._opts.frequency_penalty

    return LLMStream(
        self,
        client=self._client,
        model=self._opts.model,
        chat_ctx=chat_ctx,
        tools=tools,
        conn_options=conn_options,
        extra_kwargs=extra,
    )

Inherited members

class STT (*,
languages: LanguageCode = 'en-US',
detect_language: bool = True,
interim_results: bool = True,
punctuate: bool = True,
spoken_punctuation: bool = False,
model: SpeechModels | str = 'latest_long',
location: str = 'global',
sample_rate: int = 16000,
credentials_info: NotGivenOr[dict] = NOT_GIVEN,
credentials_file: NotGivenOr[str] = NOT_GIVEN,
keywords: NotGivenOr[list[tuple[str, float]]] = NOT_GIVEN)
Expand source code
class STT(stt.STT):
    def __init__(
        self,
        *,
        languages: LanguageCode = "en-US",  # Google STT can accept multiple languages
        detect_language: bool = True,
        interim_results: bool = True,
        punctuate: bool = True,
        spoken_punctuation: bool = False,
        model: SpeechModels | str = "latest_long",
        location: str = "global",
        sample_rate: int = 16000,
        credentials_info: NotGivenOr[dict] = NOT_GIVEN,
        credentials_file: NotGivenOr[str] = NOT_GIVEN,
        keywords: NotGivenOr[list[tuple[str, float]]] = NOT_GIVEN,
    ):
        """
        Create a new instance of Google STT.

        Credentials must be provided, either by using the ``credentials_info`` dict, or reading
        from the file specified in ``credentials_file`` or via Application Default Credentials as
        described in https://cloud.google.com/docs/authentication/application-default-credentials

        args:
            languages(LanguageCode): list of language codes to recognize (default: "en-US")
            detect_language(bool): whether to detect the language of the audio (default: True)
            interim_results(bool): whether to return interim results (default: True)
            punctuate(bool): whether to punctuate the audio (default: True)
            spoken_punctuation(bool): whether to use spoken punctuation (default: False)
            model(SpeechModels): the model to use for recognition default: "latest_long"
            location(str): the location to use for recognition default: "global"
            sample_rate(int): the sample rate of the audio default: 16000
            credentials_info(dict): the credentials info to use for recognition (default: None)
            credentials_file(str): the credentials file to use for recognition (default: None)
            keywords(List[tuple[str, float]]): list of keywords to recognize (default: None)
        """
        super().__init__(capabilities=stt.STTCapabilities(streaming=True, interim_results=True))

        self._location = location
        self._credentials_info = credentials_info
        self._credentials_file = credentials_file

        if not is_given(credentials_file) and not is_given(credentials_info):
            try:
                gauth_default()
            except DefaultCredentialsError:
                raise ValueError(  # noqa: B904
                    "Application default credentials must be available "
                    "when using Google STT without explicitly passing "
                    "credentials through credentials_info or credentials_file."
                )

        if isinstance(languages, str):
            languages = [languages]

        self._config = STTOptions(
            languages=languages,
            detect_language=detect_language,
            interim_results=interim_results,
            punctuate=punctuate,
            spoken_punctuation=spoken_punctuation,
            model=model,
            sample_rate=sample_rate,
            keywords=keywords,
        )
        self._streams = weakref.WeakSet[SpeechStream]()
        self._pool = utils.ConnectionPool[SpeechAsyncClient](
            max_session_duration=_max_session_duration,
            connect_cb=self._create_client,
        )

    async def _create_client(self) -> SpeechAsyncClient:
        # Add support for passing a specific location that matches recognizer
        # see: https://cloud.google.com/speech-to-text/v2/docs/speech-to-text-supported-languages
        client_options = None
        client: SpeechAsyncClient | None = None
        if self._location != "global":
            client_options = ClientOptions(api_endpoint=f"{self._location}-speech.googleapis.com")
        if is_given(self._credentials_info):
            client = SpeechAsyncClient.from_service_account_info(
                self._credentials_info, client_options=client_options
            )
        elif is_given(self._credentials_file):
            client = SpeechAsyncClient.from_service_account_file(
                self._credentials_file, client_options=client_options
            )
        else:
            client = SpeechAsyncClient(client_options=client_options)
        assert client is not None
        return client

    def _get_recognizer(self, client: SpeechAsyncClient) -> str:
        # TODO(theomonnom): should we use recognizers?
        # recognizers may improve latency https://cloud.google.com/speech-to-text/v2/docs/recognizers#understand_recognizers

        # TODO(theomonnom): find a better way to access the project_id
        try:
            project_id = client.transport._credentials.project_id  # type: ignore
        except AttributeError:
            from google.auth import default as ga_default

            _, project_id = ga_default()
        return f"projects/{project_id}/locations/{self._location}/recognizers/_"

    def _sanitize_options(self, *, language: NotGivenOr[str] = NOT_GIVEN) -> STTOptions:
        config = dataclasses.replace(self._config)

        if is_given(language):
            config.languages = [language]

        if not isinstance(config.languages, list):
            config.languages = [config.languages]
        elif not config.detect_language:
            if len(config.languages) > 1:
                logger.warning("multiple languages provided, but language detection is disabled")
            config.languages = [config.languages[0]]

        return config

    async def _recognize_impl(
        self,
        buffer: utils.AudioBuffer,
        *,
        language: NotGivenOr[SpeechLanguages | str] = NOT_GIVEN,
        conn_options: APIConnectOptions,
    ) -> stt.SpeechEvent:
        config = self._sanitize_options(language=language)
        frame = rtc.combine_audio_frames(buffer)

        config = cloud_speech.RecognitionConfig(
            explicit_decoding_config=cloud_speech.ExplicitDecodingConfig(
                encoding=cloud_speech.ExplicitDecodingConfig.AudioEncoding.LINEAR16,
                sample_rate_hertz=frame.sample_rate,
                audio_channel_count=frame.num_channels,
            ),
            adaptation=config.build_adaptation(),
            features=cloud_speech.RecognitionFeatures(
                enable_automatic_punctuation=config.punctuate,
                enable_spoken_punctuation=config.spoken_punctuation,
                enable_word_time_offsets=True,
            ),
            model=config.model,
            language_codes=config.languages,
        )

        try:
            async with self._pool.connection() as client:
                raw = await client.recognize(
                    cloud_speech.RecognizeRequest(
                        recognizer=self._get_recognizer(client),
                        config=config,
                        content=frame.data.tobytes(),
                    ),
                    timeout=conn_options.timeout,
                )

                return _recognize_response_to_speech_event(raw)
        except DeadlineExceeded:
            raise APITimeoutError()  # noqa: B904
        except GoogleAPICallError as e:
            raise APIStatusError(  # noqa: B904
                e.message,
                status_code=e.code or -1,
            )
        except Exception as e:
            raise APIConnectionError() from e

    def stream(
        self,
        *,
        language: NotGivenOr[SpeechLanguages | str] = NOT_GIVEN,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    ) -> SpeechStream:
        config = self._sanitize_options(language=language)
        stream = SpeechStream(
            stt=self,
            pool=self._pool,
            recognizer_cb=self._get_recognizer,
            config=config,
            conn_options=conn_options,
        )
        self._streams.add(stream)
        return stream

    def update_options(
        self,
        *,
        languages: NotGivenOr[LanguageCode] = NOT_GIVEN,
        detect_language: NotGivenOr[bool] = NOT_GIVEN,
        interim_results: NotGivenOr[bool] = NOT_GIVEN,
        punctuate: NotGivenOr[bool] = NOT_GIVEN,
        spoken_punctuation: NotGivenOr[bool] = NOT_GIVEN,
        model: NotGivenOr[SpeechModels] = NOT_GIVEN,
        location: NotGivenOr[str] = NOT_GIVEN,
        keywords: NotGivenOr[list[tuple[str, float]]] = NOT_GIVEN,
    ):
        if is_given(languages):
            if isinstance(languages, str):
                languages = [languages]
            self._config.languages = languages
        if is_given(detect_language):
            self._config.detect_language = detect_language
        if is_given(interim_results):
            self._config.interim_results = interim_results
        if is_given(punctuate):
            self._config.punctuate = punctuate
        if is_given(spoken_punctuation):
            self._config.spoken_punctuation = spoken_punctuation
        if is_given(model):
            self._config.model = model
        if is_given(location):
            self._location = location
            # if location is changed, fetch a new client and recognizer as per the new location
            self._pool.invalidate()
        if is_given(keywords):
            self._config.keywords = keywords

        for stream in self._streams:
            stream.update_options(
                languages=languages,
                detect_language=detect_language,
                interim_results=interim_results,
                punctuate=punctuate,
                spoken_punctuation=spoken_punctuation,
                model=model,
                keywords=keywords,
            )

    async def aclose(self) -> None:
        await self._pool.aclose()
        await super().aclose()

Helper class that provides a standard way to create an ABC using inheritance.

Create a new instance of Google STT.

Credentials must be provided, either by using the credentials_info dict, or reading from the file specified in credentials_file or via Application Default Credentials as described in https://cloud.google.com/docs/authentication/application-default-credentials

args: languages(LanguageCode): list of language codes to recognize (default: "en-US") detect_language(bool): whether to detect the language of the audio (default: True) interim_results(bool): whether to return interim results (default: True) punctuate(bool): whether to punctuate the audio (default: True) spoken_punctuation(bool): whether to use spoken punctuation (default: False) model(SpeechModels): the model to use for recognition default: "latest_long" location(str): the location to use for recognition default: "global" sample_rate(int): the sample rate of the audio default: 16000 credentials_info(dict): the credentials info to use for recognition (default: None) credentials_file(str): the credentials file to use for recognition (default: None) keywords(List[tuple[str, float]]): list of keywords to recognize (default: None)

Ancestors

  • livekit.agents.stt.stt.STT
  • abc.ABC
  • EventEmitter
  • typing.Generic

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    await self._pool.aclose()
    await super().aclose()

Close the STT, and every stream/requests associated with it

def stream(self,
*,
language: NotGivenOr[SpeechLanguages | str] = NOT_GIVEN,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.google.stt.SpeechStream
Expand source code
def stream(
    self,
    *,
    language: NotGivenOr[SpeechLanguages | str] = NOT_GIVEN,
    conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
) -> SpeechStream:
    config = self._sanitize_options(language=language)
    stream = SpeechStream(
        stt=self,
        pool=self._pool,
        recognizer_cb=self._get_recognizer,
        config=config,
        conn_options=conn_options,
    )
    self._streams.add(stream)
    return stream
def update_options(self,
*,
languages: NotGivenOr[LanguageCode] = NOT_GIVEN,
detect_language: NotGivenOr[bool] = NOT_GIVEN,
interim_results: NotGivenOr[bool] = NOT_GIVEN,
punctuate: NotGivenOr[bool] = NOT_GIVEN,
spoken_punctuation: NotGivenOr[bool] = NOT_GIVEN,
model: NotGivenOr[SpeechModels] = NOT_GIVEN,
location: NotGivenOr[str] = NOT_GIVEN,
keywords: NotGivenOr[list[tuple[str, float]]] = NOT_GIVEN)
Expand source code
def update_options(
    self,
    *,
    languages: NotGivenOr[LanguageCode] = NOT_GIVEN,
    detect_language: NotGivenOr[bool] = NOT_GIVEN,
    interim_results: NotGivenOr[bool] = NOT_GIVEN,
    punctuate: NotGivenOr[bool] = NOT_GIVEN,
    spoken_punctuation: NotGivenOr[bool] = NOT_GIVEN,
    model: NotGivenOr[SpeechModels] = NOT_GIVEN,
    location: NotGivenOr[str] = NOT_GIVEN,
    keywords: NotGivenOr[list[tuple[str, float]]] = NOT_GIVEN,
):
    if is_given(languages):
        if isinstance(languages, str):
            languages = [languages]
        self._config.languages = languages
    if is_given(detect_language):
        self._config.detect_language = detect_language
    if is_given(interim_results):
        self._config.interim_results = interim_results
    if is_given(punctuate):
        self._config.punctuate = punctuate
    if is_given(spoken_punctuation):
        self._config.spoken_punctuation = spoken_punctuation
    if is_given(model):
        self._config.model = model
    if is_given(location):
        self._location = location
        # if location is changed, fetch a new client and recognizer as per the new location
        self._pool.invalidate()
    if is_given(keywords):
        self._config.keywords = keywords

    for stream in self._streams:
        stream.update_options(
            languages=languages,
            detect_language=detect_language,
            interim_results=interim_results,
            punctuate=punctuate,
            spoken_punctuation=spoken_punctuation,
            model=model,
            keywords=keywords,
        )

Inherited members

class SpeechStream (*,
stt: STT,
conn_options: APIConnectOptions,
pool: utils.ConnectionPool[SpeechAsyncClient],
recognizer_cb: Callable[[SpeechAsyncClient], str],
config: STTOptions)
Expand source code
class SpeechStream(stt.SpeechStream):
    def __init__(
        self,
        *,
        stt: STT,
        conn_options: APIConnectOptions,
        pool: utils.ConnectionPool[SpeechAsyncClient],
        recognizer_cb: Callable[[SpeechAsyncClient], str],
        config: STTOptions,
    ) -> None:
        super().__init__(stt=stt, conn_options=conn_options, sample_rate=config.sample_rate)

        self._pool = pool
        self._recognizer_cb = recognizer_cb
        self._config = config
        self._reconnect_event = asyncio.Event()
        self._session_connected_at: float = 0

    def update_options(
        self,
        *,
        languages: NotGivenOr[LanguageCode] = NOT_GIVEN,
        detect_language: NotGivenOr[bool] = NOT_GIVEN,
        interim_results: NotGivenOr[bool] = NOT_GIVEN,
        punctuate: NotGivenOr[bool] = NOT_GIVEN,
        spoken_punctuation: NotGivenOr[bool] = NOT_GIVEN,
        model: NotGivenOr[SpeechModels] = NOT_GIVEN,
        keywords: NotGivenOr[list[tuple[str, float]]] = NOT_GIVEN,
    ):
        if is_given(languages):
            if isinstance(languages, str):
                languages = [languages]
            self._config.languages = languages
        if is_given(detect_language):
            self._config.detect_language = detect_language
        if is_given(interim_results):
            self._config.interim_results = interim_results
        if is_given(punctuate):
            self._config.punctuate = punctuate
        if is_given(spoken_punctuation):
            self._config.spoken_punctuation = spoken_punctuation
        if is_given(model):
            self._config.model = model
        if is_given(keywords):
            self._config.keywords = keywords

        self._reconnect_event.set()

    async def _run(self) -> None:
        # google requires a async generator when calling streaming_recognize
        # this function basically convert the queue into a async generator
        async def input_generator(client: SpeechAsyncClient, should_stop: asyncio.Event):
            try:
                # first request should contain the config
                yield cloud_speech.StreamingRecognizeRequest(
                    recognizer=self._recognizer_cb(client),
                    streaming_config=self._streaming_config,
                )

                async for frame in self._input_ch:
                    # when the stream is aborted due to reconnect, this input_generator
                    # needs to stop consuming frames
                    # when the generator stops, the previous gRPC stream will close
                    if should_stop.is_set():
                        return

                    if isinstance(frame, rtc.AudioFrame):
                        yield cloud_speech.StreamingRecognizeRequest(audio=frame.data.tobytes())

            except Exception:
                logger.exception("an error occurred while streaming input to google STT")

        async def process_stream(client: SpeechAsyncClient, stream):
            has_started = False
            async for resp in stream:
                if (
                    resp.speech_event_type
                    == cloud_speech.StreamingRecognizeResponse.SpeechEventType.SPEECH_ACTIVITY_BEGIN
                ):
                    self._event_ch.send_nowait(
                        stt.SpeechEvent(type=stt.SpeechEventType.START_OF_SPEECH)
                    )
                    has_started = True

                if (
                    resp.speech_event_type
                    == cloud_speech.StreamingRecognizeResponse.SpeechEventType.SPEECH_EVENT_TYPE_UNSPECIFIED  # noqa: E501
                ):
                    result = resp.results[0]
                    speech_data = _streaming_recognize_response_to_speech_data(resp)
                    if speech_data is None:
                        continue

                    if not result.is_final:
                        self._event_ch.send_nowait(
                            stt.SpeechEvent(
                                type=stt.SpeechEventType.INTERIM_TRANSCRIPT,
                                alternatives=[speech_data],
                            )
                        )
                    else:
                        self._event_ch.send_nowait(
                            stt.SpeechEvent(
                                type=stt.SpeechEventType.FINAL_TRANSCRIPT,
                                alternatives=[speech_data],
                            )
                        )
                        if time.time() - self._session_connected_at > _max_session_duration:
                            logger.debug(
                                "Google STT maximum connection time reached. Reconnecting..."
                            )
                            self._pool.remove(client)
                            if has_started:
                                self._event_ch.send_nowait(
                                    stt.SpeechEvent(type=stt.SpeechEventType.END_OF_SPEECH)
                                )
                                has_started = False
                            self._reconnect_event.set()
                            return

                if (
                    resp.speech_event_type
                    == cloud_speech.StreamingRecognizeResponse.SpeechEventType.SPEECH_ACTIVITY_END
                ):
                    self._event_ch.send_nowait(
                        stt.SpeechEvent(type=stt.SpeechEventType.END_OF_SPEECH)
                    )
                    has_started = False

        while True:
            try:
                async with self._pool.connection() as client:
                    self._streaming_config = cloud_speech.StreamingRecognitionConfig(
                        config=cloud_speech.RecognitionConfig(
                            explicit_decoding_config=cloud_speech.ExplicitDecodingConfig(
                                encoding=cloud_speech.ExplicitDecodingConfig.AudioEncoding.LINEAR16,
                                sample_rate_hertz=self._config.sample_rate,
                                audio_channel_count=1,
                            ),
                            adaptation=self._config.build_adaptation(),
                            language_codes=self._config.languages,
                            model=self._config.model,
                            features=cloud_speech.RecognitionFeatures(
                                enable_automatic_punctuation=self._config.punctuate,
                                enable_word_time_offsets=True,
                            ),
                        ),
                        streaming_features=cloud_speech.StreamingRecognitionFeatures(
                            interim_results=self._config.interim_results,
                        ),
                    )

                    should_stop = asyncio.Event()
                    stream = await client.streaming_recognize(
                        requests=input_generator(client, should_stop),
                    )
                    self._session_connected_at = time.time()

                    process_stream_task = asyncio.create_task(process_stream(client, stream))
                    wait_reconnect_task = asyncio.create_task(self._reconnect_event.wait())

                    try:
                        done, _ = await asyncio.wait(
                            [process_stream_task, wait_reconnect_task],
                            return_when=asyncio.FIRST_COMPLETED,
                        )
                        for task in done:
                            if task != wait_reconnect_task:
                                task.result()
                        if wait_reconnect_task not in done:
                            break
                        self._reconnect_event.clear()
                    finally:
                        await utils.aio.gracefully_cancel(process_stream_task, wait_reconnect_task)
                        should_stop.set()
            except DeadlineExceeded:
                raise APITimeoutError()  # noqa: B904
            except GoogleAPICallError as e:
                raise APIStatusError(  # noqa: B904
                    e.message,
                    status_code=e.code or -1,
                )
            except Exception as e:
                raise APIConnectionError() from e

Helper class that provides a standard way to create an ABC using inheritance.

Args: sample_rate : int or None, optional The desired sample rate for the audio input. If specified, the audio input will be automatically resampled to match the given sample rate before being processed for Speech-to-Text. If not provided (None), the input will retain its original sample rate.

Ancestors

  • livekit.agents.stt.stt.RecognizeStream
  • abc.ABC

Methods

def update_options(self,
*,
languages: NotGivenOr[LanguageCode] = NOT_GIVEN,
detect_language: NotGivenOr[bool] = NOT_GIVEN,
interim_results: NotGivenOr[bool] = NOT_GIVEN,
punctuate: NotGivenOr[bool] = NOT_GIVEN,
spoken_punctuation: NotGivenOr[bool] = NOT_GIVEN,
model: NotGivenOr[SpeechModels] = NOT_GIVEN,
keywords: NotGivenOr[list[tuple[str, float]]] = NOT_GIVEN)
Expand source code
def update_options(
    self,
    *,
    languages: NotGivenOr[LanguageCode] = NOT_GIVEN,
    detect_language: NotGivenOr[bool] = NOT_GIVEN,
    interim_results: NotGivenOr[bool] = NOT_GIVEN,
    punctuate: NotGivenOr[bool] = NOT_GIVEN,
    spoken_punctuation: NotGivenOr[bool] = NOT_GIVEN,
    model: NotGivenOr[SpeechModels] = NOT_GIVEN,
    keywords: NotGivenOr[list[tuple[str, float]]] = NOT_GIVEN,
):
    if is_given(languages):
        if isinstance(languages, str):
            languages = [languages]
        self._config.languages = languages
    if is_given(detect_language):
        self._config.detect_language = detect_language
    if is_given(interim_results):
        self._config.interim_results = interim_results
    if is_given(punctuate):
        self._config.punctuate = punctuate
    if is_given(spoken_punctuation):
        self._config.spoken_punctuation = spoken_punctuation
    if is_given(model):
        self._config.model = model
    if is_given(keywords):
        self._config.keywords = keywords

    self._reconnect_event.set()
class TTS (*,
voice: NotGivenOr[texttospeech.VoiceSelectionParams] = NOT_GIVEN,
sample_rate: int = 24000,
pitch: int = 0,
effects_profile_id: str = '',
speaking_rate: float = 1.0,
credentials_info: NotGivenOr[dict] = NOT_GIVEN,
credentials_file: NotGivenOr[str] = NOT_GIVEN)
Expand source code
class TTS(tts.TTS):
    def __init__(
        self,
        *,
        voice: NotGivenOr[texttospeech.VoiceSelectionParams] = NOT_GIVEN,
        sample_rate: int = 24000,
        pitch: int = 0,
        effects_profile_id: str = "",
        speaking_rate: float = 1.0,
        credentials_info: NotGivenOr[dict] = NOT_GIVEN,
        credentials_file: NotGivenOr[str] = NOT_GIVEN,
    ) -> None:
        """
        Create a new instance of Google TTS.

        Credentials must be provided, either by using the ``credentials_info`` dict, or reading
        from the file specified in ``credentials_file`` or the ``GOOGLE_APPLICATION_CREDENTIALS``
        environmental variable.

        Args:
            voice (texttospeech.VoiceSelectionParams, optional): Voice selection parameters.
            sample_rate (int, optional): Audio sample rate in Hz. Default is 24000.
            pitch (float, optional): Speaking pitch, ranging from -20.0 to 20.0 semitones relative to the original pitch. Default is 0.
            effects_profile_id (str): Optional identifier for selecting audio effects profiles to apply to the synthesized speech.
            speaking_rate (float, optional): Speed of speech. Default is 1.0.
            credentials_info (dict, optional): Dictionary containing Google Cloud credentials. Default is None.
            credentials_file (str, optional): Path to the Google Cloud credentials JSON file. Default is None.
        """  # noqa: E501

        super().__init__(
            capabilities=tts.TTSCapabilities(
                streaming=False,
            ),
            sample_rate=sample_rate,
            num_channels=1,
        )

        self._client: texttospeech.TextToSpeechAsyncClient | None = None
        self._credentials_info = credentials_info
        self._credentials_file = credentials_file

        if not is_given(voice):
            voice = texttospeech.VoiceSelectionParams(
                name="",
                language_code="en-US",
                ssml_gender=SsmlVoiceGender.NEUTRAL,
            )

        self._opts = _TTSOptions(
            voice=voice,
            audio_config=texttospeech.AudioConfig(
                audio_encoding=texttospeech.AudioEncoding.OGG_OPUS,
                sample_rate_hertz=sample_rate,
                pitch=pitch,
                effects_profile_id=effects_profile_id,
                speaking_rate=speaking_rate,
            ),
        )

    def update_options(
        self,
        *,
        voice: NotGivenOr[texttospeech.VoiceSelectionParams] = NOT_GIVEN,
        speaking_rate: NotGivenOr[float] = NOT_GIVEN,
    ) -> None:
        """
        Update the TTS options.

        Args:
            voice (texttospeech.VoiceSelectionParams, optional): Voice selection parameters.
            speaking_rate (float, optional): Speed of speech.
        """  # noqa: E501
        if is_given(voice):
            self._opts.voice = voice
        if is_given(speaking_rate):
            self._opts.audio_config.speaking_rate = speaking_rate

    def _ensure_client(self) -> texttospeech.TextToSpeechAsyncClient:
        if self._client is None:
            if self._credentials_info:
                self._client = texttospeech.TextToSpeechAsyncClient.from_service_account_info(
                    self._credentials_info
                )

            elif self._credentials_file:
                self._client = texttospeech.TextToSpeechAsyncClient.from_service_account_file(
                    self._credentials_file
                )
            else:
                self._client = texttospeech.TextToSpeechAsyncClient()

        assert self._client is not None
        return self._client

    def synthesize(
        self,
        text: str,
        *,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    ) -> ChunkedStream:
        return ChunkedStream(
            tts=self,
            input_text=text,
            conn_options=conn_options,
            opts=self._opts,
            client=self._ensure_client(),
        )

Helper class that provides a standard way to create an ABC using inheritance.

Create a new instance of Google TTS.

Credentials must be provided, either by using the credentials_info dict, or reading from the file specified in credentials_file or the GOOGLE_APPLICATION_CREDENTIALS environmental variable.

Args

voice : texttospeech.VoiceSelectionParams, optional
Voice selection parameters.
sample_rate : int, optional
Audio sample rate in Hz. Default is 24000.
pitch : float, optional
Speaking pitch, ranging from -20.0 to 20.0 semitones relative to the original pitch. Default is 0.
effects_profile_id : str
Optional identifier for selecting audio effects profiles to apply to the synthesized speech.
speaking_rate : float, optional
Speed of speech. Default is 1.0.
credentials_info : dict, optional
Dictionary containing Google Cloud credentials. Default is None.
credentials_file : str, optional
Path to the Google Cloud credentials JSON file. Default is None.

Ancestors

  • livekit.agents.tts.tts.TTS
  • abc.ABC
  • EventEmitter
  • typing.Generic

Methods

def synthesize(self,
text: str,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.google.tts.ChunkedStream
Expand source code
def synthesize(
    self,
    text: str,
    *,
    conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
) -> ChunkedStream:
    return ChunkedStream(
        tts=self,
        input_text=text,
        conn_options=conn_options,
        opts=self._opts,
        client=self._ensure_client(),
    )
def update_options(self,
*,
voice: NotGivenOr[texttospeech.VoiceSelectionParams] = NOT_GIVEN,
speaking_rate: NotGivenOr[float] = NOT_GIVEN) ‑> None
Expand source code
def update_options(
    self,
    *,
    voice: NotGivenOr[texttospeech.VoiceSelectionParams] = NOT_GIVEN,
    speaking_rate: NotGivenOr[float] = NOT_GIVEN,
) -> None:
    """
    Update the TTS options.

    Args:
        voice (texttospeech.VoiceSelectionParams, optional): Voice selection parameters.
        speaking_rate (float, optional): Speed of speech.
    """  # noqa: E501
    if is_given(voice):
        self._opts.voice = voice
    if is_given(speaking_rate):
        self._opts.audio_config.speaking_rate = speaking_rate

Update the TTS options.

Args

voice : texttospeech.VoiceSelectionParams, optional
Voice selection parameters.
speaking_rate : float, optional
Speed of speech.

Inherited members