Module livekit.agents.stt

Classes

class AvailabilityChangedEvent (stt: STT,
available: bool)
Expand source code
@dataclass
class AvailabilityChangedEvent:
    stt: STT
    available: bool

AvailabilityChangedEvent(stt: 'STT', available: 'bool')

Instance variables

var available : bool
var stt : livekit.agents.stt.stt.STT
class FallbackAdapter (stt: list[STT],
*,
vad: VAD | None = None,
attempt_timeout: float = 10.0,
max_retry_per_stt: int = 1,
retry_interval: float = 5)
Expand source code
class FallbackAdapter(
    STT[Literal["stt_availability_changed"]],
):
    def __init__(
        self,
        stt: list[STT],
        *,
        vad: VAD | None = None,
        attempt_timeout: float = 10.0,
        max_retry_per_stt: int = 1,
        retry_interval: float = 5,
    ) -> None:
        if len(stt) < 1:
            raise ValueError("At least one STT instance must be provided.")

        non_streaming_stt = [t for t in stt if not t.capabilities.streaming]
        if non_streaming_stt:
            if vad is None:
                labels = ", ".join(t.label for t in non_streaming_stt)
                raise ValueError(
                    f"STTs do not support streaming: {labels}. "
                    "Provide a VAD to enable stt.StreamAdapter automatically "
                    "or wrap them with stt.StreamAdapter before using this adapter."
                )
            from ..stt import StreamAdapter

            stt = [
                StreamAdapter(stt=t, vad=vad) if not t.capabilities.streaming else t for t in stt
            ]

        super().__init__(
            capabilities=STTCapabilities(
                streaming=True,
                interim_results=all(t.capabilities.interim_results for t in stt),
            )
        )

        self._stt_instances = stt
        self._attempt_timeout = attempt_timeout
        self._max_retry_per_stt = max_retry_per_stt
        self._retry_interval = retry_interval

        self._status: list[_STTStatus] = [
            _STTStatus(
                available=True,
                recovering_synthesize_task=None,
                recovering_stream_task=None,
            )
            for _ in self._stt_instances
        ]

    async def _try_recognize(
        self,
        *,
        stt: STT,
        buffer: utils.AudioBuffer,
        language: NotGivenOr[str] = NOT_GIVEN,
        conn_options: APIConnectOptions,
        recovering: bool = False,
    ) -> SpeechEvent:
        try:
            return await stt.recognize(
                buffer,
                language=language,
                conn_options=dataclasses.replace(
                    conn_options,
                    max_retry=self._max_retry_per_stt,
                    timeout=self._attempt_timeout,
                    retry_interval=self._retry_interval,
                ),
            )
        except asyncio.TimeoutError:
            if recovering:
                logger.warning(f"{stt.label} recovery timed out", extra={"streamed": False})
                raise

            logger.warning(
                f"{stt.label} timed out, switching to next STT",
                extra={"streamed": False},
            )

            raise
        except APIError as e:
            if recovering:
                logger.warning(
                    f"{stt.label} recovery failed",
                    exc_info=e,
                    extra={"streamed": False},
                )
                raise

            logger.warning(
                f"{stt.label} failed, switching to next STT",
                exc_info=e,
                extra={"streamed": False},
            )
            raise
        except Exception:
            if recovering:
                logger.exception(
                    f"{stt.label} recovery unexpected error", extra={"streamed": False}
                )
                raise

            logger.exception(
                f"{stt.label} unexpected error, switching to next STT",
                extra={"streamed": False},
            )
            raise

    def _try_recovery(
        self,
        *,
        stt: STT,
        buffer: utils.AudioBuffer,
        language: NotGivenOr[str],
        conn_options: APIConnectOptions,
    ) -> None:
        stt_status = self._status[self._stt_instances.index(stt)]
        if (
            stt_status.recovering_synthesize_task is None
            or stt_status.recovering_synthesize_task.done()
        ):

            async def _recover_stt_task(stt: STT) -> None:
                try:
                    await self._try_recognize(
                        stt=stt,
                        buffer=buffer,
                        language=language,
                        conn_options=conn_options,
                        recovering=True,
                    )

                    stt_status.available = True
                    logger.info(f"{stt.label} recovered")
                    self.emit(
                        "stt_availability_changed",
                        AvailabilityChangedEvent(stt=stt, available=True),
                    )
                except Exception:
                    return

            stt_status.recovering_synthesize_task = asyncio.create_task(_recover_stt_task(stt))

    async def _recognize_impl(
        self,
        buffer: utils.AudioBuffer,
        *,
        language: NotGivenOr[str] = NOT_GIVEN,
        conn_options: APIConnectOptions,
    ) -> SpeechEvent:
        start_time = time.time()

        all_failed = all(not stt_status.available for stt_status in self._status)
        if all_failed:
            logger.error("all STTs are unavailable, retrying..")

        for i, stt in enumerate(self._stt_instances):
            stt_status = self._status[i]
            if stt_status.available or all_failed:
                try:
                    return await self._try_recognize(
                        stt=stt,
                        buffer=buffer,
                        language=language,
                        conn_options=conn_options,
                        recovering=False,
                    )
                except Exception:  # exceptions already logged inside _try_recognize
                    if stt_status.available:
                        stt_status.available = False
                        self.emit(
                            "stt_availability_changed",
                            AvailabilityChangedEvent(stt=stt, available=False),
                        )

            self._try_recovery(stt=stt, buffer=buffer, language=language, conn_options=conn_options)

        raise APIConnectionError(
            f"all STTs failed ({[stt.label for stt in self._stt_instances]}) after {time.time() - start_time} seconds"  # noqa: E501
        )

    async def recognize(
        self,
        buffer: AudioBuffer,
        *,
        language: NotGivenOr[str] = NOT_GIVEN,
        conn_options: APIConnectOptions = DEFAULT_FALLBACK_API_CONNECT_OPTIONS,
    ) -> SpeechEvent:
        return await super().recognize(buffer, language=language, conn_options=conn_options)

    def stream(
        self,
        *,
        language: NotGivenOr[str] = NOT_GIVEN,
        conn_options: APIConnectOptions = DEFAULT_FALLBACK_API_CONNECT_OPTIONS,
    ) -> RecognizeStream:
        return FallbackRecognizeStream(stt=self, language=language, conn_options=conn_options)

    async def aclose(self) -> None:
        for stt_status in self._status:
            if stt_status.recovering_synthesize_task is not None:
                await aio.cancel_and_wait(stt_status.recovering_synthesize_task)

            if stt_status.recovering_stream_task is not None:
                await aio.cancel_and_wait(stt_status.recovering_stream_task)

