Module livekit.plugins.aws

Sub-modules

livekit.plugins.aws.llm
livekit.plugins.aws.log
livekit.plugins.aws.models
livekit.plugins.aws.stt
livekit.plugins.aws.tts
livekit.plugins.aws.version

Classes

class ChunkedStream (*,
tts: TTS,
text: str,
conn_options: Optional[APIConnectOptions] = None,
opts: _TTSOptions,
get_client: Callable[[], Any])
Expand source code
class ChunkedStream(tts.ChunkedStream):
    def __init__(
        self,
        *,
        tts: TTS,
        text: str,
        conn_options: Optional[APIConnectOptions] = None,
        opts: _TTSOptions,
        get_client: Callable[[], Any],
    ) -> None:
        super().__init__(tts=tts, input_text=text, conn_options=conn_options)
        self._opts = opts
        self._get_client = get_client
        self._segment_id = utils.shortuuid()

    async def _run(self):
        request_id = utils.shortuuid()

        try:
            async with self._get_client() as client:
                params = {
                    "Text": self._input_text,
                    "OutputFormat": "mp3",
                    "Engine": self._opts.speech_engine,
                    "VoiceId": self._opts.voice,
                    "TextType": "text",
                    "SampleRate": str(self._opts.sample_rate),
                    "LanguageCode": self._opts.language,
                }
                response = await client.synthesize_speech(**_strip_nones(params))
                if "AudioStream" in response:
                    decoder = utils.codecs.AudioStreamDecoder(
                        sample_rate=self._opts.sample_rate,
                        num_channels=1,
                    )

                    # Create a task to push data to the decoder
                    async def push_data():
                        try:
                            async with response["AudioStream"] as resp:
                                async for data, _ in resp.content.iter_chunks():
                                    decoder.push(data)
                        finally:
                            decoder.end_input()

                    # Start pushing data to the decoder
                    push_task = asyncio.create_task(push_data())

                    try:
                        # Create emitter and process decoded frames
                        emitter = tts.SynthesizedAudioEmitter(
                            event_ch=self._event_ch,
                            request_id=request_id,
                            segment_id=self._segment_id,
                        )
                        async for frame in decoder:
                            emitter.push(frame)
                        emitter.flush()
                        await push_task
                    finally:
                        await utils.aio.gracefully_cancel(push_task)

        except asyncio.TimeoutError as e:
            raise APITimeoutError() from e
        except aiohttp.ClientResponseError as e:
            raise APIStatusError(
                message=e.message,
                status_code=e.status,
                request_id=request_id,
                body=None,
            ) from e
        except Exception as e:
            raise APIConnectionError() from e

Used by the non-streamed synthesize API, some providers support chunked http responses

Ancestors

Inherited members

