Module livekit.plugins.aws

AWS plugin for LiveKit Agents

Support for AWS AI including Bedrock, Polly, Transcribe and optionally Nova Sonic.

See https://docs.livekit.io/agents/integrations/aws/ for more information.

Sub-modules

livekit.plugins.aws.experimental

Classes

class ChunkedStream (*,
tts: TTS,
text: str,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0))
Expand source code
class ChunkedStream(tts.ChunkedStream):
    def __init__(
        self, *, tts: TTS, text: str, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS
    ) -> None:
        super().__init__(tts=tts, input_text=text, conn_options=conn_options)
        self._tts = tts
        self._opts = replace(tts._opts)

    async def _run(self, output_emitter: tts.AudioEmitter) -> None:
        try:
            config = AioConfig(
                connect_timeout=self._conn_options.timeout,
                read_timeout=10,
                retries={"mode": "standard", "total_max_attempts": 1},
            )
            async with self._tts._session.client("polly", config=config) as client:  # type: ignore
                response = await client.synthesize_speech(
                    **_strip_nones(
                        {
                            "Text": self._input_text,
                            "OutputFormat": "mp3",
                            "Engine": self._opts.speech_engine,
                            "VoiceId": self._opts.voice,
                            "TextType": self._opts.text_type,
                            "SampleRate": str(self._opts.sample_rate),
                            "LanguageCode": self._opts.language,
                        }
                    )
                )

                if "AudioStream" in response:
                    output_emitter.initialize(
                        request_id=response["ResponseMetadata"]["RequestId"],
                        sample_rate=self._opts.sample_rate,
                        num_channels=1,
                        mime_type="audio/mp3",
                    )

                    async with response["AudioStream"] as resp:
                        async for data, _ in resp.content.iter_chunks():
                            output_emitter.push(data)
        except botocore.exceptions.ConnectTimeoutError:
            raise APITimeoutError() from None
        except Exception as e:
            raise APIConnectionError() from e

Used by the non-streamed synthesize API, some providers support chunked http responses

Ancestors

  • livekit.agents.tts.tts.ChunkedStream
  • abc.ABC
