Module livekit.agents.stt.stream_adapter

Classes

class StreamAdapter (*, stt: STT, vad: VAD)

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code
class StreamAdapter(STT):
    def __init__(self, *, stt: STT, vad: VAD) -> None:
        super().__init__(
            capabilities=STTCapabilities(streaming=True, interim_results=False)
        )
        self._vad = vad
        self._stt = stt

    @property
    def wrapped_stt(self) -> STT:
        return self._stt

    async def recognize(
        self, buffer: utils.AudioBuffer, *, language: str | None = None
    ):
        return await self._stt.recognize(buffer=buffer, language=language)

    def stream(self, *, language: str | None = None) -> SpeechStream:
        return StreamAdapterWrapper(self._vad, self._stt, language=language)

Ancestors

Instance variables

prop wrapped_stt : STT
Expand source code
@property
def wrapped_stt(self) -> STT:
    return self._stt

Methods

async def recognize(self, buffer: utils.AudioBuffer, *, language: str | None = None)
def stream(self, *, language: str | None = None) ‑> SpeechStream

Inherited members

class StreamAdapterWrapper (vad: VAD, stt: STT, *args: Any, **kwargs: Any)

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code
class StreamAdapterWrapper(SpeechStream):
    def __init__(self, vad: VAD, stt: STT, *args: Any, **kwargs: Any) -> None:
        super().__init__()
        self._vad = vad
        self._stt = stt
        self._vad_stream = self._vad.stream()
        self._args = args
        self._kwargs = kwargs

    @utils.log_exceptions(logger=logger)
    async def _main_task(self) -> None:
        async def _forward_input():
            """forward input to vad"""
            async for input in self._input_ch:
                if isinstance(input, self._FlushSentinel):
                    self._vad_stream.flush()
                    continue
                self._vad_stream.push_frame(input)

            self._vad_stream.end_input()

        async def _recognize():
            """recognize speech from vad"""
            async for event in self._vad_stream:
                if event.type == VADEventType.START_OF_SPEECH:
                    self._event_ch.send_nowait(
                        SpeechEvent(SpeechEventType.START_OF_SPEECH)
                    )
                elif event.type == VADEventType.END_OF_SPEECH:
                    self._event_ch.send_nowait(
                        SpeechEvent(
                            type=SpeechEventType.END_OF_SPEECH,
                        )
                    )

                    merged_frames = utils.merge_frames(event.frames)
                    t_event = await self._stt.recognize(
                        buffer=merged_frames, *self._args, **self._kwargs
                    )

                    if len(t_event.alternatives) == 0:
                        continue
                    elif not t_event.alternatives[0].text:
                        continue

                    self._event_ch.send_nowait(
                        SpeechEvent(
                            type=SpeechEventType.FINAL_TRANSCRIPT,
                            alternatives=[t_event.alternatives[0]],
                        )
                    )

        tasks = [
            asyncio.create_task(_forward_input(), name="forward_input"),
            asyncio.create_task(_recognize(), name="recognize"),
        ]
        try:
            await asyncio.gather(*tasks)
        finally:
            await utils.aio.gracefully_cancel(*tasks)

Ancestors

Inherited members