class LLM (*,
model: TEXT_MODEL | str = 'anthropic.claude-3-5-sonnet-20240620-v1:0',
api_key: str | None = None,
api_secret: str | None = None,
region: str = 'us-east-1',
temperature: float = 0.8,
max_output_tokens: int | None = None,
top_p: float | None = None,
tool_choice: "Union[ToolChoice, Literal['auto', 'required', 'none']]" = 'auto',
additional_request_fields: dict[str, Any] | None = None)
Expand source code
class LLM(llm.LLM):
    def __init__(
        self,
        *,
        model: TEXT_MODEL | str = "anthropic.claude-3-5-sonnet-20240620-v1:0",
        api_key: str | None = None,
        api_secret: str | None = None,
        region: str = "us-east-1",
        temperature: float = 0.8,
        max_output_tokens: int | None = None,
        top_p: float | None = None,
        tool_choice: Union[ToolChoice, Literal["auto", "required", "none"]] = "auto",
        additional_request_fields: dict[str, Any] | None = None,
    ) -> None:
        """
        Create a new instance of AWS Bedrock LLM.

        ``api_key``  and ``api_secret`` must be set to your AWS Access key id and secret access key, either using the argument or by setting the
        ``AWS_ACCESS_KEY_ID`` and ``AWS_SECRET_ACCESS_KEY`` environmental variables.

        See https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/bedrock-runtime/client/converse_stream.html for more details on the the AWS Bedrock Runtime API.

        Args:
            model (TEXT_MODEL, optional): model or inference profile arn to use(https://docs.aws.amazon.com/bedrock/latest/userguide/inference-profiles-use.html). Defaults to 'anthropic.claude-3-5-sonnet-20240620-v1:0'.
            api_key(str, optional): AWS access key id.
            api_secret(str, optional): AWS secret access key
            region (str, optional): The region to use for AWS API requests. Defaults value is "us-east-1".
            temperature (float, optional): Sampling temperature for response generation. Defaults to 0.8.
            max_output_tokens (int, optional): Maximum number of tokens to generate in the output. Defaults to None.
            top_p (float, optional): The nucleus sampling probability for response generation. Defaults to None.
            tool_choice (ToolChoice or Literal["auto", "required", "none"], optional): Specifies whether to use tools during response generation. Defaults to "auto".
            additional_request_fields (dict[str, Any], optional): Additional request fields to send to the AWS Bedrock Converse API. Defaults to None.
        """
        super().__init__(
            capabilities=LLMCapabilities(
                supports_choices_on_int=True,
                requires_persistent_functions=True,
            )
        )
        self._api_key, self._api_secret = _get_aws_credentials(
            api_key, api_secret, region
        )

        self._model = model or os.environ.get("BEDROCK_INFERENCE_PROFILE_ARN")
        if not self._model:
            raise ValueError(
                "model or inference profile arn must be set using the argument or by setting the BEDROCK_INFERENCE_PROFILE_ARN environment variable."
            )
        self._opts = LLMOptions(
            model=self._model,
            temperature=temperature,
            tool_choice=tool_choice,
            max_output_tokens=max_output_tokens,
            top_p=top_p,
            additional_request_fields=additional_request_fields,
        )
        self._region = region
        self._running_fncs: MutableSet[asyncio.Task[Any]] = set()

    def chat(
        self,
        *,
        chat_ctx: llm.ChatContext,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
        fnc_ctx: llm.FunctionContext | None = None,
        temperature: float | None = None,
        n: int | None = 1,
        parallel_tool_calls: bool | None = None,
        tool_choice: Union[ToolChoice, Literal["auto", "required", "none"]]
        | None = None,
    ) -> "LLMStream":
        if tool_choice is None:
            tool_choice = self._opts.tool_choice

        if temperature is None:
            temperature = self._opts.temperature

        return LLMStream(
            self,
            model=self._opts.model,
            aws_access_key_id=self._api_key,
            aws_secret_access_key=self._api_secret,
            region_name=self._region,
            max_output_tokens=self._opts.max_output_tokens,
            top_p=self._opts.top_p,
            additional_request_fields=self._opts.additional_request_fields,
            chat_ctx=chat_ctx,
            fnc_ctx=fnc_ctx,
            conn_options=conn_options,
            temperature=temperature,
            tool_choice=tool_choice,
        )

Helper class that provides a standard way to create an ABC using inheritance.

Create a new instance of AWS Bedrock LLM.

api_key and api_secret must be set to your AWS Access key id and secret access key, either using the argument or by setting the AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environmental variables.

See https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/bedrock-runtime/client/converse_stream.html for more details on the the AWS Bedrock Runtime API.

Args

