Module livekit.plugins.deepgram

Classes

class AudioEnergyFilter (*, min_silence: float = 1.5, rms_threshold: float = 1.6e-05)
Expand source code
class AudioEnergyFilter:
    class State(Enum):
        START = 0
        SPEAKING = 1
        SILENCE = 2
        END = 3

    def __init__(
        self, *, min_silence: float = 1.5, rms_threshold: float = MAGIC_NUMBER_THRESHOLD
    ):
        self._cooldown_seconds = min_silence
        self._cooldown = min_silence
        self._state = self.State.SILENCE
        self._rms_threshold = rms_threshold

    def update(self, frame: rtc.AudioFrame) -> State:
        arr = np.frombuffer(frame.data, dtype=np.int16)
        float_arr = arr.astype(np.float32) / 32768.0
        rms = np.mean(np.square(float_arr))

        if rms > self._rms_threshold:
            self._cooldown = self._cooldown_seconds
            if self._state in (self.State.SILENCE, self.State.END):
                self._state = self.State.START
            else:
                self._state = self.State.SPEAKING
        else:
            if self._cooldown <= 0:
                if self._state in (self.State.SPEAKING, self.State.START):
                    self._state = self.State.END
                elif self._state == self.State.END:
                    self._state = self.State.SILENCE
            else:
                # keep speaking during cooldown
                self._cooldown -= frame.duration
                self._state = self.State.SPEAKING

        return self._state

Class variables

var State

Create a collection of name/value pairs.

Example enumeration:

>>> class Color(Enum):
...     RED = 1
...     BLUE = 2
...     GREEN = 3

Access them by:

  • attribute access:

Color.RED

  • value lookup:

Color(1)

  • name lookup:

Color['RED']

Enumerations can be iterated over, and know how many members they have:

>>> len(Color)
3
>>> list(Color)
[<Color.RED: 1>, <Color.BLUE: 2>, <Color.GREEN: 3>]

Methods can be added to enumerations, and members can have their own attributes – see the documentation for details.

Methods

def update(self, frame: rtc.AudioFrame) ‑> State
Expand source code
def update(self, frame: rtc.AudioFrame) -> State:
    arr = np.frombuffer(frame.data, dtype=np.int16)
    float_arr = arr.astype(np.float32) / 32768.0
    rms = np.mean(np.square(float_arr))

    if rms > self._rms_threshold:
        self._cooldown = self._cooldown_seconds
        if self._state in (self.State.SILENCE, self.State.END):
            self._state = self.State.START
        else:
            self._state = self.State.SPEAKING
    else:
        if self._cooldown <= 0:
            if self._state in (self.State.SPEAKING, self.State.START):
                self._state = self.State.END
            elif self._state == self.State.END:
                self._state = self.State.SILENCE
        else:
            # keep speaking during cooldown
            self._cooldown -= frame.duration
            self._state = self.State.SPEAKING

    return self._state