Helper class that provides a standard way to create an ABC using inheritance.

Ancestors

  • livekit.agents.stt.stt.STT
  • abc.ABC
  • EventEmitter
  • typing.Generic

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    for stt_status in self._status:
        if stt_status.recovering_synthesize_task is not None:
            await aio.cancel_and_wait(stt_status.recovering_synthesize_task)

        if stt_status.recovering_stream_task is not None:
            await aio.cancel_and_wait(stt_status.recovering_stream_task)

Close the STT, and every stream/requests associated with it

async def recognize(self,
buffer: AudioBuffer,
*,
language: NotGivenOr[str] = NOT_GIVEN,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=0, retry_interval=2.0, timeout=10.0)) ‑> livekit.agents.stt.stt.SpeechEvent
Expand source code
async def recognize(
    self,
    buffer: AudioBuffer,
    *,
    language: NotGivenOr[str] = NOT_GIVEN,
    conn_options: APIConnectOptions = DEFAULT_FALLBACK_API_CONNECT_OPTIONS,
) -> SpeechEvent:
    return await super().recognize(buffer, language=language, conn_options=conn_options)
def stream(self,
*,
language: NotGivenOr[str] = NOT_GIVEN,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=0, retry_interval=2.0, timeout=10.0)) ‑> livekit.agents.stt.stt.RecognizeStream
Expand source code
def stream(
    self,
    *,
    language: NotGivenOr[str] = NOT_GIVEN,
    conn_options: APIConnectOptions = DEFAULT_FALLBACK_API_CONNECT_OPTIONS,
) -> RecognizeStream:
    return FallbackRecognizeStream(stt=self, language=language, conn_options=conn_options)

Inherited members

class RecognitionUsage (audio_duration: float)
Expand source code
@dataclass
class RecognitionUsage:
    audio_duration: float

RecognitionUsage(audio_duration: 'float')

Instance variables

