Module livekit.agents.stt

Sub-modules

livekit.agents.stt.stream_adapter
livekit.agents.stt.stt

Classes

class RecognitionUsage (audio_duration: float)

RecognitionUsage(audio_duration: 'float')

Expand source code
@dataclass
class RecognitionUsage:
    audio_duration: float

Class variables

var audio_duration : float
class STT (*, capabilities: STTCapabilities)

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code
class STT(ABC, rtc.EventEmitter[Literal["metrics_collected"]]):
    def __init__(self, *, capabilities: STTCapabilities) -> None:
        super().__init__()
        self._capabilities = capabilities
        self._label = f"{type(self).__module__}.{type(self).__name__}"

    @property
    def capabilities(self) -> STTCapabilities:
        return self._capabilities

    @abstractmethod
    async def _recognize_impl(
        self, buffer: AudioBuffer, *, language: str | None = None
    ) -> SpeechEvent: ...

    async def recognize(
        self, buffer: AudioBuffer, *, language: str | None = None
    ) -> SpeechEvent:
        start_time = time.perf_counter()
        event = await self._recognize_impl(buffer, language=language)
        duration = time.perf_counter() - start_time
        stt_metrics = STTMetrics(
            request_id=event.request_id,
            timestamp=time.time(),
            duration=duration,
            label=self._label,
            audio_duration=calculate_audio_duration(buffer),
            streamed=False,
            error=None,
        )
        self.emit("metrics_collected", stt_metrics)
        return event

    def stream(self, *, language: str | None = None) -> "SpeechStream":
        raise NotImplementedError(
            "streaming is not supported by this STT, please use a different STT or use a StreamAdapter"
        )

    async def aclose(self) -> None:
        """Close the STT, and every stream/requests associated with it"""
        ...

Ancestors

Subclasses

  • StreamAdapter
  • livekit.plugins.azure.stt.STT
  • livekit.plugins.deepgram.stt.STT
  • livekit.plugins.google.stt.STT
  • livekit.plugins.openai.stt.STT

Instance variables

prop capabilitiesSTTCapabilities
Expand source code
@property
def capabilities(self) -> STTCapabilities:
    return self._capabilities

Methods

async def aclose(self) ‑> None

Close the STT, and every stream/requests associated with it

async def recognize(self, buffer: AudioBuffer, *, language: str | None = None) ‑> SpeechEvent
def stream(self, *, language: str | None = None) ‑> SpeechStream

Inherited members

class STTCapabilities (streaming: bool, interim_results: bool)

STTCapabilities(streaming: 'bool', interim_results: 'bool')

Expand source code
@dataclass
class STTCapabilities:
    streaming: bool
    interim_results: bool

Class variables

var interim_results : bool
var streaming : bool
class SpeechData (language: str, text: str, start_time: float = 0.0, end_time: float = 0.0, confidence: float = 0.0)

SpeechData(language: 'str', text: 'str', start_time: 'float' = 0.0, end_time: 'float' = 0.0, confidence: 'float' = 0.0)

Expand source code
@dataclass
class SpeechData:
    language: str
    text: str
    start_time: float = 0.0
    end_time: float = 0.0
    confidence: float = 0.0  # [0, 1]

Class variables

var confidence : float
var end_time : float
var language : str
var start_time : float
var text : str
class SpeechEvent (type: SpeechEventType, request_id: str = '', alternatives: List[SpeechData] = <factory>, recognition_usage: RecognitionUsage | None = None)

SpeechEvent(type: 'SpeechEventType', request_id: 'str' = '', alternatives: 'List[SpeechData]' = , recognition_usage: 'RecognitionUsage | None' = None)

Expand source code
@dataclass
class SpeechEvent:
    type: SpeechEventType
    request_id: str = ""
    alternatives: List[SpeechData] = field(default_factory=list)
    recognition_usage: RecognitionUsage | None = None

Class variables

var alternatives : List[SpeechData]
var recognition_usageRecognitionUsage | None
var request_id : str
var typeSpeechEventType
class SpeechEventType (*args, **kwds)

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

Expand source code
@unique
class SpeechEventType(str, Enum):
    START_OF_SPEECH = "start_of_speech"
    """indicate the start of speech
    if the STT doesn't support this event, this will be emitted as the same time as the first INTERIM_TRANSCRIPT"""
    INTERIM_TRANSCRIPT = "interim_transcript"
    """interim transcript, useful for real-time transcription"""
    FINAL_TRANSCRIPT = "final_transcript"
    """final transcript, emitted when the STT is confident enough that a certain
    portion of speech will not change"""
    RECOGNITION_USAGE = "recognition_usage"
    """usage event, emitted periodically to indicate usage metrics"""
    END_OF_SPEECH = "end_of_speech"
    """indicate the end of speech, emitted when the user stops speaking"""