class STT (*,
model: DeepgramModels = 'nova-2-general',
language: DeepgramLanguages = 'en-US',
detect_language: bool = False,
interim_results: bool = True,
punctuate: bool = True,
smart_format: bool = True,
sample_rate: int = 16000,
no_delay: bool = True,
endpointing_ms: int = 25,
filler_words: bool = True,
keywords: list[Tuple[str, float]] = [],
profanity_filter: bool = False,
api_key: str | None = None,
http_session: aiohttp.ClientSession | None = None,
base_url: str = 'https://api.deepgram.com/v1/listen',
energy_filter: AudioEnergyFilter | bool = False)
Expand source code
class STT(stt.STT):
    def __init__(
        self,
        *,
        model: DeepgramModels = "nova-2-general",
        language: DeepgramLanguages = "en-US",
        detect_language: bool = False,
        interim_results: bool = True,
        punctuate: bool = True,
        smart_format: bool = True,
        sample_rate: int = 16000,
        no_delay: bool = True,
        endpointing_ms: int = 25,
        # enable filler words by default to improve turn detector accuracy
        filler_words: bool = True,
        keywords: list[Tuple[str, float]] = [],
        profanity_filter: bool = False,
        api_key: str | None = None,
        http_session: aiohttp.ClientSession | None = None,
        base_url: str = BASE_URL,
        energy_filter: AudioEnergyFilter | bool = False,
    ) -> None:
        """
        Create a new instance of Deepgram STT.

        ``api_key`` must be set to your Deepgram API key, either using the argument or by setting
        the ``DEEPGRAM_API_KEY`` environmental variable.
        """

        super().__init__(
            capabilities=stt.STTCapabilities(
                streaming=True, interim_results=interim_results
            )
        )
        self._base_url = base_url

        api_key = api_key or os.environ.get("DEEPGRAM_API_KEY")
        if api_key is None:
            raise ValueError("Deepgram API key is required")

        model = _validate_model(model, language)

        self._api_key = api_key

        self._opts = STTOptions(
            language=language,
            detect_language=detect_language,
            interim_results=interim_results,
            punctuate=punctuate,
            model=model,
            smart_format=smart_format,
            no_delay=no_delay,
            endpointing_ms=endpointing_ms,
            filler_words=filler_words,
            sample_rate=sample_rate,
            num_channels=1,
            keywords=keywords,
            profanity_filter=profanity_filter,
            energy_filter=energy_filter,
        )
        self._session = http_session
        self._streams = weakref.WeakSet[SpeechStream]()

    def _ensure_session(self) -> aiohttp.ClientSession:
        if not self._session:
            self._session = utils.http_context.http_session()

        return self._session

    async def _recognize_impl(
        self,
        buffer: AudioBuffer,
        *,
        language: DeepgramLanguages | str | None,
        conn_options: APIConnectOptions,
    ) -> stt.SpeechEvent:
        config = self._sanitize_options(language=language)

        recognize_config = {
            "model": str(config.model),
            "punctuate": config.punctuate,
            "detect_language": config.detect_language,
            "smart_format": config.smart_format,
            "keywords": self._opts.keywords,
            "profanity_filter": config.profanity_filter,
        }
        if config.language:
            recognize_config["language"] = config.language

        try:
            async with self._ensure_session().post(
                url=_to_deepgram_url(recognize_config, self._base_url, websocket=False),
                data=rtc.combine_audio_frames(buffer).to_wav_bytes(),
                headers={
                    "Authorization": f"Token {self._api_key}",
                    "Accept": "application/json",
                    "Content-Type": "audio/wav",
                },
                timeout=aiohttp.ClientTimeout(
                    total=30,
                    sock_connect=conn_options.timeout,
                ),
            ) as res:
                return prerecorded_transcription_to_speech_event(
                    config.language,
                    await res.json(),
                )

        except asyncio.TimeoutError as e:
            raise APITimeoutError() from e
        except aiohttp.ClientResponseError as e:
            raise APIStatusError(
                message=e.message,
                status_code=e.status,
                request_id=None,
                body=None,
            ) from e
        except Exception as e:
            raise APIConnectionError() from e

    def stream(
        self,
        *,
        language: DeepgramLanguages | str | None = None,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    ) -> "SpeechStream":
        config = self._sanitize_options(language=language)
        stream = SpeechStream(
            stt=self,
            conn_options=conn_options,
            opts=config,
            api_key=self._api_key,
            http_session=self._ensure_session(),
            base_url=self._base_url,
        )
        self._streams.add(stream)
        return stream

    def update_options(
        self,
        *,
        language: DeepgramLanguages | None = None,
        model: DeepgramModels | None = None,
        interim_results: bool | None = None,
        punctuate: bool | None = None,
        smart_format: bool | None = None,
        sample_rate: int | None = None,
        no_delay: bool | None = None,
        endpointing_ms: int | None = None,
        filler_words: bool | None = None,
        keywords: list[Tuple[str, float]] | None = None,
        profanity_filter: bool | None = None,
    ):
        if language is not None:
            self._opts.language = language
        if model is not None:
            self._opts.model = _validate_model(model, language)
        if interim_results is not None:
            self._opts.interim_results = interim_results
        if punctuate is not None:
            self._opts.punctuate = punctuate
        if smart_format is not None:
            self._opts.smart_format = smart_format
        if sample_rate is not None:
            self._opts.sample_rate = sample_rate
        if no_delay is not None:
            self._opts.no_delay = no_delay
        if endpointing_ms is not None:
            self._opts.endpointing_ms = endpointing_ms
        if filler_words is not None:
            self._opts.filler_words = filler_words
        if keywords is not None:
            self._opts.keywords = keywords
        if profanity_filter is not None:
            self._opts.profanity_filter = profanity_filter

        for stream in self._streams:
            stream.update_options(
                language=language,
                model=model,
                interim_results=interim_results,
                punctuate=punctuate,
                smart_format=smart_format,
                sample_rate=sample_rate,
                no_delay=no_delay,
                endpointing_ms=endpointing_ms,
                filler_words=filler_words,
                keywords=keywords,
                profanity_filter=profanity_filter,
            )

    def _sanitize_options(self, *, language: str | None = None) -> STTOptions:
        config = dataclasses.replace(self._opts)
        config.language = language or config.language

        if config.detect_language:
            config.language = None

        return config