class LLM (*,
model: NotGivenOr[str] = 'anthropic.claude-3-5-sonnet-20240620-v1:0',
api_key: NotGivenOr[str] = NOT_GIVEN,
api_secret: NotGivenOr[str] = NOT_GIVEN,
region: NotGivenOr[str] = 'us-east-1',
temperature: NotGivenOr[float] = NOT_GIVEN,
max_output_tokens: NotGivenOr[int] = NOT_GIVEN,
top_p: NotGivenOr[float] = NOT_GIVEN,
tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN,
additional_request_fields: NotGivenOr[dict[str, Any]] = NOT_GIVEN,
cache_system: bool = False,
cache_tools: bool = False,
session: aioboto3.Session | None = None)
Expand source code
class LLM(llm.LLM):
    def __init__(
        self,
        *,
        model: NotGivenOr[str] = DEFAULT_TEXT_MODEL,
        api_key: NotGivenOr[str] = NOT_GIVEN,
        api_secret: NotGivenOr[str] = NOT_GIVEN,
        region: NotGivenOr[str] = "us-east-1",
        temperature: NotGivenOr[float] = NOT_GIVEN,
        max_output_tokens: NotGivenOr[int] = NOT_GIVEN,
        top_p: NotGivenOr[float] = NOT_GIVEN,
        tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN,
        additional_request_fields: NotGivenOr[dict[str, Any]] = NOT_GIVEN,
        cache_system: bool = False,
        cache_tools: bool = False,
        session: aioboto3.Session | None = None,
    ) -> None:
        """
        Create a new instance of AWS Bedrock LLM.

        ``api_key``  and ``api_secret`` must be set to your AWS Access key id and secret access key, either using the argument or by setting the
        ``AWS_ACCESS_KEY_ID`` and ``AWS_SECRET_ACCESS_KEY`` environmental variables.

        See https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/bedrock-runtime/client/converse_stream.html for more details on the AWS Bedrock Runtime API.

        Args:
            model (str, optional): model or inference profile arn to use(https://docs.aws.amazon.com/bedrock/latest/userguide/inference-profiles-use.html).
                Defaults to 'anthropic.claude-3-5-sonnet-20240620-v1:0'.
            api_key(str, optional): AWS access key id.
            api_secret(str, optional): AWS secret access key
            region (str, optional): The region to use for AWS API requests. Defaults value is "us-east-1".
            temperature (float, optional): Sampling temperature for response generation. Defaults to 0.8.
            max_output_tokens (int, optional): Maximum number of tokens to generate in the output. Defaults to None.
            top_p (float, optional): The nucleus sampling probability for response generation. Defaults to None.
            tool_choice (ToolChoice, optional): Specifies whether to use tools during response generation. Defaults to "auto".
            additional_request_fields (dict[str, Any], optional): Additional request fields to send to the AWS Bedrock Converse API. Defaults to None.
            cache_system (bool, optional): Caches system messages to reduce token usage. Defaults to False.
            cache_tools (bool, optional): Caches tool definitions to reduce token usage. Defaults to False.
            session (aioboto3.Session, optional): Optional aioboto3 session to use.
        """  # noqa: E501
        super().__init__()

        self._session = session or aioboto3.Session(
            aws_access_key_id=api_key if is_given(api_key) else None,
            aws_secret_access_key=api_secret if is_given(api_secret) else None,
            region_name=region if is_given(region) else None,
        )

        bedrock_model = (
            model if is_given(model) else os.environ.get("BEDROCK_INFERENCE_PROFILE_ARN")
        )
        if not bedrock_model:
            raise ValueError(
                "model or inference profile arn must be set using the argument or by setting the BEDROCK_INFERENCE_PROFILE_ARN environment variable."  # noqa: E501
            )
        self._opts = _LLMOptions(
            model=bedrock_model,
            temperature=temperature,
            tool_choice=tool_choice,
            max_output_tokens=max_output_tokens,
            top_p=top_p,
            additional_request_fields=additional_request_fields,
            cache_system=cache_system,
            cache_tools=cache_tools,
        )

    @property
    def model(self) -> str:
        return self._opts.model

    def chat(
        self,
        *,
        chat_ctx: ChatContext,
        tools: list[FunctionTool | RawFunctionTool] | None = None,
        parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
        tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN,
        temperature: NotGivenOr[float] = NOT_GIVEN,
        extra_kwargs: NotGivenOr[dict[str, Any]] = NOT_GIVEN,
    ) -> LLMStream:
        opts: dict[str, Any] = {}
        extra_kwargs = extra_kwargs if is_given(extra_kwargs) else {}

        if is_given(self._opts.model):
            opts["modelId"] = self._opts.model

        def _get_tool_config() -> dict[str, Any] | None:
            nonlocal tool_choice

            if not tools:
                return None

            tools_list = to_fnc_ctx(tools)
            if self._opts.cache_tools:
                tools_list.append({"cachePoint": {"type": "default"}})

            tool_config: dict[str, Any] = {"tools": tools_list}
            tool_choice = (
                cast(ToolChoice, tool_choice) if is_given(tool_choice) else self._opts.tool_choice
            )
            if is_given(tool_choice):
                if isinstance(tool_choice, dict) and tool_choice.get("type") == "function":
                    tool_config["toolChoice"] = {"tool": {"name": tool_choice["function"]["name"]}}
                elif tool_choice == "required":
                    tool_config["toolChoice"] = {"any": {}}
                elif tool_choice == "auto":
                    tool_config["toolChoice"] = {"auto": {}}
                else:
                    return None

            return tool_config

        tool_config = _get_tool_config()
        if tool_config:
            opts["toolConfig"] = tool_config
        messages, extra_data = chat_ctx.to_provider_format(format="aws")
        opts["messages"] = messages
        if extra_data.system_messages:
            system_messages: list[dict[str, str | dict]] = [
                {"text": content} for content in extra_data.system_messages
            ]
            if self._opts.cache_system:
                system_messages.append({"cachePoint": {"type": "default"}})
            opts["system"] = system_messages

        inference_config: dict[str, Any] = {}
        if is_given(self._opts.max_output_tokens):
            inference_config["maxTokens"] = self._opts.max_output_tokens
        temperature = temperature if is_given(temperature) else self._opts.temperature
        if is_given(temperature):
            inference_config["temperature"] = temperature
        if is_given(self._opts.top_p):
            inference_config["topP"] = self._opts.top_p

        opts["inferenceConfig"] = inference_config
        if is_given(self._opts.additional_request_fields):
            opts["additionalModelRequestFields"] = self._opts.additional_request_fields

        return LLMStream(
            self,
            chat_ctx=chat_ctx,
            tools=tools or [],
            session=self._session,
            conn_options=conn_options,
            extra_kwargs=opts,
        )