var audio_duration : float
class RecognizeStream (*,
stt: STT,
conn_options: APIConnectOptions,
sample_rate: NotGivenOr[int] = NOT_GIVEN)
Expand source code
class RecognizeStream(ABC):
    class _FlushSentinel:
        """Sentinel to mark when it was flushed"""

        pass

    def __init__(
        self,
        *,
        stt: STT,
        conn_options: APIConnectOptions,
        sample_rate: NotGivenOr[int] = NOT_GIVEN,
    ):
        """
        Args:
        sample_rate : int or None, optional
            The desired sample rate for the audio input.
            If specified, the audio input will be automatically resampled to match
            the given sample rate before being processed for Speech-to-Text.
            If not provided (None), the input will retain its original sample rate.
        """
        self._stt = stt
        self._conn_options = conn_options
        self._input_ch = aio.Chan[Union[rtc.AudioFrame, RecognizeStream._FlushSentinel]]()
        self._event_ch = aio.Chan[SpeechEvent]()

        self._event_aiter, monitor_aiter = aio.itertools.tee(self._event_ch, 2)
        self._metrics_task = asyncio.create_task(
            self._metrics_monitor_task(monitor_aiter), name="STT._metrics_task"
        )

        self._num_retries = 0
        self._task = asyncio.create_task(self._main_task())
        self._task.add_done_callback(lambda _: self._event_ch.close())

        self._needed_sr = sample_rate if is_given(sample_rate) else None
        self._pushed_sr = 0
        self._resampler: rtc.AudioResampler | None = None

    @abstractmethod
    async def _run(self) -> None: ...

    async def _main_task(self) -> None:
        max_retries = self._conn_options.max_retry

        while self._num_retries <= max_retries:
            try:
                return await self._run()
            except APIError as e:
                if max_retries == 0:
                    self._emit_error(e, recoverable=False)
                    raise
                elif self._num_retries == max_retries:
                    self._emit_error(e, recoverable=False)
                    raise APIConnectionError(
                        f"failed to recognize speech after {self._num_retries} attempts",
                    ) from e
                else:
                    self._emit_error(e, recoverable=True)

                    retry_interval = self._conn_options._interval_for_retry(self._num_retries)
                    logger.warning(
                        f"failed to recognize speech, retrying in {retry_interval}s",
                        exc_info=e,
                        extra={
                            "tts": self._stt._label,
                            "attempt": self._num_retries,
                            "streamed": True,
                        },
                    )
                    await asyncio.sleep(retry_interval)

                self._num_retries += 1

            except Exception as e:
                self._emit_error(e, recoverable=False)
                raise

    def _emit_error(self, api_error: Exception, recoverable: bool) -> None:
        self._stt.emit(
            "error",
            STTError(
                timestamp=time.time(),
                label=self._stt._label,
                error=api_error,
                recoverable=recoverable,
            ),
        )

    async def _metrics_monitor_task(self, event_aiter: AsyncIterable[SpeechEvent]) -> None:
        """Task used to collect metrics"""

        async for ev in event_aiter:
            if ev.type == SpeechEventType.RECOGNITION_USAGE:
                assert ev.recognition_usage is not None, (
                    "recognition_usage must be provided for RECOGNITION_USAGE event"
                )

                stt_metrics = STTMetrics(
                    request_id=ev.request_id,
                    timestamp=time.time(),
                    duration=0.0,
                    label=self._stt._label,
                    audio_duration=ev.recognition_usage.audio_duration,
                    streamed=True,
                )

                self._stt.emit("metrics_collected", stt_metrics)
            elif ev.type == SpeechEventType.FINAL_TRANSCRIPT:
                # reset the retry count after a successful recognition
                self._num_retries = 0

    def push_frame(self, frame: rtc.AudioFrame) -> None:
        """Push audio to be recognized"""
        self._check_input_not_ended()
        self._check_not_closed()

        if self._pushed_sr and self._pushed_sr != frame.sample_rate:
            raise ValueError("the sample rate of the input frames must be consistent")

        self._pushed_sr = frame.sample_rate

        if self._needed_sr and self._needed_sr != frame.sample_rate:
            if not self._resampler:
                self._resampler = rtc.AudioResampler(
                    frame.sample_rate,
                    self._needed_sr,
                    quality=rtc.AudioResamplerQuality.HIGH,
                )

        if self._resampler:
            frames = self._resampler.push(frame)
            for frame in frames:
                self._input_ch.send_nowait(frame)
        else:
            self._input_ch.send_nowait(frame)

    def flush(self) -> None:
        """Mark the end of the current segment"""
        self._check_input_not_ended()
        self._check_not_closed()

        if self._resampler:
            for frame in self._resampler.flush():
                self._input_ch.send_nowait(frame)

        self._input_ch.send_nowait(self._FlushSentinel())

    def end_input(self) -> None:
        """Mark the end of input, no more audio will be pushed"""
        self.flush()
        self._input_ch.close()

    async def aclose(self) -> None:
        """Close ths stream immediately"""
        self._input_ch.close()
        await aio.cancel_and_wait(self._task)

        if self._metrics_task is not None:
            await self._metrics_task

    async def __anext__(self) -> SpeechEvent:
        try:
            val = await self._event_aiter.__anext__()
        except StopAsyncIteration:
            if not self._task.cancelled() and (exc := self._task.exception()):
                raise exc  # noqa: B904

            raise StopAsyncIteration from None

        return val

    def __aiter__(self) -> AsyncIterator[SpeechEvent]:
        return self

    def _check_not_closed(self) -> None:
        if self._event_ch.closed:
            cls = type(self)
            raise RuntimeError(f"{cls.__module__}.{cls.__name__} is closed")

    def _check_input_not_ended(self) -> None:
        if self._input_ch.closed:
            cls = type(self)
            raise RuntimeError(f"{cls.__module__}.{cls.__name__} input ended")

    async def __aenter__(self) -> RecognizeStream:
        return self

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> None:
        await self.aclose()

Helper class that provides a standard way to create an ABC using inheritance.

Args: sample_rate : int or None, optional The desired sample rate for the audio input. If specified, the audio input will be automatically resampled to match the given sample rate before being processed for Speech-to-Text. If not provided (None), the input will retain its original sample rate.

Ancestors

  • abc.ABC

Subclasses

  • livekit.agents.stt.fallback_adapter.FallbackRecognizeStream
  • livekit.agents.stt.stream_adapter.StreamAdapterWrapper
  • livekit.plugins.assemblyai.stt.SpeechStream
  • livekit.plugins.aws.stt.SpeechStream
  • livekit.plugins.azure.stt.SpeechStream
  • livekit.plugins.baseten.stt.SpeechStream
  • livekit.plugins.cartesia.stt.SpeechStream
  • livekit.plugins.deepgram.stt.SpeechStream
  • livekit.plugins.gladia.stt.SpeechStream
  • livekit.plugins.google.stt.SpeechStream
  • livekit.plugins.openai.stt.SpeechStream
  • livekit.plugins.speechmatics.stt.SpeechStream

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    """Close ths stream immediately"""
    self._input_ch.close()
    await aio.cancel_and_wait(self._task)

    if self._metrics_task is not None:
        await self._metrics_task

Close ths stream immediately

def end_input(self) ‑> None
Expand source code
def end_input(self) -> None:
    """Mark the end of input, no more audio will be pushed"""
    self.flush()
    self._input_ch.close()

Mark the end of input, no more audio will be pushed

