Module livekit.plugins.deepgram
Classes
class AudioEnergyFilter (*, min_silence: float = 1.5, rms_threshold: float = 1.6e-05)
-
Expand source code
class AudioEnergyFilter: class State(Enum): START = 0 SPEAKING = 1 SILENCE = 2 END = 3 def __init__( self, *, min_silence: float = 1.5, rms_threshold: float = MAGIC_NUMBER_THRESHOLD ): self._cooldown_seconds = min_silence self._cooldown = min_silence self._state = self.State.SILENCE self._rms_threshold = rms_threshold def update(self, frame: rtc.AudioFrame) -> State: arr = np.frombuffer(frame.data, dtype=np.int16) float_arr = arr.astype(np.float32) / 32768.0 rms = np.mean(np.square(float_arr)) if rms > self._rms_threshold: self._cooldown = self._cooldown_seconds if self._state in (self.State.SILENCE, self.State.END): self._state = self.State.START else: self._state = self.State.SPEAKING else: if self._cooldown <= 0: if self._state in (self.State.SPEAKING, self.State.START): self._state = self.State.END elif self._state == self.State.END: self._state = self.State.SILENCE else: # keep speaking during cooldown self._cooldown -= frame.duration self._state = self.State.SPEAKING return self._state
Class variables
var State
-
Create a collection of name/value pairs.
Example enumeration:
>>> class Color(Enum): ... RED = 1 ... BLUE = 2 ... GREEN = 3
Access them by:
- attribute access:
Color.RED
- value lookup:
Color(1)
- name lookup:
Color['RED']
Enumerations can be iterated over, and know how many members they have:
>>> len(Color) 3
>>> list(Color) [<Color.RED: 1>, <Color.BLUE: 2>, <Color.GREEN: 3>]
Methods can be added to enumerations, and members can have their own attributes – see the documentation for details.
Methods
def update(self, frame: rtc.AudioFrame) ‑> State
-
Expand source code
def update(self, frame: rtc.AudioFrame) -> State: arr = np.frombuffer(frame.data, dtype=np.int16) float_arr = arr.astype(np.float32) / 32768.0 rms = np.mean(np.square(float_arr)) if rms > self._rms_threshold: self._cooldown = self._cooldown_seconds if self._state in (self.State.SILENCE, self.State.END): self._state = self.State.START else: self._state = self.State.SPEAKING else: if self._cooldown <= 0: if self._state in (self.State.SPEAKING, self.State.START): self._state = self.State.END elif self._state == self.State.END: self._state = self.State.SILENCE else: # keep speaking during cooldown self._cooldown -= frame.duration self._state = self.State.SPEAKING return self._state
class STT (*,
model: DeepgramModels | str = 'nova-2-general',
language: DeepgramLanguages | str = 'en-US',
detect_language: bool = False,
interim_results: bool = True,
punctuate: bool = True,
smart_format: bool = True,
sample_rate: int = 16000,
no_delay: bool = True,
endpointing_ms: int = 25,
filler_words: bool = True,
keywords: list[Tuple[str, float]] | None = None,
keyterms: list[str] | None = None,
profanity_filter: bool = False,
api_key: str | None = None,
http_session: aiohttp.ClientSession | None = None,
base_url: str = 'https://api.deepgram.com/v1/listen',
energy_filter: AudioEnergyFilter | bool = False,
numerals: bool = False,
mip_opt_out: bool = False)-
Expand source code
class STT(stt.STT): def __init__( self, *, model: DeepgramModels | str = "nova-2-general", language: DeepgramLanguages | str = "en-US", detect_language: bool = False, interim_results: bool = True, punctuate: bool = True, smart_format: bool = True, sample_rate: int = 16000, no_delay: bool = True, endpointing_ms: int = 25, # enable filler words by default to improve turn detector accuracy filler_words: bool = True, keywords: list[Tuple[str, float]] | None = None, keyterms: list[str] | None = None, profanity_filter: bool = False, api_key: str | None = None, http_session: aiohttp.ClientSession | None = None, base_url: str = BASE_URL, energy_filter: AudioEnergyFilter | bool = False, numerals: bool = False, mip_opt_out: bool = False, ) -> None: """Create a new instance of Deepgram STT. Args: model: The Deepgram model to use for speech recognition. Defaults to "nova-2-general". language: The language code for recognition. Defaults to "en-US". detect_language: Whether to enable automatic language detection. Defaults to False. interim_results: Whether to return interim (non-final) transcription results. Defaults to True. punctuate: Whether to add punctuations to the transcription. Defaults to True. Turn detector will work better with punctuations. smart_format: Whether to apply smart formatting to numbers, dates, etc. Defaults to True. sample_rate: The sample rate of the audio in Hz. Defaults to 16000. no_delay: When smart_format is used, ensures it does not wait for sequence to be complete before returning results. Defaults to True. endpointing_ms: Time in milliseconds of silence to consider end of speech. Set to 0 to disable. Defaults to 25. filler_words: Whether to include filler words (um, uh, etc.) in transcription. Defaults to True. keywords: List of tuples containing keywords and their boost values for improved recognition. Each tuple should be (keyword: str, boost: float). Defaults to None. `keywords` does not work with Nova-3 models. Use `keyterms` instead. keyterms: List of key terms to improve recognition accuracy. Defaults to None. `keyterms` is supported by Nova-3 models. profanity_filter: Whether to filter profanity from the transcription. Defaults to False. api_key: Your Deepgram API key. If not provided, will look for DEEPGRAM_API_KEY environment variable. http_session: Optional aiohttp ClientSession to use for requests. base_url: The base URL for Deepgram API. Defaults to "https://api.deepgram.com/v1/listen". energy_filter: Audio energy filter configuration for voice activity detection. Can be a boolean or AudioEnergyFilter instance. Defaults to False. numerals: Whether to include numerals in the transcription. Defaults to False. mip_opt_out: Whether to take part in the model improvement program Raises: ValueError: If no API key is provided or found in environment variables. Note: The api_key must be set either through the constructor argument or by setting the DEEPGRAM_API_KEY environmental variable. """ super().__init__( capabilities=stt.STTCapabilities( streaming=True, interim_results=interim_results ) ) self._base_url = base_url api_key = api_key or os.environ.get("DEEPGRAM_API_KEY") if api_key is None: raise ValueError("Deepgram API key is required") model = _validate_model(model, language) self._api_key = api_key self._opts = STTOptions( language=language, detect_language=detect_language, interim_results=interim_results, punctuate=punctuate, model=model, smart_format=smart_format, no_delay=no_delay, endpointing_ms=endpointing_ms, filler_words=filler_words, sample_rate=sample_rate, num_channels=1, keywords=keywords or [], keyterms=keyterms or [], profanity_filter=profanity_filter, energy_filter=energy_filter, numerals=numerals, mip_opt_out=mip_opt_out, ) self._session = http_session self._streams = weakref.WeakSet[SpeechStream]() def _ensure_session(self) -> aiohttp.ClientSession: if not self._session: self._session = utils.http_context.http_session() return self._session async def _recognize_impl( self, buffer: AudioBuffer, *, language: DeepgramLanguages | str | None, conn_options: APIConnectOptions, ) -> stt.SpeechEvent: config = self._sanitize_options(language=language) recognize_config = { "model": str(config.model), "punctuate": config.punctuate, "detect_language": config.detect_language, "smart_format": config.smart_format, "keywords": self._opts.keywords, "profanity_filter": config.profanity_filter, "numerals": config.numerals, } if config.language: recognize_config["language"] = config.language try: async with self._ensure_session().post( url=_to_deepgram_url(recognize_config, self._base_url, websocket=False), data=rtc.combine_audio_frames(buffer).to_wav_bytes(), headers={ "Authorization": f"Token {self._api_key}", "Accept": "application/json", "Content-Type": "audio/wav", }, timeout=aiohttp.ClientTimeout( total=30, sock_connect=conn_options.timeout, ), ) as res: return prerecorded_transcription_to_speech_event( config.language, await res.json(), ) except asyncio.TimeoutError as e: raise APITimeoutError() from e except aiohttp.ClientResponseError as e: raise APIStatusError( message=e.message, status_code=e.status, request_id=None, body=None, ) from e except Exception as e: raise APIConnectionError() from e def stream( self, *, language: DeepgramLanguages | str | None = None, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> "SpeechStream": config = self._sanitize_options(language=language) stream = SpeechStream( stt=self, conn_options=conn_options, opts=config, api_key=self._api_key, http_session=self._ensure_session(), base_url=self._base_url, ) self._streams.add(stream) return stream def update_options( self, *, language: DeepgramLanguages | str | None = None, model: DeepgramModels | str | None = None, interim_results: bool | None = None, punctuate: bool | None = None, smart_format: bool | None = None, sample_rate: int | None = None, no_delay: bool | None = None, endpointing_ms: int | None = None, filler_words: bool | None = None, keywords: list[Tuple[str, float]] | None = None, keyterms: list[str] | None = None, profanity_filter: bool | None = None, numerals: bool | None = None, mip_opt_out: bool | None = None, ): if language is not None: self._opts.language = language if model is not None: self._opts.model = _validate_model(model, language) if interim_results is not None: self._opts.interim_results = interim_results if punctuate is not None: self._opts.punctuate = punctuate if smart_format is not None: self._opts.smart_format = smart_format if sample_rate is not None: self._opts.sample_rate = sample_rate if no_delay is not None: self._opts.no_delay = no_delay if endpointing_ms is not None: self._opts.endpointing_ms = endpointing_ms if filler_words is not None: self._opts.filler_words = filler_words if keywords is not None: self._opts.keywords = keywords if keyterms is not None: self._opts.keyterms = keyterms if profanity_filter is not None: self._opts.profanity_filter = profanity_filter if mip_opt_out is not None: self._opts.mip_opt_out = mip_opt_out for stream in self._streams: stream.update_options( language=language, model=model, interim_results=interim_results, punctuate=punctuate, smart_format=smart_format, sample_rate=sample_rate, no_delay=no_delay, endpointing_ms=endpointing_ms, filler_words=filler_words, keywords=keywords, keyterms=keyterms, profanity_filter=profanity_filter, numerals=numerals, mip_opt_out=mip_opt_out, ) def _sanitize_options(self, *, language: str | None = None) -> STTOptions: config = dataclasses.replace(self._opts) config.language = language or config.language if config.detect_language: config.language = None return config
Helper class that provides a standard way to create an ABC using inheritance.
Create a new instance of Deepgram STT.
Args
model
- The Deepgram model to use for speech recognition. Defaults to "nova-2-general".
language
- The language code for recognition. Defaults to "en-US".
detect_language
- Whether to enable automatic language detection. Defaults to False.
interim_results
- Whether to return interim (non-final) transcription results. Defaults to True.
punctuate
- Whether to add punctuations to the transcription. Defaults to True. Turn detector will work better with punctuations.
smart_format
- Whether to apply smart formatting to numbers, dates, etc. Defaults to True.
sample_rate
- The sample rate of the audio in Hz. Defaults to 16000.
no_delay
- When smart_format is used, ensures it does not wait for sequence to be complete before returning results. Defaults to True.
endpointing_ms
- Time in milliseconds of silence to consider end of speech. Set to 0 to disable. Defaults to 25.
filler_words
- Whether to include filler words (um, uh, etc.) in transcription. Defaults to True.
keywords
- List of tuples containing keywords and their boost values for improved recognition.
Each tuple should be (keyword: str, boost: float). Defaults to None.
keywords
does not work with Nova-3 models. Usekeyterms
instead. keyterms
- List of key terms to improve recognition accuracy. Defaults to None.
keyterms
is supported by Nova-3 models. profanity_filter
- Whether to filter profanity from the transcription. Defaults to False.
api_key
- Your Deepgram API key. If not provided, will look for DEEPGRAM_API_KEY environment variable.
http_session
- Optional aiohttp ClientSession to use for requests.
base_url
- The base URL for Deepgram API. Defaults to "https://api.deepgram.com/v1/listen".
energy_filter
- Audio energy filter configuration for voice activity detection. Can be a boolean or AudioEnergyFilter instance. Defaults to False.
numerals
- Whether to include numerals in the transcription. Defaults to False.
mip_opt_out
- Whether to take part in the model improvement program
Raises
ValueError
- If no API key is provided or found in environment variables.
Note
The api_key must be set either through the constructor argument or by setting the DEEPGRAM_API_KEY environmental variable.
Ancestors
- STT
- abc.ABC
- EventEmitter
- typing.Generic
Methods
def stream(self,
*,
language: DeepgramLanguages | str | None = None,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.deepgram.stt.SpeechStream-
Expand source code
def stream( self, *, language: DeepgramLanguages | str | None = None, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> "SpeechStream": config = self._sanitize_options(language=language) stream = SpeechStream( stt=self, conn_options=conn_options, opts=config, api_key=self._api_key, http_session=self._ensure_session(), base_url=self._base_url, ) self._streams.add(stream) return stream
def update_options(self,
*,
language: DeepgramLanguages | str | None = None,
model: DeepgramModels | str | None = None,
interim_results: bool | None = None,
punctuate: bool | None = None,
smart_format: bool | None = None,
sample_rate: int | None = None,
no_delay: bool | None = None,
endpointing_ms: int | None = None,
filler_words: bool | None = None,
keywords: list[Tuple[str, float]] | None = None,
keyterms: list[str] | None = None,
profanity_filter: bool | None = None,
numerals: bool | None = None,
mip_opt_out: bool | None = None)-
Expand source code
def update_options( self, *, language: DeepgramLanguages | str | None = None, model: DeepgramModels | str | None = None, interim_results: bool | None = None, punctuate: bool | None = None, smart_format: bool | None = None, sample_rate: int | None = None, no_delay: bool | None = None, endpointing_ms: int | None = None, filler_words: bool | None = None, keywords: list[Tuple[str, float]] | None = None, keyterms: list[str] | None = None, profanity_filter: bool | None = None, numerals: bool | None = None, mip_opt_out: bool | None = None, ): if language is not None: self._opts.language = language if model is not None: self._opts.model = _validate_model(model, language) if interim_results is not None: self._opts.interim_results = interim_results if punctuate is not None: self._opts.punctuate = punctuate if smart_format is not None: self._opts.smart_format = smart_format if sample_rate is not None: self._opts.sample_rate = sample_rate if no_delay is not None: self._opts.no_delay = no_delay if endpointing_ms is not None: self._opts.endpointing_ms = endpointing_ms if filler_words is not None: self._opts.filler_words = filler_words if keywords is not None: self._opts.keywords = keywords if keyterms is not None: self._opts.keyterms = keyterms if profanity_filter is not None: self._opts.profanity_filter = profanity_filter if mip_opt_out is not None: self._opts.mip_opt_out = mip_opt_out for stream in self._streams: stream.update_options( language=language, model=model, interim_results=interim_results, punctuate=punctuate, smart_format=smart_format, sample_rate=sample_rate, no_delay=no_delay, endpointing_ms=endpointing_ms, filler_words=filler_words, keywords=keywords, keyterms=keyterms, profanity_filter=profanity_filter, numerals=numerals, mip_opt_out=mip_opt_out, )
Inherited members
class SpeechStream (*,
stt: STT,
opts: STTOptions,
conn_options: APIConnectOptions,
api_key: str,
http_session: aiohttp.ClientSession,
base_url: str)-
Expand source code
class SpeechStream(stt.SpeechStream): _KEEPALIVE_MSG: str = json.dumps({"type": "KeepAlive"}) _CLOSE_MSG: str = json.dumps({"type": "CloseStream"}) _FINALIZE_MSG: str = json.dumps({"type": "Finalize"}) def __init__( self, *, stt: STT, opts: STTOptions, conn_options: APIConnectOptions, api_key: str, http_session: aiohttp.ClientSession, base_url: str, ) -> None: super().__init__( stt=stt, conn_options=conn_options, sample_rate=opts.sample_rate ) if opts.detect_language and opts.language is None: raise ValueError("language detection is not supported in streaming mode") self._opts = opts self._api_key = api_key self._session = http_session self._base_url = base_url self._speaking = False self._audio_duration_collector = PeriodicCollector( callback=self._on_audio_duration_report, duration=5.0, ) self._audio_energy_filter: Optional[AudioEnergyFilter] = None if opts.energy_filter: if isinstance(opts.energy_filter, AudioEnergyFilter): self._audio_energy_filter = opts.energy_filter else: self._audio_energy_filter = AudioEnergyFilter() self._request_id = "" self._reconnect_event = asyncio.Event() def update_options( self, *, language: DeepgramLanguages | str | None = None, model: DeepgramModels | str | None = None, interim_results: bool | None = None, punctuate: bool | None = None, smart_format: bool | None = None, sample_rate: int | None = None, no_delay: bool | None = None, endpointing_ms: int | None = None, filler_words: bool | None = None, keywords: list[Tuple[str, float]] | None = None, keyterms: list[str] | None = None, profanity_filter: bool | None = None, numerals: bool | None = None, mip_opt_out: bool | None = None, ): if language is not None: self._opts.language = language if model is not None: self._opts.model = _validate_model(model, language) if interim_results is not None: self._opts.interim_results = interim_results if punctuate is not None: self._opts.punctuate = punctuate if smart_format is not None: self._opts.smart_format = smart_format if sample_rate is not None: self._opts.sample_rate = sample_rate if no_delay is not None: self._opts.no_delay = no_delay if endpointing_ms is not None: self._opts.endpointing_ms = endpointing_ms if filler_words is not None: self._opts.filler_words = filler_words if keywords is not None: self._opts.keywords = keywords if keyterms is not None: self._opts.keyterms = keyterms if profanity_filter is not None: self._opts.profanity_filter = profanity_filter if numerals is not None: self._opts.numerals = numerals if mip_opt_out is not None: self._opts.mip_opt_out = mip_opt_out self._reconnect_event.set() async def _run(self) -> None: closing_ws = False async def keepalive_task(ws: aiohttp.ClientWebSocketResponse): # if we want to keep the connection alive even if no audio is sent, # Deepgram expects a keepalive message. # https://developers.deepgram.com/reference/listen-live#stream-keepalive try: while True: await ws.send_str(SpeechStream._KEEPALIVE_MSG) await asyncio.sleep(5) except Exception: return @utils.log_exceptions(logger=logger) async def send_task(ws: aiohttp.ClientWebSocketResponse): nonlocal closing_ws # forward audio to deepgram in chunks of 50ms samples_50ms = self._opts.sample_rate // 20 audio_bstream = utils.audio.AudioByteStream( sample_rate=self._opts.sample_rate, num_channels=self._opts.num_channels, samples_per_channel=samples_50ms, ) has_ended = False last_frame: Optional[rtc.AudioFrame] = None async for data in self._input_ch: frames: list[rtc.AudioFrame] = [] if isinstance(data, rtc.AudioFrame): state = self._check_energy_state(data) if state in ( AudioEnergyFilter.State.START, AudioEnergyFilter.State.SPEAKING, ): if last_frame: frames.extend( audio_bstream.write(last_frame.data.tobytes()) ) last_frame = None frames.extend(audio_bstream.write(data.data.tobytes())) elif state == AudioEnergyFilter.State.END: # no need to buffer as we have cooldown period frames.extend(audio_bstream.flush()) has_ended = True elif state == AudioEnergyFilter.State.SILENCE: # buffer the last silence frame, since it could contain beginning of speech # TODO: improve accuracy by using a ring buffer with longer window last_frame = data elif isinstance(data, self._FlushSentinel): frames.extend(audio_bstream.flush()) has_ended = True for frame in frames: self._audio_duration_collector.push(frame.duration) await ws.send_bytes(frame.data.tobytes()) if has_ended: self._audio_duration_collector.flush() await ws.send_str(SpeechStream._FINALIZE_MSG) has_ended = False # tell deepgram we are done sending audio/inputs closing_ws = True await ws.send_str(SpeechStream._CLOSE_MSG) @utils.log_exceptions(logger=logger) async def recv_task(ws: aiohttp.ClientWebSocketResponse): nonlocal closing_ws while True: msg = await ws.receive() if msg.type in ( aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE, aiohttp.WSMsgType.CLOSING, ): if closing_ws: # close is expected, see SpeechStream.aclose return # this will trigger a reconnection, see the _run loop raise APIStatusError( message="deepgram connection closed unexpectedly" ) if msg.type != aiohttp.WSMsgType.TEXT: logger.warning("unexpected deepgram message type %s", msg.type) continue try: self._process_stream_event(json.loads(msg.data)) except Exception: logger.exception("failed to process deepgram message") ws: aiohttp.ClientWebSocketResponse | None = None while True: try: ws = await self._connect_ws() tasks = [ asyncio.create_task(send_task(ws)), asyncio.create_task(recv_task(ws)), asyncio.create_task(keepalive_task(ws)), ] wait_reconnect_task = asyncio.create_task(self._reconnect_event.wait()) try: done, _ = await asyncio.wait( [asyncio.gather(*tasks), wait_reconnect_task], return_when=asyncio.FIRST_COMPLETED, ) # type: ignore # propagate exceptions from completed tasks for task in done: if task != wait_reconnect_task: task.result() if wait_reconnect_task not in done: break self._reconnect_event.clear() finally: await utils.aio.gracefully_cancel(*tasks, wait_reconnect_task) finally: if ws is not None: await ws.close() async def _connect_ws(self) -> aiohttp.ClientWebSocketResponse: live_config: dict[str, Any] = { "model": self._opts.model, "punctuate": self._opts.punctuate, "smart_format": self._opts.smart_format, "no_delay": self._opts.no_delay, "interim_results": self._opts.interim_results, "encoding": "linear16", "vad_events": True, "sample_rate": self._opts.sample_rate, "channels": self._opts.num_channels, "endpointing": False if self._opts.endpointing_ms == 0 else self._opts.endpointing_ms, "filler_words": self._opts.filler_words, "profanity_filter": self._opts.profanity_filter, "numerals": self._opts.numerals, "mip_opt_out": self._opts.mip_opt_out, } if self._opts.keywords: live_config["keywords"] = self._opts.keywords if self._opts.keyterms: # the query param is `keyterm` # See: https://developers.deepgram.com/docs/keyterm live_config["keyterm"] = self._opts.keyterms if self._opts.language: live_config["language"] = self._opts.language ws = await asyncio.wait_for( self._session.ws_connect( _to_deepgram_url(live_config, base_url=self._base_url, websocket=True), headers={"Authorization": f"Token {self._api_key}"}, ), self._conn_options.timeout, ) return ws def _check_energy_state(self, frame: rtc.AudioFrame) -> AudioEnergyFilter.State: if self._audio_energy_filter: return self._audio_energy_filter.update(frame) return AudioEnergyFilter.State.SPEAKING def _on_audio_duration_report(self, duration: float) -> None: usage_event = stt.SpeechEvent( type=stt.SpeechEventType.RECOGNITION_USAGE, request_id=self._request_id, alternatives=[], recognition_usage=stt.RecognitionUsage(audio_duration=duration), ) self._event_ch.send_nowait(usage_event) def _process_stream_event(self, data: dict) -> None: assert self._opts.language is not None if data["type"] == "SpeechStarted": # This is a normal case. Deepgram's SpeechStarted events # are not correlated with speech_final or utterance end. # It's possible that we receive two in a row without an endpoint # It's also possible we receive a transcript without a SpeechStarted event. if self._speaking: return self._speaking = True start_event = stt.SpeechEvent(type=stt.SpeechEventType.START_OF_SPEECH) self._event_ch.send_nowait(start_event) # see this page: # https://developers.deepgram.com/docs/understand-endpointing-interim-results#using-endpointing-speech_final # for more information about the different types of events elif data["type"] == "Results": metadata = data["metadata"] request_id = metadata["request_id"] is_final_transcript = data["is_final"] is_endpoint = data["speech_final"] self._request_id = request_id alts = live_transcription_to_speech_data(self._opts.language, data) # If, for some reason, we didn't get a SpeechStarted event but we got # a transcript with text, we should start speaking. It's rare but has # been observed. if len(alts) > 0 and alts[0].text: if not self._speaking: self._speaking = True start_event = stt.SpeechEvent( type=stt.SpeechEventType.START_OF_SPEECH ) self._event_ch.send_nowait(start_event) if is_final_transcript: final_event = stt.SpeechEvent( type=stt.SpeechEventType.FINAL_TRANSCRIPT, request_id=request_id, alternatives=alts, ) self._event_ch.send_nowait(final_event) else: interim_event = stt.SpeechEvent( type=stt.SpeechEventType.INTERIM_TRANSCRIPT, request_id=request_id, alternatives=alts, ) self._event_ch.send_nowait(interim_event) # if we receive an endpoint, only end the speech if # we either had a SpeechStarted event or we have a seen # a non-empty transcript (deepgram doesn't have a SpeechEnded event) if is_endpoint and self._speaking: self._speaking = False self._event_ch.send_nowait( stt.SpeechEvent(type=stt.SpeechEventType.END_OF_SPEECH) ) elif data["type"] == "Metadata": pass # metadata is too noisy else: logger.warning("received unexpected message from deepgram %s", data)
Helper class that provides a standard way to create an ABC using inheritance.
Args: sample_rate : int or None, optional The desired sample rate for the audio input. If specified, the audio input will be automatically resampled to match the given sample rate before being processed for Speech-to-Text. If not provided (None), the input will retain its original sample rate.
Ancestors
- RecognizeStream
- abc.ABC
Methods
def update_options(self,
*,
language: DeepgramLanguages | str | None = None,
model: DeepgramModels | str | None = None,
interim_results: bool | None = None,
punctuate: bool | None = None,
smart_format: bool | None = None,
sample_rate: int | None = None,
no_delay: bool | None = None,
endpointing_ms: int | None = None,
filler_words: bool | None = None,
keywords: list[Tuple[str, float]] | None = None,
keyterms: list[str] | None = None,
profanity_filter: bool | None = None,
numerals: bool | None = None,
mip_opt_out: bool | None = None)-
Expand source code
def update_options( self, *, language: DeepgramLanguages | str | None = None, model: DeepgramModels | str | None = None, interim_results: bool | None = None, punctuate: bool | None = None, smart_format: bool | None = None, sample_rate: int | None = None, no_delay: bool | None = None, endpointing_ms: int | None = None, filler_words: bool | None = None, keywords: list[Tuple[str, float]] | None = None, keyterms: list[str] | None = None, profanity_filter: bool | None = None, numerals: bool | None = None, mip_opt_out: bool | None = None, ): if language is not None: self._opts.language = language if model is not None: self._opts.model = _validate_model(model, language) if interim_results is not None: self._opts.interim_results = interim_results if punctuate is not None: self._opts.punctuate = punctuate if smart_format is not None: self._opts.smart_format = smart_format if sample_rate is not None: self._opts.sample_rate = sample_rate if no_delay is not None: self._opts.no_delay = no_delay if endpointing_ms is not None: self._opts.endpointing_ms = endpointing_ms if filler_words is not None: self._opts.filler_words = filler_words if keywords is not None: self._opts.keywords = keywords if keyterms is not None: self._opts.keyterms = keyterms if profanity_filter is not None: self._opts.profanity_filter = profanity_filter if numerals is not None: self._opts.numerals = numerals if mip_opt_out is not None: self._opts.mip_opt_out = mip_opt_out self._reconnect_event.set()
Inherited members
class TTS (*,
model: str = 'aura-asteria-en',
encoding: str = 'linear16',
sample_rate: int = 24000,
api_key: str | None = None,
base_url: str = 'https://api.deepgram.com/v1/speak',
word_tokenizer: tokenize.WordTokenizer = <livekit.agents.tokenize.basic.WordTokenizer object>,
http_session: aiohttp.ClientSession | None = None)-
Expand source code
class TTS(tts.TTS): def __init__( self, *, model: str = "aura-asteria-en", encoding: str = "linear16", sample_rate: int = 24000, api_key: str | None = None, base_url: str = BASE_URL, word_tokenizer: tokenize.WordTokenizer = tokenize.basic.WordTokenizer( ignore_punctuation=False ), http_session: aiohttp.ClientSession | None = None, ) -> None: """ Create a new instance of Deepgram TTS. Args: model (str): TTS model to use. Defaults to "aura-asteria-en". encoding (str): Audio encoding to use. Defaults to "linear16". sample_rate (int): Sample rate of audio. Defaults to 24000. api_key (str): Deepgram API key. If not provided, will look for DEEPGRAM_API_KEY in environment. base_url (str): Base URL for Deepgram TTS API. Defaults to "https://api.deepgram.com/v1/speak" word_tokenizer (tokenize.WordTokenizer): Tokenizer for processing text. Defaults to basic WordTokenizer. http_session (aiohttp.ClientSession): Optional aiohttp session to use for requests. """ super().__init__( capabilities=tts.TTSCapabilities(streaming=True), sample_rate=sample_rate, num_channels=NUM_CHANNELS, ) api_key = api_key or os.environ.get("DEEPGRAM_API_KEY") if not api_key: raise ValueError( "Deepgram API key required. Set DEEPGRAM_API_KEY or provide api_key." ) self._opts = _TTSOptions( model=model, encoding=encoding, sample_rate=sample_rate, word_tokenizer=word_tokenizer, ) self._session = http_session self._api_key = api_key self._base_url = base_url self._streams = weakref.WeakSet[SynthesizeStream]() self._pool = utils.ConnectionPool[aiohttp.ClientWebSocketResponse]( connect_cb=self._connect_ws, close_cb=self._close_ws, max_session_duration=3600, # 1 hour mark_refreshed_on_get=False, ) async def _connect_ws(self) -> aiohttp.ClientWebSocketResponse: session = self._ensure_session() config = { "encoding": self._opts.encoding, "model": self._opts.model, "sample_rate": self._opts.sample_rate, } return await asyncio.wait_for( session.ws_connect( _to_deepgram_url(config, self._base_url, websocket=True), headers={"Authorization": f"Token {self._api_key}"}, ), self._conn_options.timeout, ) async def _close_ws(self, ws: aiohttp.ClientWebSocketResponse): await ws.close() def _ensure_session(self) -> aiohttp.ClientSession: if not self._session: self._session = utils.http_context.http_session() return self._session def update_options( self, *, model: str | None = None, sample_rate: int | None = None, ) -> None: """ args: model (str): TTS model to use. sample_rate (int): Sample rate of audio. """ if model is not None: self._opts.model = model if sample_rate is not None: self._opts.sample_rate = sample_rate # deepgram sets options upon connection, so we need to invalidate the pool # to get a new connection with the updated options self._pool.invalidate() def synthesize( self, text: str, *, conn_options: Optional[APIConnectOptions] = None, ) -> "ChunkedStream": return ChunkedStream( tts=self, input_text=text, base_url=self._base_url, api_key=self._api_key, conn_options=conn_options, opts=self._opts, session=self._ensure_session(), ) def stream( self, *, conn_options: Optional[APIConnectOptions] = None ) -> "SynthesizeStream": stream = SynthesizeStream( tts=self, pool=self._pool, opts=self._opts, ) self._streams.add(stream) return stream def prewarm(self) -> None: self._pool.prewarm() async def aclose(self) -> None: for stream in list(self._streams): await stream.aclose() self._streams.clear() await self._pool.aclose() await super().aclose()
Helper class that provides a standard way to create an ABC using inheritance.
Create a new instance of Deepgram TTS.
Args
model
:str
- TTS model to use. Defaults to "aura-asteria-en".
encoding
:str
- Audio encoding to use. Defaults to "linear16".
sample_rate
:int
- Sample rate of audio. Defaults to 24000.
api_key
:str
- Deepgram API key. If not provided, will look for DEEPGRAM_API_KEY in environment.
base_url
:str
- Base URL for Deepgram TTS API. Defaults to "https://api.deepgram.com/v1/speak"
word_tokenizer
:tokenize.WordTokenizer
- Tokenizer for processing text. Defaults to basic WordTokenizer.
http_session
:aiohttp.ClientSession
- Optional aiohttp session to use for requests.
Ancestors
- TTS
- abc.ABC
- EventEmitter
- typing.Generic
Methods
async def aclose(self) ‑> None
-
Expand source code
async def aclose(self) -> None: for stream in list(self._streams): await stream.aclose() self._streams.clear() await self._pool.aclose() await super().aclose()
def stream(self, *, conn_options: Optional[APIConnectOptions] = None) ‑> livekit.plugins.deepgram.tts.SynthesizeStream
-
Expand source code
def stream( self, *, conn_options: Optional[APIConnectOptions] = None ) -> "SynthesizeStream": stream = SynthesizeStream( tts=self, pool=self._pool, opts=self._opts, ) self._streams.add(stream) return stream
def synthesize(self, text: str, *, conn_options: Optional[APIConnectOptions] = None) ‑> livekit.plugins.deepgram.tts.ChunkedStream
-
Expand source code
def synthesize( self, text: str, *, conn_options: Optional[APIConnectOptions] = None, ) -> "ChunkedStream": return ChunkedStream( tts=self, input_text=text, base_url=self._base_url, api_key=self._api_key, conn_options=conn_options, opts=self._opts, session=self._ensure_session(), )
def update_options(self, *, model: str | None = None, sample_rate: int | None = None) ‑> None
-
Expand source code
def update_options( self, *, model: str | None = None, sample_rate: int | None = None, ) -> None: """ args: model (str): TTS model to use. sample_rate (int): Sample rate of audio. """ if model is not None: self._opts.model = model if sample_rate is not None: self._opts.sample_rate = sample_rate # deepgram sets options upon connection, so we need to invalidate the pool # to get a new connection with the updated options self._pool.invalidate()
args: model (str): TTS model to use. sample_rate (int): Sample rate of audio.
Inherited members