Module livekit.plugins.aws.stt

Classes

class STT (*,
region: NotGivenOr[str] = NOT_GIVEN,
api_key: NotGivenOr[str] = NOT_GIVEN,
api_secret: NotGivenOr[str] = NOT_GIVEN,
sample_rate: int = 48000,
language: str = 'en-US',
encoding: str = 'pcm',
vocabulary_name: NotGivenOr[str] = NOT_GIVEN,
session_id: NotGivenOr[str] = NOT_GIVEN,
vocab_filter_method: NotGivenOr[str] = NOT_GIVEN,
vocab_filter_name: NotGivenOr[str] = NOT_GIVEN,
show_speaker_label: NotGivenOr[bool] = NOT_GIVEN,
enable_channel_identification: NotGivenOr[bool] = NOT_GIVEN,
number_of_channels: NotGivenOr[int] = NOT_GIVEN,
enable_partial_results_stabilization: NotGivenOr[bool] = NOT_GIVEN,
partial_results_stability: NotGivenOr[str] = NOT_GIVEN,
language_model_name: NotGivenOr[str] = NOT_GIVEN,
session: aioboto3.Session | None = None,
refresh_interval: NotGivenOr[int] = NOT_GIVEN)
Expand source code
class STT(stt.STT):
    def __init__(
        self,
        *,
        region: NotGivenOr[str] = NOT_GIVEN,
        api_key: NotGivenOr[str] = NOT_GIVEN,
        api_secret: NotGivenOr[str] = NOT_GIVEN,
        sample_rate: int = 48000,
        language: str = "en-US",
        encoding: str = "pcm",
        vocabulary_name: NotGivenOr[str] = NOT_GIVEN,
        session_id: NotGivenOr[str] = NOT_GIVEN,
        vocab_filter_method: NotGivenOr[str] = NOT_GIVEN,
        vocab_filter_name: NotGivenOr[str] = NOT_GIVEN,
        show_speaker_label: NotGivenOr[bool] = NOT_GIVEN,
        enable_channel_identification: NotGivenOr[bool] = NOT_GIVEN,
        number_of_channels: NotGivenOr[int] = NOT_GIVEN,
        enable_partial_results_stabilization: NotGivenOr[bool] = NOT_GIVEN,
        partial_results_stability: NotGivenOr[str] = NOT_GIVEN,
        language_model_name: NotGivenOr[str] = NOT_GIVEN,
        session: aioboto3.Session | None = None,
        refresh_interval: NotGivenOr[int] = NOT_GIVEN,
    ):
        super().__init__(capabilities=stt.STTCapabilities(streaming=True, interim_results=True))
        self._region = region if is_given(region) else DEFAULT_REGION
        self._session = session or get_aws_async_session(
            api_key=api_key if is_given(api_key) else None,
            api_secret=api_secret if is_given(api_secret) else None,
            region=self._region,
        )

        self._config = STTOptions(
            language=language,
            sample_rate=sample_rate,
            encoding=encoding,
            vocabulary_name=vocabulary_name,
            session_id=session_id,
            vocab_filter_method=vocab_filter_method,
            vocab_filter_name=vocab_filter_name,
            show_speaker_label=show_speaker_label,
            enable_channel_identification=enable_channel_identification,
            number_of_channels=number_of_channels,
            enable_partial_results_stabilization=enable_partial_results_stabilization,
            partial_results_stability=partial_results_stability,
            language_model_name=language_model_name,
        )
        self._pool = utils.ConnectionPool[TranscribeStreamingClient](
            connect_cb=self._create_client,
            max_session_duration=refresh_interval
            if is_given(refresh_interval)
            else REFRESH_INTERVAL,
        )

    async def _create_client(self) -> TranscribeStreamingClient:
        creds = await self._session.get_credentials()
        frozen_credentials = await creds.get_frozen_credentials()
        return TranscribeStreamingClient(
            region=self._region,
            credential_resolver=StaticCredentialResolver(
                access_key_id=frozen_credentials.access_key,
                secret_access_key=frozen_credentials.secret_key,
                session_token=frozen_credentials.token,
            ),
        )

    async def aclose(self) -> None:
        await self._pool.aclose()
        await super().aclose()

    async def _recognize_impl(
        self,
        buffer: utils.AudioBuffer,
        *,
        language: NotGivenOr[str] = NOT_GIVEN,
        conn_options: APIConnectOptions,
    ) -> stt.SpeechEvent:
        raise NotImplementedError("Amazon Transcribe does not support single frame recognition")

    def stream(
        self,
        *,
        language: NotGivenOr[str] = NOT_GIVEN,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    ) -> SpeechStream:
        return SpeechStream(
            stt=self,
            pool=self._pool,
            conn_options=conn_options,
            opts=self._config,
        )

