Module livekit.plugins.deepgram

Classes

class AudioEnergyFilter (*, min_silence: float = 1.5, rms_threshold: float = 1.6e-05)
Expand source code
class AudioEnergyFilter:
    class State(Enum):
        START = 0
        SPEAKING = 1
        SILENCE = 2
        END = 3

    def __init__(
        self, *, min_silence: float = 1.5, rms_threshold: float = MAGIC_NUMBER_THRESHOLD
    ):
        self._cooldown_seconds = min_silence
        self._cooldown = min_silence
        self._state = self.State.SILENCE
        self._rms_threshold = rms_threshold

    def update(self, frame: rtc.AudioFrame) -> State:
        arr = np.frombuffer(frame.data, dtype=np.int16)
        float_arr = arr.astype(np.float32) / 32768.0
        rms = np.mean(np.square(float_arr))

        if rms > self._rms_threshold:
            self._cooldown = self._cooldown_seconds
            if self._state in (self.State.SILENCE, self.State.END):
                self._state = self.State.START
            else:
                self._state = self.State.SPEAKING
        else:
            if self._cooldown <= 0:
                if self._state in (self.State.SPEAKING, self.State.START):
                    self._state = self.State.END
                elif self._state == self.State.END:
                    self._state = self.State.SILENCE
            else:
                # keep speaking during cooldown
                self._cooldown -= frame.duration
                self._state = self.State.SPEAKING

        return self._state

Class variables

var State

Create a collection of name/value pairs.

Example enumeration:

>>> class Color(Enum):
...     RED = 1
...     BLUE = 2
...     GREEN = 3

Access them by:

  • attribute access:

Color.RED

  • value lookup:

Color(1)

  • name lookup:

Color['RED']

Enumerations can be iterated over, and know how many members they have:

>>> len(Color)
3
>>> list(Color)
[<Color.RED: 1>, <Color.BLUE: 2>, <Color.GREEN: 3>]

Methods can be added to enumerations, and members can have their own attributes – see the documentation for details.

Methods

def update(self, frame: rtc.AudioFrame) ‑> State
Expand source code
def update(self, frame: rtc.AudioFrame) -> State:
    arr = np.frombuffer(frame.data, dtype=np.int16)
    float_arr = arr.astype(np.float32) / 32768.0
    rms = np.mean(np.square(float_arr))

    if rms > self._rms_threshold:
        self._cooldown = self._cooldown_seconds
        if self._state in (self.State.SILENCE, self.State.END):
            self._state = self.State.START
        else:
            self._state = self.State.SPEAKING
    else:
        if self._cooldown <= 0:
            if self._state in (self.State.SPEAKING, self.State.START):
                self._state = self.State.END
            elif self._state == self.State.END:
                self._state = self.State.SILENCE
        else:
            # keep speaking during cooldown
            self._cooldown -= frame.duration
            self._state = self.State.SPEAKING

    return self._state
