Module livekit.plugins.speechmatics.stt
Functions
def live_transcription_to_speech_data(data: dict) ‑> List[SpeechData]
-
Expand source code
def live_transcription_to_speech_data(data: dict) -> List[stt.SpeechData]: speech_data: List[stt.SpeechData] = [] for result in data.get("results", []): start_time, end_time, is_eos = ( result.get("start_time", 0), result.get("end_time", 0), result.get("is_eos", False), ) for alt in result.get("alternatives", []): content, confidence, language = ( alt.get("content", "").strip(), alt.get("confidence", 1.0), alt.get("language", "en"), ) if not content: continue # append punctuation to the previous result if is_eos and speech_data: speech_data[-1].text += content elif speech_data and start_time == speech_data[-1].end_time: speech_data[-1].text += " " + content else: speech_data.append( stt.SpeechData(language, content, start_time, end_time, confidence) ) return speech_data
Classes
class STT (*,
transcription_config: TranscriptionConfig = TranscriptionConfig(language='en', operating_point='enhanced', output_locale=None, diarization=None, additional_vocab=None, punctuation_overrides=None, enable_entities=None, max_delay=0.7, max_delay_mode=None, streaming_mode=None, enable_partials=True),
connection_settings: ConnectionSettings = ConnectionSettings(url='wss://eu2.rt.speechmatics.com/v2', ssl_context=<ssl.SSLContext object>, api_key=None, get_access_token=True),
audio_settings: AudioSettings = AudioSettings(encoding='pcm_s16le', sample_rate=16000),
http_session: Optional[aiohttp.ClientSession] = None,
extra_headers: Optional[Dict] = None)-
Expand source code
class STT(stt.STT): def __init__( self, *, transcription_config: TranscriptionConfig = TranscriptionConfig( language="en", operating_point="enhanced", enable_partials=True, max_delay=0.7, ), connection_settings: ConnectionSettings = ConnectionSettings( url="wss://eu2.rt.speechmatics.com/v2", ), audio_settings: AudioSettings = AudioSettings(), http_session: Optional[aiohttp.ClientSession] = None, extra_headers: Optional[Dict] = None, ): super().__init__( capabilities=stt.STTCapabilities( streaming=True, interim_results=True, ), ) self._transcription_config = transcription_config self._audio_settings = audio_settings self._connection_settings = connection_settings self._extra_headers = extra_headers or {} self._session = http_session self._streams = weakref.WeakSet[SpeechStream]() @property def session(self) -> aiohttp.ClientSession: if not self._session: self._session = utils.http_context.http_session() return self._session async def _recognize_impl( self, buffer: AudioBuffer, *, language: str | None, conn_options: APIConnectOptions, ) -> stt.SpeechEvent: raise NotImplementedError("Not implemented") def stream( self, *, language: Optional[str] = None, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> "SpeechStream": config = dataclasses.replace(self._audio_settings) stream = SpeechStream( stt=self, transcription_config=self._transcription_config, audio_settings=config, connection_settings=self._connection_settings, conn_options=conn_options, http_session=self.session, extra_headers=self._extra_headers, ) self._streams.add(stream) return stream
Helper class that provides a standard way to create an ABC using inheritance.
Ancestors
- STT
- abc.ABC
- EventEmitter
- typing.Generic
Instance variables
prop session : aiohttp.ClientSession
-
Expand source code
@property def session(self) -> aiohttp.ClientSession: if not self._session: self._session = utils.http_context.http_session() return self._session
Methods
def stream(self,
*,
language: Optional[str] = None,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> SpeechStream-
Expand source code
def stream( self, *, language: Optional[str] = None, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> "SpeechStream": config = dataclasses.replace(self._audio_settings) stream = SpeechStream( stt=self, transcription_config=self._transcription_config, audio_settings=config, connection_settings=self._connection_settings, conn_options=conn_options, http_session=self.session, extra_headers=self._extra_headers, ) self._streams.add(stream) return stream
Inherited members
class SpeechStream (*,
stt: STT,
transcription_config: TranscriptionConfig,
audio_settings: AudioSettings,
connection_settings: ConnectionSettings,
conn_options: APIConnectOptions,
http_session: aiohttp.ClientSession,
extra_headers: Optional[Dict] = None)-
Expand source code
class SpeechStream(stt.SpeechStream): def __init__( self, *, stt: STT, transcription_config: TranscriptionConfig, audio_settings: AudioSettings, connection_settings: ConnectionSettings, conn_options: APIConnectOptions, http_session: aiohttp.ClientSession, extra_headers: Optional[Dict] = None, ) -> None: super().__init__( stt=stt, conn_options=conn_options, sample_rate=audio_settings.sample_rate ) self._transcription_config = transcription_config self._audio_settings = audio_settings self._connection_settings = connection_settings self._session = http_session self._extra_headers = extra_headers or {} self._speech_duration: float = 0 self._reconnect_event = asyncio.Event() self._recognition_started = asyncio.Event() self._seq_no = 0 async def _run(self): closing_ws = False async def send_task(ws: aiohttp.ClientWebSocketResponse): nonlocal closing_ws start_recognition_msg = { "message": ClientMessageType.StartRecognition, "audio_format": self._audio_settings.asdict(), "transcription_config": self._transcription_config.asdict(), } await ws.send_str(json.dumps(start_recognition_msg)) await self._recognition_started.wait() audio_bstream = utils.audio.AudioByteStream( sample_rate=self._audio_settings.sample_rate, num_channels=1, ) async for data in self._input_ch: if isinstance(data, self._FlushSentinel): frames = audio_bstream.flush() else: frames = audio_bstream.write(data.data.tobytes()) for frame in frames: self._seq_no += 1 self._speech_duration += frame.duration await ws.send_bytes(frame.data.tobytes()) closing_ws = True await ws.send_str( json.dumps( { "message": ClientMessageType.EndOfStream, "last_seq_no": self._seq_no, } ) ) async def recv_task(ws: aiohttp.ClientWebSocketResponse): nonlocal closing_ws while True: msg = await ws.receive() if msg.type in ( aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE, aiohttp.WSMsgType.CLOSING, ): if closing_ws: # close is expected, see SpeechStream.aclose return # this will trigger a reconnection, see the _run loop raise APIStatusError( message="Speechmatics connection closed unexpectedly" ) try: data = json.loads(msg.data) self._process_stream_event(data, closing_ws) except Exception: logger.exception("failed to process Speechmatics message") ws: aiohttp.ClientWebSocketResponse | None = None while True: try: ws = await self._connect_ws() tasks = [ asyncio.create_task(send_task(ws)), asyncio.create_task(recv_task(ws)), ] wait_reconnect_task = asyncio.create_task(self._reconnect_event.wait()) try: done, _ = await asyncio.wait( [asyncio.gather(*tasks), wait_reconnect_task], return_when=asyncio.FIRST_COMPLETED, ) # type: ignore for task in done: if task != wait_reconnect_task: task.result() if wait_reconnect_task not in done: break self._reconnect_event.clear() finally: await utils.aio.gracefully_cancel(*tasks, wait_reconnect_task) finally: if ws is not None: await ws.close() async def _connect_ws(self) -> aiohttp.ClientWebSocketResponse: api_key = self._connection_settings.api_key or os.environ.get( "SPEECHMATICS_API_KEY" ) if api_key is None: raise ValueError( "Speechmatics API key is required. " "Pass one in via ConnectionSettings.api_key parameter, " "or set `SPEECHMATICS_API_KEY` environment variable" ) if self._connection_settings.get_access_token: api_key = await get_access_token(api_key) headers = { "Authorization": f"Bearer {api_key}", **self._extra_headers, } url = sanitize_url( self._connection_settings.url, self._transcription_config.language ) return await self._session.ws_connect( url, ssl=self._connection_settings.ssl_context, headers=headers, ) def _process_stream_event(self, data: dict, closing_ws: bool) -> None: message_type = data["message"] if message_type == ServerMessageType.RecognitionStarted: self._recognition_started.set() elif message_type == ServerMessageType.AddPartialTranscript: alts = live_transcription_to_speech_data(data) if len(alts) > 0 and alts[0].text: interim_event = stt.SpeechEvent( type=stt.SpeechEventType.INTERIM_TRANSCRIPT, alternatives=alts, ) self._event_ch.send_nowait(interim_event) elif message_type == ServerMessageType.AddTranscript: alts = live_transcription_to_speech_data(data) if len(alts) > 0 and alts[0].text: final_event = stt.SpeechEvent( type=stt.SpeechEventType.FINAL_TRANSCRIPT, alternatives=alts, ) self._event_ch.send_nowait(final_event) if self._speech_duration > 0: usage_event = stt.SpeechEvent( type=stt.SpeechEventType.RECOGNITION_USAGE, alternatives=[], recognition_usage=stt.RecognitionUsage( audio_duration=self._speech_duration ), ) self._event_ch.send_nowait(usage_event) self._speech_duration = 0 elif message_type == ServerMessageType.EndOfTranscript: if closing_ws: pass else: raise Exception("Speechmatics connection closed unexpectedly")
Helper class that provides a standard way to create an ABC using inheritance.
Args: sample_rate : int or None, optional The desired sample rate for the audio input. If specified, the audio input will be automatically resampled to match the given sample rate before being processed for Speech-to-Text. If not provided (None), the input will retain its original sample rate.
Ancestors
- RecognizeStream
- abc.ABC
Inherited members