Helper class that provides a standard way to create an ABC using inheritance.

Create a new instance of AWS Bedrock LLM.

api_key and api_secret must be set to your AWS Access key id and secret access key, either using the argument or by setting the AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environmental variables.

See https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/bedrock-runtime/client/converse_stream.html for more details on the AWS Bedrock Runtime API.

Args

model : str, optional
model or inference profile arn to use(https://docs.aws.amazon.com/bedrock/latest/userguide/inference-profiles-use.html). Defaults to 'anthropic.claude-3-5-sonnet-20240620-v1:0'.
api_key(str, optional): AWS access key id.
api_secret(str, optional): AWS secret access key
region : str, optional
The region to use for AWS API requests. Defaults value is "us-east-1".
temperature : float, optional
Sampling temperature for response generation. Defaults to 0.8.
max_output_tokens : int, optional
Maximum number of tokens to generate in the output. Defaults to None.
top_p : float, optional
The nucleus sampling probability for response generation. Defaults to None.
tool_choice : ToolChoice, optional
Specifies whether to use tools during response generation. Defaults to "auto".
additional_request_fields : dict[str, Any], optional
Additional request fields to send to the AWS Bedrock Converse API. Defaults to None.
cache_system : bool, optional
Caches system messages to reduce token usage. Defaults to False.
cache_tools : bool, optional
Caches tool definitions to reduce token usage. Defaults to False.
session : aioboto3.Session, optional
Optional aioboto3 session to use.

Ancestors

  • livekit.agents.llm.llm.LLM
  • abc.ABC
  • EventEmitter
  • typing.Generic

Instance variables

prop model : str
Expand source code
@property
def model(self) -> str:
    return self._opts.model

Get the model name/identifier for this LLM instance.

Returns

The model name if available, "unknown" otherwise.

Note

Plugins should override this property to provide their model information.

Methods

def chat(self,
*,
chat_ctx: ChatContext,
tools: list[FunctionTool | RawFunctionTool] | None = None,
parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0),
tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN,
temperature: NotGivenOr[float] = NOT_GIVEN,
extra_kwargs: NotGivenOr[dict[str, Any]] = NOT_GIVEN) ‑> livekit.plugins.aws.llm.LLMStream
Expand source code
def chat(
    self,
    *,
    chat_ctx: ChatContext,
    tools: list[FunctionTool | RawFunctionTool] | None = None,
    parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN,
    conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN,
    temperature: NotGivenOr[float] = NOT_GIVEN,
    extra_kwargs: NotGivenOr[dict[str, Any]] = NOT_GIVEN,
) -> LLMStream:
    opts: dict[str, Any] = {}
    extra_kwargs = extra_kwargs if is_given(extra_kwargs) else {}

    if is_given(self._opts.model):
        opts["modelId"] = self._opts.model

    def _get_tool_config() -> dict[str, Any] | None:
        nonlocal tool_choice

        if not tools:
            return None

        tools_list = to_fnc_ctx(tools)
        if self._opts.cache_tools:
            tools_list.append({"cachePoint": {"type": "default"}})

        tool_config: dict[str, Any] = {"tools": tools_list}
        tool_choice = (
            cast(ToolChoice, tool_choice) if is_given(tool_choice) else self._opts.tool_choice
        )
        if is_given(tool_choice):
            if isinstance(tool_choice, dict) and tool_choice.get("type") == "function":
                tool_config["toolChoice"] = {"tool": {"name": tool_choice["function"]["name"]}}
            elif tool_choice == "required":
                tool_config["toolChoice"] = {"any": {}}
            elif tool_choice == "auto":
                tool_config["toolChoice"] = {"auto": {}}
            else:
                return None

        return tool_config

    tool_config = _get_tool_config()
    if tool_config:
        opts["toolConfig"] = tool_config
    messages, extra_data = chat_ctx.to_provider_format(format="aws")
    opts["messages"] = messages
    if extra_data.system_messages:
        system_messages: list[dict[str, str | dict]] = [
            {"text": content} for content in extra_data.system_messages
        ]
        if self._opts.cache_system:
            system_messages.append({"cachePoint": {"type": "default"}})
        opts["system"] = system_messages

    inference_config: dict[str, Any] = {}
    if is_given(self._opts.max_output_tokens):
        inference_config["maxTokens"] = self._opts.max_output_tokens
    temperature = temperature if is_given(temperature) else self._opts.temperature
    if is_given(temperature):
        inference_config["temperature"] = temperature
    if is_given(self._opts.top_p):
        inference_config["topP"] = self._opts.top_p

    opts["inferenceConfig"] = inference_config
    if is_given(self._opts.additional_request_fields):
        opts["additionalModelRequestFields"] = self._opts.additional_request_fields

    return LLMStream(
        self,
        chat_ctx=chat_ctx,
        tools=tools or [],
        session=self._session,
        conn_options=conn_options,
        extra_kwargs=opts,
    )