Helper class that provides a standard way to create an ABC using inheritance.

Create a new instance of Deepgram STT.

api_key must be set to your Deepgram API key, either using the argument or by setting the DEEPGRAM_API_KEY environmental variable.

Ancestors

Methods

def stream(self,
*,
language: DeepgramLanguages | str | None = None,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=5.0, timeout=10.0)) ‑> livekit.plugins.deepgram.stt.SpeechStream
Expand source code
def stream(
    self,
    *,
    language: DeepgramLanguages | str | None = None,
    conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
) -> "SpeechStream":
    config = self._sanitize_options(language=language)
    stream = SpeechStream(
        stt=self,
        conn_options=conn_options,
        opts=config,
        api_key=self._api_key,
        http_session=self._ensure_session(),
        base_url=self._base_url,
    )
    self._streams.add(stream)
    return stream
def update_options(self,
*,
language: DeepgramLanguages | None = None,
model: DeepgramModels | None = None,
interim_results: bool | None = None,
punctuate: bool | None = None,
smart_format: bool | None = None,
sample_rate: int | None = None,
no_delay: bool | None = None,
endpointing_ms: int | None = None,
filler_words: bool | None = None,
keywords: list[Tuple[str, float]] | None = None,
profanity_filter: bool | None = None)
Expand source code
def update_options(
    self,
    *,
    language: DeepgramLanguages | None = None,
    model: DeepgramModels | None = None,
    interim_results: bool | None = None,
    punctuate: bool | None = None,
    smart_format: bool | None = None,
    sample_rate: int | None = None,
    no_delay: bool | None = None,
    endpointing_ms: int | None = None,
    filler_words: bool | None = None,
    keywords: list[Tuple[str, float]] | None = None,
    profanity_filter: bool | None = None,
):
    if language is not None:
        self._opts.language = language
    if model is not None:
        self._opts.model = _validate_model(model, language)
    if interim_results is not None:
        self._opts.interim_results = interim_results
    if punctuate is not None:
        self._opts.punctuate = punctuate
    if smart_format is not None:
        self._opts.smart_format = smart_format
    if sample_rate is not None:
        self._opts.sample_rate = sample_rate
    if no_delay is not None:
        self._opts.no_delay = no_delay
    if endpointing_ms is not None:
        self._opts.endpointing_ms = endpointing_ms
    if filler_words is not None:
        self._opts.filler_words = filler_words
    if keywords is not None:
        self._opts.keywords = keywords
    if profanity_filter is not None:
        self._opts.profanity_filter = profanity_filter

    for stream in self._streams:
        stream.update_options(
            language=language,
            model=model,
            interim_results=interim_results,
            punctuate=punctuate,
            smart_format=smart_format,
            sample_rate=sample_rate,
            no_delay=no_delay,
            endpointing_ms=endpointing_ms,
            filler_words=filler_words,
            keywords=keywords,
            profanity_filter=profanity_filter,
        )