class STT (*,
model: DeepgramModels | str = 'nova-2-general',
language: DeepgramLanguages | str = 'en-US',
detect_language: bool = False,
interim_results: bool = True,
punctuate: bool = True,
smart_format: bool = True,
sample_rate: int = 16000,
no_delay: bool = True,
endpointing_ms: int = 25,
filler_words: bool = True,
keywords: list[Tuple[str, float]] | None = None,
keyterms: list[str] | None = None,
profanity_filter: bool = False,
api_key: str | None = None,
http_session: aiohttp.ClientSession | None = None,
base_url: str = 'https://api.deepgram.com/v1/listen',
energy_filter: AudioEnergyFilter | bool = False,
numerals: bool = False,
mip_opt_out: bool = False)
Expand source code
class STT(stt.STT):
    def __init__(
        self,
        *,
        model: DeepgramModels | str = "nova-2-general",
        language: DeepgramLanguages | str = "en-US",
        detect_language: bool = False,
        interim_results: bool = True,
        punctuate: bool = True,
        smart_format: bool = True,
        sample_rate: int = 16000,
        no_delay: bool = True,
        endpointing_ms: int = 25,
        # enable filler words by default to improve turn detector accuracy
        filler_words: bool = True,
        keywords: list[Tuple[str, float]] | None = None,
        keyterms: list[str] | None = None,
        profanity_filter: bool = False,
        api_key: str | None = None,
        http_session: aiohttp.ClientSession | None = None,
        base_url: str = BASE_URL,
        energy_filter: AudioEnergyFilter | bool = False,
        numerals: bool = False,
        mip_opt_out: bool = False,
    ) -> None:
        """Create a new instance of Deepgram STT.

        Args:
            model: The Deepgram model to use for speech recognition. Defaults to "nova-2-general".
            language: The language code for recognition. Defaults to "en-US".
            detect_language: Whether to enable automatic language detection. Defaults to False.
            interim_results: Whether to return interim (non-final) transcription results. Defaults to True.
            punctuate: Whether to add punctuations to the transcription. Defaults to True. Turn detector will work better with punctuations.
            smart_format: Whether to apply smart formatting to numbers, dates, etc. Defaults to True.
            sample_rate: The sample rate of the audio in Hz. Defaults to 16000.
            no_delay: When smart_format is used, ensures it does not wait for sequence to be complete before returning results. Defaults to True.
            endpointing_ms: Time in milliseconds of silence to consider end of speech. Set to 0 to disable. Defaults to 25.
            filler_words: Whether to include filler words (um, uh, etc.) in transcription. Defaults to True.
            keywords: List of tuples containing keywords and their boost values for improved recognition.
                     Each tuple should be (keyword: str, boost: float). Defaults to None.
                     `keywords` does not work with Nova-3 models. Use `keyterms` instead.
            keyterms: List of key terms to improve recognition accuracy. Defaults to None.
                     `keyterms` is supported by Nova-3 models.
            profanity_filter: Whether to filter profanity from the transcription. Defaults to False.
            api_key: Your Deepgram API key. If not provided, will look for DEEPGRAM_API_KEY environment variable.
            http_session: Optional aiohttp ClientSession to use for requests.
            base_url: The base URL for Deepgram API. Defaults to "https://api.deepgram.com/v1/listen".
            energy_filter: Audio energy filter configuration for voice activity detection.
                         Can be a boolean or AudioEnergyFilter instance. Defaults to False.
            numerals: Whether to include numerals in the transcription. Defaults to False.
            mip_opt_out: Whether to take part in the model improvement program

        Raises:
            ValueError: If no API key is provided or found in environment variables.

        Note:
            The api_key must be set either through the constructor argument or by setting
            the DEEPGRAM_API_KEY environmental variable.
        """

        super().__init__(
            capabilities=stt.STTCapabilities(
                streaming=True, interim_results=interim_results
            )
        )
        self._base_url = base_url

        api_key = api_key or os.environ.get("DEEPGRAM_API_KEY")
        if api_key is None:
            raise ValueError("Deepgram API key is required")

        model = _validate_model(model, language)

        self._api_key = api_key

        self._opts = STTOptions(
            language=language,
            detect_language=detect_language,
            interim_results=interim_results,
            punctuate=punctuate,
            model=model,
            smart_format=smart_format,
            no_delay=no_delay,
            endpointing_ms=endpointing_ms,
            filler_words=filler_words,
            sample_rate=sample_rate,
            num_channels=1,
            keywords=keywords or [],
            keyterms=keyterms or [],
            profanity_filter=profanity_filter,
            energy_filter=energy_filter,
            numerals=numerals,
            mip_opt_out=mip_opt_out,
        )
        self._session = http_session
        self._streams = weakref.WeakSet[SpeechStream]()

    def _ensure_session(self) -> aiohttp.ClientSession:
        if not self._session:
            self._session = utils.http_context.http_session()

        return self._session

    async def _recognize_impl(
        self,
        buffer: AudioBuffer,
        *,
        language: DeepgramLanguages | str | None,
        conn_options: APIConnectOptions,
    ) -> stt.SpeechEvent:
        config = self._sanitize_options(language=language)

        recognize_config = {
            "model": str(config.model),
            "punctuate": config.punctuate,
            "detect_language": config.detect_language,
            "smart_format": config.smart_format,
            "keywords": self._opts.keywords,
            "profanity_filter": config.profanity_filter,
            "numerals": config.numerals,
        }
        if config.language:
            recognize_config["language"] = config.language

        try:
            async with self._ensure_session().post(
                url=_to_deepgram_url(recognize_config, self._base_url, websocket=False),
                data=rtc.combine_audio_frames(buffer).to_wav_bytes(),
                headers={
                    "Authorization": f"Token {self._api_key}",
                    "Accept": "application/json",
                    "Content-Type": "audio/wav",
                },
                timeout=aiohttp.ClientTimeout(
                    total=30,
                    sock_connect=conn_options.timeout,
                ),
            ) as res:
                return prerecorded_transcription_to_speech_event(
                    config.language,
                    await res.json(),
                )

        except asyncio.TimeoutError as e:
            raise APITimeoutError() from e
        except aiohttp.ClientResponseError as e:
            raise APIStatusError(
                message=e.message,
                status_code=e.status,
                request_id=None,
                body=None,
            ) from e
        except Exception as e:
            raise APIConnectionError() from e

    def stream(
        self,
        *,
        language: DeepgramLanguages | str | None = None,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    ) -> "SpeechStream":
        config = self._sanitize_options(language=language)
        stream = SpeechStream(
            stt=self,
            conn_options=conn_options,
            opts=config,
            api_key=self._api_key,
            http_session=self._ensure_session(),
            base_url=self._base_url,
        )
        self._streams.add(stream)
        return stream

    def update_options(
        self,
        *,
        language: DeepgramLanguages | str | None = None,
        model: DeepgramModels | str | None = None,
        interim_results: bool | None = None,
        punctuate: bool | None = None,
        smart_format: bool | None = None,
        sample_rate: int | None = None,
        no_delay: bool | None = None,
        endpointing_ms: int | None = None,
        filler_words: bool | None = None,
        keywords: list[Tuple[str, float]] | None = None,
        keyterms: list[str] | None = None,
        profanity_filter: bool | None = None,
        numerals: bool | None = None,
        mip_opt_out: bool | None = None,
    ):
        if language is not None:
            self._opts.language = language
        if model is not None:
            self._opts.model = _validate_model(model, language)
        if interim_results is not None:
            self._opts.interim_results = interim_results
        if punctuate is not None:
            self._opts.punctuate = punctuate
        if smart_format is not None:
            self._opts.smart_format = smart_format
        if sample_rate is not None:
            self._opts.sample_rate = sample_rate
        if no_delay is not None:
            self._opts.no_delay = no_delay
        if endpointing_ms is not None:
            self._opts.endpointing_ms = endpointing_ms
        if filler_words is not None:
            self._opts.filler_words = filler_words
        if keywords is not None:
            self._opts.keywords = keywords
        if keyterms is not None:
            self._opts.keyterms = keyterms
        if profanity_filter is not None:
            self._opts.profanity_filter = profanity_filter
        if mip_opt_out is not None:
            self._opts.mip_opt_out = mip_opt_out

        for stream in self._streams:
            stream.update_options(
                language=language,
                model=model,
                interim_results=interim_results,
                punctuate=punctuate,
                smart_format=smart_format,
                sample_rate=sample_rate,
                no_delay=no_delay,
                endpointing_ms=endpointing_ms,
                filler_words=filler_words,
                keywords=keywords,
                keyterms=keyterms,
                profanity_filter=profanity_filter,
                numerals=numerals,
                mip_opt_out=mip_opt_out,
            )

    def _sanitize_options(self, *, language: str | None = None) -> STTOptions:
        config = dataclasses.replace(self._opts)
        config.language = language or config.language

        if config.detect_language:
            config.language = None

        return config