Inherited members

class STT (*,
region: NotGivenOr[str] = NOT_GIVEN,
sample_rate: int = 24000,
language: str = 'en-US',
encoding: str = 'pcm',
vocabulary_name: NotGivenOr[str] = NOT_GIVEN,
session_id: NotGivenOr[str] = NOT_GIVEN,
vocab_filter_method: NotGivenOr[str] = NOT_GIVEN,
vocab_filter_name: NotGivenOr[str] = NOT_GIVEN,
show_speaker_label: NotGivenOr[bool] = NOT_GIVEN,
enable_channel_identification: NotGivenOr[bool] = NOT_GIVEN,
number_of_channels: NotGivenOr[int] = NOT_GIVEN,
enable_partial_results_stabilization: NotGivenOr[bool] = NOT_GIVEN,
partial_results_stability: NotGivenOr[str] = NOT_GIVEN,
language_model_name: NotGivenOr[str] = NOT_GIVEN)
Expand source code
class STT(stt.STT):
    def __init__(
        self,
        *,
        region: NotGivenOr[str] = NOT_GIVEN,
        sample_rate: int = 24000,
        language: str = "en-US",
        encoding: str = "pcm",
        vocabulary_name: NotGivenOr[str] = NOT_GIVEN,
        session_id: NotGivenOr[str] = NOT_GIVEN,
        vocab_filter_method: NotGivenOr[str] = NOT_GIVEN,
        vocab_filter_name: NotGivenOr[str] = NOT_GIVEN,
        show_speaker_label: NotGivenOr[bool] = NOT_GIVEN,
        enable_channel_identification: NotGivenOr[bool] = NOT_GIVEN,
        number_of_channels: NotGivenOr[int] = NOT_GIVEN,
        enable_partial_results_stabilization: NotGivenOr[bool] = NOT_GIVEN,
        partial_results_stability: NotGivenOr[str] = NOT_GIVEN,
        language_model_name: NotGivenOr[str] = NOT_GIVEN,
    ):
        super().__init__(capabilities=stt.STTCapabilities(streaming=True, interim_results=True))

        if not is_given(region):
            region = os.getenv("AWS_REGION") or DEFAULT_REGION

        self._config = STTOptions(
            language=language,
            sample_rate=sample_rate,
            encoding=encoding,
            vocabulary_name=vocabulary_name,
            session_id=session_id,
            vocab_filter_method=vocab_filter_method,
            vocab_filter_name=vocab_filter_name,
            show_speaker_label=show_speaker_label,
            enable_channel_identification=enable_channel_identification,
            number_of_channels=number_of_channels,
            enable_partial_results_stabilization=enable_partial_results_stabilization,
            partial_results_stability=partial_results_stability,
            language_model_name=language_model_name,
            region=region,
        )

    async def aclose(self) -> None:
        await super().aclose()

    async def _recognize_impl(
        self,
        buffer: utils.AudioBuffer,
        *,
        language: NotGivenOr[str] = NOT_GIVEN,
        conn_options: APIConnectOptions,
    ) -> stt.SpeechEvent:
        raise NotImplementedError("Amazon Transcribe does not support single frame recognition")

    def stream(
        self,
        *,
        language: NotGivenOr[str] = NOT_GIVEN,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    ) -> SpeechStream:
        return SpeechStream(stt=self, conn_options=conn_options, opts=self._config)

