Module livekit.plugins.deepgram
Classes
class AudioEnergyFilter (*, min_silence: float = 1.5, rms_threshold: float = 1.6e-05)
-
Expand source code
class AudioEnergyFilter: class State(Enum): START = 0 SPEAKING = 1 SILENCE = 2 END = 3 def __init__( self, *, min_silence: float = 1.5, rms_threshold: float = MAGIC_NUMBER_THRESHOLD ): self._cooldown_seconds = min_silence self._cooldown = min_silence self._state = self.State.SILENCE self._rms_threshold = rms_threshold def update(self, frame: rtc.AudioFrame) -> State: arr = np.frombuffer(frame.data, dtype=np.int16) float_arr = arr.astype(np.float32) / 32768.0 rms = np.mean(np.square(float_arr)) if rms > self._rms_threshold: self._cooldown = self._cooldown_seconds if self._state in (self.State.SILENCE, self.State.END): self._state = self.State.START else: self._state = self.State.SPEAKING else: if self._cooldown <= 0: if self._state in (self.State.SPEAKING, self.State.START): self._state = self.State.END elif self._state == self.State.END: self._state = self.State.SILENCE else: # keep speaking during cooldown self._cooldown -= frame.duration self._state = self.State.SPEAKING return self._state
Class variables
var State
-
Create a collection of name/value pairs.
Example enumeration:
>>> class Color(Enum): ... RED = 1 ... BLUE = 2 ... GREEN = 3
Access them by:
- attribute access:
Color.RED
- value lookup:
Color(1)
- name lookup:
Color['RED']
Enumerations can be iterated over, and know how many members they have:
>>> len(Color) 3
>>> list(Color) [<Color.RED: 1>, <Color.BLUE: 2>, <Color.GREEN: 3>]
Methods can be added to enumerations, and members can have their own attributes – see the documentation for details.
Methods
def update(self, frame: rtc.AudioFrame)
class STT (*, model: DeepgramModels = 'nova-2-general', language: DeepgramLanguages = 'en-US', detect_language: bool = False, interim_results: bool = True, punctuate: bool = True, smart_format: bool = True, sample_rate: int = 16000, no_delay: bool = True, endpointing_ms: int = 25, filler_words: bool = False, keywords: list[Tuple[str, float]] = [], profanity_filter: bool = False, api_key: str | None = None, http_session: aiohttp.ClientSession | None = None, energy_filter: AudioEnergyFilter | bool = False)
-
Helper class that provides a standard way to create an ABC using inheritance.
Create a new instance of Deepgram STT.
api_key
must be set to your Deepgram API key, either using the argument or by setting theDEEPGRAM_API_KEY
environmental variable.Expand source code
class STT(stt.STT): def __init__( self, *, model: DeepgramModels = "nova-2-general", language: DeepgramLanguages = "en-US", detect_language: bool = False, interim_results: bool = True, punctuate: bool = True, smart_format: bool = True, sample_rate: int = 16000, no_delay: bool = True, endpointing_ms: int = 25, filler_words: bool = False, keywords: list[Tuple[str, float]] = [], profanity_filter: bool = False, api_key: str | None = None, http_session: aiohttp.ClientSession | None = None, energy_filter: AudioEnergyFilter | bool = False, ) -> None: """ Create a new instance of Deepgram STT. ``api_key`` must be set to your Deepgram API key, either using the argument or by setting the ``DEEPGRAM_API_KEY`` environmental variable. """ super().__init__( capabilities=stt.STTCapabilities( streaming=True, interim_results=interim_results ) ) api_key = api_key or os.environ.get("DEEPGRAM_API_KEY") if api_key is None: raise ValueError("Deepgram API key is required") if language not in ("en-US", "en") and model in ( "nova-2-meeting", "nova-2-phonecall", "nova-2-finance", "nova-2-conversationalai", "nova-2-voicemail", "nova-2-video", "nova-2-medical", "nova-2-drivethru", "nova-2-automotive", ): logger.warning( f"{model} does not support language {language}, falling back to nova-2-general" ) model = "nova-2-general" self._api_key = api_key self._opts = STTOptions( language=language, detect_language=detect_language, interim_results=interim_results, punctuate=punctuate, model=model, smart_format=smart_format, no_delay=no_delay, endpointing_ms=endpointing_ms, filler_words=filler_words, sample_rate=sample_rate, num_channels=1, keywords=keywords, profanity_filter=profanity_filter, energy_filter=energy_filter, ) self._session = http_session def _ensure_session(self) -> aiohttp.ClientSession: if not self._session: self._session = utils.http_context.http_session() return self._session async def _recognize_impl( self, buffer: AudioBuffer, *, language: DeepgramLanguages | str | None = None ) -> stt.SpeechEvent: config = self._sanitize_options(language=language) recognize_config = { "model": str(config.model), "punctuate": config.punctuate, "detect_language": config.detect_language, "smart_format": config.smart_format, "keywords": self._opts.keywords, "profanity_filter": config.profanity_filter, } if config.language: recognize_config["language"] = config.language buffer = merge_frames(buffer) io_buffer = io.BytesIO() with wave.open(io_buffer, "wb") as wav: wav.setnchannels(buffer.num_channels) wav.setsampwidth(2) # 16-bit wav.setframerate(buffer.sample_rate) wav.writeframes(buffer.data) data = io_buffer.getvalue() try: async with self._ensure_session().post( url=_to_deepgram_url(recognize_config), data=data, headers={ "Authorization": f"Token {self._api_key}", "Accept": "application/json", "Content-Type": "audio/wav", }, ) as res: return prerecorded_transcription_to_speech_event( config.language, await res.json(), ) except asyncio.TimeoutError as e: raise APITimeoutError() from e except aiohttp.ClientResponseError as e: raise APIStatusError( message=e.message, status_code=e.status, request_id=None, body=None, ) from e except Exception as e: raise APIConnectionError() from e def stream( self, *, language: DeepgramLanguages | str | None = None ) -> "SpeechStream": config = self._sanitize_options(language=language) return SpeechStream(self, config, self._api_key, self._ensure_session()) def _sanitize_options(self, *, language: str | None = None) -> STTOptions: config = dataclasses.replace(self._opts) config.language = language or config.language if config.detect_language: config.language = None return config
Ancestors
- STT
- abc.ABC
- EventEmitter
- typing.Generic
Methods
def stream(self, *, language: DeepgramLanguages | str | None = None) ‑> livekit.plugins.deepgram.stt.SpeechStream
Inherited members
class SpeechStream (stt: STT, opts: STTOptions, api_key: str, http_session: aiohttp.ClientSession, max_retry: int = 32)
-
Helper class that provides a standard way to create an ABC using inheritance.
Args: sample_rate : int or None, optional The desired sample rate for the audio input. If specified, the audio input will be automatically resampled to match the given sample rate before being processed for Speech-to-Text. If not provided (None), the input will retain its original sample rate.
Expand source code
class SpeechStream(stt.SpeechStream): _KEEPALIVE_MSG: str = json.dumps({"type": "KeepAlive"}) _CLOSE_MSG: str = json.dumps({"type": "CloseStream"}) _FINALIZE_MSG: str = json.dumps({"type": "Finalize"}) def __init__( self, stt: STT, opts: STTOptions, api_key: str, http_session: aiohttp.ClientSession, max_retry: int = 32, ) -> None: super().__init__(stt, sample_rate=opts.sample_rate) if opts.detect_language and opts.language is None: raise ValueError("language detection is not supported in streaming mode") self._opts = opts self._api_key = api_key self._session = http_session self._speaking = False self._max_retry = max_retry self._audio_duration_collector = metrics.PeriodicCollector( callback=self._on_audio_duration_report, duration=5.0, ) self._audio_energy_filter: Optional[AudioEnergyFilter] = None if opts.energy_filter: if isinstance(opts.energy_filter, AudioEnergyFilter): self._audio_energy_filter = opts.energy_filter else: self._audio_energy_filter = AudioEnergyFilter() self._pushed_audio_duration = 0.0 self._request_id = "" @utils.log_exceptions(logger=logger) async def _main_task(self) -> None: await self._run(self._max_retry) async def _run(self, max_retry: int) -> None: """ Run a single websocket connection to Deepgram and make sure to reconnect when something went wrong. """ retry_count = 0 while self._input_ch.qsize() or not self._input_ch.closed: try: live_config = { "model": self._opts.model, "punctuate": self._opts.punctuate, "smart_format": self._opts.smart_format, "no_delay": self._opts.no_delay, "interim_results": self._opts.interim_results, "encoding": "linear16", "vad_events": True, "sample_rate": self._opts.sample_rate, "channels": self._opts.num_channels, "endpointing": False if self._opts.endpointing_ms == 0 else self._opts.endpointing_ms, "filler_words": self._opts.filler_words, "keywords": self._opts.keywords, "profanity_filter": self._opts.profanity_filter, } if self._opts.language: live_config["language"] = self._opts.language headers = {"Authorization": f"Token {self._api_key}"} ws = await self._session.ws_connect( _to_deepgram_url(live_config, websocket=True), headers=headers ) retry_count = 0 # connected successfully, reset the retry_count await self._run_ws(ws) except Exception as e: if self._session.closed: break if retry_count >= max_retry: logger.exception( f"failed to connect to deepgram after {max_retry} tries" ) break retry_delay = min(retry_count * 2, 10) # max 10s retry_count += 1 # increment after calculating the delay, the first retry should happen directly logger.warning( f"deepgram connection failed, retrying in {retry_delay}s", exc_info=e, ) await asyncio.sleep(retry_delay) async def _run_ws(self, ws: aiohttp.ClientWebSocketResponse) -> None: """This method could throw ws errors, these are handled inside the _run method""" closing_ws = False async def keepalive_task(): # if we want to keep the connection alive even if no audio is sent, # Deepgram expects a keepalive message. # https://developers.deepgram.com/reference/listen-live#stream-keepalive try: while True: await ws.send_str(SpeechStream._KEEPALIVE_MSG) await asyncio.sleep(5) except Exception: return async def send_task(): nonlocal closing_ws # forward audio to deepgram in chunks of 50ms samples_50ms = self._opts.sample_rate // 20 audio_bstream = utils.audio.AudioByteStream( sample_rate=self._opts.sample_rate, num_channels=self._opts.num_channels, samples_per_channel=samples_50ms, ) has_ended = False last_frame: Optional[rtc.AudioFrame] = None async for data in self._input_ch: frames: list[rtc.AudioFrame] = [] if isinstance(data, rtc.AudioFrame): state = self._check_energy_state(data) if state in ( AudioEnergyFilter.State.START, AudioEnergyFilter.State.SPEAKING, ): if last_frame: frames.extend( audio_bstream.write(last_frame.data.tobytes()) ) last_frame = None frames.extend(audio_bstream.write(data.data.tobytes())) elif state == AudioEnergyFilter.State.END: # no need to buffer as we have cooldown period frames = audio_bstream.flush() has_ended = True elif state == AudioEnergyFilter.State.SILENCE: # buffer the last silence frame, since it could contain beginning of speech # TODO: improve accuracy by using a ring buffer with longer window last_frame = data elif isinstance(data, self._FlushSentinel): frames = audio_bstream.flush() has_ended = True for frame in frames: self._audio_duration_collector.push(frame.duration) await ws.send_bytes(frame.data.tobytes()) if has_ended: self._audio_duration_collector.flush() await ws.send_str(SpeechStream._FINALIZE_MSG) has_ended = False # tell deepgram we are done sending audio/inputs closing_ws = True await ws.send_str(SpeechStream._CLOSE_MSG) async def recv_task(): nonlocal closing_ws while True: msg = await ws.receive() if msg.type in ( aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE, aiohttp.WSMsgType.CLOSING, ): if closing_ws: # close is expected, see SpeechStream.aclose return # this will trigger a reconnection, see the _run loop raise Exception("deepgram connection closed unexpectedly") if msg.type != aiohttp.WSMsgType.TEXT: logger.warning("unexpected deepgram message type %s", msg.type) continue try: self._process_stream_event(json.loads(msg.data)) except Exception: logger.exception("failed to process deepgram message") tasks = [ asyncio.create_task(send_task()), asyncio.create_task(recv_task()), asyncio.create_task(keepalive_task()), ] try: await asyncio.gather(*tasks) finally: await utils.aio.gracefully_cancel(*tasks) def _check_energy_state(self, frame: rtc.AudioFrame) -> AudioEnergyFilter.State: if self._audio_energy_filter: return self._audio_energy_filter.update(frame) return AudioEnergyFilter.State.SPEAKING def _on_audio_duration_report(self, duration: float) -> None: usage_event = stt.SpeechEvent( type=stt.SpeechEventType.RECOGNITION_USAGE, request_id=self._request_id, alternatives=[], recognition_usage=stt.RecognitionUsage(audio_duration=duration), ) self._event_ch.send_nowait(usage_event) def _process_stream_event(self, data: dict) -> None: assert self._opts.language is not None if data["type"] == "SpeechStarted": # This is a normal case. Deepgram's SpeechStarted events # are not correlated with speech_final or utterance end. # It's possible that we receive two in a row without an endpoint # It's also possible we receive a transcript without a SpeechStarted event. if self._speaking: return self._speaking = True start_event = stt.SpeechEvent(type=stt.SpeechEventType.START_OF_SPEECH) self._event_ch.send_nowait(start_event) # see this page: # https://developers.deepgram.com/docs/understand-endpointing-interim-results#using-endpointing-speech_final # for more information about the different types of events elif data["type"] == "Results": metadata = data["metadata"] request_id = metadata["request_id"] is_final_transcript = data["is_final"] is_endpoint = data["speech_final"] self._request_id = request_id alts = live_transcription_to_speech_data(self._opts.language, data) # If, for some reason, we didn't get a SpeechStarted event but we got # a transcript with text, we should start speaking. It's rare but has # been observed. if len(alts) > 0 and alts[0].text: if not self._speaking: self._speaking = True start_event = stt.SpeechEvent( type=stt.SpeechEventType.START_OF_SPEECH ) self._event_ch.send_nowait(start_event) if is_final_transcript: final_event = stt.SpeechEvent( type=stt.SpeechEventType.FINAL_TRANSCRIPT, request_id=request_id, alternatives=alts, ) self._event_ch.send_nowait(final_event) else: interim_event = stt.SpeechEvent( type=stt.SpeechEventType.INTERIM_TRANSCRIPT, request_id=request_id, alternatives=alts, ) self._event_ch.send_nowait(interim_event) # if we receive an endpoint, only end the speech if # we either had a SpeechStarted event or we have a seen # a non-empty transcript (deepgram doesn't have a SpeechEnded event) if is_endpoint and self._speaking: self._speaking = False self._event_ch.send_nowait( stt.SpeechEvent(type=stt.SpeechEventType.END_OF_SPEECH) ) elif data["type"] == "Metadata": pass # metadata is too noisy else: logger.warning("received unexpected message from deepgram %s", data)
Ancestors
- SpeechStream
- abc.ABC
Inherited members