Module livekit.plugins.deepgram

Classes

class STT (*, model: DeepgramModels = 'nova-2-general', language: DeepgramLanguages = 'en-US', detect_language: bool = False, interim_results: bool = True, punctuate: bool = True, smart_format: bool = True, no_delay: bool = True, endpointing_ms: int = 25, filler_words: bool = False, keywords: list[Tuple[str, float]] = [], profanity_filter: bool = False, api_key: str | None = None, http_session: aiohttp.ClientSession | None = None)

Helper class that provides a standard way to create an ABC using inheritance.

Create a new instance of Deepgram STT.

api_key must be set to your Deepgram API key, either using the argument or by setting the DEEPGRAM_API_KEY environmental variable.

Expand source code
class STT(stt.STT):
    def __init__(
        self,
        *,
        model: DeepgramModels = "nova-2-general",
        language: DeepgramLanguages = "en-US",
        detect_language: bool = False,
        interim_results: bool = True,
        punctuate: bool = True,
        smart_format: bool = True,
        no_delay: bool = True,
        endpointing_ms: int = 25,
        filler_words: bool = False,
        keywords: list[Tuple[str, float]] = [],
        profanity_filter: bool = False,
        api_key: str | None = None,
        http_session: aiohttp.ClientSession | None = None,
    ) -> None:
        """
        Create a new instance of Deepgram STT.

        ``api_key`` must be set to your Deepgram API key, either using the argument or by setting
        the ``DEEPGRAM_API_KEY`` environmental variable.
        """

        super().__init__(
            capabilities=stt.STTCapabilities(
                streaming=True, interim_results=interim_results
            )
        )

        api_key = api_key or os.environ.get("DEEPGRAM_API_KEY")
        if api_key is None:
            raise ValueError("Deepgram API key is required")

        if language not in ("en-US", "en") and model in (
            "nova-2-meeting",
            "nova-2-phonecall",
            "nova-2-finance",
            "nova-2-conversationalai",
            "nova-2-voicemail",
            "nova-2-video",
            "nova-2-medical",
            "nova-2-drivethru",
            "nova-2-automotive",
        ):
            logger.warning(
                f"{model} does not support language {language}, falling back to nova-2-general"
            )
            model = "nova-2-general"

        self._api_key = api_key

        self._opts = STTOptions(
            language=language,
            detect_language=detect_language,
            interim_results=interim_results,
            punctuate=punctuate,
            model=model,
            smart_format=smart_format,
            no_delay=no_delay,
            endpointing_ms=endpointing_ms,
            filler_words=filler_words,
            sample_rate=48000,
            num_channels=1,
            keywords=keywords,
            profanity_filter=profanity_filter,
        )
        self._session = http_session

    def _ensure_session(self) -> aiohttp.ClientSession:
        if not self._session:
            self._session = utils.http_context.http_session()

        return self._session

    async def recognize(
        self, buffer: AudioBuffer, *, language: DeepgramLanguages | str | None = None
    ) -> stt.SpeechEvent:
        config = self._sanitize_options(language=language)

        recognize_config = {
            "model": str(config.model),
            "punctuate": config.punctuate,
            "detect_language": config.detect_language,
            "smart_format": config.smart_format,
            "keywords": self._opts.keywords,
            "profanity_filter": config.profanity_filter,
        }
        if config.language:
            recognize_config["language"] = config.language

        buffer = merge_frames(buffer)
        io_buffer = io.BytesIO()
        with wave.open(io_buffer, "wb") as wav:
            wav.setnchannels(buffer.num_channels)
            wav.setsampwidth(2)  # 16-bit
            wav.setframerate(buffer.sample_rate)
            wav.writeframes(buffer.data)

        data = io_buffer.getvalue()

        async with self._ensure_session().post(
            url=_to_deepgram_url(recognize_config),
            data=data,
            headers={
                "Authorization": f"Token {self._api_key}",
                "Accept": "application/json",
                "Content-Type": "audio/wav",
            },
        ) as res:
            return prerecorded_transcription_to_speech_event(
                config.language, await res.json()
            )

    def stream(
        self, *, language: DeepgramLanguages | str | None = None
    ) -> "SpeechStream":
        config = self._sanitize_options(language=language)
        return SpeechStream(config, self._api_key, self._ensure_session())

    def _sanitize_options(self, *, language: str | None = None) -> STTOptions:
        config = dataclasses.replace(self._opts)
        config.language = language or config.language

        if config.detect_language:
            config.language = None

        return config