Inherited members

class SpeechStream (*,
stt: STT,
opts: STTOptions,
conn_options: APIConnectOptions,
api_key: str,
http_session: aiohttp.ClientSession,
base_url: str)
Expand source code
class SpeechStream(stt.SpeechStream):
    _KEEPALIVE_MSG: str = json.dumps({"type": "KeepAlive"})
    _CLOSE_MSG: str = json.dumps({"type": "CloseStream"})
    _FINALIZE_MSG: str = json.dumps({"type": "Finalize"})

    def __init__(
        self,
        *,
        stt: STT,
        opts: STTOptions,
        conn_options: APIConnectOptions,
        api_key: str,
        http_session: aiohttp.ClientSession,
        base_url: str,
    ) -> None:
        super().__init__(
            stt=stt, conn_options=conn_options, sample_rate=opts.sample_rate
        )

        if opts.detect_language and opts.language is None:
            raise ValueError("language detection is not supported in streaming mode")

        self._opts = opts
        self._api_key = api_key
        self._session = http_session
        self._base_url = base_url
        self._speaking = False
        self._audio_duration_collector = PeriodicCollector(
            callback=self._on_audio_duration_report,
            duration=5.0,
        )

        self._audio_energy_filter: Optional[AudioEnergyFilter] = None
        if opts.energy_filter:
            if isinstance(opts.energy_filter, AudioEnergyFilter):
                self._audio_energy_filter = opts.energy_filter
            else:
                self._audio_energy_filter = AudioEnergyFilter()

        self._pushed_audio_duration = 0.0
        self._request_id = ""
        self._reconnect_event = asyncio.Event()

    def update_options(
        self,
        *,
        language: DeepgramLanguages | None = None,
        model: DeepgramModels | None = None,
        interim_results: bool | None = None,
        punctuate: bool | None = None,
        smart_format: bool | None = None,
        sample_rate: int | None = None,
        no_delay: bool | None = None,
        endpointing_ms: int | None = None,
        filler_words: bool | None = None,
        keywords: list[Tuple[str, float]] | None = None,
        profanity_filter: bool | None = None,
    ):
        if language is not None:
            self._opts.language = language
        if model is not None:
            self._opts.model = _validate_model(model, language)
        if interim_results is not None:
            self._opts.interim_results = interim_results
        if punctuate is not None:
            self._opts.punctuate = punctuate
        if smart_format is not None:
            self._opts.smart_format = smart_format
        if sample_rate is not None:
            self._opts.sample_rate = sample_rate
        if no_delay is not None:
            self._opts.no_delay = no_delay
        if endpointing_ms is not None:
            self._opts.endpointing_ms = endpointing_ms
        if filler_words is not None:
            self._opts.filler_words = filler_words
        if keywords is not None:
            self._opts.keywords = keywords
        if profanity_filter is not None:
            self._opts.profanity_filter = profanity_filter

        self._reconnect_event.set()

    async def _run(self) -> None:
        closing_ws = False

        async def keepalive_task(ws: aiohttp.ClientWebSocketResponse):
            # if we want to keep the connection alive even if no audio is sent,
            # Deepgram expects a keepalive message.
            # https://developers.deepgram.com/reference/listen-live#stream-keepalive
            try:
                while True:
                    await ws.send_str(SpeechStream._KEEPALIVE_MSG)
                    await asyncio.sleep(5)
            except Exception:
                return

        async def send_task(ws: aiohttp.ClientWebSocketResponse):
            nonlocal closing_ws

            # forward audio to deepgram in chunks of 50ms
            samples_50ms = self._opts.sample_rate // 20
            audio_bstream = utils.audio.AudioByteStream(
                sample_rate=self._opts.sample_rate,
                num_channels=self._opts.num_channels,
                samples_per_channel=samples_50ms,
            )

            has_ended = False
            last_frame: Optional[rtc.AudioFrame] = None
            async for data in self._input_ch:
                frames: list[rtc.AudioFrame] = []
                if isinstance(data, rtc.AudioFrame):
                    state = self._check_energy_state(data)
                    if state in (
                        AudioEnergyFilter.State.START,
                        AudioEnergyFilter.State.SPEAKING,
                    ):
                        if last_frame:
                            frames.extend(
                                audio_bstream.write(last_frame.data.tobytes())
                            )
                            last_frame = None
                        frames.extend(audio_bstream.write(data.data.tobytes()))
                    elif state == AudioEnergyFilter.State.END:
                        # no need to buffer as we have cooldown period
                        frames = audio_bstream.flush()
                        has_ended = True
                    elif state == AudioEnergyFilter.State.SILENCE:
                        # buffer the last silence frame, since it could contain beginning of speech
                        # TODO: improve accuracy by using a ring buffer with longer window
                        last_frame = data
                elif isinstance(data, self._FlushSentinel):
                    frames = audio_bstream.flush()
                    has_ended = True

                for frame in frames:
                    self._audio_duration_collector.push(frame.duration)
                    await ws.send_bytes(frame.data.tobytes())

                    if has_ended:
                        self._audio_duration_collector.flush()
                        await ws.send_str(SpeechStream._FINALIZE_MSG)
                        has_ended = False

            # tell deepgram we are done sending audio/inputs
            closing_ws = True
            await ws.send_str(SpeechStream._CLOSE_MSG)

        async def recv_task(ws: aiohttp.ClientWebSocketResponse):
            nonlocal closing_ws
            while True:
                msg = await ws.receive()
                if msg.type in (
                    aiohttp.WSMsgType.CLOSED,
                    aiohttp.WSMsgType.CLOSE,
                    aiohttp.WSMsgType.CLOSING,
                ):
                    if closing_ws:  # close is expected, see SpeechStream.aclose
                        return

                    # this will trigger a reconnection, see the _run loop
                    raise APIStatusError(
                        message="deepgram connection closed unexpectedly"
                    )

                if msg.type != aiohttp.WSMsgType.TEXT:
                    logger.warning("unexpected deepgram message type %s", msg.type)
                    continue

                try:
                    self._process_stream_event(json.loads(msg.data))
                except Exception:
                    logger.exception("failed to process deepgram message")

        ws: aiohttp.ClientWebSocketResponse | None = None

        while True:
            try:
                ws = await self._connect_ws()
                tasks = [
                    asyncio.create_task(send_task(ws)),
                    asyncio.create_task(recv_task(ws)),
                    asyncio.create_task(keepalive_task(ws)),
                ]
                wait_reconnect_task = asyncio.create_task(self._reconnect_event.wait())
                try:
                    done, _ = await asyncio.wait(
                        [asyncio.gather(*tasks), wait_reconnect_task],
                        return_when=asyncio.FIRST_COMPLETED,
                    )  # type: ignore

                    # propagate exceptions from completed tasks
                    for task in done:
                        if task != wait_reconnect_task:
                            task.result()

                    if wait_reconnect_task not in done:
                        break

                    self._reconnect_event.clear()
                finally:
                    await utils.aio.gracefully_cancel(*tasks, wait_reconnect_task)
            finally:
                if ws is not None:
                    await ws.close()

    async def _connect_ws(self) -> aiohttp.ClientWebSocketResponse:
        live_config = {
            "model": self._opts.model,
            "punctuate": self._opts.punctuate,
            "smart_format": self._opts.smart_format,
            "no_delay": self._opts.no_delay,
            "interim_results": self._opts.interim_results,
            "encoding": "linear16",
            "vad_events": True,
            "sample_rate": self._opts.sample_rate,
            "channels": self._opts.num_channels,
            "endpointing": False
            if self._opts.endpointing_ms == 0
            else self._opts.endpointing_ms,
            "filler_words": self._opts.filler_words,
            "keywords": self._opts.keywords,
            "profanity_filter": self._opts.profanity_filter,
        }

        if self._opts.language:
            live_config["language"] = self._opts.language

        ws = await asyncio.wait_for(
            self._session.ws_connect(
                _to_deepgram_url(live_config, base_url=self._base_url, websocket=True),
                headers={"Authorization": f"Token {self._api_key}"},
            ),
            self._conn_options.timeout,
        )
        return ws

    def _check_energy_state(self, frame: rtc.AudioFrame) -> AudioEnergyFilter.State:
        if self._audio_energy_filter:
            return self._audio_energy_filter.update(frame)
        return AudioEnergyFilter.State.SPEAKING

    def _on_audio_duration_report(self, duration: float) -> None:
        usage_event = stt.SpeechEvent(
            type=stt.SpeechEventType.RECOGNITION_USAGE,
            request_id=self._request_id,
            alternatives=[],
            recognition_usage=stt.RecognitionUsage(audio_duration=duration),
        )
        self._event_ch.send_nowait(usage_event)

    def _process_stream_event(self, data: dict) -> None:
        assert self._opts.language is not None

        if data["type"] == "SpeechStarted":
            # This is a normal case. Deepgram's SpeechStarted events
            # are not correlated with speech_final or utterance end.
            # It's possible that we receive two in a row without an endpoint
            # It's also possible we receive a transcript without a SpeechStarted event.
            if self._speaking:
                return

            self._speaking = True
            start_event = stt.SpeechEvent(type=stt.SpeechEventType.START_OF_SPEECH)
            self._event_ch.send_nowait(start_event)

        # see this page:
        # https://developers.deepgram.com/docs/understand-endpointing-interim-results#using-endpointing-speech_final
        # for more information about the different types of events
        elif data["type"] == "Results":
            metadata = data["metadata"]
            request_id = metadata["request_id"]
            is_final_transcript = data["is_final"]
            is_endpoint = data["speech_final"]
            self._request_id = request_id

            alts = live_transcription_to_speech_data(self._opts.language, data)
            # If, for some reason, we didn't get a SpeechStarted event but we got
            # a transcript with text, we should start speaking. It's rare but has
            # been observed.
            if len(alts) > 0 and alts[0].text:
                if not self._speaking:
                    self._speaking = True
                    start_event = stt.SpeechEvent(
                        type=stt.SpeechEventType.START_OF_SPEECH
                    )
                    self._event_ch.send_nowait(start_event)

                if is_final_transcript:
                    final_event = stt.SpeechEvent(
                        type=stt.SpeechEventType.FINAL_TRANSCRIPT,
                        request_id=request_id,
                        alternatives=alts,
                    )
                    self._event_ch.send_nowait(final_event)
                else:
                    interim_event = stt.SpeechEvent(
                        type=stt.SpeechEventType.INTERIM_TRANSCRIPT,
                        request_id=request_id,
                        alternatives=alts,
                    )
                    self._event_ch.send_nowait(interim_event)

            # if we receive an endpoint, only end the speech if
            # we either had a SpeechStarted event or we have a seen
            # a non-empty transcript (deepgram doesn't have a SpeechEnded event)
            if is_endpoint and self._speaking:
                self._speaking = False
                self._event_ch.send_nowait(
                    stt.SpeechEvent(type=stt.SpeechEventType.END_OF_SPEECH)
                )

        elif data["type"] == "Metadata":
            pass  # metadata is too noisy
        else:
            logger.warning("received unexpected message from deepgram %s", data)

