Module livekit.plugins.google
Google AI plugin for LiveKit Agents
Supports Gemini, Cloud Speech-to-Text, and Cloud Text-to-Speech.
See https://docs.livekit.io/agents/integrations/stt/google/ for more information.
Sub-modules
livekit.plugins.google.beta
Classes
class LLM (*,
model: ChatModels | str = 'gemini-2.0-flash-001',
api_key: NotGivenOr[str] = NOT_GIVEN,
vertexai: NotGivenOr[bool] = NOT_GIVEN,
project: NotGivenOr[str] = NOT_GIVEN,
location: NotGivenOr[str] = NOT_GIVEN,
temperature: NotGivenOr[float] = NOT_GIVEN,
max_output_tokens: NotGivenOr[int] = NOT_GIVEN,
top_p: NotGivenOr[float] = NOT_GIVEN,
top_k: NotGivenOr[float] = NOT_GIVEN,
presence_penalty: NotGivenOr[float] = NOT_GIVEN,
frequency_penalty: NotGivenOr[float] = NOT_GIVEN,
tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN,
thinking_config: NotGivenOr[types.ThinkingConfigOrDict] = NOT_GIVEN)-
Expand source code
class LLM(llm.LLM): def __init__( self, *, model: ChatModels | str = "gemini-2.0-flash-001", api_key: NotGivenOr[str] = NOT_GIVEN, vertexai: NotGivenOr[bool] = NOT_GIVEN, project: NotGivenOr[str] = NOT_GIVEN, location: NotGivenOr[str] = NOT_GIVEN, temperature: NotGivenOr[float] = NOT_GIVEN, max_output_tokens: NotGivenOr[int] = NOT_GIVEN, top_p: NotGivenOr[float] = NOT_GIVEN, top_k: NotGivenOr[float] = NOT_GIVEN, presence_penalty: NotGivenOr[float] = NOT_GIVEN, frequency_penalty: NotGivenOr[float] = NOT_GIVEN, tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN, thinking_config: NotGivenOr[types.ThinkingConfigOrDict] = NOT_GIVEN, ) -> None: """ Create a new instance of Google GenAI LLM. Environment Requirements: - For VertexAI: Set the `GOOGLE_APPLICATION_CREDENTIALS` environment variable to the path of the service account key file or use any of the other Google Cloud auth methods. The Google Cloud project and location can be set via `project` and `location` arguments or the environment variables `GOOGLE_CLOUD_PROJECT` and `GOOGLE_CLOUD_LOCATION`. By default, the project is inferred from the service account key file, and the location defaults to "us-central1". - For Google Gemini API: Set the `api_key` argument or the `GOOGLE_API_KEY` environment variable. Args: model (ChatModels | str, optional): The model name to use. Defaults to "gemini-2.0-flash-001". api_key (str, optional): The API key for Google Gemini. If not provided, it attempts to read from the `GOOGLE_API_KEY` environment variable. vertexai (bool, optional): Whether to use VertexAI. If not provided, it attempts to read from the `GOOGLE_GENAI_USE_VERTEXAI` environment variable. Defaults to False. project (str, optional): The Google Cloud project to use (only for VertexAI). Defaults to None. location (str, optional): The location to use for VertexAI API requests. Defaults value is "us-central1". temperature (float, optional): Sampling temperature for response generation. Defaults to 0.8. max_output_tokens (int, optional): Maximum number of tokens to generate in the output. Defaults to None. top_p (float, optional): The nucleus sampling probability for response generation. Defaults to None. top_k (int, optional): The top-k sampling value for response generation. Defaults to None. presence_penalty (float, optional): Penalizes the model for generating previously mentioned concepts. Defaults to None. frequency_penalty (float, optional): Penalizes the model for repeating words. Defaults to None. tool_choice (ToolChoice, optional): Specifies whether to use tools during response generation. Defaults to "auto". thinking_config (ThinkingConfigOrDict, optional): The thinking configuration for response generation. Defaults to None. """ # noqa: E501 super().__init__() gcp_project = project if is_given(project) else os.environ.get("GOOGLE_CLOUD_PROJECT") gcp_location = ( location if is_given(location) else os.environ.get("GOOGLE_CLOUD_LOCATION") or "us-central1" ) use_vertexai = ( vertexai if is_given(vertexai) else os.environ.get("GOOGLE_GENAI_USE_VERTEXAI", "0").lower() in ["true", "1"] ) gemini_api_key = api_key if is_given(api_key) else os.environ.get("GOOGLE_API_KEY") if use_vertexai: if not gcp_project: _, gcp_project = default_async( scopes=["https://www.googleapis.com/auth/cloud-platform"] ) gemini_api_key = None # VertexAI does not require an API key else: gcp_project = None gcp_location = None if not gemini_api_key: raise ValueError( "API key is required for Google API either via api_key or GOOGLE_API_KEY environment variable" # noqa: E501 ) # Validate thinking_config if is_given(thinking_config): _thinking_budget = None if isinstance(thinking_config, dict): _thinking_budget = thinking_config.get("thinking_budget") elif isinstance(thinking_config, types.ThinkingConfig): _thinking_budget = thinking_config.thinking_budget if _thinking_budget is not None: if not isinstance(_thinking_budget, int): raise ValueError("thinking_budget inside thinking_config must be an integer") if not (0 <= _thinking_budget <= 24576): raise ValueError( "thinking_budget inside thinking_config must be between 0 and 24576" ) self._opts = _LLMOptions( model=model, temperature=temperature, tool_choice=tool_choice, vertexai=use_vertexai, project=project, location=location, max_output_tokens=max_output_tokens, top_p=top_p, top_k=top_k, presence_penalty=presence_penalty, frequency_penalty=frequency_penalty, thinking_config=thinking_config, ) self._client = genai.Client( api_key=gemini_api_key, vertexai=use_vertexai, project=gcp_project, location=gcp_location, ) def chat( self, *, chat_ctx: llm.ChatContext, tools: list[FunctionTool] | None = None, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN, tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN, response_format: NotGivenOr[ types.SchemaUnion | type[llm_utils.ResponseFormatT] ] = NOT_GIVEN, extra_kwargs: NotGivenOr[dict[str, Any]] = NOT_GIVEN, ) -> LLMStream: extra = {} if is_given(extra_kwargs): extra.update(extra_kwargs) tool_choice = tool_choice if is_given(tool_choice) else self._opts.tool_choice if is_given(tool_choice): gemini_tool_choice: types.ToolConfig if isinstance(tool_choice, dict) and tool_choice.get("type") == "function": gemini_tool_choice = types.ToolConfig( function_calling_config=types.FunctionCallingConfig( mode="ANY", allowed_function_names=[tool_choice["function"]["name"]], ) ) extra["tool_config"] = gemini_tool_choice elif tool_choice == "required": gemini_tool_choice = types.ToolConfig( function_calling_config=types.FunctionCallingConfig( mode="ANY", allowed_function_names=[get_function_info(fnc).name for fnc in tools] if tools else None, ) ) extra["tool_config"] = gemini_tool_choice elif tool_choice == "auto": gemini_tool_choice = types.ToolConfig( function_calling_config=types.FunctionCallingConfig( mode="AUTO", ) ) extra["tool_config"] = gemini_tool_choice elif tool_choice == "none": gemini_tool_choice = types.ToolConfig( function_calling_config=types.FunctionCallingConfig( mode="NONE", ) ) extra["tool_config"] = gemini_tool_choice if is_given(response_format): extra["response_schema"] = to_response_format(response_format) extra["response_mime_type"] = "application/json" if is_given(self._opts.temperature): extra["temperature"] = self._opts.temperature if is_given(self._opts.max_output_tokens): extra["max_output_tokens"] = self._opts.max_output_tokens if is_given(self._opts.top_p): extra["top_p"] = self._opts.top_p if is_given(self._opts.top_k): extra["top_k"] = self._opts.top_k if is_given(self._opts.presence_penalty): extra["presence_penalty"] = self._opts.presence_penalty if is_given(self._opts.frequency_penalty): extra["frequency_penalty"] = self._opts.frequency_penalty # Add thinking config if thinking_budget is provided if is_given(self._opts.thinking_config): extra["thinking_config"] = self._opts.thinking_config return LLMStream( self, client=self._client, model=self._opts.model, chat_ctx=chat_ctx, tools=tools or [], conn_options=conn_options, extra_kwargs=extra, )
Helper class that provides a standard way to create an ABC using inheritance.
Create a new instance of Google GenAI LLM.
Environment Requirements: - For VertexAI: Set the
GOOGLE_APPLICATION_CREDENTIALS
environment variable to the path of the service account key file or use any of the other Google Cloud auth methods. The Google Cloud project and location can be set viaproject
andlocation
arguments or the environment variablesGOOGLE_CLOUD_PROJECT
andGOOGLE_CLOUD_LOCATION
. By default, the project is inferred from the service account key file, and the location defaults to "us-central1". - For Google Gemini API: Set theapi_key
argument or theGOOGLE_API_KEY
environment variable.Args
model
:ChatModels | str
, optional- The model name to use. Defaults to "gemini-2.0-flash-001".
api_key
:str
, optional- The API key for Google Gemini. If not provided, it attempts to read from the
GOOGLE_API_KEY
environment variable. vertexai
:bool
, optional- Whether to use VertexAI. If not provided, it attempts to read from the
GOOGLE_GENAI_USE_VERTEXAI
environment variable. Defaults to False. project (str, optional): The Google Cloud project to use (only for VertexAI). Defaults to None. location (str, optional): The location to use for VertexAI API requests. Defaults value is "us-central1". temperature
:float
, optional- Sampling temperature for response generation. Defaults to 0.8.
max_output_tokens
:int
, optional- Maximum number of tokens to generate in the output. Defaults to None.
top_p
:float
, optional- The nucleus sampling probability for response generation. Defaults to None.
top_k
:int
, optional- The top-k sampling value for response generation. Defaults to None.
presence_penalty
:float
, optional- Penalizes the model for generating previously mentioned concepts. Defaults to None.
frequency_penalty
:float
, optional- Penalizes the model for repeating words. Defaults to None.
tool_choice
:ToolChoice
, optional- Specifies whether to use tools during response generation. Defaults to "auto".
thinking_config
:ThinkingConfigOrDict
, optional- The thinking configuration for response generation. Defaults to None.
Ancestors
- livekit.agents.llm.llm.LLM
- abc.ABC
- EventEmitter
- typing.Generic
Methods
def chat(self,
*,
chat_ctx: llm.ChatContext,
tools: list[FunctionTool] | None = None,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0),
parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN,
tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN,
response_format: NotGivenOr[types.SchemaUnion | type[llm_utils.ResponseFormatT]] = NOT_GIVEN,
extra_kwargs: NotGivenOr[dict[str, Any]] = NOT_GIVEN) ‑> livekit.plugins.google.llm.LLMStream-
Expand source code
def chat( self, *, chat_ctx: llm.ChatContext, tools: list[FunctionTool] | None = None, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN, tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN, response_format: NotGivenOr[ types.SchemaUnion | type[llm_utils.ResponseFormatT] ] = NOT_GIVEN, extra_kwargs: NotGivenOr[dict[str, Any]] = NOT_GIVEN, ) -> LLMStream: extra = {} if is_given(extra_kwargs): extra.update(extra_kwargs) tool_choice = tool_choice if is_given(tool_choice) else self._opts.tool_choice if is_given(tool_choice): gemini_tool_choice: types.ToolConfig if isinstance(tool_choice, dict) and tool_choice.get("type") == "function": gemini_tool_choice = types.ToolConfig( function_calling_config=types.FunctionCallingConfig( mode="ANY", allowed_function_names=[tool_choice["function"]["name"]], ) ) extra["tool_config"] = gemini_tool_choice elif tool_choice == "required": gemini_tool_choice = types.ToolConfig( function_calling_config=types.FunctionCallingConfig( mode="ANY", allowed_function_names=[get_function_info(fnc).name for fnc in tools] if tools else None, ) ) extra["tool_config"] = gemini_tool_choice elif tool_choice == "auto": gemini_tool_choice = types.ToolConfig( function_calling_config=types.FunctionCallingConfig( mode="AUTO", ) ) extra["tool_config"] = gemini_tool_choice elif tool_choice == "none": gemini_tool_choice = types.ToolConfig( function_calling_config=types.FunctionCallingConfig( mode="NONE", ) ) extra["tool_config"] = gemini_tool_choice if is_given(response_format): extra["response_schema"] = to_response_format(response_format) extra["response_mime_type"] = "application/json" if is_given(self._opts.temperature): extra["temperature"] = self._opts.temperature if is_given(self._opts.max_output_tokens): extra["max_output_tokens"] = self._opts.max_output_tokens if is_given(self._opts.top_p): extra["top_p"] = self._opts.top_p if is_given(self._opts.top_k): extra["top_k"] = self._opts.top_k if is_given(self._opts.presence_penalty): extra["presence_penalty"] = self._opts.presence_penalty if is_given(self._opts.frequency_penalty): extra["frequency_penalty"] = self._opts.frequency_penalty # Add thinking config if thinking_budget is provided if is_given(self._opts.thinking_config): extra["thinking_config"] = self._opts.thinking_config return LLMStream( self, client=self._client, model=self._opts.model, chat_ctx=chat_ctx, tools=tools or [], conn_options=conn_options, extra_kwargs=extra, )
Inherited members
class STT (*,
languages: LanguageCode = 'en-US',
detect_language: bool = True,
interim_results: bool = True,
punctuate: bool = True,
spoken_punctuation: bool = False,
model: SpeechModels | str = 'latest_long',
location: str = 'global',
sample_rate: int = 16000,
min_confidence_threshold: float = 0.65,
credentials_info: NotGivenOr[dict] = NOT_GIVEN,
credentials_file: NotGivenOr[str] = NOT_GIVEN,
keywords: NotGivenOr[list[tuple[str, float]]] = NOT_GIVEN,
use_streaming: NotGivenOr[bool] = NOT_GIVEN)-
Expand source code
class STT(stt.STT): def __init__( self, *, languages: LanguageCode = "en-US", # Google STT can accept multiple languages detect_language: bool = True, interim_results: bool = True, punctuate: bool = True, spoken_punctuation: bool = False, model: SpeechModels | str = "latest_long", location: str = "global", sample_rate: int = 16000, min_confidence_threshold: float = _default_min_confidence, credentials_info: NotGivenOr[dict] = NOT_GIVEN, credentials_file: NotGivenOr[str] = NOT_GIVEN, keywords: NotGivenOr[list[tuple[str, float]]] = NOT_GIVEN, use_streaming: NotGivenOr[bool] = NOT_GIVEN, ): """ Create a new instance of Google STT. Credentials must be provided, either by using the ``credentials_info`` dict, or reading from the file specified in ``credentials_file`` or via Application Default Credentials as described in https://cloud.google.com/docs/authentication/application-default-credentials args: languages(LanguageCode): list of language codes to recognize (default: "en-US") detect_language(bool): whether to detect the language of the audio (default: True) interim_results(bool): whether to return interim results (default: True) punctuate(bool): whether to punctuate the audio (default: True) spoken_punctuation(bool): whether to use spoken punctuation (default: False) model(SpeechModels): the model to use for recognition default: "latest_long" location(str): the location to use for recognition default: "global" sample_rate(int): the sample rate of the audio default: 16000 min_confidence_threshold(float): minimum confidence threshold for recognition (default: 0.65) credentials_info(dict): the credentials info to use for recognition (default: None) credentials_file(str): the credentials file to use for recognition (default: None) keywords(List[tuple[str, float]]): list of keywords to recognize (default: None) use_streaming(bool): whether to use streaming for recognition (default: True) """ if not is_given(use_streaming): use_streaming = True super().__init__( capabilities=stt.STTCapabilities(streaming=use_streaming, interim_results=True) ) self._location = location self._credentials_info = credentials_info self._credentials_file = credentials_file if not is_given(credentials_file) and not is_given(credentials_info): try: gauth_default() except DefaultCredentialsError: raise ValueError( "Application default credentials must be available " "when using Google STT without explicitly passing " "credentials through credentials_info or credentials_file." ) from None if isinstance(languages, str): languages = [languages] self._config = STTOptions( languages=languages, detect_language=detect_language, interim_results=interim_results, punctuate=punctuate, spoken_punctuation=spoken_punctuation, model=model, sample_rate=sample_rate, min_confidence_threshold=min_confidence_threshold, keywords=keywords, ) self._streams = weakref.WeakSet[SpeechStream]() self._pool = utils.ConnectionPool[SpeechAsyncClient]( max_session_duration=_max_session_duration, connect_cb=self._create_client, ) async def _create_client(self) -> SpeechAsyncClient: # Add support for passing a specific location that matches recognizer # see: https://cloud.google.com/speech-to-text/v2/docs/speech-to-text-supported-languages client_options = None client: SpeechAsyncClient | None = None if self._location != "global": client_options = ClientOptions(api_endpoint=f"{self._location}-speech.googleapis.com") if is_given(self._credentials_info): client = SpeechAsyncClient.from_service_account_info( self._credentials_info, client_options=client_options ) elif is_given(self._credentials_file): client = SpeechAsyncClient.from_service_account_file( self._credentials_file, client_options=client_options ) else: client = SpeechAsyncClient(client_options=client_options) assert client is not None return client def _get_recognizer(self, client: SpeechAsyncClient) -> str: # TODO(theomonnom): should we use recognizers? # recognizers may improve latency https://cloud.google.com/speech-to-text/v2/docs/recognizers#understand_recognizers # TODO(theomonnom): find a better way to access the project_id try: project_id = client.transport._credentials.project_id # type: ignore except AttributeError: from google.auth import default as ga_default _, project_id = ga_default() return f"projects/{project_id}/locations/{self._location}/recognizers/_" def _sanitize_options(self, *, language: NotGivenOr[str] = NOT_GIVEN) -> STTOptions: config = dataclasses.replace(self._config) if is_given(language): config.languages = [language] if not isinstance(config.languages, list): config.languages = [config.languages] elif not config.detect_language: if len(config.languages) > 1: logger.warning("multiple languages provided, but language detection is disabled") config.languages = [config.languages[0]] return config async def _recognize_impl( self, buffer: utils.AudioBuffer, *, language: NotGivenOr[SpeechLanguages | str] = NOT_GIVEN, conn_options: APIConnectOptions, ) -> stt.SpeechEvent: config = self._sanitize_options(language=language) frame = rtc.combine_audio_frames(buffer) config = cloud_speech.RecognitionConfig( explicit_decoding_config=cloud_speech.ExplicitDecodingConfig( encoding=cloud_speech.ExplicitDecodingConfig.AudioEncoding.LINEAR16, sample_rate_hertz=frame.sample_rate, audio_channel_count=frame.num_channels, ), adaptation=config.build_adaptation(), features=cloud_speech.RecognitionFeatures( enable_automatic_punctuation=config.punctuate, enable_spoken_punctuation=config.spoken_punctuation, enable_word_time_offsets=True, ), model=config.model, language_codes=config.languages, ) try: async with self._pool.connection() as client: raw = await client.recognize( cloud_speech.RecognizeRequest( recognizer=self._get_recognizer(client), config=config, content=frame.data.tobytes(), ), timeout=conn_options.timeout, ) return _recognize_response_to_speech_event(raw) except DeadlineExceeded: raise APITimeoutError() from None except GoogleAPICallError as e: raise APIStatusError(f"{e.message} {e.details}", status_code=e.code or -1) from e except Exception as e: raise APIConnectionError() from e def stream( self, *, language: NotGivenOr[SpeechLanguages | str] = NOT_GIVEN, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> SpeechStream: config = self._sanitize_options(language=language) stream = SpeechStream( stt=self, pool=self._pool, recognizer_cb=self._get_recognizer, config=config, conn_options=conn_options, ) self._streams.add(stream) return stream def update_options( self, *, languages: NotGivenOr[LanguageCode] = NOT_GIVEN, detect_language: NotGivenOr[bool] = NOT_GIVEN, interim_results: NotGivenOr[bool] = NOT_GIVEN, punctuate: NotGivenOr[bool] = NOT_GIVEN, spoken_punctuation: NotGivenOr[bool] = NOT_GIVEN, model: NotGivenOr[SpeechModels] = NOT_GIVEN, location: NotGivenOr[str] = NOT_GIVEN, keywords: NotGivenOr[list[tuple[str, float]]] = NOT_GIVEN, ): if is_given(languages): if isinstance(languages, str): languages = [languages] self._config.languages = languages if is_given(detect_language): self._config.detect_language = detect_language if is_given(interim_results): self._config.interim_results = interim_results if is_given(punctuate): self._config.punctuate = punctuate if is_given(spoken_punctuation): self._config.spoken_punctuation = spoken_punctuation if is_given(model): self._config.model = model if is_given(location): self._location = location # if location is changed, fetch a new client and recognizer as per the new location self._pool.invalidate() if is_given(keywords): self._config.keywords = keywords for stream in self._streams: stream.update_options( languages=languages, detect_language=detect_language, interim_results=interim_results, punctuate=punctuate, spoken_punctuation=spoken_punctuation, model=model, keywords=keywords, ) async def aclose(self) -> None: await self._pool.aclose() await super().aclose()
Helper class that provides a standard way to create an ABC using inheritance.
Create a new instance of Google STT.
Credentials must be provided, either by using the
credentials_info
dict, or reading from the file specified incredentials_file
or via Application Default Credentials as described in https://cloud.google.com/docs/authentication/application-default-credentialsargs: languages(LanguageCode): list of language codes to recognize (default: "en-US") detect_language(bool): whether to detect the language of the audio (default: True) interim_results(bool): whether to return interim results (default: True) punctuate(bool): whether to punctuate the audio (default: True) spoken_punctuation(bool): whether to use spoken punctuation (default: False) model(SpeechModels): the model to use for recognition default: "latest_long" location(str): the location to use for recognition default: "global" sample_rate(int): the sample rate of the audio default: 16000 min_confidence_threshold(float): minimum confidence threshold for recognition (default: 0.65) credentials_info(dict): the credentials info to use for recognition (default: None) credentials_file(str): the credentials file to use for recognition (default: None) keywords(List[tuple[str, float]]): list of keywords to recognize (default: None) use_streaming(bool): whether to use streaming for recognition (default: True)
Ancestors
- livekit.agents.stt.stt.STT
- abc.ABC
- EventEmitter
- typing.Generic
Methods
async def aclose(self) ‑> None
-
Expand source code
async def aclose(self) -> None: await self._pool.aclose() await super().aclose()
Close the STT, and every stream/requests associated with it
def stream(self,
*,
language: NotGivenOr[SpeechLanguages | str] = NOT_GIVEN,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.google.stt.SpeechStream-
Expand source code
def stream( self, *, language: NotGivenOr[SpeechLanguages | str] = NOT_GIVEN, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> SpeechStream: config = self._sanitize_options(language=language) stream = SpeechStream( stt=self, pool=self._pool, recognizer_cb=self._get_recognizer, config=config, conn_options=conn_options, ) self._streams.add(stream) return stream
def update_options(self,
*,
languages: NotGivenOr[LanguageCode] = NOT_GIVEN,
detect_language: NotGivenOr[bool] = NOT_GIVEN,
interim_results: NotGivenOr[bool] = NOT_GIVEN,
punctuate: NotGivenOr[bool] = NOT_GIVEN,
spoken_punctuation: NotGivenOr[bool] = NOT_GIVEN,
model: NotGivenOr[SpeechModels] = NOT_GIVEN,
location: NotGivenOr[str] = NOT_GIVEN,
keywords: NotGivenOr[list[tuple[str, float]]] = NOT_GIVEN)-
Expand source code
def update_options( self, *, languages: NotGivenOr[LanguageCode] = NOT_GIVEN, detect_language: NotGivenOr[bool] = NOT_GIVEN, interim_results: NotGivenOr[bool] = NOT_GIVEN, punctuate: NotGivenOr[bool] = NOT_GIVEN, spoken_punctuation: NotGivenOr[bool] = NOT_GIVEN, model: NotGivenOr[SpeechModels] = NOT_GIVEN, location: NotGivenOr[str] = NOT_GIVEN, keywords: NotGivenOr[list[tuple[str, float]]] = NOT_GIVEN, ): if is_given(languages): if isinstance(languages, str): languages = [languages] self._config.languages = languages if is_given(detect_language): self._config.detect_language = detect_language if is_given(interim_results): self._config.interim_results = interim_results if is_given(punctuate): self._config.punctuate = punctuate if is_given(spoken_punctuation): self._config.spoken_punctuation = spoken_punctuation if is_given(model): self._config.model = model if is_given(location): self._location = location # if location is changed, fetch a new client and recognizer as per the new location self._pool.invalidate() if is_given(keywords): self._config.keywords = keywords for stream in self._streams: stream.update_options( languages=languages, detect_language=detect_language, interim_results=interim_results, punctuate=punctuate, spoken_punctuation=spoken_punctuation, model=model, keywords=keywords, )
Inherited members
class SpeechStream (*,
stt: STT,
conn_options: APIConnectOptions,
pool: utils.ConnectionPool[SpeechAsyncClient],
recognizer_cb: Callable[[SpeechAsyncClient], str],
config: STTOptions)-
Expand source code
class SpeechStream(stt.SpeechStream): def __init__( self, *, stt: STT, conn_options: APIConnectOptions, pool: utils.ConnectionPool[SpeechAsyncClient], recognizer_cb: Callable[[SpeechAsyncClient], str], config: STTOptions, ) -> None: super().__init__(stt=stt, conn_options=conn_options, sample_rate=config.sample_rate) self._pool = pool self._recognizer_cb = recognizer_cb self._config = config self._reconnect_event = asyncio.Event() self._session_connected_at: float = 0 def update_options( self, *, languages: NotGivenOr[LanguageCode] = NOT_GIVEN, detect_language: NotGivenOr[bool] = NOT_GIVEN, interim_results: NotGivenOr[bool] = NOT_GIVEN, punctuate: NotGivenOr[bool] = NOT_GIVEN, spoken_punctuation: NotGivenOr[bool] = NOT_GIVEN, model: NotGivenOr[SpeechModels] = NOT_GIVEN, min_confidence_threshold: NotGivenOr[float] = NOT_GIVEN, keywords: NotGivenOr[list[tuple[str, float]]] = NOT_GIVEN, ): if is_given(languages): if isinstance(languages, str): languages = [languages] self._config.languages = languages if is_given(detect_language): self._config.detect_language = detect_language if is_given(interim_results): self._config.interim_results = interim_results if is_given(punctuate): self._config.punctuate = punctuate if is_given(spoken_punctuation): self._config.spoken_punctuation = spoken_punctuation if is_given(model): self._config.model = model if is_given(min_confidence_threshold): self._config.min_confidence_threshold = min_confidence_threshold if is_given(keywords): self._config.keywords = keywords self._reconnect_event.set() async def _run(self) -> None: # google requires a async generator when calling streaming_recognize # this function basically convert the queue into a async generator async def input_generator(client: SpeechAsyncClient, should_stop: asyncio.Event): try: # first request should contain the config yield cloud_speech.StreamingRecognizeRequest( recognizer=self._recognizer_cb(client), streaming_config=self._streaming_config, ) async for frame in self._input_ch: # when the stream is aborted due to reconnect, this input_generator # needs to stop consuming frames # when the generator stops, the previous gRPC stream will close if should_stop.is_set(): return if isinstance(frame, rtc.AudioFrame): yield cloud_speech.StreamingRecognizeRequest(audio=frame.data.tobytes()) except Exception: logger.exception("an error occurred while streaming input to google STT") async def process_stream(client: SpeechAsyncClient, stream): has_started = False async for resp in stream: if ( resp.speech_event_type == cloud_speech.StreamingRecognizeResponse.SpeechEventType.SPEECH_ACTIVITY_BEGIN ): self._event_ch.send_nowait( stt.SpeechEvent(type=stt.SpeechEventType.START_OF_SPEECH) ) has_started = True if ( resp.speech_event_type == cloud_speech.StreamingRecognizeResponse.SpeechEventType.SPEECH_EVENT_TYPE_UNSPECIFIED # noqa: E501 ): result = resp.results[0] speech_data = _streaming_recognize_response_to_speech_data( resp, min_confidence_threshold=self._config.min_confidence_threshold, ) if speech_data is None: continue if not result.is_final: self._event_ch.send_nowait( stt.SpeechEvent( type=stt.SpeechEventType.INTERIM_TRANSCRIPT, alternatives=[speech_data], ) ) else: self._event_ch.send_nowait( stt.SpeechEvent( type=stt.SpeechEventType.FINAL_TRANSCRIPT, alternatives=[speech_data], ) ) if time.time() - self._session_connected_at > _max_session_duration: logger.debug( "Google STT maximum connection time reached. Reconnecting..." ) self._pool.remove(client) if has_started: self._event_ch.send_nowait( stt.SpeechEvent(type=stt.SpeechEventType.END_OF_SPEECH) ) has_started = False self._reconnect_event.set() return if ( resp.speech_event_type == cloud_speech.StreamingRecognizeResponse.SpeechEventType.SPEECH_ACTIVITY_END ): self._event_ch.send_nowait( stt.SpeechEvent(type=stt.SpeechEventType.END_OF_SPEECH) ) has_started = False while True: try: async with self._pool.connection() as client: self._streaming_config = cloud_speech.StreamingRecognitionConfig( config=cloud_speech.RecognitionConfig( explicit_decoding_config=cloud_speech.ExplicitDecodingConfig( encoding=cloud_speech.ExplicitDecodingConfig.AudioEncoding.LINEAR16, sample_rate_hertz=self._config.sample_rate, audio_channel_count=1, ), adaptation=self._config.build_adaptation(), language_codes=self._config.languages, model=self._config.model, features=cloud_speech.RecognitionFeatures( enable_automatic_punctuation=self._config.punctuate, enable_word_time_offsets=True, enable_spoken_punctuation=self._config.spoken_punctuation, ), ), streaming_features=cloud_speech.StreamingRecognitionFeatures( interim_results=self._config.interim_results, ), ) should_stop = asyncio.Event() stream = await client.streaming_recognize( requests=input_generator(client, should_stop), ) self._session_connected_at = time.time() process_stream_task = asyncio.create_task(process_stream(client, stream)) wait_reconnect_task = asyncio.create_task(self._reconnect_event.wait()) try: done, _ = await asyncio.wait( [process_stream_task, wait_reconnect_task], return_when=asyncio.FIRST_COMPLETED, ) for task in done: if task != wait_reconnect_task: task.result() if wait_reconnect_task not in done: break self._reconnect_event.clear() finally: await utils.aio.gracefully_cancel(process_stream_task, wait_reconnect_task) should_stop.set() except DeadlineExceeded: raise APITimeoutError() from None except GoogleAPICallError as e: if e.code == 409: logger.debug("stream timed out, restarting.") else: raise APIStatusError( f"{e.message} {e.details}", status_code=e.code or -1 ) from e except Exception as e: raise APIConnectionError() from e
Helper class that provides a standard way to create an ABC using inheritance.
Args: sample_rate : int or None, optional The desired sample rate for the audio input. If specified, the audio input will be automatically resampled to match the given sample rate before being processed for Speech-to-Text. If not provided (None), the input will retain its original sample rate.
Ancestors
- livekit.agents.stt.stt.RecognizeStream
- abc.ABC
Methods
def update_options(self,
*,
languages: NotGivenOr[LanguageCode] = NOT_GIVEN,
detect_language: NotGivenOr[bool] = NOT_GIVEN,
interim_results: NotGivenOr[bool] = NOT_GIVEN,
punctuate: NotGivenOr[bool] = NOT_GIVEN,
spoken_punctuation: NotGivenOr[bool] = NOT_GIVEN,
model: NotGivenOr[SpeechModels] = NOT_GIVEN,
min_confidence_threshold: NotGivenOr[float] = NOT_GIVEN,
keywords: NotGivenOr[list[tuple[str, float]]] = NOT_GIVEN)-
Expand source code
def update_options( self, *, languages: NotGivenOr[LanguageCode] = NOT_GIVEN, detect_language: NotGivenOr[bool] = NOT_GIVEN, interim_results: NotGivenOr[bool] = NOT_GIVEN, punctuate: NotGivenOr[bool] = NOT_GIVEN, spoken_punctuation: NotGivenOr[bool] = NOT_GIVEN, model: NotGivenOr[SpeechModels] = NOT_GIVEN, min_confidence_threshold: NotGivenOr[float] = NOT_GIVEN, keywords: NotGivenOr[list[tuple[str, float]]] = NOT_GIVEN, ): if is_given(languages): if isinstance(languages, str): languages = [languages] self._config.languages = languages if is_given(detect_language): self._config.detect_language = detect_language if is_given(interim_results): self._config.interim_results = interim_results if is_given(punctuate): self._config.punctuate = punctuate if is_given(spoken_punctuation): self._config.spoken_punctuation = spoken_punctuation if is_given(model): self._config.model = model if is_given(min_confidence_threshold): self._config.min_confidence_threshold = min_confidence_threshold if is_given(keywords): self._config.keywords = keywords self._reconnect_event.set()
class TTS (*,
language: NotGivenOr[SpeechLanguages | str] = NOT_GIVEN,
gender: NotGivenOr[Gender | str] = NOT_GIVEN,
voice_name: NotGivenOr[str] = NOT_GIVEN,
sample_rate: int = 24000,
pitch: int = 0,
effects_profile_id: str = '',
speaking_rate: float = 1.0,
location: str = 'global',
audio_encoding: texttospeech.AudioEncoding = 7,
credentials_info: NotGivenOr[dict] = NOT_GIVEN,
credentials_file: NotGivenOr[str] = NOT_GIVEN)-
Expand source code
class TTS(tts.TTS): def __init__( self, *, language: NotGivenOr[SpeechLanguages | str] = NOT_GIVEN, gender: NotGivenOr[Gender | str] = NOT_GIVEN, voice_name: NotGivenOr[str] = NOT_GIVEN, sample_rate: int = 24000, pitch: int = 0, effects_profile_id: str = "", speaking_rate: float = 1.0, location: str = "global", audio_encoding: texttospeech.AudioEncoding = texttospeech.AudioEncoding.PCM, credentials_info: NotGivenOr[dict] = NOT_GIVEN, credentials_file: NotGivenOr[str] = NOT_GIVEN, ) -> None: """ Create a new instance of Google TTS. Credentials must be provided, either by using the ``credentials_info`` dict, or reading from the file specified in ``credentials_file`` or the ``GOOGLE_APPLICATION_CREDENTIALS`` environmental variable. Args: language (SpeechLanguages | str, optional): Language code (e.g., "en-US"). Default is "en-US". gender (Gender | str, optional): Voice gender ("male", "female", "neutral"). Default is "neutral". voice_name (str, optional): Specific voice name. Default is an empty string. sample_rate (int, optional): Audio sample rate in Hz. Default is 24000. location (str, optional): Location for the TTS client. Default is "global". pitch (float, optional): Speaking pitch, ranging from -20.0 to 20.0 semitones relative to the original pitch. Default is 0. effects_profile_id (str): Optional identifier for selecting audio effects profiles to apply to the synthesized speech. speaking_rate (float, optional): Speed of speech. Default is 1.0. credentials_info (dict, optional): Dictionary containing Google Cloud credentials. Default is None. credentials_file (str, optional): Path to the Google Cloud credentials JSON file. Default is None. """ # noqa: E501 super().__init__( capabilities=tts.TTSCapabilities( streaming=False, ), sample_rate=sample_rate, num_channels=1, ) self._client: texttospeech.TextToSpeechAsyncClient | None = None self._credentials_info = credentials_info self._credentials_file = credentials_file self._location = location lang = language if is_given(language) else "en-US" ssml_gender = _gender_from_str("neutral" if not is_given(gender) else gender) name = "" if not is_given(voice_name) else voice_name voice_params = texttospeech.VoiceSelectionParams( name=name, language_code=lang, ssml_gender=ssml_gender, ) self._opts = _TTSOptions( voice=voice_params, audio_config=texttospeech.AudioConfig( audio_encoding=audio_encoding, sample_rate_hertz=sample_rate, pitch=pitch, effects_profile_id=effects_profile_id, speaking_rate=speaking_rate, ), ) def update_options( self, *, language: NotGivenOr[SpeechLanguages | str] = NOT_GIVEN, gender: NotGivenOr[Gender | str] = NOT_GIVEN, voice_name: NotGivenOr[str] = NOT_GIVEN, speaking_rate: NotGivenOr[float] = NOT_GIVEN, ) -> None: """ Update the TTS options. Args: language (SpeechLanguages | str, optional): Language code (e.g., "en-US"). gender (Gender | str, optional): Voice gender ("male", "female", "neutral"). voice_name (str, optional): Specific voice name. speaking_rate (float, optional): Speed of speech. """ # noqa: E501 params = {} if is_given(language): params["language_code"] = str(language) if is_given(gender): params["ssml_gender"] = _gender_from_str(str(gender)) if is_given(voice_name): params["name"] = voice_name if params: self._opts.voice = texttospeech.VoiceSelectionParams(**params) if is_given(speaking_rate): self._opts.audio_config.speaking_rate = speaking_rate def _ensure_client(self) -> texttospeech.TextToSpeechAsyncClient: api_endpoint = "texttospeech.googleapis.com" if self._location != "global": api_endpoint = f"{self._location}-texttospeech.googleapis.com" if self._client is None: if self._credentials_info: self._client = texttospeech.TextToSpeechAsyncClient.from_service_account_info( self._credentials_info, client_options=ClientOptions(api_endpoint=api_endpoint) ) elif self._credentials_file: self._client = texttospeech.TextToSpeechAsyncClient.from_service_account_file( self._credentials_file, client_options=ClientOptions(api_endpoint=api_endpoint) ) else: self._client = texttospeech.TextToSpeechAsyncClient( client_options=ClientOptions(api_endpoint=api_endpoint) ) assert self._client is not None return self._client def synthesize( self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> ChunkedStream: return ChunkedStream( tts=self, input_text=text, conn_options=conn_options, opts=self._opts, client=self._ensure_client(), )
Helper class that provides a standard way to create an ABC using inheritance.
Create a new instance of Google TTS.
Credentials must be provided, either by using the
credentials_info
dict, or reading from the file specified incredentials_file
or theGOOGLE_APPLICATION_CREDENTIALS
environmental variable.Args
language
:SpeechLanguages | str
, optional- Language code (e.g., "en-US"). Default is "en-US".
gender
:Gender | str
, optional- Voice gender ("male", "female", "neutral"). Default is "neutral".
voice_name
:str
, optional- Specific voice name. Default is an empty string.
sample_rate
:int
, optional- Audio sample rate in Hz. Default is 24000.
location
:str
, optional- Location for the TTS client. Default is "global".
pitch
:float
, optional- Speaking pitch, ranging from -20.0 to 20.0 semitones relative to the original pitch. Default is 0.
effects_profile_id
:str
- Optional identifier for selecting audio effects profiles to apply to the synthesized speech.
speaking_rate
:float
, optional- Speed of speech. Default is 1.0.
credentials_info
:dict
, optional- Dictionary containing Google Cloud credentials. Default is None.
credentials_file
:str
, optional- Path to the Google Cloud credentials JSON file. Default is None.
Ancestors
- livekit.agents.tts.tts.TTS
- abc.ABC
- EventEmitter
- typing.Generic
Methods
def synthesize(self,
text: str,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.google.tts.ChunkedStream-
Expand source code
def synthesize( self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> ChunkedStream: return ChunkedStream( tts=self, input_text=text, conn_options=conn_options, opts=self._opts, client=self._ensure_client(), )
def update_options(self,
*,
language: NotGivenOr[SpeechLanguages | str] = NOT_GIVEN,
gender: NotGivenOr[Gender | str] = NOT_GIVEN,
voice_name: NotGivenOr[str] = NOT_GIVEN,
speaking_rate: NotGivenOr[float] = NOT_GIVEN) ‑> None-
Expand source code
def update_options( self, *, language: NotGivenOr[SpeechLanguages | str] = NOT_GIVEN, gender: NotGivenOr[Gender | str] = NOT_GIVEN, voice_name: NotGivenOr[str] = NOT_GIVEN, speaking_rate: NotGivenOr[float] = NOT_GIVEN, ) -> None: """ Update the TTS options. Args: language (SpeechLanguages | str, optional): Language code (e.g., "en-US"). gender (Gender | str, optional): Voice gender ("male", "female", "neutral"). voice_name (str, optional): Specific voice name. speaking_rate (float, optional): Speed of speech. """ # noqa: E501 params = {} if is_given(language): params["language_code"] = str(language) if is_given(gender): params["ssml_gender"] = _gender_from_str(str(gender)) if is_given(voice_name): params["name"] = voice_name if params: self._opts.voice = texttospeech.VoiceSelectionParams(**params) if is_given(speaking_rate): self._opts.audio_config.speaking_rate = speaking_rate
Update the TTS options.
Args
language
:SpeechLanguages | str
, optional- Language code (e.g., "en-US").
gender
:Gender | str
, optional- Voice gender ("male", "female", "neutral").
voice_name
:str
, optional- Specific voice name.
speaking_rate
:float
, optional- Speed of speech.
Inherited members