def flush(self) ‑> None
Expand source code
def flush(self) -> None:
    """Mark the end of the current segment"""
    self._check_input_not_ended()
    self._check_not_closed()

    if self._resampler:
        for frame in self._resampler.flush():
            self._input_ch.send_nowait(frame)

    self._input_ch.send_nowait(self._FlushSentinel())

Mark the end of the current segment

def push_frame(self, frame: rtc.AudioFrame) ‑> None
Expand source code
def push_frame(self, frame: rtc.AudioFrame) -> None:
    """Push audio to be recognized"""
    self._check_input_not_ended()
    self._check_not_closed()

    if self._pushed_sr and self._pushed_sr != frame.sample_rate:
        raise ValueError("the sample rate of the input frames must be consistent")

    self._pushed_sr = frame.sample_rate

    if self._needed_sr and self._needed_sr != frame.sample_rate:
        if not self._resampler:
            self._resampler = rtc.AudioResampler(
                frame.sample_rate,
                self._needed_sr,
                quality=rtc.AudioResamplerQuality.HIGH,
            )

    if self._resampler:
        frames = self._resampler.push(frame)
        for frame in frames:
            self._input_ch.send_nowait(frame)
    else:
        self._input_ch.send_nowait(frame)

Push audio to be recognized

class SpeechStream (*,
stt: STT,
conn_options: APIConnectOptions,
sample_rate: NotGivenOr[int] = NOT_GIVEN)
Expand source code
class RecognizeStream(ABC):
    class _FlushSentinel:
        """Sentinel to mark when it was flushed"""

        pass

    def __init__(
        self,
        *,
        stt: STT,
        conn_options: APIConnectOptions,
        sample_rate: NotGivenOr[int] = NOT_GIVEN,
    ):
        """
        Args:
        sample_rate : int or None, optional
            The desired sample rate for the audio input.
            If specified, the audio input will be automatically resampled to match
            the given sample rate before being processed for Speech-to-Text.
            If not provided (None), the input will retain its original sample rate.
        """
        self._stt = stt
        self._conn_options = conn_options
        self._input_ch = aio.Chan[Union[rtc.AudioFrame, RecognizeStream._FlushSentinel]]()
        self._event_ch = aio.Chan[SpeechEvent]()

        self._event_aiter, monitor_aiter = aio.itertools.tee(self._event_ch, 2)
        self._metrics_task = asyncio.create_task(
            self._metrics_monitor_task(monitor_aiter), name="STT._metrics_task"
        )

        self._num_retries = 0
        self._task = asyncio.create_task(self._main_task())
        self._task.add_done_callback(lambda _: self._event_ch.close())

        self._needed_sr = sample_rate if is_given(sample_rate) else None
        self._pushed_sr = 0
        self._resampler: rtc.AudioResampler | None = None

    @abstractmethod
    async def _run(self) -> None: ...

    async def _main_task(self) -> None:
        max_retries = self._conn_options.max_retry

        while self._num_retries <= max_retries:
            try:
                return await self._run()
            except APIError as e:
                if max_retries == 0:
                    self._emit_error(e, recoverable=False)
                    raise
                elif self._num_retries == max_retries:
                    self._emit_error(e, recoverable=False)
                    raise APIConnectionError(
                        f"failed to recognize speech after {self._num_retries} attempts",
                    ) from e
                else:
                    self._emit_error(e, recoverable=True)

                    retry_interval = self._conn_options._interval_for_retry(self._num_retries)
                    logger.warning(
                        f"failed to recognize speech, retrying in {retry_interval}s",
                        exc_info=e,
                        extra={
                            "tts": self._stt._label,
                            "attempt": self._num_retries,
                            "streamed": True,
                        },
                    )
                    await asyncio.sleep(retry_interval)

                self._num_retries += 1

            except Exception as e:
                self._emit_error(e, recoverable=False)
                raise

    def _emit_error(self, api_error: Exception, recoverable: bool) -> None:
        self._stt.emit(
            "error",
            STTError(
                timestamp=time.time(),
                label=self._stt._label,
                error=api_error,
                recoverable=recoverable,
            ),
        )

    async def _metrics_monitor_task(self, event_aiter: AsyncIterable[SpeechEvent]) -> None:
        """Task used to collect metrics"""

        async for ev in event_aiter:
            if ev.type == SpeechEventType.RECOGNITION_USAGE:
                assert ev.recognition_usage is not None, (
                    "recognition_usage must be provided for RECOGNITION_USAGE event"
                )

                stt_metrics = STTMetrics(
                    request_id=ev.request_id,
                    timestamp=time.time(),
                    duration=0.0,
                    label=self._stt._label,
                    audio_duration=ev.recognition_usage.audio_duration,
                    streamed=True,
                )

                self._stt.emit("metrics_collected", stt_metrics)
            elif ev.type == SpeechEventType.FINAL_TRANSCRIPT:
                # reset the retry count after a successful recognition
                self._num_retries = 0

    def push_frame(self, frame: rtc.AudioFrame) -> None:
        """Push audio to be recognized"""
        self._check_input_not_ended()
        self._check_not_closed()

        if self._pushed_sr and self._pushed_sr != frame.sample_rate:
            raise ValueError("the sample rate of the input frames must be consistent")

        self._pushed_sr = frame.sample_rate

        if self._needed_sr and self._needed_sr != frame.sample_rate:
            if not self._resampler:
                self._resampler = rtc.AudioResampler(
                    frame.sample_rate,
                    self._needed_sr,
                    quality=rtc.AudioResamplerQuality.HIGH,
                )

        if self._resampler:
            frames = self._resampler.push(frame)
            for frame in frames:
                self._input_ch.send_nowait(frame)
        else:
            self._input_ch.send_nowait(frame)

    def flush(self) -> None:
        """Mark the end of the current segment"""
        self._check_input_not_ended()
        self._check_not_closed()

        if self._resampler:
            for frame in self._resampler.flush():
                self._input_ch.send_nowait(frame)

        self._input_ch.send_nowait(self._FlushSentinel())

    def end_input(self) -> None:
        """Mark the end of input, no more audio will be pushed"""
        self.flush()
        self._input_ch.close()

    async def aclose(self) -> None:
        """Close ths stream immediately"""
        self._input_ch.close()
        await aio.cancel_and_wait(self._task)

        if self._metrics_task is not None:
            await self._metrics_task

    async def __anext__(self) -> SpeechEvent:
        try:
            val = await self._event_aiter.__anext__()
        except StopAsyncIteration:
            if not self._task.cancelled() and (exc := self._task.exception()):
                raise exc  # noqa: B904

            raise StopAsyncIteration from None

        return val

    def __aiter__(self) -> AsyncIterator[SpeechEvent]:
        return self

    def _check_not_closed(self) -> None:
        if self._event_ch.closed:
            cls = type(self)
            raise RuntimeError(f"{cls.__module__}.{cls.__name__} is closed")

    def _check_input_not_ended(self) -> None:
        if self._input_ch.closed:
            cls = type(self)
            raise RuntimeError(f"{cls.__module__}.{cls.__name__} input ended")

    async def __aenter__(self) -> RecognizeStream:
        return self

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> None:
        await self.aclose()