Ancestors

  • builtins.str
  • enum.Enum

Class variables

var END_OF_SPEECH

indicate the end of speech, emitted when the user stops speaking

var FINAL_TRANSCRIPT

final transcript, emitted when the STT is confident enough that a certain portion of speech will not change

var INTERIM_TRANSCRIPT

interim transcript, useful for real-time transcription

var RECOGNITION_USAGE

usage event, emitted periodically to indicate usage metrics

var START_OF_SPEECH

indicate the start of speech if the STT doesn't support this event, this will be emitted as the same time as the first INTERIM_TRANSCRIPT

class SpeechStream (stt: STT, *, sample_rate: int | None = None)

Helper class that provides a standard way to create an ABC using inheritance.

Args: sample_rate : int or None, optional The desired sample rate for the audio input. If specified, the audio input will be automatically resampled to match the given sample rate before being processed for Speech-to-Text. If not provided (None), the input will retain its original sample rate.

Expand source code
class SpeechStream(ABC):
    class _FlushSentinel:
        """Sentinel to mark when it was flushed"""

        pass

    def __init__(self, stt: STT, *, sample_rate: int | None = None):
        """
        Args:
        sample_rate : int or None, optional
            The desired sample rate for the audio input.
            If specified, the audio input will be automatically resampled to match
            the given sample rate before being processed for Speech-to-Text.
            If not provided (None), the input will retain its original sample rate.
        """
        self._stt = stt
        self._input_ch = aio.Chan[Union[rtc.AudioFrame, SpeechStream._FlushSentinel]]()
        self._event_ch = aio.Chan[SpeechEvent]()

        self._event_aiter, monitor_aiter = aio.itertools.tee(self._event_ch, 2)
        self._metrics_task = asyncio.create_task(
            self._metrics_monitor_task(monitor_aiter), name="STT._metrics_task"
        )

        self._task = asyncio.create_task(self._main_task())
        self._task.add_done_callback(lambda _: self._event_ch.close())

        self._needed_sr = sample_rate
        self._pushed_sr = 0
        self._resampler: rtc.AudioResampler | None = None

    @abstractmethod
    async def _main_task(self) -> None: ...

    async def _metrics_monitor_task(
        self, event_aiter: AsyncIterable[SpeechEvent]
    ) -> None:
        """Task used to collect metrics"""

        start_time = time.perf_counter()

        async for ev in event_aiter:
            if ev.type == SpeechEventType.RECOGNITION_USAGE:
                assert (
                    ev.recognition_usage is not None
                ), "recognition_usage must be provided for RECOGNITION_USAGE event"

                duration = time.perf_counter() - start_time
                stt_metrics = STTMetrics(
                    request_id=ev.request_id,
                    timestamp=time.time(),
                    duration=duration,
                    label=self._stt._label,
                    audio_duration=ev.recognition_usage.audio_duration,
                    streamed=True,
                    error=None,
                )

                self._stt.emit("metrics_collected", stt_metrics)

    def push_frame(self, frame: rtc.AudioFrame) -> None:
        """Push audio to be recognized"""
        self._check_input_not_ended()
        self._check_not_closed()

        if self._pushed_sr and self._pushed_sr != frame.sample_rate:
            raise ValueError("the sample rate of the input frames must be consistent")

        self._pushed_sr = frame.sample_rate

        if self._needed_sr and self._needed_sr != frame.sample_rate:
            if not self._resampler:
                self._resampler = rtc.AudioResampler(
                    frame.sample_rate,
                    self._needed_sr,
                    quality=rtc.AudioResamplerQuality.HIGH,
                )

        if self._resampler:
            for frame in self._resampler.push(frame):
                self._input_ch.send_nowait(frame)
        else:
            self._input_ch.send_nowait(frame)

    def flush(self) -> None:
        """Mark the end of the current segment"""
        self._check_input_not_ended()
        self._check_not_closed()

        if self._resampler:
            for frame in self._resampler.flush():
                self._input_ch.send_nowait(frame)

        self._input_ch.send_nowait(self._FlushSentinel())

    def end_input(self) -> None:
        """Mark the end of input, no more text will be pushed"""
        self.flush()
        self._input_ch.close()

    async def aclose(self) -> None:
        """Close ths stream immediately"""
        self._input_ch.close()
        await aio.gracefully_cancel(self._task)

        if self._metrics_task is not None:
            await self._metrics_task

    async def __anext__(self) -> SpeechEvent:
        try:
            val = await self._event_aiter.__anext__()
        except StopAsyncIteration:
            if self._task.done() and (exc := self._task.exception()):
                raise exc from None

            raise StopAsyncIteration

        return val

    def __aiter__(self) -> AsyncIterator[SpeechEvent]:
        return self

    def _check_not_closed(self) -> None:
        if self._event_ch.closed:
            cls = type(self)
            raise RuntimeError(f"{cls.__module__}.{cls.__name__} is closed")

    def _check_input_not_ended(self) -> None:
        if self._input_ch.closed:
            cls = type(self)
            raise RuntimeError(f"{cls.__module__}.{cls.__name__} input ended")