Helper class that provides a standard way to create an ABC using inheritance.

Args: sample_rate : int or None, optional The desired sample rate for the audio input. If specified, the audio input will be automatically resampled to match the given sample rate before being processed for Speech-to-Text. If not provided (None), the input will retain its original sample rate.

Ancestors

Methods

def update_options(self,
*,
language: DeepgramLanguages | None = None,
model: DeepgramModels | None = None,
interim_results: bool | None = None,
punctuate: bool | None = None,
smart_format: bool | None = None,
sample_rate: int | None = None,
no_delay: bool | None = None,
endpointing_ms: int | None = None,
filler_words: bool | None = None,
keywords: list[Tuple[str, float]] | None = None,
profanity_filter: bool | None = None)
Expand source code
def update_options(
    self,
    *,
    language: DeepgramLanguages | None = None,
    model: DeepgramModels | None = None,
    interim_results: bool | None = None,
    punctuate: bool | None = None,
    smart_format: bool | None = None,
    sample_rate: int | None = None,
    no_delay: bool | None = None,
    endpointing_ms: int | None = None,
    filler_words: bool | None = None,
    keywords: list[Tuple[str, float]] | None = None,
    profanity_filter: bool | None = None,
):
    if language is not None:
        self._opts.language = language
    if model is not None:
        self._opts.model = _validate_model(model, language)
    if interim_results is not None:
        self._opts.interim_results = interim_results
    if punctuate is not None:
        self._opts.punctuate = punctuate
    if smart_format is not None:
        self._opts.smart_format = smart_format
    if sample_rate is not None:
        self._opts.sample_rate = sample_rate
    if no_delay is not None:
        self._opts.no_delay = no_delay
    if endpointing_ms is not None:
        self._opts.endpointing_ms = endpointing_ms
    if filler_words is not None:
        self._opts.filler_words = filler_words
    if keywords is not None:
        self._opts.keywords = keywords
    if profanity_filter is not None:
        self._opts.profanity_filter = profanity_filter

    self._reconnect_event.set()