Helper class that provides a standard way to create an ABC using inheritance.

Ancestors

  • livekit.agents.stt.stt.STT
  • abc.ABC
  • EventEmitter
  • typing.Generic

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    await super().aclose()

Close the STT, and every stream/requests associated with it

def stream(self,
*,
language: NotGivenOr[str] = NOT_GIVEN,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.aws.stt.SpeechStream
Expand source code
def stream(
    self,
    *,
    language: NotGivenOr[str] = NOT_GIVEN,
    conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
) -> SpeechStream:
    return SpeechStream(stt=self, conn_options=conn_options, opts=self._config)

Inherited members

class SpeechStream (stt: STT,
opts: STTOptions,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0))
Expand source code
class SpeechStream(stt.SpeechStream):
    def __init__(
        self,
        stt: STT,
        opts: STTOptions,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    ) -> None:
        super().__init__(stt=stt, conn_options=conn_options, sample_rate=opts.sample_rate)
        self._opts = opts

    async def _run(self) -> None:
        while True:
            client = TranscribeStreamingClient(
                region=self._opts.region,
                credential_resolver=AwsCrtCredentialResolver(None),  # type: ignore
            )

            live_config = {
                "language_code": self._opts.language,
                "media_sample_rate_hz": self._opts.sample_rate,
                "media_encoding": self._opts.encoding,
                "vocabulary_name": self._opts.vocabulary_name,
                "session_id": self._opts.session_id,
                "vocab_filter_method": self._opts.vocab_filter_method,
                "vocab_filter_name": self._opts.vocab_filter_name,
                "show_speaker_label": self._opts.show_speaker_label,
                "enable_channel_identification": self._opts.enable_channel_identification,
                "number_of_channels": self._opts.number_of_channels,
                "enable_partial_results_stabilization": self._opts.enable_partial_results_stabilization,  # noqa: E501
                "partial_results_stability": self._opts.partial_results_stability,
                "language_model_name": self._opts.language_model_name,
            }
            filtered_config = {k: v for k, v in live_config.items() if v and is_given(v)}
            stream = await client.start_stream_transcription(**filtered_config)  # type: ignore

            async def input_generator(stream: StartStreamTranscriptionEventStream) -> None:
                async for frame in self._input_ch:
                    if isinstance(frame, rtc.AudioFrame):
                        await stream.input_stream.send_audio_event(audio_chunk=frame.data.tobytes())
                await stream.input_stream.end_stream()  # type: ignore

            async def handle_transcript_events(stream: StartStreamTranscriptionEventStream) -> None:
                async for event in stream.output_stream:
                    if isinstance(event, TranscriptEvent):
                        self._process_transcript_event(event)

            tasks = [
                asyncio.create_task(input_generator(stream)),
                asyncio.create_task(handle_transcript_events(stream)),
            ]
            try:
                await asyncio.gather(*tasks)
            except BadRequestException as e:
                if e.message and e.message.startswith("Your request timed out"):
                    # AWS times out after 15s of inactivity, this tends to happen
                    # at the end of the session, when the input is gone, we'll ignore it and
                    # just treat it as a silent retry
                    logger.info("restarting transcribe session")
                    continue
                else:
                    raise e
            finally:
                await utils.aio.gracefully_cancel(*tasks)

    def _process_transcript_event(self, transcript_event: TranscriptEvent) -> None:
        stream = transcript_event.transcript.results
        for resp in stream:
            if resp.start_time and resp.start_time == 0.0:
                self._event_ch.send_nowait(
                    stt.SpeechEvent(type=stt.SpeechEventType.START_OF_SPEECH)
                )

            if resp.end_time and resp.end_time > 0.0:
                if resp.is_partial:
                    self._event_ch.send_nowait(
                        stt.SpeechEvent(
                            type=stt.SpeechEventType.INTERIM_TRANSCRIPT,
                            alternatives=[self._streaming_recognize_response_to_speech_data(resp)],
                        )
                    )

                else:
                    self._event_ch.send_nowait(
                        stt.SpeechEvent(
                            type=stt.SpeechEventType.FINAL_TRANSCRIPT,
                            alternatives=[self._streaming_recognize_response_to_speech_data(resp)],
                        )
                    )

            if not resp.is_partial:
                self._event_ch.send_nowait(stt.SpeechEvent(type=stt.SpeechEventType.END_OF_SPEECH))

    def _streaming_recognize_response_to_speech_data(self, resp: Result) -> stt.SpeechData:
        confidence = 0.0
        if resp.alternatives and (items := resp.alternatives[0].items):
            confidence = items[0].confidence or 0.0

        return stt.SpeechData(
            language=resp.language_code or self._opts.language,
            start_time=resp.start_time if resp.start_time is not None else 0.0,
            end_time=resp.end_time if resp.end_time is not None else 0.0,
            text=resp.alternatives[0].transcript if resp.alternatives else "",
            confidence=confidence,
        )