Helper class that provides a standard way to create an ABC using inheritance.

Args: sample_rate : int or None, optional The desired sample rate for the audio input. If specified, the audio input will be automatically resampled to match the given sample rate before being processed for Speech-to-Text. If not provided (None), the input will retain its original sample rate.

Ancestors

  • abc.ABC

Subclasses

  • livekit.agents.stt.fallback_adapter.FallbackRecognizeStream
  • livekit.agents.stt.stream_adapter.StreamAdapterWrapper
  • livekit.plugins.assemblyai.stt.SpeechStream
  • livekit.plugins.aws.stt.SpeechStream
  • livekit.plugins.azure.stt.SpeechStream
  • livekit.plugins.baseten.stt.SpeechStream
  • livekit.plugins.cartesia.stt.SpeechStream
  • livekit.plugins.deepgram.stt.SpeechStream
  • livekit.plugins.gladia.stt.SpeechStream
  • livekit.plugins.google.stt.SpeechStream
  • livekit.plugins.openai.stt.SpeechStream
  • livekit.plugins.speechmatics.stt.SpeechStream

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    """Close ths stream immediately"""
    self._input_ch.close()
    await aio.cancel_and_wait(self._task)

    if self._metrics_task is not None:
        await self._metrics_task

Close ths stream immediately

def end_input(self) ‑> None
Expand source code
def end_input(self) -> None:
    """Mark the end of input, no more audio will be pushed"""
    self.flush()
    self._input_ch.close()

Mark the end of input, no more audio will be pushed

def flush(self) ‑> None
Expand source code
def flush(self) -> None:
    """Mark the end of the current segment"""
    self._check_input_not_ended()
    self._check_not_closed()

    if self._resampler:
        for frame in self._resampler.flush():
            self._input_ch.send_nowait(frame)

    self._input_ch.send_nowait(self._FlushSentinel())

Mark the end of the current segment

def push_frame(self, frame: rtc.AudioFrame) ‑> None
Expand source code
def push_frame(self, frame: rtc.AudioFrame) -> None:
    """Push audio to be recognized"""
    self._check_input_not_ended()
    self._check_not_closed()

    if self._pushed_sr and self._pushed_sr != frame.sample_rate:
        raise ValueError("the sample rate of the input frames must be consistent")

    self._pushed_sr = frame.sample_rate

    if self._needed_sr and self._needed_sr != frame.sample_rate:
        if not self._resampler:
            self._resampler = rtc.AudioResampler(
                frame.sample_rate,
                self._needed_sr,
                quality=rtc.AudioResamplerQuality.HIGH,
            )

    if self._resampler:
        frames = self._resampler.push(frame)
        for frame in frames:
            self._input_ch.send_nowait(frame)
    else:
        self._input_ch.send_nowait(frame)

Push audio to be recognized

