Module livekit.agents.stt
Sub-modules
livekit.agents.stt.fallback_adapter
livekit.agents.stt.stream_adapter
livekit.agents.stt.stt
Classes
class AvailabilityChangedEvent (stt: STT,
available: bool)-
Expand source code
@dataclass class AvailabilityChangedEvent: stt: STT available: bool
AvailabilityChangedEvent(stt: 'STT', available: 'bool')
Class variables
var available : bool
var stt : STT
class FallbackAdapter (stt: list[STT],
*,
attempt_timeout: float = 10.0,
max_retry_per_stt: int = 1,
retry_interval: float = 5)-
Expand source code
class FallbackAdapter( STT[Literal["stt_availability_changed"]], ): def __init__( self, stt: list[STT], *, attempt_timeout: float = 10.0, max_retry_per_stt: int = 1, retry_interval: float = 5, ) -> None: if len(stt) < 1: raise ValueError("At least one STT instance must be provided.") super().__init__( capabilities=STTCapabilities( streaming=all(t.capabilities.streaming for t in stt), interim_results=all(t.capabilities.interim_results for t in stt), ) ) self._stt_instances = stt self._attempt_timeout = attempt_timeout self._max_retry_per_stt = max_retry_per_stt self._retry_interval = retry_interval self._status: list[_STTStatus] = [ _STTStatus( available=True, recovering_synthesize_task=None, recovering_stream_task=None, ) for _ in self._stt_instances ] async def _try_recognize( self, *, stt: STT, buffer: utils.AudioBuffer, language: str | None = None, conn_options: APIConnectOptions, recovering: bool = False, ) -> SpeechEvent: try: return await stt.recognize( buffer, language=language, conn_options=dataclasses.replace( conn_options, max_retry=self._max_retry_per_stt, timeout=self._attempt_timeout, retry_interval=self._retry_interval, ), ) except asyncio.TimeoutError: if recovering: logger.warning( f"{stt.label} recovery timed out", extra={"streamed": False} ) raise logger.warning( f"{stt.label} timed out, switching to next STT", extra={"streamed": False}, ) raise except APIError as e: if recovering: logger.warning( f"{stt.label} recovery failed", exc_info=e, extra={"streamed": False}, ) raise logger.warning( f"{stt.label} failed, switching to next STT", exc_info=e, extra={"streamed": False}, ) raise except Exception: if recovering: logger.exception( f"{stt.label} recovery unexpected error", extra={"streamed": False} ) raise logger.exception( f"{stt.label} unexpected error, switching to next STT", extra={"streamed": False}, ) raise def _try_recovery( self, *, stt: STT, buffer: utils.AudioBuffer, language: str | None, conn_options: APIConnectOptions, ) -> None: stt_status = self._status[self._stt_instances.index(stt)] if ( stt_status.recovering_synthesize_task is None or stt_status.recovering_synthesize_task.done() ): async def _recover_stt_task(stt: STT) -> None: try: await self._try_recognize( stt=stt, buffer=buffer, language=language, conn_options=conn_options, recovering=True, ) stt_status.available = True logger.info(f"{stt.label} recovered") self.emit( "stt_availability_changed", AvailabilityChangedEvent(stt=stt, available=True), ) except Exception: return stt_status.recovering_synthesize_task = asyncio.create_task( _recover_stt_task(stt) ) async def _recognize_impl( self, buffer: utils.AudioBuffer, *, language: str | None, conn_options: APIConnectOptions, ): start_time = time.time() all_failed = all(not stt_status.available for stt_status in self._status) if all_failed: logger.error("all STTs are unavailable, retrying..") for i, stt in enumerate(self._stt_instances): stt_status = self._status[i] if stt_status.available or all_failed: try: return await self._try_recognize( stt=stt, buffer=buffer, language=language, conn_options=conn_options, recovering=False, ) except Exception: # exceptions already logged inside _try_recognize if stt_status.available: stt_status.available = False self.emit( "stt_availability_changed", AvailabilityChangedEvent(stt=stt, available=False), ) self._try_recovery( stt=stt, buffer=buffer, language=language, conn_options=conn_options ) raise APIConnectionError( "all STTs failed (%s) after %s seconds" % ( [stt.label for stt in self._stt_instances], time.time() - start_time, ) ) async def recognize( self, buffer: AudioBuffer, *, language: str | None = None, conn_options: APIConnectOptions = DEFAULT_FALLBACK_API_CONNECT_OPTIONS, ) -> SpeechEvent: return await super().recognize( buffer, language=language, conn_options=conn_options ) def stream( self, *, language: str | None = None, conn_options: APIConnectOptions = DEFAULT_FALLBACK_API_CONNECT_OPTIONS, ) -> RecognizeStream: return FallbackRecognizeStream( stt=self, language=language, conn_options=conn_options ) async def aclose(self) -> None: for stt_status in self._status: if stt_status.recovering_synthesize_task is not None: await aio.gracefully_cancel(stt_status.recovering_synthesize_task) if stt_status.recovering_stream_task is not None: await aio.gracefully_cancel(stt_status.recovering_stream_task)
Helper class that provides a standard way to create an ABC using inheritance.
Ancestors
- STT
- abc.ABC
- EventEmitter
- typing.Generic
Methods
async def recognize(self,
buffer: AudioBuffer,
*,
language: str | None = None,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=0, retry_interval=5.0, timeout=10.0)) ‑> SpeechEvent-
Expand source code
async def recognize( self, buffer: AudioBuffer, *, language: str | None = None, conn_options: APIConnectOptions = DEFAULT_FALLBACK_API_CONNECT_OPTIONS, ) -> SpeechEvent: return await super().recognize( buffer, language=language, conn_options=conn_options )
def stream(self,
*,
language: str | None = None,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=0, retry_interval=5.0, timeout=10.0)) ‑> RecognizeStream-
Expand source code
def stream( self, *, language: str | None = None, conn_options: APIConnectOptions = DEFAULT_FALLBACK_API_CONNECT_OPTIONS, ) -> RecognizeStream: return FallbackRecognizeStream( stt=self, language=language, conn_options=conn_options )
Inherited members
class RecognitionUsage (audio_duration: float)
-
Expand source code
@dataclass class RecognitionUsage: audio_duration: float
RecognitionUsage(audio_duration: 'float')
Class variables
var audio_duration : float
class RecognizeStream (*,
stt: STT,
conn_options: APIConnectOptions,
sample_rate: int | None = None)-
Expand source code
class RecognizeStream(ABC): class _FlushSentinel: """Sentinel to mark when it was flushed""" pass def __init__( self, *, stt: STT, conn_options: APIConnectOptions, sample_rate: int | None = None, ): """ Args: sample_rate : int or None, optional The desired sample rate for the audio input. If specified, the audio input will be automatically resampled to match the given sample rate before being processed for Speech-to-Text. If not provided (None), the input will retain its original sample rate. """ self._stt = stt self._conn_options = conn_options self._input_ch = aio.Chan[ Union[rtc.AudioFrame, RecognizeStream._FlushSentinel] ]() self._event_ch = aio.Chan[SpeechEvent]() self._event_aiter, monitor_aiter = aio.itertools.tee(self._event_ch, 2) self._metrics_task = asyncio.create_task( self._metrics_monitor_task(monitor_aiter), name="STT._metrics_task" ) self._task = asyncio.create_task(self._main_task()) self._task.add_done_callback(lambda _: self._event_ch.close()) self._needed_sr = sample_rate self._pushed_sr = 0 self._resampler: rtc.AudioResampler | None = None @abstractmethod async def _run(self) -> None: ... async def _main_task(self) -> None: for i in range(self._conn_options.max_retry + 1): try: return await self._run() except APIError as e: if self._conn_options.max_retry == 0: raise elif i == self._conn_options.max_retry: raise APIConnectionError( f"failed to recognize speech after {self._conn_options.max_retry + 1} attempts", ) from e else: logger.warning( f"failed to recognize speech, retrying in {self._conn_options.retry_interval}s", exc_info=e, extra={ "tts": self._stt._label, "attempt": i + 1, "streamed": True, }, ) await asyncio.sleep(self._conn_options.retry_interval) async def _metrics_monitor_task( self, event_aiter: AsyncIterable[SpeechEvent] ) -> None: """Task used to collect metrics""" start_time = time.perf_counter() async for ev in event_aiter: if ev.type == SpeechEventType.RECOGNITION_USAGE: assert ( ev.recognition_usage is not None ), "recognition_usage must be provided for RECOGNITION_USAGE event" duration = time.perf_counter() - start_time stt_metrics = STTMetrics( request_id=ev.request_id, timestamp=time.time(), duration=duration, label=self._stt._label, audio_duration=ev.recognition_usage.audio_duration, streamed=True, error=None, ) self._stt.emit("metrics_collected", stt_metrics) def push_frame(self, frame: rtc.AudioFrame) -> None: """Push audio to be recognized""" self._check_input_not_ended() self._check_not_closed() if self._pushed_sr and self._pushed_sr != frame.sample_rate: raise ValueError("the sample rate of the input frames must be consistent") self._pushed_sr = frame.sample_rate if self._needed_sr and self._needed_sr != frame.sample_rate: if not self._resampler: self._resampler = rtc.AudioResampler( frame.sample_rate, self._needed_sr, quality=rtc.AudioResamplerQuality.HIGH, ) if self._resampler: for frame in self._resampler.push(frame): self._input_ch.send_nowait(frame) else: self._input_ch.send_nowait(frame) def flush(self) -> None: """Mark the end of the current segment""" self._check_input_not_ended() self._check_not_closed() if self._resampler: for frame in self._resampler.flush(): self._input_ch.send_nowait(frame) self._input_ch.send_nowait(self._FlushSentinel()) def end_input(self) -> None: """Mark the end of input, no more audio will be pushed""" self.flush() self._input_ch.close() async def aclose(self) -> None: """Close ths stream immediately""" self._input_ch.close() await aio.gracefully_cancel(self._task) if self._metrics_task is not None: await self._metrics_task async def __anext__(self) -> SpeechEvent: try: val = await self._event_aiter.__anext__() except StopAsyncIteration: if not self._task.cancelled() and (exc := self._task.exception()): raise exc from None raise StopAsyncIteration return val def __aiter__(self) -> AsyncIterator[SpeechEvent]: return self def _check_not_closed(self) -> None: if self._event_ch.closed: cls = type(self) raise RuntimeError(f"{cls.__module__}.{cls.__name__} is closed") def _check_input_not_ended(self) -> None: if self._input_ch.closed: cls = type(self) raise RuntimeError(f"{cls.__module__}.{cls.__name__} input ended") async def __aenter__(self) -> RecognizeStream: return self async def __aexit__( self, exc_type: type[BaseException] | None, exc: BaseException | None, exc_tb: TracebackType | None, ) -> None: await self.aclose()
Helper class that provides a standard way to create an ABC using inheritance.
Args: sample_rate : int or None, optional The desired sample rate for the audio input. If specified, the audio input will be automatically resampled to match the given sample rate before being processed for Speech-to-Text. If not provided (None), the input will retain its original sample rate.
Ancestors
- abc.ABC
Subclasses
- FallbackRecognizeStream
- StreamAdapterWrapper
- SpeechStream
- livekit.plugins.azure.stt.SpeechStream
- livekit.plugins.deepgram.stt.SpeechStream
- livekit.plugins.google.stt.SpeechStream
Methods
async def aclose(self) ‑> None
-
Expand source code
async def aclose(self) -> None: """Close ths stream immediately""" self._input_ch.close() await aio.gracefully_cancel(self._task) if self._metrics_task is not None: await self._metrics_task
Close ths stream immediately
def end_input(self) ‑> None
-
Expand source code
def end_input(self) -> None: """Mark the end of input, no more audio will be pushed""" self.flush() self._input_ch.close()
Mark the end of input, no more audio will be pushed
def flush(self) ‑> None
-
Expand source code
def flush(self) -> None: """Mark the end of the current segment""" self._check_input_not_ended() self._check_not_closed() if self._resampler: for frame in self._resampler.flush(): self._input_ch.send_nowait(frame) self._input_ch.send_nowait(self._FlushSentinel())
Mark the end of the current segment
def push_frame(self, frame: rtc.AudioFrame) ‑> None
-
Expand source code
def push_frame(self, frame: rtc.AudioFrame) -> None: """Push audio to be recognized""" self._check_input_not_ended() self._check_not_closed() if self._pushed_sr and self._pushed_sr != frame.sample_rate: raise ValueError("the sample rate of the input frames must be consistent") self._pushed_sr = frame.sample_rate if self._needed_sr and self._needed_sr != frame.sample_rate: if not self._resampler: self._resampler = rtc.AudioResampler( frame.sample_rate, self._needed_sr, quality=rtc.AudioResamplerQuality.HIGH, ) if self._resampler: for frame in self._resampler.push(frame): self._input_ch.send_nowait(frame) else: self._input_ch.send_nowait(frame)
Push audio to be recognized
class SpeechStream (*,
stt: STT,
conn_options: APIConnectOptions,
sample_rate: int | None = None)-
Expand source code
class RecognizeStream(ABC): class _FlushSentinel: """Sentinel to mark when it was flushed""" pass def __init__( self, *, stt: STT, conn_options: APIConnectOptions, sample_rate: int | None = None, ): """ Args: sample_rate : int or None, optional The desired sample rate for the audio input. If specified, the audio input will be automatically resampled to match the given sample rate before being processed for Speech-to-Text. If not provided (None), the input will retain its original sample rate. """ self._stt = stt self._conn_options = conn_options self._input_ch = aio.Chan[ Union[rtc.AudioFrame, RecognizeStream._FlushSentinel] ]() self._event_ch = aio.Chan[SpeechEvent]() self._event_aiter, monitor_aiter = aio.itertools.tee(self._event_ch, 2) self._metrics_task = asyncio.create_task( self._metrics_monitor_task(monitor_aiter), name="STT._metrics_task" ) self._task = asyncio.create_task(self._main_task()) self._task.add_done_callback(lambda _: self._event_ch.close()) self._needed_sr = sample_rate self._pushed_sr = 0 self._resampler: rtc.AudioResampler | None = None @abstractmethod async def _run(self) -> None: ... async def _main_task(self) -> None: for i in range(self._conn_options.max_retry + 1): try: return await self._run() except APIError as e: if self._conn_options.max_retry == 0: raise elif i == self._conn_options.max_retry: raise APIConnectionError( f"failed to recognize speech after {self._conn_options.max_retry + 1} attempts", ) from e else: logger.warning( f"failed to recognize speech, retrying in {self._conn_options.retry_interval}s", exc_info=e, extra={ "tts": self._stt._label, "attempt": i + 1, "streamed": True, }, ) await asyncio.sleep(self._conn_options.retry_interval) async def _metrics_monitor_task( self, event_aiter: AsyncIterable[SpeechEvent] ) -> None: """Task used to collect metrics""" start_time = time.perf_counter() async for ev in event_aiter: if ev.type == SpeechEventType.RECOGNITION_USAGE: assert ( ev.recognition_usage is not None ), "recognition_usage must be provided for RECOGNITION_USAGE event" duration = time.perf_counter() - start_time stt_metrics = STTMetrics( request_id=ev.request_id, timestamp=time.time(), duration=duration, label=self._stt._label, audio_duration=ev.recognition_usage.audio_duration, streamed=True, error=None, ) self._stt.emit("metrics_collected", stt_metrics) def push_frame(self, frame: rtc.AudioFrame) -> None: """Push audio to be recognized""" self._check_input_not_ended() self._check_not_closed() if self._pushed_sr and self._pushed_sr != frame.sample_rate: raise ValueError("the sample rate of the input frames must be consistent") self._pushed_sr = frame.sample_rate if self._needed_sr and self._needed_sr != frame.sample_rate: if not self._resampler: self._resampler = rtc.AudioResampler( frame.sample_rate, self._needed_sr, quality=rtc.AudioResamplerQuality.HIGH, ) if self._resampler: for frame in self._resampler.push(frame): self._input_ch.send_nowait(frame) else: self._input_ch.send_nowait(frame) def flush(self) -> None: """Mark the end of the current segment""" self._check_input_not_ended() self._check_not_closed() if self._resampler: for frame in self._resampler.flush(): self._input_ch.send_nowait(frame) self._input_ch.send_nowait(self._FlushSentinel()) def end_input(self) -> None: """Mark the end of input, no more audio will be pushed""" self.flush() self._input_ch.close() async def aclose(self) -> None: """Close ths stream immediately""" self._input_ch.close() await aio.gracefully_cancel(self._task) if self._metrics_task is not None: await self._metrics_task async def __anext__(self) -> SpeechEvent: try: val = await self._event_aiter.__anext__() except StopAsyncIteration: if not self._task.cancelled() and (exc := self._task.exception()): raise exc from None raise StopAsyncIteration return val def __aiter__(self) -> AsyncIterator[SpeechEvent]: return self def _check_not_closed(self) -> None: if self._event_ch.closed: cls = type(self) raise RuntimeError(f"{cls.__module__}.{cls.__name__} is closed") def _check_input_not_ended(self) -> None: if self._input_ch.closed: cls = type(self) raise RuntimeError(f"{cls.__module__}.{cls.__name__} input ended") async def __aenter__(self) -> RecognizeStream: return self async def __aexit__( self, exc_type: type[BaseException] | None, exc: BaseException | None, exc_tb: TracebackType | None, ) -> None: await self.aclose()
Helper class that provides a standard way to create an ABC using inheritance.
Args: sample_rate : int or None, optional The desired sample rate for the audio input. If specified, the audio input will be automatically resampled to match the given sample rate before being processed for Speech-to-Text. If not provided (None), the input will retain its original sample rate.
Ancestors
- abc.ABC
Subclasses
- FallbackRecognizeStream
- StreamAdapterWrapper
- SpeechStream
- livekit.plugins.azure.stt.SpeechStream
- livekit.plugins.deepgram.stt.SpeechStream
- livekit.plugins.google.stt.SpeechStream
Methods
async def aclose(self) ‑> None
-
Expand source code
async def aclose(self) -> None: """Close ths stream immediately""" self._input_ch.close() await aio.gracefully_cancel(self._task) if self._metrics_task is not None: await self._metrics_task
Close ths stream immediately
def end_input(self) ‑> None
-
Expand source code
def end_input(self) -> None: """Mark the end of input, no more audio will be pushed""" self.flush() self._input_ch.close()
Mark the end of input, no more audio will be pushed
def flush(self) ‑> None
-
Expand source code
def flush(self) -> None: """Mark the end of the current segment""" self._check_input_not_ended() self._check_not_closed() if self._resampler: for frame in self._resampler.flush(): self._input_ch.send_nowait(frame) self._input_ch.send_nowait(self._FlushSentinel())
Mark the end of the current segment
def push_frame(self, frame: rtc.AudioFrame) ‑> None
-
Expand source code
def push_frame(self, frame: rtc.AudioFrame) -> None: """Push audio to be recognized""" self._check_input_not_ended() self._check_not_closed() if self._pushed_sr and self._pushed_sr != frame.sample_rate: raise ValueError("the sample rate of the input frames must be consistent") self._pushed_sr = frame.sample_rate if self._needed_sr and self._needed_sr != frame.sample_rate: if not self._resampler: self._resampler = rtc.AudioResampler( frame.sample_rate, self._needed_sr, quality=rtc.AudioResamplerQuality.HIGH, ) if self._resampler: for frame in self._resampler.push(frame): self._input_ch.send_nowait(frame) else: self._input_ch.send_nowait(frame)
Push audio to be recognized
class STT (*,
capabilities: STTCapabilities)-
Expand source code
class STT( ABC, rtc.EventEmitter[Union[Literal["metrics_collected"], TEvent]], Generic[TEvent], ): def __init__(self, *, capabilities: STTCapabilities) -> None: super().__init__() self._capabilities = capabilities self._label = f"{type(self).__module__}.{type(self).__name__}" @property def label(self) -> str: return self._label @property def capabilities(self) -> STTCapabilities: return self._capabilities @abstractmethod async def _recognize_impl( self, buffer: AudioBuffer, *, language: str | None, conn_options: APIConnectOptions, ) -> SpeechEvent: ... async def recognize( self, buffer: AudioBuffer, *, language: str | None = None, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> SpeechEvent: for i in range(conn_options.max_retry + 1): try: start_time = time.perf_counter() event = await self._recognize_impl( buffer, language=language, conn_options=conn_options ) duration = time.perf_counter() - start_time stt_metrics = STTMetrics( request_id=event.request_id, timestamp=time.time(), duration=duration, label=self._label, audio_duration=calculate_audio_duration(buffer), streamed=False, error=None, ) self.emit("metrics_collected", stt_metrics) return event except APIError as e: if conn_options.max_retry == 0: raise elif i == conn_options.max_retry: raise APIConnectionError( f"failed to recognize speech after {conn_options.max_retry + 1} attempts", ) from e else: logger.warning( f"failed to recognize speech, retrying in {conn_options.retry_interval}s", exc_info=e, extra={ "tts": self._label, "attempt": i + 1, "streamed": False, }, ) await asyncio.sleep(conn_options.retry_interval) raise RuntimeError("unreachable") def stream( self, *, language: str | None = None, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> "RecognizeStream": raise NotImplementedError( "streaming is not supported by this STT, please use a different STT or use a StreamAdapter" ) async def aclose(self) -> None: """Close the STT, and every stream/requests associated with it""" ... async def __aenter__(self) -> STT: return self async def __aexit__( self, exc_type: type[BaseException] | None, exc: BaseException | None, exc_tb: TracebackType | None, ) -> None: await self.aclose()
Helper class that provides a standard way to create an ABC using inheritance.
Ancestors
- abc.ABC
- EventEmitter
- typing.Generic
Subclasses
- FallbackAdapter
- StreamAdapter
- STT
- livekit.plugins.azure.stt.STT
- livekit.plugins.deepgram.stt.STT
- WizperSTT
- livekit.plugins.google.stt.STT
- livekit.plugins.openai.stt.STT
Instance variables
prop capabilities : STTCapabilities
-
Expand source code
@property def capabilities(self) -> STTCapabilities: return self._capabilities
prop label : str
-
Expand source code
@property def label(self) -> str: return self._label
Methods
async def aclose(self) ‑> None
-
Expand source code
async def aclose(self) -> None: """Close the STT, and every stream/requests associated with it""" ...
Close the STT, and every stream/requests associated with it
async def recognize(self,
buffer: AudioBuffer,
*,
language: str | None = None,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=5.0, timeout=10.0)) ‑> SpeechEvent-
Expand source code
async def recognize( self, buffer: AudioBuffer, *, language: str | None = None, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> SpeechEvent: for i in range(conn_options.max_retry + 1): try: start_time = time.perf_counter() event = await self._recognize_impl( buffer, language=language, conn_options=conn_options ) duration = time.perf_counter() - start_time stt_metrics = STTMetrics( request_id=event.request_id, timestamp=time.time(), duration=duration, label=self._label, audio_duration=calculate_audio_duration(buffer), streamed=False, error=None, ) self.emit("metrics_collected", stt_metrics) return event except APIError as e: if conn_options.max_retry == 0: raise elif i == conn_options.max_retry: raise APIConnectionError( f"failed to recognize speech after {conn_options.max_retry + 1} attempts", ) from e else: logger.warning( f"failed to recognize speech, retrying in {conn_options.retry_interval}s", exc_info=e, extra={ "tts": self._label, "attempt": i + 1, "streamed": False, }, ) await asyncio.sleep(conn_options.retry_interval) raise RuntimeError("unreachable")
def stream(self,
*,
language: str | None = None,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=5.0, timeout=10.0)) ‑> RecognizeStream-
Expand source code
def stream( self, *, language: str | None = None, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> "RecognizeStream": raise NotImplementedError( "streaming is not supported by this STT, please use a different STT or use a StreamAdapter" )
Inherited members
class STTCapabilities (streaming: bool, interim_results: bool)
-
Expand source code
@dataclass class STTCapabilities: streaming: bool interim_results: bool
STTCapabilities(streaming: 'bool', interim_results: 'bool')
Class variables
var interim_results : bool
var streaming : bool
class SpeechData (language: str,
text: str,
start_time: float = 0.0,
end_time: float = 0.0,
confidence: float = 0.0)-
Expand source code
@dataclass class SpeechData: language: str text: str start_time: float = 0.0 end_time: float = 0.0 confidence: float = 0.0 # [0, 1]
SpeechData(language: 'str', text: 'str', start_time: 'float' = 0.0, end_time: 'float' = 0.0, confidence: 'float' = 0.0)
Class variables
var confidence : float
var end_time : float
var language : str
var start_time : float
var text : str
class SpeechEvent (type: SpeechEventType,
request_id: str = '',
alternatives: List[SpeechData] = <factory>,
recognition_usage: RecognitionUsage | None = None)-
Expand source code
@dataclass class SpeechEvent: type: SpeechEventType request_id: str = "" alternatives: List[SpeechData] = field(default_factory=list) recognition_usage: RecognitionUsage | None = None
SpeechEvent(type: 'SpeechEventType', request_id: 'str' = '', alternatives: 'List[SpeechData]' =
, recognition_usage: 'RecognitionUsage | None' = None) Class variables
var alternatives : List[SpeechData]
var recognition_usage : RecognitionUsage | None
var request_id : str
var type : SpeechEventType
class SpeechEventType (*args, **kwds)
-
Expand source code
@unique class SpeechEventType(str, Enum): START_OF_SPEECH = "start_of_speech" """indicate the start of speech if the STT doesn't support this event, this will be emitted as the same time as the first INTERIM_TRANSCRIPT""" INTERIM_TRANSCRIPT = "interim_transcript" """interim transcript, useful for real-time transcription""" FINAL_TRANSCRIPT = "final_transcript" """final transcript, emitted when the STT is confident enough that a certain portion of speech will not change""" RECOGNITION_USAGE = "recognition_usage" """usage event, emitted periodically to indicate usage metrics""" END_OF_SPEECH = "end_of_speech" """indicate the end of speech, emitted when the user stops speaking"""
str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.
Ancestors
- builtins.str
- enum.Enum
Class variables
var END_OF_SPEECH
-
indicate the end of speech, emitted when the user stops speaking
var FINAL_TRANSCRIPT
-
final transcript, emitted when the STT is confident enough that a certain portion of speech will not change
var INTERIM_TRANSCRIPT
-
interim transcript, useful for real-time transcription
var RECOGNITION_USAGE
-
usage event, emitted periodically to indicate usage metrics
var START_OF_SPEECH
-
indicate the start of speech if the STT doesn't support this event, this will be emitted as the same time as the first INTERIM_TRANSCRIPT
class StreamAdapter (*,
stt: STT,
vad: VAD)-
Expand source code
class StreamAdapter(STT): def __init__(self, *, stt: STT, vad: VAD) -> None: super().__init__( capabilities=STTCapabilities(streaming=True, interim_results=False) ) self._vad = vad self._stt = stt @self._stt.on("metrics_collected") def _forward_metrics(*args, **kwargs): self.emit("metrics_collected", *args, **kwargs) @property def wrapped_stt(self) -> STT: return self._stt async def _recognize_impl( self, buffer: utils.AudioBuffer, *, language: str | None, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ): return await self._stt.recognize( buffer=buffer, language=language, conn_options=conn_options ) def stream( self, *, language: str | None = None, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> RecognizeStream: return StreamAdapterWrapper( self, vad=self._vad, wrapped_stt=self._stt, language=language, conn_options=conn_options, )
Helper class that provides a standard way to create an ABC using inheritance.
Ancestors
- STT
- abc.ABC
- EventEmitter
- typing.Generic
Instance variables
prop wrapped_stt : STT
-
Expand source code
@property def wrapped_stt(self) -> STT: return self._stt
Methods
def stream(self,
*,
language: str | None = None,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=5.0, timeout=10.0)) ‑> RecognizeStream-
Expand source code
def stream( self, *, language: str | None = None, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> RecognizeStream: return StreamAdapterWrapper( self, vad=self._vad, wrapped_stt=self._stt, language=language, conn_options=conn_options, )
Inherited members
class StreamAdapterWrapper (stt: STT,
*,
vad: VAD,
wrapped_stt: STT,
language: str | None,
conn_options: APIConnectOptions)-
Expand source code
class StreamAdapterWrapper(RecognizeStream): def __init__( self, stt: STT, *, vad: VAD, wrapped_stt: STT, language: str | None, conn_options: APIConnectOptions, ) -> None: super().__init__(stt=stt, conn_options=conn_options) self._vad = vad self._wrapped_stt = wrapped_stt self._vad_stream = self._vad.stream() self._language = language async def _metrics_monitor_task( self, event_aiter: AsyncIterable[SpeechEvent] ) -> None: pass # do nothing async def _run(self) -> None: async def _forward_input(): """forward input to vad""" async for input in self._input_ch: if isinstance(input, self._FlushSentinel): self._vad_stream.flush() continue self._vad_stream.push_frame(input) self._vad_stream.end_input() async def _recognize(): """recognize speech from vad""" async for event in self._vad_stream: if event.type == VADEventType.START_OF_SPEECH: self._event_ch.send_nowait( SpeechEvent(SpeechEventType.START_OF_SPEECH) ) elif event.type == VADEventType.END_OF_SPEECH: self._event_ch.send_nowait( SpeechEvent( type=SpeechEventType.END_OF_SPEECH, ) ) merged_frames = utils.merge_frames(event.frames) t_event = await self._wrapped_stt.recognize( buffer=merged_frames, language=self._language, conn_options=self._conn_options, ) if len(t_event.alternatives) == 0: continue elif not t_event.alternatives[0].text: continue self._event_ch.send_nowait( SpeechEvent( type=SpeechEventType.FINAL_TRANSCRIPT, alternatives=[t_event.alternatives[0]], ) ) tasks = [ asyncio.create_task(_forward_input(), name="forward_input"), asyncio.create_task(_recognize(), name="recognize"), ] try: await asyncio.gather(*tasks) finally: await utils.aio.gracefully_cancel(*tasks)
Helper class that provides a standard way to create an ABC using inheritance.
Args: sample_rate : int or None, optional The desired sample rate for the audio input. If specified, the audio input will be automatically resampled to match the given sample rate before being processed for Speech-to-Text. If not provided (None), the input will retain its original sample rate.
Ancestors
- RecognizeStream
- abc.ABC
Inherited members