model : TEXT_MODEL, optional
model or inference profile arn to use(https://docs.aws.amazon.com/bedrock/latest/userguide/inference-profiles-use.html). Defaults to 'anthropic.claude-3-5-sonnet-20240620-v1:0'.
api_key(str, optional): AWS access key id.
api_secret(str, optional): AWS secret access key
region : str, optional
The region to use for AWS API requests. Defaults value is "us-east-1".
temperature : float, optional
Sampling temperature for response generation. Defaults to 0.8.
max_output_tokens : int, optional
Maximum number of tokens to generate in the output. Defaults to None.
top_p : float, optional
The nucleus sampling probability for response generation. Defaults to None.
tool_choice (ToolChoice or Literal["auto", "required", "none"], optional): Specifies whether to use tools during response generation. Defaults to "auto".
additional_request_fields : dict[str, Any], optional
Additional request fields to send to the AWS Bedrock Converse API. Defaults to None.

Ancestors

Methods

def chat(self,
*,
chat_ctx: llm.ChatContext,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0),
fnc_ctx: llm.FunctionContext | None = None,
temperature: float | None = None,
n: int | None = 1,
parallel_tool_calls: bool | None = None,
tool_choice: "Union[ToolChoice, Literal['auto', 'required', 'none']] | None" = None) ‑> LLMStream
Expand source code
def chat(
    self,
    *,
    chat_ctx: llm.ChatContext,
    conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    fnc_ctx: llm.FunctionContext | None = None,
    temperature: float | None = None,
    n: int | None = 1,
    parallel_tool_calls: bool | None = None,
    tool_choice: Union[ToolChoice, Literal["auto", "required", "none"]]
    | None = None,
) -> "LLMStream":
    if tool_choice is None:
        tool_choice = self._opts.tool_choice

    if temperature is None:
        temperature = self._opts.temperature

    return LLMStream(
        self,
        model=self._opts.model,
        aws_access_key_id=self._api_key,
        aws_secret_access_key=self._api_secret,
        region_name=self._region,
        max_output_tokens=self._opts.max_output_tokens,
        top_p=self._opts.top_p,
        additional_request_fields=self._opts.additional_request_fields,
        chat_ctx=chat_ctx,
        fnc_ctx=fnc_ctx,
        conn_options=conn_options,
        temperature=temperature,
        tool_choice=tool_choice,
    )

Inherited members

class STT (*,
speech_region: str = 'us-east-1',
api_key: str | None = None,
api_secret: str | None = None,
sample_rate: int = 48000,
language: str = 'en-US',
encoding: str = 'pcm',
vocabulary_name: Optional[str] = None,
session_id: Optional[str] = None,
vocab_filter_method: Optional[str] = None,
vocab_filter_name: Optional[str] = None,
show_speaker_label: Optional[bool] = None,
enable_channel_identification: Optional[bool] = None,
number_of_channels: Optional[int] = None,
enable_partial_results_stabilization: Optional[bool] = None,
partial_results_stability: Optional[str] = None,
language_model_name: Optional[str] = None)
Expand source code
class STT(stt.STT):
    def __init__(
        self,
        *,
        speech_region: str = "us-east-1",
        api_key: str | None = None,
        api_secret: str | None = None,
        sample_rate: int = 48000,
        language: str = "en-US",
        encoding: str = "pcm",
        vocabulary_name: Optional[str] = None,
        session_id: Optional[str] = None,
        vocab_filter_method: Optional[str] = None,
        vocab_filter_name: Optional[str] = None,
        show_speaker_label: Optional[bool] = None,
        enable_channel_identification: Optional[bool] = None,
        number_of_channels: Optional[int] = None,
        enable_partial_results_stabilization: Optional[bool] = None,
        partial_results_stability: Optional[str] = None,
        language_model_name: Optional[str] = None,
    ):
        super().__init__(
            capabilities=stt.STTCapabilities(streaming=True, interim_results=True)
        )

        self._api_key, self._api_secret = _get_aws_credentials(
            api_key, api_secret, speech_region
        )
        self._config = STTOptions(
            speech_region=speech_region,
            language=language,
            sample_rate=sample_rate,
            encoding=encoding,
            vocabulary_name=vocabulary_name,
            session_id=session_id,
            vocab_filter_method=vocab_filter_method,
            vocab_filter_name=vocab_filter_name,
            show_speaker_label=show_speaker_label,
            enable_channel_identification=enable_channel_identification,
            number_of_channels=number_of_channels,
            enable_partial_results_stabilization=enable_partial_results_stabilization,
            partial_results_stability=partial_results_stability,
            language_model_name=language_model_name,
        )

    async def _recognize_impl(
        self,
        buffer: utils.AudioBuffer,
        *,
        language: str | None,
        conn_options: APIConnectOptions,
    ) -> stt.SpeechEvent:
        raise NotImplementedError(
            "Amazon Transcribe does not support single frame recognition"
        )

    def stream(
        self,
        *,
        language: str | None = None,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    ) -> "SpeechStream":
        return SpeechStream(
            stt=self,
            conn_options=conn_options,
            opts=self._config,
        )

Helper class that provides a standard way to create an ABC using inheritance.