class STT (*,
capabilities: STTCapabilities)
Expand source code
class STT(
    ABC,
    rtc.EventEmitter[Union[Literal["metrics_collected", "error"], TEvent]],
    Generic[TEvent],
):
    def __init__(self, *, capabilities: STTCapabilities) -> None:
        super().__init__()
        self._capabilities = capabilities
        self._label = f"{type(self).__module__}.{type(self).__name__}"

    @property
    def label(self) -> str:
        return self._label

    @property
    def capabilities(self) -> STTCapabilities:
        return self._capabilities

    @abstractmethod
    async def _recognize_impl(
        self,
        buffer: AudioBuffer,
        *,
        language: NotGivenOr[str] = NOT_GIVEN,
        conn_options: APIConnectOptions,
    ) -> SpeechEvent: ...

    async def recognize(
        self,
        buffer: AudioBuffer,
        *,
        language: NotGivenOr[str] = NOT_GIVEN,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    ) -> SpeechEvent:
        for i in range(conn_options.max_retry + 1):
            try:
                start_time = time.perf_counter()
                event = await self._recognize_impl(
                    buffer, language=language, conn_options=conn_options
                )
                duration = time.perf_counter() - start_time
                stt_metrics = STTMetrics(
                    request_id=event.request_id,
                    timestamp=time.time(),
                    duration=duration,
                    label=self._label,
                    audio_duration=calculate_audio_duration(buffer),
                    streamed=False,
                )
                self.emit("metrics_collected", stt_metrics)
                return event

            except APIError as e:
                retry_interval = conn_options._interval_for_retry(i)
                if conn_options.max_retry == 0:
                    self._emit_error(e, recoverable=False)
                    raise
                elif i == conn_options.max_retry:
                    self._emit_error(e, recoverable=False)
                    raise APIConnectionError(
                        f"failed to recognize speech after {conn_options.max_retry + 1} attempts",
                    ) from e
                else:
                    self._emit_error(e, recoverable=True)
                    logger.warning(
                        f"failed to recognize speech, retrying in {retry_interval}s",
                        exc_info=e,
                        extra={
                            "tts": self._label,
                            "attempt": i + 1,
                            "streamed": False,
                        },
                    )

                await asyncio.sleep(retry_interval)

            except Exception as e:
                self._emit_error(e, recoverable=False)
                raise

        raise RuntimeError("unreachable")

    def _emit_error(self, api_error: Exception, recoverable: bool) -> None:
        self.emit(
            "error",
            STTError(
                timestamp=time.time(),
                label=self._label,
                error=api_error,
                recoverable=recoverable,
            ),
        )

    def stream(
        self,
        *,
        language: NotGivenOr[str] = NOT_GIVEN,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    ) -> RecognizeStream:
        raise NotImplementedError(
            "streaming is not supported by this STT, please use a different STT or use a StreamAdapter"  # noqa: E501
        )

    async def aclose(self) -> None:
        """Close the STT, and every stream/requests associated with it"""
        ...

    async def __aenter__(self) -> STT:
        return self

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> None:
        await self.aclose()

    def prewarm(self) -> None:
        """Pre-warm connection to the STT service"""
        pass

Helper class that provides a standard way to create an ABC using inheritance.

Ancestors

Subclasses

  • livekit.agents.stt.fallback_adapter.FallbackAdapter
  • livekit.agents.stt.stream_adapter.StreamAdapter
  • livekit.plugins.assemblyai.stt.STT
  • livekit.plugins.aws.stt.STT
  • livekit.plugins.azure.stt.STT
  • livekit.plugins.baseten.stt.STT
  • livekit.plugins.cartesia.stt.STT
  • livekit.plugins.clova.stt.STT
  • livekit.plugins.deepgram.stt.STT
  • livekit.plugins.elevenlabs.stt.STT
  • livekit.plugins.fal.stt.WizperSTT
  • livekit.plugins.gladia.stt.STT
  • livekit.plugins.google.stt.STT
  • livekit.plugins.mistralai.stt.STT
  • livekit.plugins.openai.stt.STT
  • livekit.plugins.sarvam.stt.STT
  • livekit.plugins.speechmatics.stt.STT
  • livekit.plugins.spitch.stt.STT

Instance variables

prop capabilitiesSTTCapabilities
Expand source code
@property
def capabilities(self) -> STTCapabilities:
    return self._capabilities
prop label : str
Expand source code
@property
def label(self) -> str:
    return self._label

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    """Close the STT, and every stream/requests associated with it"""
    ...

Close the STT, and every stream/requests associated with it

def prewarm(self) ‑> None
Expand source code
def prewarm(self) -> None:
    """Pre-warm connection to the STT service"""
    pass

Pre-warm connection to the STT service