Helper class that provides a standard way to create an ABC using inheritance.

Create a new instance of Deepgram STT.

Args

model
The Deepgram model to use for speech recognition. Defaults to "nova-2-general".
language
The language code for recognition. Defaults to "en-US".
detect_language
Whether to enable automatic language detection. Defaults to False.
interim_results
Whether to return interim (non-final) transcription results. Defaults to True.
punctuate
Whether to add punctuations to the transcription. Defaults to True. Turn detector will work better with punctuations.
smart_format
Whether to apply smart formatting to numbers, dates, etc. Defaults to True.
sample_rate
The sample rate of the audio in Hz. Defaults to 16000.
no_delay
When smart_format is used, ensures it does not wait for sequence to be complete before returning results. Defaults to True.
endpointing_ms
Time in milliseconds of silence to consider end of speech. Set to 0 to disable. Defaults to 25.
filler_words
Whether to include filler words (um, uh, etc.) in transcription. Defaults to True.
keywords
List of tuples containing keywords and their boost values for improved recognition. Each tuple should be (keyword: str, boost: float). Defaults to None. keywords does not work with Nova-3 models. Use keyterms instead.
keyterms
List of key terms to improve recognition accuracy. Defaults to None. keyterms is supported by Nova-3 models.
profanity_filter
Whether to filter profanity from the transcription. Defaults to False.
api_key
Your Deepgram API key. If not provided, will look for DEEPGRAM_API_KEY environment variable.
http_session
Optional aiohttp ClientSession to use for requests.
base_url
The base URL for Deepgram API. Defaults to "https://api.deepgram.com/v1/listen".
energy_filter
Audio energy filter configuration for voice activity detection. Can be a boolean or AudioEnergyFilter instance. Defaults to False.
numerals
Whether to include numerals in the transcription. Defaults to False.
mip_opt_out
Whether to take part in the model improvement program

Raises

ValueError
If no API key is provided or found in environment variables.

Note

The api_key must be set either through the constructor argument or by setting the DEEPGRAM_API_KEY environmental variable.

Ancestors

Methods

