Module livekit.plugins.aws.stt
Classes
class STT (*,
region: NotGivenOr[str] = NOT_GIVEN,
api_key: NotGivenOr[str] = NOT_GIVEN,
api_secret: NotGivenOr[str] = NOT_GIVEN,
sample_rate: int = 48000,
language: str = 'en-US',
encoding: str = 'pcm',
vocabulary_name: NotGivenOr[str] = NOT_GIVEN,
session_id: NotGivenOr[str] = NOT_GIVEN,
vocab_filter_method: NotGivenOr[str] = NOT_GIVEN,
vocab_filter_name: NotGivenOr[str] = NOT_GIVEN,
show_speaker_label: NotGivenOr[bool] = NOT_GIVEN,
enable_channel_identification: NotGivenOr[bool] = NOT_GIVEN,
number_of_channels: NotGivenOr[int] = NOT_GIVEN,
enable_partial_results_stabilization: NotGivenOr[bool] = NOT_GIVEN,
partial_results_stability: NotGivenOr[str] = NOT_GIVEN,
language_model_name: NotGivenOr[str] = NOT_GIVEN,
session: aioboto3.Session | None = None,
refresh_interval: NotGivenOr[int] = NOT_GIVEN)-
Expand source code
class STT(stt.STT): def __init__( self, *, region: NotGivenOr[str] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, sample_rate: int = 48000, language: str = "en-US", encoding: str = "pcm", vocabulary_name: NotGivenOr[str] = NOT_GIVEN, session_id: NotGivenOr[str] = NOT_GIVEN, vocab_filter_method: NotGivenOr[str] = NOT_GIVEN, vocab_filter_name: NotGivenOr[str] = NOT_GIVEN, show_speaker_label: NotGivenOr[bool] = NOT_GIVEN, enable_channel_identification: NotGivenOr[bool] = NOT_GIVEN, number_of_channels: NotGivenOr[int] = NOT_GIVEN, enable_partial_results_stabilization: NotGivenOr[bool] = NOT_GIVEN, partial_results_stability: NotGivenOr[str] = NOT_GIVEN, language_model_name: NotGivenOr[str] = NOT_GIVEN, session: aioboto3.Session | None = None, refresh_interval: NotGivenOr[int] = NOT_GIVEN, ): super().__init__(capabilities=stt.STTCapabilities(streaming=True, interim_results=True)) self._region = region if is_given(region) else DEFAULT_REGION self._session = session or get_aws_async_session( api_key=api_key if is_given(api_key) else None, api_secret=api_secret if is_given(api_secret) else None, region=self._region, ) self._config = STTOptions( language=language, sample_rate=sample_rate, encoding=encoding, vocabulary_name=vocabulary_name, session_id=session_id, vocab_filter_method=vocab_filter_method, vocab_filter_name=vocab_filter_name, show_speaker_label=show_speaker_label, enable_channel_identification=enable_channel_identification, number_of_channels=number_of_channels, enable_partial_results_stabilization=enable_partial_results_stabilization, partial_results_stability=partial_results_stability, language_model_name=language_model_name, ) self._pool = utils.ConnectionPool[TranscribeStreamingClient]( connect_cb=self._create_client, max_session_duration=refresh_interval if is_given(refresh_interval) else REFRESH_INTERVAL, ) async def _create_client(self) -> TranscribeStreamingClient: creds = await self._session.get_credentials() frozen_credentials = await creds.get_frozen_credentials() return TranscribeStreamingClient( region=self._region, credential_resolver=StaticCredentialResolver( access_key_id=frozen_credentials.access_key, secret_access_key=frozen_credentials.secret_key, session_token=frozen_credentials.token, ), ) async def aclose(self) -> None: await self._pool.aclose() await super().aclose() async def _recognize_impl( self, buffer: utils.AudioBuffer, *, language: NotGivenOr[str] = NOT_GIVEN, conn_options: APIConnectOptions, ) -> stt.SpeechEvent: raise NotImplementedError("Amazon Transcribe does not support single frame recognition") def stream( self, *, language: NotGivenOr[str] = NOT_GIVEN, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> SpeechStream: return SpeechStream( stt=self, pool=self._pool, conn_options=conn_options, opts=self._config, )Helper class that provides a standard way to create an ABC using inheritance.
Ancestors
- livekit.agents.stt.stt.STT
- abc.ABC
- EventEmitter
- typing.Generic
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: await self._pool.aclose() await super().aclose()Close the STT, and every stream/requests associated with it
def stream(self,
*,
language: NotGivenOr[str] = NOT_GIVEN,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> SpeechStream-
Expand source code
def stream( self, *, language: NotGivenOr[str] = NOT_GIVEN, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> SpeechStream: return SpeechStream( stt=self, pool=self._pool, conn_options=conn_options, opts=self._config, )
Inherited members
class STTOptions (sample_rate: int,
language: str,
encoding: str,
vocabulary_name: NotGivenOr[str],
session_id: NotGivenOr[str],
vocab_filter_method: NotGivenOr[str],
vocab_filter_name: NotGivenOr[str],
show_speaker_label: NotGivenOr[bool],
enable_channel_identification: NotGivenOr[bool],
number_of_channels: NotGivenOr[int],
enable_partial_results_stabilization: NotGivenOr[bool],
partial_results_stability: NotGivenOr[str],
language_model_name: NotGivenOr[str])-
Expand source code
@dataclass class STTOptions: sample_rate: int language: str encoding: str vocabulary_name: NotGivenOr[str] session_id: NotGivenOr[str] vocab_filter_method: NotGivenOr[str] vocab_filter_name: NotGivenOr[str] show_speaker_label: NotGivenOr[bool] enable_channel_identification: NotGivenOr[bool] number_of_channels: NotGivenOr[int] enable_partial_results_stabilization: NotGivenOr[bool] partial_results_stability: NotGivenOr[str] language_model_name: NotGivenOr[str]STTOptions(sample_rate: 'int', language: 'str', encoding: 'str', vocabulary_name: 'NotGivenOr[str]', session_id: 'NotGivenOr[str]', vocab_filter_method: 'NotGivenOr[str]', vocab_filter_name: 'NotGivenOr[str]', show_speaker_label: 'NotGivenOr[bool]', enable_channel_identification: 'NotGivenOr[bool]', number_of_channels: 'NotGivenOr[int]', enable_partial_results_stabilization: 'NotGivenOr[bool]', partial_results_stability: 'NotGivenOr[str]', language_model_name: 'NotGivenOr[str]')
Instance variables
var enable_channel_identification : bool | livekit.agents.types.NotGivenvar enable_partial_results_stabilization : bool | livekit.agents.types.NotGivenvar encoding : strvar language : strvar language_model_name : str | livekit.agents.types.NotGivenvar number_of_channels : int | livekit.agents.types.NotGivenvar partial_results_stability : str | livekit.agents.types.NotGivenvar sample_rate : intvar session_id : str | livekit.agents.types.NotGivenvar show_speaker_label : bool | livekit.agents.types.NotGivenvar vocab_filter_method : str | livekit.agents.types.NotGivenvar vocab_filter_name : str | livekit.agents.types.NotGivenvar vocabulary_name : str | livekit.agents.types.NotGiven
class SpeechStream (stt: STT,
opts: STTOptions,
pool: utils.ConnectionPool[TranscribeStreamingClient],
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0))-
Expand source code
class SpeechStream(stt.SpeechStream): def __init__( self, stt: STT, opts: STTOptions, pool: utils.ConnectionPool[TranscribeStreamingClient], conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> None: super().__init__(stt=stt, conn_options=conn_options, sample_rate=opts.sample_rate) self._opts = opts self._pool = pool async def _run(self) -> None: async with self._pool.connection() as client: live_config = { "language_code": self._opts.language, "media_sample_rate_hz": self._opts.sample_rate, "media_encoding": self._opts.encoding, "vocabulary_name": self._opts.vocabulary_name, "session_id": self._opts.session_id, "vocab_filter_method": self._opts.vocab_filter_method, "vocab_filter_name": self._opts.vocab_filter_name, "show_speaker_label": self._opts.show_speaker_label, "enable_channel_identification": self._opts.enable_channel_identification, "number_of_channels": self._opts.number_of_channels, "enable_partial_results_stabilization": self._opts.enable_partial_results_stabilization, # noqa: E501 "partial_results_stability": self._opts.partial_results_stability, "language_model_name": self._opts.language_model_name, } filtered_config = {k: v for k, v in live_config.items() if v and is_given(v)} stream = await client.start_stream_transcription(**filtered_config) @utils.log_exceptions(logger=logger) async def input_generator(): async for frame in self._input_ch: if isinstance(frame, rtc.AudioFrame): await stream.input_stream.send_audio_event(audio_chunk=frame.data.tobytes()) await stream.input_stream.end_stream() @utils.log_exceptions(logger=logger) async def handle_transcript_events(): async for event in stream.output_stream: if isinstance(event, TranscriptEvent): self._process_transcript_event(event) tasks = [ asyncio.create_task(input_generator()), asyncio.create_task(handle_transcript_events()), ] try: await asyncio.gather(*tasks) finally: await utils.aio.gracefully_cancel(*tasks) def _process_transcript_event(self, transcript_event: TranscriptEvent): stream = transcript_event.transcript.results for resp in stream: if resp.start_time and resp.start_time == 0.0: self._event_ch.send_nowait( stt.SpeechEvent(type=stt.SpeechEventType.START_OF_SPEECH) ) if resp.end_time and resp.end_time > 0.0: if resp.is_partial: self._event_ch.send_nowait( stt.SpeechEvent( type=stt.SpeechEventType.INTERIM_TRANSCRIPT, alternatives=[_streaming_recognize_response_to_speech_data(resp)], ) ) else: self._event_ch.send_nowait( stt.SpeechEvent( type=stt.SpeechEventType.FINAL_TRANSCRIPT, alternatives=[_streaming_recognize_response_to_speech_data(resp)], ) ) if not resp.is_partial: self._event_ch.send_nowait(stt.SpeechEvent(type=stt.SpeechEventType.END_OF_SPEECH))Helper class that provides a standard way to create an ABC using inheritance.
Args: sample_rate : int or None, optional The desired sample rate for the audio input. If specified, the audio input will be automatically resampled to match the given sample rate before being processed for Speech-to-Text. If not provided (None), the input will retain its original sample rate.
Ancestors
- livekit.agents.stt.stt.RecognizeStream
- abc.ABC