Ancestors

Methods

def stream(self,
*,
language: str | None = None,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> SpeechStream
Expand source code
def stream(
    self,
    *,
    language: str | None = None,
    conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
) -> "SpeechStream":
    return SpeechStream(
        stt=self,
        conn_options=conn_options,
        opts=self._config,
    )

Inherited members

class SpeechStream (stt: STT,
opts: STTOptions,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0))
Expand source code
class SpeechStream(stt.SpeechStream):
    def __init__(
        self,
        stt: STT,
        opts: STTOptions,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    ) -> None:
        super().__init__(
            stt=stt, conn_options=conn_options, sample_rate=opts.sample_rate
        )
        self._opts = opts
        self._client = TranscribeStreamingClient(region=self._opts.speech_region)

    async def _run(self) -> None:
        stream = await self._client.start_stream_transcription(
            language_code=self._opts.language,
            media_sample_rate_hz=self._opts.sample_rate,
            media_encoding=self._opts.encoding,
            vocabulary_name=self._opts.vocabulary_name,
            session_id=self._opts.session_id,
            vocab_filter_method=self._opts.vocab_filter_method,
            vocab_filter_name=self._opts.vocab_filter_name,
            show_speaker_label=self._opts.show_speaker_label,
            enable_channel_identification=self._opts.enable_channel_identification,
            number_of_channels=self._opts.number_of_channels,
            enable_partial_results_stabilization=self._opts.enable_partial_results_stabilization,
            partial_results_stability=self._opts.partial_results_stability,
            language_model_name=self._opts.language_model_name,
        )

        @utils.log_exceptions(logger=logger)
        async def input_generator():
            async for frame in self._input_ch:
                if isinstance(frame, rtc.AudioFrame):
                    await stream.input_stream.send_audio_event(
                        audio_chunk=frame.data.tobytes()
                    )
            await stream.input_stream.end_stream()

        @utils.log_exceptions(logger=logger)
        async def handle_transcript_events():
            async for event in stream.output_stream:
                if isinstance(event, TranscriptEvent):
                    self._process_transcript_event(event)

        tasks = [
            asyncio.create_task(input_generator()),
            asyncio.create_task(handle_transcript_events()),
        ]
        try:
            await asyncio.gather(*tasks)
        finally:
            await utils.aio.gracefully_cancel(*tasks)

    def _process_transcript_event(self, transcript_event: TranscriptEvent):
        stream = transcript_event.transcript.results
        for resp in stream:
            if resp.start_time and resp.start_time == 0.0:
                self._event_ch.send_nowait(
                    stt.SpeechEvent(type=stt.SpeechEventType.START_OF_SPEECH)
                )

            if resp.end_time and resp.end_time > 0.0:
                if resp.is_partial:
                    self._event_ch.send_nowait(
                        stt.SpeechEvent(
                            type=stt.SpeechEventType.INTERIM_TRANSCRIPT,
                            alternatives=[
                                _streaming_recognize_response_to_speech_data(resp)
                            ],
                        )
                    )

                else:
                    self._event_ch.send_nowait(
                        stt.SpeechEvent(
                            type=stt.SpeechEventType.FINAL_TRANSCRIPT,
                            alternatives=[
                                _streaming_recognize_response_to_speech_data(resp)
                            ],
                        )
                    )

            if not resp.is_partial:
                self._event_ch.send_nowait(
                    stt.SpeechEvent(type=stt.SpeechEventType.END_OF_SPEECH)
                )

Helper class that provides a standard way to create an ABC using inheritance.

Args: sample_rate : int or None, optional The desired sample rate for the audio input. If specified, the audio input will be automatically resampled to match the given sample rate before being processed for Speech-to-Text. If not provided (None), the input will retain its original sample rate.

Ancestors

Inherited members