def stream(self,
*,
language: DeepgramLanguages | str | None = None,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.deepgram.stt.SpeechStream
Expand source code
def stream(
    self,
    *,
    language: DeepgramLanguages | str | None = None,
    conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
) -> "SpeechStream":
    config = self._sanitize_options(language=language)
    stream = SpeechStream(
        stt=self,
        conn_options=conn_options,
        opts=config,
        api_key=self._api_key,
        http_session=self._ensure_session(),
        base_url=self._base_url,
    )
    self._streams.add(stream)
    return stream
def update_options(self,
*,
language: DeepgramLanguages | str | None = None,
model: DeepgramModels | str | None = None,
interim_results: bool | None = None,
punctuate: bool | None = None,
smart_format: bool | None = None,
sample_rate: int | None = None,
no_delay: bool | None = None,
endpointing_ms: int | None = None,
filler_words: bool | None = None,
keywords: list[Tuple[str, float]] | None = None,
keyterms: list[str] | None = None,
profanity_filter: bool | None = None,
numerals: bool | None = None,
mip_opt_out: bool | None = None)
Expand source code
def update_options(
    self,
    *,
    language: DeepgramLanguages | str | None = None,
    model: DeepgramModels | str | None = None,
    interim_results: bool | None = None,
    punctuate: bool | None = None,
    smart_format: bool | None = None,
    sample_rate: int | None = None,
    no_delay: bool | None = None,
    endpointing_ms: int | None = None,
    filler_words: bool | None = None,
    keywords: list[Tuple[str, float]] | None = None,
    keyterms: list[str] | None = None,
    profanity_filter: bool | None = None,
    numerals: bool | None = None,
    mip_opt_out: bool | None = None,
):
    if language is not None:
        self._opts.language = language
    if model is not None:
        self._opts.model = _validate_model(model, language)
    if interim_results is not None:
        self._opts.interim_results = interim_results
    if punctuate is not None:
        self._opts.punctuate = punctuate
    if smart_format is not None:
        self._opts.smart_format = smart_format
    if sample_rate is not None:
        self._opts.sample_rate = sample_rate
    if no_delay is not None:
        self._opts.no_delay = no_delay
    if endpointing_ms is not None:
        self._opts.endpointing_ms = endpointing_ms
    if filler_words is not None:
        self._opts.filler_words = filler_words
    if keywords is not None:
        self._opts.keywords = keywords
    if keyterms is not None:
        self._opts.keyterms = keyterms
    if profanity_filter is not None:
        self._opts.profanity_filter = profanity_filter
    if mip_opt_out is not None:
        self._opts.mip_opt_out = mip_opt_out

    for stream in self._streams:
        stream.update_options(
            language=language,
            model=model,
            interim_results=interim_results,
            punctuate=punctuate,
            smart_format=smart_format,
            sample_rate=sample_rate,
            no_delay=no_delay,
            endpointing_ms=endpointing_ms,
            filler_words=filler_words,
            keywords=keywords,
            keyterms=keyterms,
            profanity_filter=profanity_filter,
            numerals=numerals,
            mip_opt_out=mip_opt_out,
        )

Inherited members

class SpeechStream (*,
stt: STT,
opts: STTOptions,
conn_options: APIConnectOptions,
api_key: str,
http_session: aiohttp.ClientSession,
base_url: str)
Expand source code
class SpeechStream(stt.SpeechStream):
    _KEEPALIVE_MSG: str = json.dumps({"type": "KeepAlive"})
    _CLOSE_MSG: str = json.dumps({"type": "CloseStream"})
    _FINALIZE_MSG: str = json.dumps({"type": "Finalize"})

    def __init__(
        self,
        *,
        stt: STT,
        opts: STTOptions,
        conn_options: APIConnectOptions,
        api_key: str,
        http_session: aiohttp.ClientSession,
        base_url: str,
    ) -> None:
        super().__init__(
            stt=stt, conn_options=conn_options, sample_rate=opts.sample_rate
        )

        if opts.detect_language and opts.language is None:
            raise ValueError("language detection is not supported in streaming mode")

        self._opts = opts
        self._api_key = api_key
        self._session = http_session
        self._base_url = base_url
        self._speaking = False
        self._audio_duration_collector = PeriodicCollector(
            callback=self._on_audio_duration_report,
            duration=5.0,
        )

        self._audio_energy_filter: Optional[AudioEnergyFilter] = None
        if opts.energy_filter:
            if isinstance(opts.energy_filter, AudioEnergyFilter):
                self._audio_energy_filter = opts.energy_filter
            else:
                self._audio_energy_filter = AudioEnergyFilter()

        self._request_id = ""
        self._reconnect_event = asyncio.Event()

    def update_options(
        self,
        *,
        language: DeepgramLanguages | str | None = None,
        model: DeepgramModels | str | None = None,
        interim_results: bool | None = None,
        punctuate: bool | None = None,
        smart_format: bool | None = None,
        sample_rate: int | None = None,
        no_delay: bool | None = None,
        endpointing_ms: int | None = None,
        filler_words: bool | None = None,
        keywords: list[Tuple[str, float]] | None = None,
        keyterms: list[str] | None = None,
        profanity_filter: bool | None = None,
        numerals: bool | None = None,
        mip_opt_out: bool | None = None,
    ):
        if language is not None:
            self._opts.language = language
        if model is not None:
            self._opts.model = _validate_model(model, language)
        if interim_results is not None:
            self._opts.interim_results = interim_results
        if punctuate is not None:
            self._opts.punctuate = punctuate
        if smart_format is not None:
            self._opts.smart_format = smart_format
        if sample_rate is not None:
            self._opts.sample_rate = sample_rate
        if no_delay is not None:
            self._opts.no_delay = no_delay
        if endpointing_ms is not None:
            self._opts.endpointing_ms = endpointing_ms
        if filler_words is not None:
            self._opts.filler_words = filler_words
        if keywords is not None:
            self._opts.keywords = keywords
        if keyterms is not None:
            self._opts.keyterms = keyterms
        if profanity_filter is not None:
            self._opts.profanity_filter = profanity_filter
        if numerals is not None:
            self._opts.numerals = numerals
        if mip_opt_out is not None:
            self._opts.mip_opt_out = mip_opt_out

        self._reconnect_event.set()

    async def _run(self) -> None:
        closing_ws = False

        async def keepalive_task(ws: aiohttp.ClientWebSocketResponse):
            # if we want to keep the connection alive even if no audio is sent,
            # Deepgram expects a keepalive message.
            # https://developers.deepgram.com/reference/listen-live#stream-keepalive
            try:
                while True:
                    await ws.send_str(SpeechStream._KEEPALIVE_MSG)
                    await asyncio.sleep(5)
            except Exception:
                return

        @utils.log_exceptions(logger=logger)
        async def send_task(ws: aiohttp.ClientWebSocketResponse):
            nonlocal closing_ws

            # forward audio to deepgram in chunks of 50ms
            samples_50ms = self._opts.sample_rate // 20
            audio_bstream = utils.audio.AudioByteStream(
                sample_rate=self._opts.sample_rate,
                num_channels=self._opts.num_channels,
                samples_per_channel=samples_50ms,
            )

            has_ended = False
            last_frame: Optional[rtc.AudioFrame] = None
            async for data in self._input_ch:
                frames: list[rtc.AudioFrame] = []
                if isinstance(data, rtc.AudioFrame):
                    state = self._check_energy_state(data)
                    if state in (
                        AudioEnergyFilter.State.START,
                        AudioEnergyFilter.State.SPEAKING,
                    ):
                        if last_frame:
                            frames.extend(
                                audio_bstream.write(last_frame.data.tobytes())
                            )
                            last_frame = None
                        frames.extend(audio_bstream.write(data.data.tobytes()))
                    elif state == AudioEnergyFilter.State.END:
                        # no need to buffer as we have cooldown period
                        frames.extend(audio_bstream.flush())
                        has_ended = True
                    elif state == AudioEnergyFilter.State.SILENCE:
                        # buffer the last silence frame, since it could contain beginning of speech
                        # TODO: improve accuracy by using a ring buffer with longer window
                        last_frame = data
                elif isinstance(data, self._FlushSentinel):
                    frames.extend(audio_bstream.flush())
                    has_ended = True

                for frame in frames:
                    self._audio_duration_collector.push(frame.duration)
                    await ws.send_bytes(frame.data.tobytes())

                    if has_ended:
                        self._audio_duration_collector.flush()
                        await ws.send_str(SpeechStream._FINALIZE_MSG)
                        has_ended = False

            # tell deepgram we are done sending audio/inputs
            closing_ws = True
            await ws.send_str(SpeechStream._CLOSE_MSG)

        @utils.log_exceptions(logger=logger)
        async def recv_task(ws: aiohttp.ClientWebSocketResponse):
            nonlocal closing_ws
            while True:
                msg = await ws.receive()
                if msg.type in (
                    aiohttp.WSMsgType.CLOSED,
                    aiohttp.WSMsgType.CLOSE,
                    aiohttp.WSMsgType.CLOSING,
                ):
                    if closing_ws:  # close is expected, see SpeechStream.aclose
                        return

                    # this will trigger a reconnection, see the _run loop
                    raise APIStatusError(
                        message="deepgram connection closed unexpectedly"
                    )

                if msg.type != aiohttp.WSMsgType.TEXT:
                    logger.warning("unexpected deepgram message type %s", msg.type)
                    continue

                try:
                    self._process_stream_event(json.loads(msg.data))
                except Exception:
                    logger.exception("failed to process deepgram message")

        ws: aiohttp.ClientWebSocketResponse | None = None

        while True:
            try:
                ws = await self._connect_ws()
                tasks = [
                    asyncio.create_task(send_task(ws)),
                    asyncio.create_task(recv_task(ws)),
                    asyncio.create_task(keepalive_task(ws)),
                ]
                wait_reconnect_task = asyncio.create_task(self._reconnect_event.wait())
                try:
                    done, _ = await asyncio.wait(
                        [asyncio.gather(*tasks), wait_reconnect_task],
                        return_when=asyncio.FIRST_COMPLETED,
                    )  # type: ignore

                    # propagate exceptions from completed tasks
                    for task in done:
                        if task != wait_reconnect_task:
                            task.result()

                    if wait_reconnect_task not in done:
                        break

                    self._reconnect_event.clear()
                finally:
                    await utils.aio.gracefully_cancel(*tasks, wait_reconnect_task)
            finally:
                if ws is not None:
                    await ws.close()

    async def _connect_ws(self) -> aiohttp.ClientWebSocketResponse:
        live_config: dict[str, Any] = {
            "model": self._opts.model,
            "punctuate": self._opts.punctuate,
            "smart_format": self._opts.smart_format,
            "no_delay": self._opts.no_delay,
            "interim_results": self._opts.interim_results,
            "encoding": "linear16",
            "vad_events": True,
            "sample_rate": self._opts.sample_rate,
            "channels": self._opts.num_channels,
            "endpointing": False
            if self._opts.endpointing_ms == 0
            else self._opts.endpointing_ms,
            "filler_words": self._opts.filler_words,
            "profanity_filter": self._opts.profanity_filter,
            "numerals": self._opts.numerals,
            "mip_opt_out": self._opts.mip_opt_out,
        }
        if self._opts.keywords:
            live_config["keywords"] = self._opts.keywords
        if self._opts.keyterms:
            # the query param is `keyterm`
            # See: https://developers.deepgram.com/docs/keyterm
            live_config["keyterm"] = self._opts.keyterms

        if self._opts.language:
            live_config["language"] = self._opts.language

        ws = await asyncio.wait_for(
            self._session.ws_connect(
                _to_deepgram_url(live_config, base_url=self._base_url, websocket=True),
                headers={"Authorization": f"Token {self._api_key}"},
            ),
            self._conn_options.timeout,
        )
        return ws

    def _check_energy_state(self, frame: rtc.AudioFrame) -> AudioEnergyFilter.State:
        if self._audio_energy_filter:
            return self._audio_energy_filter.update(frame)
        return AudioEnergyFilter.State.SPEAKING

    def _on_audio_duration_report(self, duration: float) -> None:
        usage_event = stt.SpeechEvent(
            type=stt.SpeechEventType.RECOGNITION_USAGE,
            request_id=self._request_id,
            alternatives=[],
            recognition_usage=stt.RecognitionUsage(audio_duration=duration),
        )
        self._event_ch.send_nowait(usage_event)

    def _process_stream_event(self, data: dict) -> None:
        assert self._opts.language is not None

        if data["type"] == "SpeechStarted":
            # This is a normal case. Deepgram's SpeechStarted events
            # are not correlated with speech_final or utterance end.
            # It's possible that we receive two in a row without an endpoint
            # It's also possible we receive a transcript without a SpeechStarted event.
            if self._speaking:
                return

            self._speaking = True
            start_event = stt.SpeechEvent(type=stt.SpeechEventType.START_OF_SPEECH)
            self._event_ch.send_nowait(start_event)

        # see this page:
        # https://developers.deepgram.com/docs/understand-endpointing-interim-results#using-endpointing-speech_final
        # for more information about the different types of events
        elif data["type"] == "Results":
            metadata = data["metadata"]
            request_id = metadata["request_id"]
            is_final_transcript = data["is_final"]
            is_endpoint = data["speech_final"]
            self._request_id = request_id

            alts = live_transcription_to_speech_data(self._opts.language, data)
            # If, for some reason, we didn't get a SpeechStarted event but we got
            # a transcript with text, we should start speaking. It's rare but has
            # been observed.
            if len(alts) > 0 and alts[0].text:
                if not self._speaking:
                    self._speaking = True
                    start_event = stt.SpeechEvent(
                        type=stt.SpeechEventType.START_OF_SPEECH
                    )
                    self._event_ch.send_nowait(start_event)

                if is_final_transcript:
                    final_event = stt.SpeechEvent(
                        type=stt.SpeechEventType.FINAL_TRANSCRIPT,
                        request_id=request_id,
                        alternatives=alts,
                    )
                    self._event_ch.send_nowait(final_event)
                else:
                    interim_event = stt.SpeechEvent(
                        type=stt.SpeechEventType.INTERIM_TRANSCRIPT,
                        request_id=request_id,
                        alternatives=alts,
                    )
                    self._event_ch.send_nowait(interim_event)

            # if we receive an endpoint, only end the speech if
            # we either had a SpeechStarted event or we have a seen
            # a non-empty transcript (deepgram doesn't have a SpeechEnded event)
            if is_endpoint and self._speaking:
                self._speaking = False
                self._event_ch.send_nowait(
                    stt.SpeechEvent(type=stt.SpeechEventType.END_OF_SPEECH)
                )

        elif data["type"] == "Metadata":
            pass  # metadata is too noisy
        else:
            logger.warning("received unexpected message from deepgram %s", data)

Helper class that provides a standard way to create an ABC using inheritance.

Args: sample_rate : int or None, optional The desired sample rate for the audio input. If specified, the audio input will be automatically resampled to match the given sample rate before being processed for Speech-to-Text. If not provided (None), the input will retain its original sample rate.

Ancestors

Methods

def update_options(self,
*,
language: DeepgramLanguages | str | None = None,
model: DeepgramModels | str | None = None,
interim_results: bool | None = None,
punctuate: bool | None = None,
smart_format: bool | None = None,
sample_rate: int | None = None,
no_delay: bool | None = None,
endpointing_ms: int | None = None,
filler_words: bool | None = None,
keywords: list[Tuple[str, float]] | None = None,
keyterms: list[str] | None = None,
profanity_filter: bool | None = None,
numerals: bool | None = None,
mip_opt_out: bool | None = None)
Expand source code
def update_options(
    self,
    *,
    language: DeepgramLanguages | str | None = None,
    model: DeepgramModels | str | None = None,
    interim_results: bool | None = None,
    punctuate: bool | None = None,
    smart_format: bool | None = None,
    sample_rate: int | None = None,
    no_delay: bool | None = None,
    endpointing_ms: int | None = None,
    filler_words: bool | None = None,
    keywords: list[Tuple[str, float]] | None = None,
    keyterms: list[str] | None = None,
    profanity_filter: bool | None = None,
    numerals: bool | None = None,
    mip_opt_out: bool | None = None,
):
    if language is not None:
        self._opts.language = language
    if model is not None:
        self._opts.model = _validate_model(model, language)
    if interim_results is not None:
        self._opts.interim_results = interim_results
    if punctuate is not None:
        self._opts.punctuate = punctuate
    if smart_format is not None:
        self._opts.smart_format = smart_format
    if sample_rate is not None:
        self._opts.sample_rate = sample_rate
    if no_delay is not None:
        self._opts.no_delay = no_delay
    if endpointing_ms is not None:
        self._opts.endpointing_ms = endpointing_ms
    if filler_words is not None:
        self._opts.filler_words = filler_words
    if keywords is not None:
        self._opts.keywords = keywords
    if keyterms is not None:
        self._opts.keyterms = keyterms
    if profanity_filter is not None:
        self._opts.profanity_filter = profanity_filter
    if numerals is not None:
        self._opts.numerals = numerals
    if mip_opt_out is not None:
        self._opts.mip_opt_out = mip_opt_out

    self._reconnect_event.set()

Inherited members

class TTS (*,
model: str = 'aura-asteria-en',
encoding: str = 'linear16',
sample_rate: int = 24000,
api_key: str | None = None,
base_url: str = 'https://api.deepgram.com/v1/speak',
word_tokenizer: tokenize.WordTokenizer = <livekit.agents.tokenize.basic.WordTokenizer object>,
http_session: aiohttp.ClientSession | None = None)
Expand source code
class TTS(tts.TTS):
    def __init__(
        self,
        *,
        model: str = "aura-asteria-en",
        encoding: str = "linear16",
        sample_rate: int = 24000,
        api_key: str | None = None,
        base_url: str = BASE_URL,
        word_tokenizer: tokenize.WordTokenizer = tokenize.basic.WordTokenizer(
            ignore_punctuation=False
        ),
        http_session: aiohttp.ClientSession | None = None,
    ) -> None:
        """
        Create a new instance of Deepgram TTS.

        Args:
            model (str): TTS model to use. Defaults to "aura-asteria-en".
            encoding (str): Audio encoding to use. Defaults to "linear16".
            sample_rate (int): Sample rate of audio. Defaults to 24000.
            api_key (str): Deepgram API key. If not provided, will look for DEEPGRAM_API_KEY in environment.
            base_url (str): Base URL for Deepgram TTS API. Defaults to "https://api.deepgram.com/v1/speak"
            word_tokenizer (tokenize.WordTokenizer): Tokenizer for processing text. Defaults to basic WordTokenizer.
            http_session (aiohttp.ClientSession): Optional aiohttp session to use for requests.

        """
        super().__init__(
            capabilities=tts.TTSCapabilities(streaming=True),
            sample_rate=sample_rate,
            num_channels=NUM_CHANNELS,
        )

        api_key = api_key or os.environ.get("DEEPGRAM_API_KEY")
        if not api_key:
            raise ValueError(
                "Deepgram API key required. Set DEEPGRAM_API_KEY or provide api_key."
            )

        self._opts = _TTSOptions(
            model=model,
            encoding=encoding,
            sample_rate=sample_rate,
            word_tokenizer=word_tokenizer,
        )
        self._session = http_session
        self._api_key = api_key
        self._base_url = base_url
        self._streams = weakref.WeakSet[SynthesizeStream]()
        self._pool = utils.ConnectionPool[aiohttp.ClientWebSocketResponse](
            connect_cb=self._connect_ws,
            close_cb=self._close_ws,
            max_session_duration=3600,  # 1 hour
            mark_refreshed_on_get=False,
        )

    async def _connect_ws(self) -> aiohttp.ClientWebSocketResponse:
        session = self._ensure_session()
        config = {
            "encoding": self._opts.encoding,
            "model": self._opts.model,
            "sample_rate": self._opts.sample_rate,
        }
        return await asyncio.wait_for(
            session.ws_connect(
                _to_deepgram_url(config, self._base_url, websocket=True),
                headers={"Authorization": f"Token {self._api_key}"},
            ),
            self._conn_options.timeout,
        )

    async def _close_ws(self, ws: aiohttp.ClientWebSocketResponse):
        await ws.close()

    def _ensure_session(self) -> aiohttp.ClientSession:
        if not self._session:
            self._session = utils.http_context.http_session()
        return self._session

    def update_options(
        self,
        *,
        model: str | None = None,
        sample_rate: int | None = None,
    ) -> None:
        """
        args:
            model (str): TTS model to use.
            sample_rate (int): Sample rate of audio.
        """
        if model is not None:
            self._opts.model = model
        if sample_rate is not None:
            self._opts.sample_rate = sample_rate
        # deepgram sets options upon connection, so we need to invalidate the pool
        # to get a new connection with the updated options
        self._pool.invalidate()

    def synthesize(
        self,
        text: str,
        *,
        conn_options: Optional[APIConnectOptions] = None,
    ) -> "ChunkedStream":
        return ChunkedStream(
            tts=self,
            input_text=text,
            base_url=self._base_url,
            api_key=self._api_key,
            conn_options=conn_options,
            opts=self._opts,
            session=self._ensure_session(),
        )

    def stream(
        self, *, conn_options: Optional[APIConnectOptions] = None
    ) -> "SynthesizeStream":
        stream = SynthesizeStream(
            tts=self,
            pool=self._pool,
            opts=self._opts,
        )
        self._streams.add(stream)
        return stream

    def prewarm(self) -> None:
        self._pool.prewarm()

    async def aclose(self) -> None:
        for stream in list(self._streams):
            await stream.aclose()
        self._streams.clear()
        await self._pool.aclose()
        await super().aclose()

Helper class that provides a standard way to create an ABC using inheritance.

Create a new instance of Deepgram TTS.

Args

model : str
TTS model to use. Defaults to "aura-asteria-en".
encoding : str
Audio encoding to use. Defaults to "linear16".
sample_rate : int
Sample rate of audio. Defaults to 24000.
api_key : str
Deepgram API key. If not provided, will look for DEEPGRAM_API_KEY in environment.
base_url : str
Base URL for Deepgram TTS API. Defaults to "https://api.deepgram.com/v1/speak"
word_tokenizer : tokenize.WordTokenizer
Tokenizer for processing text. Defaults to basic WordTokenizer.
http_session : aiohttp.ClientSession
Optional aiohttp session to use for requests.

Ancestors

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    for stream in list(self._streams):
        await stream.aclose()
    self._streams.clear()
    await self._pool.aclose()
    await super().aclose()
def stream(self, *, conn_options: Optional[APIConnectOptions] = None) ‑> livekit.plugins.deepgram.tts.SynthesizeStream
Expand source code
def stream(
    self, *, conn_options: Optional[APIConnectOptions] = None
) -> "SynthesizeStream":
    stream = SynthesizeStream(
        tts=self,
        pool=self._pool,
        opts=self._opts,
    )
    self._streams.add(stream)
    return stream
def synthesize(self, text: str, *, conn_options: Optional[APIConnectOptions] = None) ‑> livekit.plugins.deepgram.tts.ChunkedStream
Expand source code
def synthesize(
    self,
    text: str,
    *,
    conn_options: Optional[APIConnectOptions] = None,
) -> "ChunkedStream":
    return ChunkedStream(
        tts=self,
        input_text=text,
        base_url=self._base_url,
        api_key=self._api_key,
        conn_options=conn_options,
        opts=self._opts,
        session=self._ensure_session(),
    )
def update_options(self, *, model: str | None = None, sample_rate: int | None = None) ‑> None
Expand source code
def update_options(
    self,
    *,
    model: str | None = None,
    sample_rate: int | None = None,
) -> None:
    """
    args:
        model (str): TTS model to use.
        sample_rate (int): Sample rate of audio.
    """
    if model is not None:
        self._opts.model = model
    if sample_rate is not None:
        self._opts.sample_rate = sample_rate
    # deepgram sets options upon connection, so we need to invalidate the pool
    # to get a new connection with the updated options
    self._pool.invalidate()

args: model (str): TTS model to use. sample_rate (int): Sample rate of audio.

Inherited members