Inherited members

class TTS (*,
model: str = 'aura-asteria-en',
encoding: str = 'linear16',
sample_rate: int = 24000,
api_key: str | None = None,
base_url: str = 'https://api.deepgram.com/v1/speak',
word_tokenizer: tokenize.WordTokenizer = <livekit.agents.tokenize.basic.WordTokenizer object>,
http_session: aiohttp.ClientSession | None = None)
Expand source code
class TTS(tts.TTS):
    def __init__(
        self,
        *,
        model: str = "aura-asteria-en",
        encoding: str = "linear16",
        sample_rate: int = 24000,
        api_key: str | None = None,
        base_url: str = BASE_URL,
        word_tokenizer: tokenize.WordTokenizer = tokenize.basic.WordTokenizer(
            ignore_punctuation=False
        ),
        http_session: aiohttp.ClientSession | None = None,
    ) -> None:
        """
        Create a new instance of Deepgram TTS.

        Args:
            model (str): TTS model to use. Defaults to "aura-asteria-en".
            encoding (str): Audio encoding to use. Defaults to "linear16".
            sample_rate (int): Sample rate of audio. Defaults to 24000.
            api_key (str): Deepgram API key. If not provided, will look for DEEPGRAM_API_KEY in environment.
            base_url (str): Base URL for Deepgram TTS API. Defaults to "https://api.deepgram.com/v1/speak"
            word_tokenizer (tokenize.WordTokenizer): Tokenizer for processing text. Defaults to basic WordTokenizer.
            http_session (aiohttp.ClientSession): Optional aiohttp session to use for requests.

        """
        super().__init__(
            capabilities=tts.TTSCapabilities(streaming=True),
            sample_rate=sample_rate,
            num_channels=NUM_CHANNELS,
        )

        api_key = api_key or os.environ.get("DEEPGRAM_API_KEY")
        if not api_key:
            raise ValueError(
                "Deepgram API key required. Set DEEPGRAM_API_KEY or provide api_key."
            )

        self._opts = _TTSOptions(
            model=model,
            encoding=encoding,
            sample_rate=sample_rate,
            word_tokenizer=word_tokenizer,
        )
        self._session = http_session
        self._api_key = api_key
        self._base_url = base_url
        self._streams = weakref.WeakSet[SynthesizeStream]()

    def _ensure_session(self) -> aiohttp.ClientSession:
        if not self._session:
            self._session = utils.http_context.http_session()
        return self._session

    def update_options(
        self,
        *,
        model: str | None = None,
        sample_rate: int | None = None,
    ) -> None:
        """
        args:
            model (str): TTS model to use.
            sample_rate (int): Sample rate of audio.
        """
        if model is not None:
            self._opts.model = model
        if sample_rate is not None:
            self._opts.sample_rate = sample_rate
        for stream in self._streams:
            stream.update_options(
                model=model,
                sample_rate=sample_rate,
            )

    def synthesize(
        self,
        text: str,
        *,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    ) -> "ChunkedStream":
        return ChunkedStream(
            tts=self,
            input_text=text,
            base_url=self._base_url,
            api_key=self._api_key,
            conn_options=conn_options,
            opts=self._opts,
            session=self._ensure_session(),
        )

    def stream(
        self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS
    ) -> "SynthesizeStream":
        stream = SynthesizeStream(
            tts=self,
            conn_options=conn_options,
            base_url=self._base_url,
            api_key=self._api_key,
            opts=self._opts,
            session=self._ensure_session(),
        )
        self._streams.add(stream)
        return stream