Helper class that provides a standard way to create an ABC using inheritance.

Ancestors

  • livekit.agents.stt.stt.STT
  • abc.ABC
  • EventEmitter
  • typing.Generic

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    await self._pool.aclose()
    await super().aclose()

Close the STT, and every stream/requests associated with it

def stream(self,
*,
language: NotGivenOr[str] = NOT_GIVEN,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> SpeechStream
Expand source code
def stream(
    self,
    *,
    language: NotGivenOr[str] = NOT_GIVEN,
    conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
) -> SpeechStream:
    return SpeechStream(
        stt=self,
        pool=self._pool,
        conn_options=conn_options,
        opts=self._config,
    )

Inherited members

class STTOptions (sample_rate: int,
language: str,
encoding: str,
vocabulary_name: NotGivenOr[str],
session_id: NotGivenOr[str],
vocab_filter_method: NotGivenOr[str],
vocab_filter_name: NotGivenOr[str],
show_speaker_label: NotGivenOr[bool],
enable_channel_identification: NotGivenOr[bool],
number_of_channels: NotGivenOr[int],
enable_partial_results_stabilization: NotGivenOr[bool],
partial_results_stability: NotGivenOr[str],
language_model_name: NotGivenOr[str])
Expand source code
@dataclass
class STTOptions:
    sample_rate: int
    language: str
    encoding: str
    vocabulary_name: NotGivenOr[str]
    session_id: NotGivenOr[str]
    vocab_filter_method: NotGivenOr[str]
    vocab_filter_name: NotGivenOr[str]
    show_speaker_label: NotGivenOr[bool]
    enable_channel_identification: NotGivenOr[bool]
    number_of_channels: NotGivenOr[int]
    enable_partial_results_stabilization: NotGivenOr[bool]
    partial_results_stability: NotGivenOr[str]
    language_model_name: NotGivenOr[str]

STTOptions(sample_rate: 'int', language: 'str', encoding: 'str', vocabulary_name: 'NotGivenOr[str]', session_id: 'NotGivenOr[str]', vocab_filter_method: 'NotGivenOr[str]', vocab_filter_name: 'NotGivenOr[str]', show_speaker_label: 'NotGivenOr[bool]', enable_channel_identification: 'NotGivenOr[bool]', number_of_channels: 'NotGivenOr[int]', enable_partial_results_stabilization: 'NotGivenOr[bool]', partial_results_stability: 'NotGivenOr[str]', language_model_name: 'NotGivenOr[str]')

Instance variables

var enable_channel_identification : bool | livekit.agents.types.NotGiven
var enable_partial_results_stabilization : bool | livekit.agents.types.NotGiven
var encoding : str
var language : str
var language_model_name : str | livekit.agents.types.NotGiven
var number_of_channels : int | livekit.agents.types.NotGiven
var partial_results_stability : str | livekit.agents.types.NotGiven
var sample_rate : int
var session_id : str | livekit.agents.types.NotGiven
var show_speaker_label : bool | livekit.agents.types.NotGiven
var vocab_filter_method : str | livekit.agents.types.NotGiven
var vocab_filter_name : str | livekit.agents.types.NotGiven
var vocabulary_name : str | livekit.agents.types.NotGiven
class SpeechStream (stt: STT,
opts: STTOptions,
pool: utils.ConnectionPool[TranscribeStreamingClient],
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0))
Expand source code
class SpeechStream(stt.SpeechStream):
    def __init__(
        self,
        stt: STT,
        opts: STTOptions,
        pool: utils.ConnectionPool[TranscribeStreamingClient],
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    ) -> None:
        super().__init__(stt=stt, conn_options=conn_options, sample_rate=opts.sample_rate)
        self._opts = opts
        self._pool = pool

    async def _run(self) -> None:
        async with self._pool.connection() as client:
            live_config = {
                "language_code": self._opts.language,
                "media_sample_rate_hz": self._opts.sample_rate,
                "media_encoding": self._opts.encoding,
                "vocabulary_name": self._opts.vocabulary_name,
                "session_id": self._opts.session_id,
                "vocab_filter_method": self._opts.vocab_filter_method,
                "vocab_filter_name": self._opts.vocab_filter_name,
                "show_speaker_label": self._opts.show_speaker_label,
                "enable_channel_identification": self._opts.enable_channel_identification,
                "number_of_channels": self._opts.number_of_channels,
                "enable_partial_results_stabilization": self._opts.enable_partial_results_stabilization,  # noqa: E501
                "partial_results_stability": self._opts.partial_results_stability,
                "language_model_name": self._opts.language_model_name,
            }
            filtered_config = {k: v for k, v in live_config.items() if v and is_given(v)}
            stream = await client.start_stream_transcription(**filtered_config)

            @utils.log_exceptions(logger=logger)
            async def input_generator():
                async for frame in self._input_ch:
                    if isinstance(frame, rtc.AudioFrame):
                        await stream.input_stream.send_audio_event(audio_chunk=frame.data.tobytes())
                await stream.input_stream.end_stream()

            @utils.log_exceptions(logger=logger)
            async def handle_transcript_events():
                async for event in stream.output_stream:
                    if isinstance(event, TranscriptEvent):
                        self._process_transcript_event(event)

            tasks = [
                asyncio.create_task(input_generator()),
                asyncio.create_task(handle_transcript_events()),
            ]
            try:
                await asyncio.gather(*tasks)
            finally:
                await utils.aio.gracefully_cancel(*tasks)

    def _process_transcript_event(self, transcript_event: TranscriptEvent):
        stream = transcript_event.transcript.results
        for resp in stream:
            if resp.start_time and resp.start_time == 0.0:
                self._event_ch.send_nowait(
                    stt.SpeechEvent(type=stt.SpeechEventType.START_OF_SPEECH)
                )

            if resp.end_time and resp.end_time > 0.0:
                if resp.is_partial:
                    self._event_ch.send_nowait(
                        stt.SpeechEvent(
                            type=stt.SpeechEventType.INTERIM_TRANSCRIPT,
                            alternatives=[_streaming_recognize_response_to_speech_data(resp)],
                        )
                    )

                else:
                    self._event_ch.send_nowait(
                        stt.SpeechEvent(
                            type=stt.SpeechEventType.FINAL_TRANSCRIPT,
                            alternatives=[_streaming_recognize_response_to_speech_data(resp)],
                        )
                    )

            if not resp.is_partial:
                self._event_ch.send_nowait(stt.SpeechEvent(type=stt.SpeechEventType.END_OF_SPEECH))

Helper class that provides a standard way to create an ABC using inheritance.

Args: sample_rate : int or None, optional The desired sample rate for the audio input. If specified, the audio input will be automatically resampled to match the given sample rate before being processed for Speech-to-Text. If not provided (None), the input will retain its original sample rate.

Ancestors

  • livekit.agents.stt.stt.RecognizeStream
  • abc.ABC