async def recognize(self,
buffer: AudioBuffer,
*,
language: NotGivenOr[str] = NOT_GIVEN,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.agents.stt.stt.SpeechEvent
Expand source code
async def recognize(
    self,
    buffer: AudioBuffer,
    *,
    language: NotGivenOr[str] = NOT_GIVEN,
    conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
) -> SpeechEvent:
    for i in range(conn_options.max_retry + 1):
        try:
            start_time = time.perf_counter()
            event = await self._recognize_impl(
                buffer, language=language, conn_options=conn_options
            )
            duration = time.perf_counter() - start_time
            stt_metrics = STTMetrics(
                request_id=event.request_id,
                timestamp=time.time(),
                duration=duration,
                label=self._label,
                audio_duration=calculate_audio_duration(buffer),
                streamed=False,
            )
            self.emit("metrics_collected", stt_metrics)
            return event

        except APIError as e:
            retry_interval = conn_options._interval_for_retry(i)
            if conn_options.max_retry == 0:
                self._emit_error(e, recoverable=False)
                raise
            elif i == conn_options.max_retry:
                self._emit_error(e, recoverable=False)
                raise APIConnectionError(
                    f"failed to recognize speech after {conn_options.max_retry + 1} attempts",
                ) from e
            else:
                self._emit_error(e, recoverable=True)
                logger.warning(
                    f"failed to recognize speech, retrying in {retry_interval}s",
                    exc_info=e,
                    extra={
                        "tts": self._label,
                        "attempt": i + 1,
                        "streamed": False,
                    },
                )

            await asyncio.sleep(retry_interval)

        except Exception as e:
            self._emit_error(e, recoverable=False)
            raise

    raise RuntimeError("unreachable")
def stream(self,
*,
language: NotGivenOr[str] = NOT_GIVEN,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.agents.stt.stt.RecognizeStream
Expand source code
def stream(
    self,
    *,
    language: NotGivenOr[str] = NOT_GIVEN,
    conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
) -> RecognizeStream:
    raise NotImplementedError(
        "streaming is not supported by this STT, please use a different STT or use a StreamAdapter"  # noqa: E501
    )

Inherited members

class STTCapabilities (streaming: bool, interim_results: bool)
Expand source code
@dataclass
class STTCapabilities:
    streaming: bool
    interim_results: bool

STTCapabilities(streaming: 'bool', interim_results: 'bool')

Instance variables

var interim_results : bool
var streaming : bool
class STTError (**data: Any)
Expand source code
class STTError(BaseModel):
    model_config = ConfigDict(arbitrary_types_allowed=True)
    type: Literal["stt_error"] = "stt_error"
    timestamp: float
    label: str
    error: Exception = Field(..., exclude=True)
    recoverable: bool

Usage docs: https://docs.pydantic.dev/2.10/concepts/models/

A base class for creating Pydantic models.

Attributes

__class_vars__
The names of the class variables defined on the model.
__private_attributes__
Metadata about the private attributes of the model.
__signature__
The synthesized __init__ [Signature][inspect.Signature] of the model.
__pydantic_complete__
Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__
The core schema of the model.
__pydantic_custom_init__
Whether the model has a custom __init__ function.
__pydantic_decorators__
Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
__pydantic_generic_metadata__
Metadata for generic models; contains data used for a similar purpose to args, origin, parameters in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__
Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__
The name of the post-init method for the model, if defined.
__pydantic_root_model__
Whether the model is a [RootModel][pydantic.root_model.RootModel].
__pydantic_serializer__
The pydantic-core SchemaSerializer used to dump instances of the model.
__pydantic_validator__
The pydantic-core SchemaValidator used to validate instances of the model.
__pydantic_fields__
A dictionary of field names and their corresponding [FieldInfo][pydantic.fields.FieldInfo] objects.
__pydantic_computed_fields__
A dictionary of computed field names and their corresponding [ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects.
__pydantic_extra__
A dictionary containing extra values, if [extra][pydantic.config.ConfigDict.extra] is set to 'allow'.
__pydantic_fields_set__
The names of fields explicitly set during instantiation.
__pydantic_private__
Values of private attributes set on the model instance.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • pydantic.main.BaseModel

Class variables

var error : Exception
var label : str
var model_config
var recoverable : bool
var timestamp : float
var type : Literal['stt_error']
class SpeechData (language: str,
text: str,
start_time: float = 0.0,
end_time: float = 0.0,
confidence: float = 0.0,
speaker_id: str | None = None)
Expand source code
@dataclass
class SpeechData:
    language: str
    text: str
    start_time: float = 0.0
    end_time: float = 0.0
    confidence: float = 0.0  # [0, 1]
    speaker_id: str | None = None

SpeechData(language: 'str', text: 'str', start_time: 'float' = 0.0, end_time: 'float' = 0.0, confidence: 'float' = 0.0, speaker_id: 'str | None' = None)

Instance variables

var confidence : float
var end_time : float
var language : str
var speaker_id : str | None
var start_time : float
var text : str
class SpeechEvent (type: SpeechEventType,
request_id: str = '',
alternatives: list[SpeechData] = <factory>,
recognition_usage: RecognitionUsage | None = None)
Expand source code
@dataclass
class SpeechEvent:
    type: SpeechEventType
    request_id: str = ""
    alternatives: list[SpeechData] = field(default_factory=list)
    recognition_usage: RecognitionUsage | None = None

SpeechEvent(type: 'SpeechEventType', request_id: 'str' = '', alternatives: 'list[SpeechData]' = , recognition_usage: 'RecognitionUsage | None' = None)

Instance variables

var alternatives : list[livekit.agents.stt.stt.SpeechData]
var recognition_usage : livekit.agents.stt.stt.RecognitionUsage | None
var request_id : str
var type : livekit.agents.stt.stt.SpeechEventType
class SpeechEventType (*args, **kwds)
Expand source code
@unique
class SpeechEventType(str, Enum):
    START_OF_SPEECH = "start_of_speech"
    """indicate the start of speech
    if the STT doesn't support this event, this will be emitted as the same time as the first INTERIM_TRANSCRIPT"""  # noqa: E501
    INTERIM_TRANSCRIPT = "interim_transcript"
    """interim transcript, useful for real-time transcription"""
    FINAL_TRANSCRIPT = "final_transcript"
    """final transcript, emitted when the STT is confident enough that a certain
    portion of speech will not change"""
    RECOGNITION_USAGE = "recognition_usage"
    """usage event, emitted periodically to indicate usage metrics"""
    END_OF_SPEECH = "end_of_speech"
    """indicate the end of speech, emitted when the user stops speaking"""

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

Ancestors

  • builtins.str
  • enum.Enum

Class variables

var END_OF_SPEECH

indicate the end of speech, emitted when the user stops speaking

var FINAL_TRANSCRIPT

final transcript, emitted when the STT is confident enough that a certain portion of speech will not change

var INTERIM_TRANSCRIPT

interim transcript, useful for real-time transcription

var RECOGNITION_USAGE

usage event, emitted periodically to indicate usage metrics

var START_OF_SPEECH

indicate the start of speech if the STT doesn't support this event, this will be emitted as the same time as the first INTERIM_TRANSCRIPT

class StreamAdapter (*,
stt: STT,
vad: VAD)
Expand source code
class StreamAdapter(STT):
    def __init__(self, *, stt: STT, vad: VAD) -> None:
        super().__init__(capabilities=STTCapabilities(streaming=True, interim_results=False))
        self._vad = vad
        self._stt = stt

        @self._stt.on("metrics_collected")
        def _forward_metrics(*args: Any, **kwargs: Any) -> None:
            self.emit("metrics_collected", *args, **kwargs)

    @property
    def wrapped_stt(self) -> STT:
        return self._stt

    async def _recognize_impl(
        self,
        buffer: utils.AudioBuffer,
        *,
        language: NotGivenOr[str] = NOT_GIVEN,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    ) -> SpeechEvent:
        return await self._stt.recognize(
            buffer=buffer, language=language, conn_options=conn_options
        )

    def stream(
        self,
        *,
        language: NotGivenOr[str] = NOT_GIVEN,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    ) -> RecognizeStream:
        return StreamAdapterWrapper(
            self,
            vad=self._vad,
            wrapped_stt=self._stt,
            language=language,
            conn_options=conn_options,
        )

Helper class that provides a standard way to create an ABC using inheritance.

Ancestors

  • livekit.agents.stt.stt.STT
  • abc.ABC
  • EventEmitter
  • typing.Generic

Instance variables

prop wrapped_sttSTT
Expand source code
@property
def wrapped_stt(self) -> STT:
    return self._stt

Methods

def stream(self,
*,
language: NotGivenOr[str] = NOT_GIVEN,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.agents.stt.stt.RecognizeStream
Expand source code
def stream(
    self,
    *,
    language: NotGivenOr[str] = NOT_GIVEN,
    conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
) -> RecognizeStream:
    return StreamAdapterWrapper(
        self,
        vad=self._vad,
        wrapped_stt=self._stt,
        language=language,
        conn_options=conn_options,
    )

Inherited members

class StreamAdapterWrapper (stt: STT,
*,
vad: VAD,
wrapped_stt: STT,
language: NotGivenOr[str],
conn_options: APIConnectOptions)
Expand source code
class StreamAdapterWrapper(RecognizeStream):
    def __init__(
        self,
        stt: STT,
        *,
        vad: VAD,
        wrapped_stt: STT,
        language: NotGivenOr[str],
        conn_options: APIConnectOptions,
    ) -> None:
        super().__init__(stt=stt, conn_options=DEFAULT_STREAM_ADAPTER_API_CONNECT_OPTIONS)
        self._vad = vad
        self._wrapped_stt = wrapped_stt
        self._wrapped_stt_conn_options = conn_options
        self._language = language

    async def _metrics_monitor_task(self, event_aiter: AsyncIterable[SpeechEvent]) -> None:
        pass  # do nothing

    async def _run(self) -> None:
        vad_stream = self._vad.stream()

        async def _forward_input() -> None:
            """forward input to vad"""
            async for input in self._input_ch:
                if isinstance(input, self._FlushSentinel):
                    vad_stream.flush()
                    continue
                vad_stream.push_frame(input)

            vad_stream.end_input()

        async def _recognize() -> None:
            """recognize speech from vad"""
            async for event in vad_stream:
                if event.type == VADEventType.START_OF_SPEECH:
                    self._event_ch.send_nowait(SpeechEvent(SpeechEventType.START_OF_SPEECH))
                elif event.type == VADEventType.END_OF_SPEECH:
                    self._event_ch.send_nowait(
                        SpeechEvent(
                            type=SpeechEventType.END_OF_SPEECH,
                        )
                    )

                    merged_frames = utils.merge_frames(event.frames)
                    t_event = await self._wrapped_stt.recognize(
                        buffer=merged_frames,
                        language=self._language,
                        conn_options=self._wrapped_stt_conn_options,
                    )

                    if len(t_event.alternatives) == 0:
                        continue
                    elif not t_event.alternatives[0].text:
                        continue

                    self._event_ch.send_nowait(
                        SpeechEvent(
                            type=SpeechEventType.FINAL_TRANSCRIPT,
                            alternatives=[t_event.alternatives[0]],
                        )
                    )

        tasks = [
            asyncio.create_task(_forward_input(), name="forward_input"),
            asyncio.create_task(_recognize(), name="recognize"),
        ]
        try:
            await asyncio.gather(*tasks)
        finally:
            await utils.aio.cancel_and_wait(*tasks)
            await vad_stream.aclose()

Helper class that provides a standard way to create an ABC using inheritance.

Args: sample_rate : int or None, optional The desired sample rate for the audio input. If specified, the audio input will be automatically resampled to match the given sample rate before being processed for Speech-to-Text. If not provided (None), the input will retain its original sample rate.

Ancestors

  • livekit.agents.stt.stt.RecognizeStream
  • abc.ABC