Helper class that provides a standard way to create an ABC using inheritance.

Create a new instance of Deepgram TTS.

Args

model : str
TTS model to use. Defaults to "aura-asteria-en".
encoding : str
Audio encoding to use. Defaults to "linear16".
sample_rate : int
Sample rate of audio. Defaults to 24000.
api_key : str
Deepgram API key. If not provided, will look for DEEPGRAM_API_KEY in environment.
base_url : str
Base URL for Deepgram TTS API. Defaults to "https://api.deepgram.com/v1/speak"
word_tokenizer : tokenize.WordTokenizer
Tokenizer for processing text. Defaults to basic WordTokenizer.
http_session : aiohttp.ClientSession
Optional aiohttp session to use for requests.

Ancestors

Methods

def stream(self,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=5.0, timeout=10.0)) ‑> livekit.plugins.deepgram.tts.SynthesizeStream
Expand source code
def stream(
    self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS
) -> "SynthesizeStream":
    stream = SynthesizeStream(
        tts=self,
        conn_options=conn_options,
        base_url=self._base_url,
        api_key=self._api_key,
        opts=self._opts,
        session=self._ensure_session(),
    )
    self._streams.add(stream)
    return stream
def synthesize(self,
text: str,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=5.0, timeout=10.0)) ‑> livekit.plugins.deepgram.tts.ChunkedStream
Expand source code
def synthesize(
    self,
    text: str,
    *,
    conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
) -> "ChunkedStream":
    return ChunkedStream(
        tts=self,
        input_text=text,
        base_url=self._base_url,
        api_key=self._api_key,
        conn_options=conn_options,
        opts=self._opts,
        session=self._ensure_session(),
    )
def update_options(self, *, model: str | None = None, sample_rate: int | None = None) ‑> None
Expand source code
def update_options(
    self,
    *,
    model: str | None = None,
    sample_rate: int | None = None,
) -> None:
    """
    args:
        model (str): TTS model to use.
        sample_rate (int): Sample rate of audio.
    """
    if model is not None:
        self._opts.model = model
    if sample_rate is not None:
        self._opts.sample_rate = sample_rate
    for stream in self._streams:
        stream.update_options(
            model=model,
            sample_rate=sample_rate,
        )

args: model (str): TTS model to use. sample_rate (int): Sample rate of audio.

Inherited members