Module livekit.agents.stt.stream_adapter
Classes
class StreamAdapter (*, stt: STT, vad: VAD)
-
Expand source code
class StreamAdapter(STT): def __init__(self, *, stt: STT, vad: VAD) -> None: super().__init__( capabilities=STTCapabilities(streaming=True, interim_results=False) ) self._vad = vad self._stt = stt @self._stt.on("metrics_collected") def _forward_metrics(*args, **kwargs): self.emit("metrics_collected", *args, **kwargs) @property def wrapped_stt(self) -> STT: return self._stt async def _recognize_impl( self, buffer: utils.AudioBuffer, *, language: str | None, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ): return await self._stt.recognize( buffer=buffer, language=language, conn_options=conn_options ) def stream( self, *, language: str | None = None, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> RecognizeStream: return StreamAdapterWrapper( self, vad=self._vad, wrapped_stt=self._stt, language=language, conn_options=conn_options, )
Helper class that provides a standard way to create an ABC using inheritance.
Ancestors
- STT
- abc.ABC
- EventEmitter
- typing.Generic
Instance variables
prop wrapped_stt : STT
-
Expand source code
@property def wrapped_stt(self) -> STT: return self._stt
Methods
def stream(self,
*,
language: str | None = None,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=5.0, timeout=10.0)) ‑> RecognizeStream-
Expand source code
def stream( self, *, language: str | None = None, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> RecognizeStream: return StreamAdapterWrapper( self, vad=self._vad, wrapped_stt=self._stt, language=language, conn_options=conn_options, )
Inherited members
class StreamAdapterWrapper (stt: STT,
*,
vad: VAD,
wrapped_stt: STT,
language: str | None,
conn_options: APIConnectOptions)-
Expand source code
class StreamAdapterWrapper(RecognizeStream): def __init__( self, stt: STT, *, vad: VAD, wrapped_stt: STT, language: str | None, conn_options: APIConnectOptions, ) -> None: super().__init__(stt=stt, conn_options=conn_options) self._vad = vad self._wrapped_stt = wrapped_stt self._vad_stream = self._vad.stream() self._language = language async def _metrics_monitor_task( self, event_aiter: AsyncIterable[SpeechEvent] ) -> None: pass # do nothing async def _run(self) -> None: async def _forward_input(): """forward input to vad""" async for input in self._input_ch: if isinstance(input, self._FlushSentinel): self._vad_stream.flush() continue self._vad_stream.push_frame(input) self._vad_stream.end_input() async def _recognize(): """recognize speech from vad""" async for event in self._vad_stream: if event.type == VADEventType.START_OF_SPEECH: self._event_ch.send_nowait( SpeechEvent(SpeechEventType.START_OF_SPEECH) ) elif event.type == VADEventType.END_OF_SPEECH: self._event_ch.send_nowait( SpeechEvent( type=SpeechEventType.END_OF_SPEECH, ) ) merged_frames = utils.merge_frames(event.frames) t_event = await self._wrapped_stt.recognize( buffer=merged_frames, language=self._language, conn_options=self._conn_options, ) if len(t_event.alternatives) == 0: continue elif not t_event.alternatives[0].text: continue self._event_ch.send_nowait( SpeechEvent( type=SpeechEventType.FINAL_TRANSCRIPT, alternatives=[t_event.alternatives[0]], ) ) tasks = [ asyncio.create_task(_forward_input(), name="forward_input"), asyncio.create_task(_recognize(), name="recognize"), ] try: await asyncio.gather(*tasks) finally: await utils.aio.gracefully_cancel(*tasks)
Helper class that provides a standard way to create an ABC using inheritance.
Args: sample_rate : int or None, optional The desired sample rate for the audio input. If specified, the audio input will be automatically resampled to match the given sample rate before being processed for Speech-to-Text. If not provided (None), the input will retain its original sample rate.
Ancestors
- RecognizeStream
- abc.ABC
Inherited members