Helper class that provides a standard way to create an ABC using inheritance.

Args: sample_rate : int or None, optional The desired sample rate for the audio input. If specified, the audio input will be automatically resampled to match the given sample rate before being processed for Speech-to-Text. If not provided (None), the input will retain its original sample rate.

Ancestors

  • livekit.agents.stt.stt.RecognizeStream
  • abc.ABC
class TTS (*,
voice: str = 'Ruth',
language: NotGivenOr[TTSLanguages | str] = NOT_GIVEN,
speech_engine: TTSSpeechEngine = 'generative',
text_type: TTSTextType = 'text',
sample_rate: int = 16000,
region: str | None = None,
api_key: str | None = None,
api_secret: str | None = None,
session: aioboto3.Session | None = None)
Expand source code
class TTS(tts.TTS):
    def __init__(
        self,
        *,
        voice: str = "Ruth",
        language: NotGivenOr[TTSLanguages | str] = NOT_GIVEN,
        speech_engine: TTSSpeechEngine = "generative",
        text_type: TTSTextType = "text",
        sample_rate: int = 16000,
        region: str | None = None,
        api_key: str | None = None,
        api_secret: str | None = None,
        session: aioboto3.Session | None = None,
    ) -> None:
        """
        Create a new instance of AWS Polly TTS.

        ``api_key``  and ``api_secret`` must be set to your AWS Access key id and secret access key, either using the argument or by setting the
        ``AWS_ACCESS_KEY_ID`` and ``AWS_SECRET_ACCESS_KEY`` environmental variables.

        See https://docs.aws.amazon.com/polly/latest/dg/API_SynthesizeSpeech.html for more details on the the AWS Polly TTS.

        Args:
            voice (TTSModels, optional): Voice ID to use for the synthesis. Defaults to "Ruth".
            language (TTSLanguages, optional): language code for the Synthesize Speech request. This is only necessary if using a bilingual voice, such as Aditi, which can be used for either Indian English (en-IN) or Hindi (hi-IN).
            speech_engine(TTSSpeechEngine, optional): The engine to use for the synthesis. Defaults to "generative".
            text_type(TTSTextType, optional): Type of text to synthesize. Use "ssml" for SSML-enhanced text. Defaults to "text".
            sample_rate(int, optional): The audio frequency specified in Hz. Defaults to 16000.
            region(str, optional): The region to use for the synthesis. Defaults to "us-east-1".
            api_key(str, optional): AWS access key id.
            api_secret(str, optional): AWS secret access key.
            session(aioboto3.Session, optional): Optional aioboto3 session to use.
        """  # noqa: E501
        super().__init__(
            capabilities=tts.TTSCapabilities(
                streaming=False,
            ),
            sample_rate=sample_rate,
            num_channels=1,
        )
        self._session = session or aioboto3.Session(
            aws_access_key_id=api_key if is_given(api_key) else None,
            aws_secret_access_key=api_secret if is_given(api_secret) else None,
            region_name=region if is_given(region) else None,
        )

        self._opts = _TTSOptions(
            voice=voice,
            speech_engine=speech_engine,
            text_type=text_type,
            region=region or None,
            language=language or None,
            sample_rate=sample_rate,
        )

    def synthesize(
        self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS
    ) -> ChunkedStream:
        return ChunkedStream(tts=self, text=text, conn_options=conn_options)

    def update_options(
        self,
        *,
        voice: NotGivenOr[str] = NOT_GIVEN,
        language: NotGivenOr[str] = NOT_GIVEN,
        speech_engine: NotGivenOr[TTSSpeechEngine] = NOT_GIVEN,
        text_type: NotGivenOr[TTSTextType] = NOT_GIVEN,
    ) -> None:
        if is_given(voice):
            self._opts.voice = voice
        if is_given(language):
            self._opts.language = language
        if is_given(speech_engine):
            self._opts.speech_engine = cast(TTSSpeechEngine, speech_engine)
        if is_given(text_type):
            self._opts.text_type = cast(TTSTextType, text_type)