class TTS (*,
voice: str | None = 'Ruth',
language: TTS_LANGUAGE | str | None = None,
speech_engine: TTS_SPEECH_ENGINE = 'generative',
sample_rate: int = 16000,
speech_region: str = 'us-east-1',
api_key: str | None = None,
api_secret: str | None = None,
session: AioSession | None = None)
Expand source code
class TTS(tts.TTS):
    def __init__(
        self,
        *,
        voice: str | None = DEFAULT_VOICE,
        language: TTS_LANGUAGE | str | None = None,
        speech_engine: TTS_SPEECH_ENGINE = DEFAULT_SPEECH_ENGINE,
        sample_rate: int = DEFAULT_SAMPLE_RATE,
        speech_region: str = DEFAULT_SPEECH_REGION,
        api_key: str | None = None,
        api_secret: str | None = None,
        session: AioSession | None = None,
    ) -> None:
        """
        Create a new instance of AWS Polly TTS.

        ``api_key``  and ``api_secret`` must be set to your AWS Access key id and secret access key, either using the argument or by setting the
        ``AWS_ACCESS_KEY_ID`` and ``AWS_SECRET_ACCESS_KEY`` environmental variables.

        See https://docs.aws.amazon.com/polly/latest/dg/API_SynthesizeSpeech.html for more details on the the AWS Polly TTS.

        Args:
            Voice (TTSModels, optional): Voice ID to use for the synthesis. Defaults to "Ruth".
            language (TTS_LANGUAGE, optional): language code for the Synthesize Speech request. This is only necessary if using a bilingual voice, such as Aditi, which can be used for either Indian English (en-IN) or Hindi (hi-IN).
            sample_rate(int, optional): The audio frequency specified in Hz. Defaults to 16000.
            speech_engine(TTS_SPEECH_ENGINE, optional): The engine to use for the synthesis. Defaults to "generative".
            speech_region(str, optional): The region to use for the synthesis. Defaults to "us-east-1".
            api_key(str, optional): AWS access key id.
            api_secret(str, optional): AWS secret access key.
        """
        super().__init__(
            capabilities=tts.TTSCapabilities(
                streaming=False,
            ),
            sample_rate=sample_rate,
            num_channels=TTS_NUM_CHANNELS,
        )

        self._api_key, self._api_secret = _get_aws_credentials(
            api_key, api_secret, speech_region
        )

        self._opts = _TTSOptions(
            voice=voice,
            speech_engine=speech_engine,
            speech_region=speech_region,
            language=language,
            sample_rate=sample_rate,
        )
        self._session = session or get_session()

    def _get_client(self):
        return self._session.create_client(
            "polly",
            region_name=self._opts.speech_region,
            aws_access_key_id=self._api_key,
            aws_secret_access_key=self._api_secret,
        )

    def synthesize(
        self,
        text: str,
        *,
        conn_options: Optional[APIConnectOptions] = None,
    ) -> "ChunkedStream":
        return ChunkedStream(
            tts=self,
            text=text,
            conn_options=conn_options,
            opts=self._opts,
            get_client=self._get_client,
        )

Helper class that provides a standard way to create an ABC using inheritance.

Create a new instance of AWS Polly TTS.

api_key and api_secret must be set to your AWS Access key id and secret access key, either using the argument or by setting the AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environmental variables.

See https://docs.aws.amazon.com/polly/latest/dg/API_SynthesizeSpeech.html for more details on the the AWS Polly TTS.

Args

Voice : TTSModels, optional
Voice ID to use for the synthesis. Defaults to "Ruth".
language : TTS_LANGUAGE, optional
language code for the Synthesize Speech request. This is only necessary if using a bilingual voice, such as Aditi, which can be used for either Indian English (en-IN) or Hindi (hi-IN).

sample_rate(int, optional): The audio frequency specified in Hz. Defaults to 16000. speech_engine(TTS_SPEECH_ENGINE, optional): The engine to use for the synthesis. Defaults to "generative". speech_region(str, optional): The region to use for the synthesis. Defaults to "us-east-1". api_key(str, optional): AWS access key id. api_secret(str, optional): AWS secret access key.

Ancestors

Methods

def synthesize(self, text: str, *, conn_options: Optional[APIConnectOptions] = None) ‑> ChunkedStream
Expand source code
def synthesize(
    self,
    text: str,
    *,
    conn_options: Optional[APIConnectOptions] = None,
) -> "ChunkedStream":
    return ChunkedStream(
        tts=self,
        text=text,
        conn_options=conn_options,
        opts=self._opts,
        get_client=self._get_client,
    )

Inherited members