Ancestors

Methods

async def recognize(self, buffer: AudioBuffer, *, language: DeepgramLanguages | str | None = None) ‑> SpeechEvent
def stream(self, *, language: DeepgramLanguages | str | None = None) ‑> livekit.plugins.deepgram.stt.SpeechStream

Inherited members

class SpeechStream (opts: STTOptions, api_key: str, http_session: aiohttp.ClientSession, max_retry: int = 32)

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code
class SpeechStream(stt.SpeechStream):
    _KEEPALIVE_MSG: str = json.dumps({"type": "KeepAlive"})
    _CLOSE_MSG: str = json.dumps({"type": "CloseStream"})

    def __init__(
        self,
        opts: STTOptions,
        api_key: str,
        http_session: aiohttp.ClientSession,
        max_retry: int = 32,
    ) -> None:
        super().__init__()

        if opts.detect_language and opts.language is None:
            raise ValueError("language detection is not supported in streaming mode")

        self._opts = opts
        self._api_key = api_key
        self._session = http_session
        self._speaking = False
        self._max_retry = max_retry
        self._audio_energy_filter = BasicAudioEnergyFilter(cooldown_seconds=1)

    @utils.log_exceptions(logger=logger)
    async def _main_task(self) -> None:
        await self._run(self._max_retry)

    async def _run(self, max_retry: int) -> None:
        """
        Run a single websocket connection to Deepgram and make sure to reconnect
        when something went wrong.
        """

        retry_count = 0
        while self._input_ch.qsize() or not self._input_ch.closed:
            try:
                live_config = {
                    "model": self._opts.model,
                    "punctuate": self._opts.punctuate,
                    "smart_format": self._opts.smart_format,
                    "no_delay": self._opts.no_delay,
                    "interim_results": self._opts.interim_results,
                    "encoding": "linear16",
                    "vad_events": True,
                    "sample_rate": self._opts.sample_rate,
                    "channels": self._opts.num_channels,
                    "endpointing": False
                    if self._opts.endpointing_ms == 0
                    else self._opts.endpointing_ms,
                    "filler_words": self._opts.filler_words,
                    "keywords": self._opts.keywords,
                    "profanity_filter": self._opts.profanity_filter,
                }

                if self._opts.language:
                    live_config["language"] = self._opts.language

                headers = {"Authorization": f"Token {self._api_key}"}
                ws = await self._session.ws_connect(
                    _to_deepgram_url(live_config, websocket=True), headers=headers
                )
                retry_count = 0  # connected successfully, reset the retry_count

                await self._run_ws(ws)
            except Exception as e:
                if self._session.closed:
                    break

                if retry_count >= max_retry:
                    logger.exception(
                        f"failed to connect to deepgram after {max_retry} tries"
                    )
                    break

                retry_delay = min(retry_count * 2, 10)  # max 10s
                retry_count += 1  # increment after calculating the delay, the first retry should happen directly

                logger.warning(
                    f"deepgram connection failed, retrying in {retry_delay}s",
                    exc_info=e,
                )
                await asyncio.sleep(retry_delay)

    async def _run_ws(self, ws: aiohttp.ClientWebSocketResponse) -> None:
        """This method could throw ws errors, these are handled inside the _run method"""

        closing_ws = False

        async def keepalive_task():
            # if we want to keep the connection alive even if no audio is sent,
            # Deepgram expects a keepalive message.
            # https://developers.deepgram.com/reference/listen-live#stream-keepalive
            try:
                while True:
                    await ws.send_str(SpeechStream._KEEPALIVE_MSG)
                    await asyncio.sleep(5)
            except Exception:
                return

        async def send_task():
            nonlocal closing_ws

            # forward audio to deepgram in chunks of 100ms
            samples_100ms = self._opts.sample_rate // 10
            audio_bstream = utils.audio.AudioByteStream(
                sample_rate=self._opts.sample_rate,
                num_channels=self._opts.num_channels,
                samples_per_channel=samples_100ms,
            )

            async for data in self._input_ch:
                if isinstance(data, self._FlushSentinel):
                    frames = audio_bstream.flush()
                else:
                    frames = audio_bstream.write(data.data.tobytes())

                for frame in frames:
                    has_audio = self._audio_energy_filter.push_frame(frame)
                    if has_audio:
                        await ws.send_bytes(frame.data.tobytes())

            # tell deepgram we are done sending audio/inputs
            closing_ws = True
            await ws.send_str(SpeechStream._CLOSE_MSG)

        async def recv_task():
            nonlocal closing_ws
            while True:
                msg = await ws.receive()
                if msg.type in (
                    aiohttp.WSMsgType.CLOSED,
                    aiohttp.WSMsgType.CLOSE,
                    aiohttp.WSMsgType.CLOSING,
                ):
                    if closing_ws:  # close is expected, see SpeechStream.aclose
                        return

                    # this will trigger a reconnection, see the _run loop
                    raise Exception("deepgram connection closed unexpectedly")

                if msg.type != aiohttp.WSMsgType.TEXT:
                    logger.warning("unexpected deepgram message type %s", msg.type)
                    continue

                try:
                    self._process_stream_event(json.loads(msg.data))
                except Exception:
                    logger.exception("failed to process deepgram message")

        tasks = [
            asyncio.create_task(send_task()),
            asyncio.create_task(recv_task()),
            asyncio.create_task(keepalive_task()),
        ]

        try:
            await asyncio.gather(*tasks)
        finally:
            await utils.aio.gracefully_cancel(*tasks)

    def _process_stream_event(self, data: dict) -> None:
        assert self._opts.language is not None

        if data["type"] == "SpeechStarted":
            # This is a normal case. Deepgram's SpeechStarted events
            # are not correlated with speech_final or utterance end.
            # It's possible that we receive two in a row without an endpoint
            # It's also possible we receive a transcript without a SpeechStarted event.
            if self._speaking:
                return

            self._speaking = True
            start_event = stt.SpeechEvent(type=stt.SpeechEventType.START_OF_SPEECH)
            self._event_ch.send_nowait(start_event)

        # see this page:
        # https://developers.deepgram.com/docs/understand-endpointing-interim-results#using-endpointing-speech_final
        # for more information about the different types of events
        elif data["type"] == "Results":
            is_final_transcript = data["is_final"]
            is_endpoint = data["speech_final"]

            alts = live_transcription_to_speech_data(self._opts.language, data)
            # If, for some reason, we didn't get a SpeechStarted event but we got
            # a transcript with text, we should start speaking. It's rare but has
            # been observed.
            if len(alts) > 0 and alts[0].text:
                if not self._speaking:
                    self._speaking = True
                    start_event = stt.SpeechEvent(
                        type=stt.SpeechEventType.START_OF_SPEECH
                    )
                    self._event_ch.send_nowait(start_event)

                if is_final_transcript:
                    final_event = stt.SpeechEvent(
                        type=stt.SpeechEventType.FINAL_TRANSCRIPT, alternatives=alts
                    )
                    self._event_ch.send_nowait(final_event)
                else:
                    interim_event = stt.SpeechEvent(
                        type=stt.SpeechEventType.INTERIM_TRANSCRIPT, alternatives=alts
                    )
                    self._event_ch.send_nowait(interim_event)

            # if we receive an endpoint, only end the speech if
            # we either had a SpeechStarted event or we have a seen
            # a non-empty transcript (deepgram doesn't have a SpeechEnded event)
            if is_endpoint and self._speaking:
                self._speaking = False
                self._event_ch.send_nowait(
                    stt.SpeechEvent(type=stt.SpeechEventType.END_OF_SPEECH)
                )

        elif data["type"] == "Metadata":
            pass  # metadata is too noisy
        else:
            logger.warning("received unexpected message from deepgram %s", data)

Ancestors

Inherited members