Helper class that provides a standard way to create an ABC using inheritance.

Create a new instance of AWS Polly TTS.

api_key and api_secret must be set to your AWS Access key id and secret access key, either using the argument or by setting the AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environmental variables.

See https://docs.aws.amazon.com/polly/latest/dg/API_SynthesizeSpeech.html for more details on the the AWS Polly TTS.

Args

voice : TTSModels, optional
Voice ID to use for the synthesis. Defaults to "Ruth".
language : TTSLanguages, optional
language code for the Synthesize Speech request. This is only necessary if using a bilingual voice, such as Aditi, which can be used for either Indian English (en-IN) or Hindi (hi-IN).

speech_engine(TTSSpeechEngine, optional): The engine to use for the synthesis. Defaults to "generative". text_type(TTSTextType, optional): Type of text to synthesize. Use "ssml" for SSML-enhanced text. Defaults to "text". sample_rate(int, optional): The audio frequency specified in Hz. Defaults to 16000. region(str, optional): The region to use for the synthesis. Defaults to "us-east-1". api_key(str, optional): AWS access key id. api_secret(str, optional): AWS secret access key. session(aioboto3.Session, optional): Optional aioboto3 session to use.

Ancestors

  • livekit.agents.tts.tts.TTS
  • abc.ABC
  • EventEmitter
  • typing.Generic

Methods

def synthesize(self,
text: str,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.aws.tts.ChunkedStream
Expand source code
def synthesize(
    self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS
) -> ChunkedStream:
    return ChunkedStream(tts=self, text=text, conn_options=conn_options)
def update_options(self,
*,
voice: NotGivenOr[str] = NOT_GIVEN,
language: NotGivenOr[str] = NOT_GIVEN,
speech_engine: NotGivenOr[TTSSpeechEngine] = NOT_GIVEN,
text_type: NotGivenOr[TTSTextType] = NOT_GIVEN) ‑> None
Expand source code
def update_options(
    self,
    *,
    voice: NotGivenOr[str] = NOT_GIVEN,
    language: NotGivenOr[str] = NOT_GIVEN,
    speech_engine: NotGivenOr[TTSSpeechEngine] = NOT_GIVEN,
    text_type: NotGivenOr[TTSTextType] = NOT_GIVEN,
) -> None:
    if is_given(voice):
        self._opts.voice = voice
    if is_given(language):
        self._opts.language = language
    if is_given(speech_engine):
        self._opts.speech_engine = cast(TTSSpeechEngine, speech_engine)
    if is_given(text_type):
        self._opts.text_type = cast(TTSTextType, text_type)

Inherited members