Module livekit.plugins.speechmatics
Speechmatics STT plugin for LiveKit Agents
See https://docs.livekit.io/agents/integrations/stt/speechmatics/ for more information.
Classes
class STT (*,
transcription_config: NotGivenOr[TranscriptionConfig] = NOT_GIVEN,
connection_settings: NotGivenOr[ConnectionSettings] = NOT_GIVEN,
audio_settings: NotGivenOr[AudioSettings] = NOT_GIVEN,
http_session: aiohttp.ClientSession | None = None,
extra_headers: NotGivenOr[dict] = NOT_GIVEN)-
Expand source code
class STT(stt.STT): def __init__( self, *, transcription_config: NotGivenOr[TranscriptionConfig] = NOT_GIVEN, connection_settings: NotGivenOr[ConnectionSettings] = NOT_GIVEN, audio_settings: NotGivenOr[AudioSettings] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, extra_headers: NotGivenOr[dict] = NOT_GIVEN, ): super().__init__( capabilities=stt.STTCapabilities( streaming=True, interim_results=True, ), ) if not is_given(transcription_config): transcription_config = TranscriptionConfig( # noqa: B008 language="en", operating_point="enhanced", enable_partials=True, max_delay=0.7, ) if not is_given(connection_settings): connection_settings = ConnectionSettings( # noqa: B008 url="wss://eu2.rt.speechmatics.com/v2", ) if not is_given(audio_settings): audio_settings = AudioSettings() # noqa: B008 self._transcription_config = transcription_config self._audio_settings = audio_settings self._connection_settings = connection_settings self._extra_headers = extra_headers or {} self._session = http_session self._streams = weakref.WeakSet[SpeechStream]() @property def session(self) -> aiohttp.ClientSession: if not self._session: self._session = utils.http_context.http_session() return self._session async def _recognize_impl( self, buffer: AudioBuffer, *, language: NotGivenOr[str] = NOT_GIVEN, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> stt.SpeechEvent: raise NotImplementedError("Not implemented") def stream( self, *, language: NotGivenOr[str] = NOT_GIVEN, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> SpeechStream: config = dataclasses.replace(self._audio_settings) if is_given(language): config.language = language stream = SpeechStream( stt=self, transcription_config=self._transcription_config, audio_settings=config, connection_settings=self._connection_settings, conn_options=conn_options, http_session=self.session, extra_headers=self._extra_headers, ) self._streams.add(stream) return stream
Helper class that provides a standard way to create an ABC using inheritance.
Ancestors
- livekit.agents.stt.stt.STT
- abc.ABC
- EventEmitter
- typing.Generic
Instance variables
prop session : aiohttp.ClientSession
-
Expand source code
@property def session(self) -> aiohttp.ClientSession: if not self._session: self._session = utils.http_context.http_session() return self._session
Methods
def stream(self,
*,
language: NotGivenOr[str] = NOT_GIVEN,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.speechmatics.stt.SpeechStream-
Expand source code
def stream( self, *, language: NotGivenOr[str] = NOT_GIVEN, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> SpeechStream: config = dataclasses.replace(self._audio_settings) if is_given(language): config.language = language stream = SpeechStream( stt=self, transcription_config=self._transcription_config, audio_settings=config, connection_settings=self._connection_settings, conn_options=conn_options, http_session=self.session, extra_headers=self._extra_headers, ) self._streams.add(stream) return stream
Inherited members
class SpeechStream (*,
stt: STT,
transcription_config: TranscriptionConfig,
audio_settings: AudioSettings,
connection_settings: ConnectionSettings,
conn_options: APIConnectOptions,
http_session: aiohttp.ClientSession,
extra_headers: dict)-
Expand source code
class SpeechStream(stt.SpeechStream): def __init__( self, *, stt: STT, transcription_config: TranscriptionConfig, audio_settings: AudioSettings, connection_settings: ConnectionSettings, conn_options: APIConnectOptions, http_session: aiohttp.ClientSession, extra_headers: dict, ) -> None: super().__init__(stt=stt, conn_options=conn_options, sample_rate=audio_settings.sample_rate) self._transcription_config = transcription_config self._audio_settings = audio_settings self._connection_settings = connection_settings self._session = http_session self._extra_headers = extra_headers self._speech_duration: float = 0 self._reconnect_event = asyncio.Event() self._recognition_started = asyncio.Event() self._seq_no = 0 async def _run(self): closing_ws = False async def send_task(ws: aiohttp.ClientWebSocketResponse): nonlocal closing_ws start_recognition_msg = { "message": ClientMessageType.StartRecognition, "audio_format": self._audio_settings.asdict(), "transcription_config": self._transcription_config.asdict(), } await ws.send_str(json.dumps(start_recognition_msg)) await self._recognition_started.wait() audio_bstream = utils.audio.AudioByteStream( sample_rate=self._audio_settings.sample_rate, num_channels=1, ) async for data in self._input_ch: if isinstance(data, self._FlushSentinel): frames = audio_bstream.flush() else: frames = audio_bstream.write(data.data.tobytes()) for frame in frames: self._seq_no += 1 self._speech_duration += frame.duration await ws.send_bytes(frame.data.tobytes()) closing_ws = True await ws.send_str( json.dumps( { "message": ClientMessageType.EndOfStream, "last_seq_no": self._seq_no, } ) ) async def recv_task(ws: aiohttp.ClientWebSocketResponse): nonlocal closing_ws while True: msg = await ws.receive() if msg.type in ( aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE, aiohttp.WSMsgType.CLOSING, ): if closing_ws: # close is expected, see SpeechStream.aclose return # this will trigger a reconnection, see the _run loop raise APIStatusError(message="Speechmatics connection closed unexpectedly") try: data = json.loads(msg.data) self._process_stream_event(data, closing_ws) except Exception: logger.exception("failed to process Speechmatics message") ws: aiohttp.ClientWebSocketResponse | None = None while True: try: ws = await self._connect_ws() tasks = [ asyncio.create_task(send_task(ws)), asyncio.create_task(recv_task(ws)), ] tasks_group = asyncio.gather(*tasks) wait_reconnect_task = asyncio.create_task(self._reconnect_event.wait()) try: done, _ = await asyncio.wait( [tasks_group, wait_reconnect_task], return_when=asyncio.FIRST_COMPLETED, ) # type: ignore for task in done: if task != wait_reconnect_task: task.result() if wait_reconnect_task not in done: break self._reconnect_event.clear() finally: await utils.aio.gracefully_cancel(*tasks, wait_reconnect_task) await tasks_group finally: if ws is not None: await ws.close() async def _connect_ws(self) -> aiohttp.ClientWebSocketResponse: api_key = self._connection_settings.api_key or os.environ.get("SPEECHMATICS_API_KEY") if api_key is None: raise ValueError( "Speechmatics API key is required. " "Pass one in via ConnectionSettings.api_key parameter, " "or set `SPEECHMATICS_API_KEY` environment variable" ) if self._connection_settings.get_access_token: api_key = await get_access_token(api_key) headers = { "Authorization": f"Bearer {api_key}", **self._extra_headers, } url = sanitize_url(self._connection_settings.url, self._transcription_config.language) return await self._session.ws_connect( url, ssl=self._connection_settings.ssl_context, headers=headers, ) def _process_stream_event(self, data: dict, closing_ws: bool) -> None: message_type = data["message"] if message_type == ServerMessageType.RecognitionStarted: self._recognition_started.set() elif message_type == ServerMessageType.AddPartialTranscript: alts = live_transcription_to_speech_data(data) if len(alts) > 0 and alts[0].text: interim_event = stt.SpeechEvent( type=stt.SpeechEventType.INTERIM_TRANSCRIPT, alternatives=alts, ) self._event_ch.send_nowait(interim_event) elif message_type == ServerMessageType.AddTranscript: alts = live_transcription_to_speech_data(data) if len(alts) > 0 and alts[0].text: final_event = stt.SpeechEvent( type=stt.SpeechEventType.FINAL_TRANSCRIPT, alternatives=alts, ) self._event_ch.send_nowait(final_event) if self._speech_duration > 0: usage_event = stt.SpeechEvent( type=stt.SpeechEventType.RECOGNITION_USAGE, alternatives=[], recognition_usage=stt.RecognitionUsage(audio_duration=self._speech_duration), ) self._event_ch.send_nowait(usage_event) self._speech_duration = 0 elif message_type == ServerMessageType.EndOfTranscript: if closing_ws: pass else: raise Exception("Speechmatics connection closed unexpectedly")
Helper class that provides a standard way to create an ABC using inheritance.
Args: sample_rate : int or None, optional The desired sample rate for the audio input. If specified, the audio input will be automatically resampled to match the given sample rate before being processed for Speech-to-Text. If not provided (None), the input will retain its original sample rate.
Ancestors
- livekit.agents.stt.stt.RecognizeStream
- abc.ABC