Ancestors

  • abc.ABC

Subclasses

  • StreamAdapterWrapper
  • livekit.plugins.azure.stt.SpeechStream
  • livekit.plugins.deepgram.stt.SpeechStream
  • livekit.plugins.google.stt.SpeechStream

Methods

async def aclose(self) ‑> None

Close ths stream immediately

def end_input(self) ‑> None

Mark the end of input, no more text will be pushed

def flush(self) ‑> None

Mark the end of the current segment

def push_frame(self, frame: rtc.AudioFrame) ‑> None

Push audio to be recognized

class StreamAdapter (*, stt: STT, vad: VAD)

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code
class StreamAdapter(STT):
    def __init__(self, *, stt: STT, vad: VAD) -> None:
        super().__init__(
            capabilities=STTCapabilities(streaming=True, interim_results=False)
        )
        self._vad = vad
        self._stt = stt

        @self._stt.on("metrics_collected")
        def _forward_metrics(*args, **kwargs):
            self.emit("metrics_collected", *args, **kwargs)

    @property
    def wrapped_stt(self) -> STT:
        return self._stt

    async def _recognize_impl(
        self, buffer: utils.AudioBuffer, *, language: str | None = None
    ):
        return await self._stt.recognize(buffer=buffer, language=language)

    def stream(self, *, language: str | None = None) -> SpeechStream:
        return StreamAdapterWrapper(
            self, vad=self._vad, wrapped_stt=self._stt, language=language
        )

Ancestors

Instance variables

prop wrapped_sttSTT
Expand source code
@property
def wrapped_stt(self) -> STT:
    return self._stt

Methods

def stream(self, *, language: str | None = None) ‑> SpeechStream

Inherited members

class StreamAdapterWrapper (stt: STT, *, vad: VAD, wrapped_stt: STT, language: str | None)

Helper class that provides a standard way to create an ABC using inheritance.

Args: sample_rate : int or None, optional The desired sample rate for the audio input. If specified, the audio input will be automatically resampled to match the given sample rate before being processed for Speech-to-Text. If not provided (None), the input will retain its original sample rate.

Expand source code
class StreamAdapterWrapper(SpeechStream):
    def __init__(
        self, stt: STT, *, vad: VAD, wrapped_stt: STT, language: str | None
    ) -> None:
        super().__init__(stt)
        self._vad = vad
        self._wrapped_stt = wrapped_stt
        self._vad_stream = self._vad.stream()
        self._language = language

    async def _metrics_monitor_task(
        self, event_aiter: AsyncIterable[SpeechEvent]
    ) -> None:
        pass  # do nothing

    @utils.log_exceptions(logger=logger)
    async def _main_task(self) -> None:
        async def _forward_input():
            """forward input to vad"""
            async for input in self._input_ch:
                if isinstance(input, self._FlushSentinel):
                    self._vad_stream.flush()
                    continue
                self._vad_stream.push_frame(input)

            self._vad_stream.end_input()

        async def _recognize():
            """recognize speech from vad"""
            async for event in self._vad_stream:
                if event.type == VADEventType.START_OF_SPEECH:
                    self._event_ch.send_nowait(
                        SpeechEvent(SpeechEventType.START_OF_SPEECH)
                    )
                elif event.type == VADEventType.END_OF_SPEECH:
                    self._event_ch.send_nowait(
                        SpeechEvent(
                            type=SpeechEventType.END_OF_SPEECH,
                        )
                    )

                    merged_frames = utils.merge_frames(event.frames)
                    t_event = await self._wrapped_stt.recognize(
                        buffer=merged_frames, language=self._language
                    )

                    if len(t_event.alternatives) == 0:
                        continue
                    elif not t_event.alternatives[0].text:
                        continue

                    self._event_ch.send_nowait(
                        SpeechEvent(
                            type=SpeechEventType.FINAL_TRANSCRIPT,
                            alternatives=[t_event.alternatives[0]],
                        )
                    )

        tasks = [
            asyncio.create_task(_forward_input(), name="forward_input"),
            asyncio.create_task(_recognize(), name="recognize"),
        ]
        try:
            await asyncio.gather(*tasks)
        finally:
            await utils.aio.gracefully_cancel(*tasks)

Ancestors

Inherited members