Module livekit.plugins.speechmatics

Speechmatics STT plugin for LiveKit Agents

See https://docs.livekit.io/agents/integrations/stt/speechmatics/ for more information.

Classes

class STT (*,
transcription_config: NotGivenOr[TranscriptionConfig] = NOT_GIVEN,
connection_settings: NotGivenOr[ConnectionSettings] = NOT_GIVEN,
audio_settings: NotGivenOr[AudioSettings] = NOT_GIVEN,
http_session: aiohttp.ClientSession | None = None,
extra_headers: NotGivenOr[dict] = NOT_GIVEN)
Expand source code
class STT(stt.STT):
    def __init__(
        self,
        *,
        transcription_config: NotGivenOr[TranscriptionConfig] = NOT_GIVEN,
        connection_settings: NotGivenOr[ConnectionSettings] = NOT_GIVEN,
        audio_settings: NotGivenOr[AudioSettings] = NOT_GIVEN,
        http_session: aiohttp.ClientSession | None = None,
        extra_headers: NotGivenOr[dict] = NOT_GIVEN,
    ):
        super().__init__(
            capabilities=stt.STTCapabilities(
                streaming=True,
                interim_results=True,
            ),
        )
        if not is_given(transcription_config):
            transcription_config = TranscriptionConfig(  # noqa: B008
                language="en",
                operating_point="enhanced",
                enable_partials=True,
                max_delay=0.7,
            )
        if not is_given(connection_settings):
            connection_settings = ConnectionSettings(  # noqa: B008
                url="wss://eu2.rt.speechmatics.com/v2",
            )
        if not is_given(audio_settings):
            audio_settings = AudioSettings()  # noqa: B008

        self._transcription_config = transcription_config
        self._audio_settings = audio_settings
        self._connection_settings = connection_settings
        self._extra_headers = extra_headers or {}
        self._session = http_session
        self._streams = weakref.WeakSet[SpeechStream]()

    @property
    def session(self) -> aiohttp.ClientSession:
        if not self._session:
            self._session = utils.http_context.http_session()
        return self._session

    async def _recognize_impl(
        self,
        buffer: AudioBuffer,
        *,
        language: NotGivenOr[str] = NOT_GIVEN,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    ) -> stt.SpeechEvent:
        raise NotImplementedError("Not implemented")

    def stream(
        self,
        *,
        language: NotGivenOr[str] = NOT_GIVEN,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    ) -> SpeechStream:
        config = dataclasses.replace(self._audio_settings)
        if is_given(language):
            config.language = language
        stream = SpeechStream(
            stt=self,
            transcription_config=self._transcription_config,
            audio_settings=config,
            connection_settings=self._connection_settings,
            conn_options=conn_options,
            http_session=self.session,
            extra_headers=self._extra_headers,
        )
        self._streams.add(stream)
        return stream

Helper class that provides a standard way to create an ABC using inheritance.

Ancestors

  • livekit.agents.stt.stt.STT
  • abc.ABC
  • EventEmitter
  • typing.Generic

Instance variables

prop session : aiohttp.ClientSession
Expand source code
@property
def session(self) -> aiohttp.ClientSession:
    if not self._session:
        self._session = utils.http_context.http_session()
    return self._session

Methods

def stream(self,
*,
language: NotGivenOr[str] = NOT_GIVEN,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.speechmatics.stt.SpeechStream
Expand source code
def stream(
    self,
    *,
    language: NotGivenOr[str] = NOT_GIVEN,
    conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
) -> SpeechStream:
    config = dataclasses.replace(self._audio_settings)
    if is_given(language):
        config.language = language
    stream = SpeechStream(
        stt=self,
        transcription_config=self._transcription_config,
        audio_settings=config,
        connection_settings=self._connection_settings,
        conn_options=conn_options,
        http_session=self.session,
        extra_headers=self._extra_headers,
    )
    self._streams.add(stream)
    return stream

Inherited members

class SpeechStream (*,
stt: STT,
transcription_config: TranscriptionConfig,
audio_settings: AudioSettings,
connection_settings: ConnectionSettings,
conn_options: APIConnectOptions,
http_session: aiohttp.ClientSession,
extra_headers: dict)
Expand source code
class SpeechStream(stt.SpeechStream):
    def __init__(
        self,
        *,
        stt: STT,
        transcription_config: TranscriptionConfig,
        audio_settings: AudioSettings,
        connection_settings: ConnectionSettings,
        conn_options: APIConnectOptions,
        http_session: aiohttp.ClientSession,
        extra_headers: dict,
    ) -> None:
        super().__init__(stt=stt, conn_options=conn_options, sample_rate=audio_settings.sample_rate)
        self._transcription_config = transcription_config
        self._audio_settings = audio_settings
        self._connection_settings = connection_settings
        self._session = http_session
        self._extra_headers = extra_headers
        self._speech_duration: float = 0

        self._reconnect_event = asyncio.Event()
        self._recognition_started = asyncio.Event()
        self._seq_no = 0

    async def _run(self):
        closing_ws = False

        async def send_task(ws: aiohttp.ClientWebSocketResponse):
            nonlocal closing_ws

            start_recognition_msg = {
                "message": ClientMessageType.StartRecognition,
                "audio_format": self._audio_settings.asdict(),
                "transcription_config": self._transcription_config.asdict(),
            }
            await ws.send_str(json.dumps(start_recognition_msg))

            await self._recognition_started.wait()

            audio_bstream = utils.audio.AudioByteStream(
                sample_rate=self._audio_settings.sample_rate,
                num_channels=1,
            )

            async for data in self._input_ch:
                if isinstance(data, self._FlushSentinel):
                    frames = audio_bstream.flush()
                else:
                    frames = audio_bstream.write(data.data.tobytes())

                for frame in frames:
                    self._seq_no += 1
                    self._speech_duration += frame.duration
                    await ws.send_bytes(frame.data.tobytes())

            closing_ws = True
            await ws.send_str(
                json.dumps(
                    {
                        "message": ClientMessageType.EndOfStream,
                        "last_seq_no": self._seq_no,
                    }
                )
            )

        async def recv_task(ws: aiohttp.ClientWebSocketResponse):
            nonlocal closing_ws
            while True:
                msg = await ws.receive()
                if msg.type in (
                    aiohttp.WSMsgType.CLOSED,
                    aiohttp.WSMsgType.CLOSE,
                    aiohttp.WSMsgType.CLOSING,
                ):
                    if closing_ws:  # close is expected, see SpeechStream.aclose
                        return

                    # this will trigger a reconnection, see the _run loop
                    raise APIStatusError(message="Speechmatics connection closed unexpectedly")

                try:
                    data = json.loads(msg.data)
                    self._process_stream_event(data, closing_ws)
                except Exception:
                    logger.exception("failed to process Speechmatics message")

        ws: aiohttp.ClientWebSocketResponse | None = None

        while True:
            try:
                ws = await self._connect_ws()
                tasks = [
                    asyncio.create_task(send_task(ws)),
                    asyncio.create_task(recv_task(ws)),
                ]
                tasks_group = asyncio.gather(*tasks)
                wait_reconnect_task = asyncio.create_task(self._reconnect_event.wait())

                try:
                    done, _ = await asyncio.wait(
                        [tasks_group, wait_reconnect_task],
                        return_when=asyncio.FIRST_COMPLETED,
                    )  # type: ignore
                    for task in done:
                        if task != wait_reconnect_task:
                            task.result()

                    if wait_reconnect_task not in done:
                        break

                    self._reconnect_event.clear()
                finally:
                    await utils.aio.gracefully_cancel(*tasks, wait_reconnect_task)
                    await tasks_group
            finally:
                if ws is not None:
                    await ws.close()

    async def _connect_ws(self) -> aiohttp.ClientWebSocketResponse:
        api_key = self._connection_settings.api_key or os.environ.get("SPEECHMATICS_API_KEY")
        if api_key is None:
            raise ValueError(
                "Speechmatics API key is required. "
                "Pass one in via ConnectionSettings.api_key parameter, "
                "or set `SPEECHMATICS_API_KEY` environment variable"
            )
        if self._connection_settings.get_access_token:
            api_key = await get_access_token(api_key)
        headers = {
            "Authorization": f"Bearer {api_key}",
            **self._extra_headers,
        }
        url = sanitize_url(self._connection_settings.url, self._transcription_config.language)
        return await self._session.ws_connect(
            url,
            ssl=self._connection_settings.ssl_context,
            headers=headers,
        )

    def _process_stream_event(self, data: dict, closing_ws: bool) -> None:
        message_type = data["message"]

        if message_type == ServerMessageType.RecognitionStarted:
            self._recognition_started.set()

        elif message_type == ServerMessageType.AddPartialTranscript:
            alts = live_transcription_to_speech_data(data)
            if len(alts) > 0 and alts[0].text:
                interim_event = stt.SpeechEvent(
                    type=stt.SpeechEventType.INTERIM_TRANSCRIPT,
                    alternatives=alts,
                )
                self._event_ch.send_nowait(interim_event)

        elif message_type == ServerMessageType.AddTranscript:
            alts = live_transcription_to_speech_data(data)
            if len(alts) > 0 and alts[0].text:
                final_event = stt.SpeechEvent(
                    type=stt.SpeechEventType.FINAL_TRANSCRIPT,
                    alternatives=alts,
                )
                self._event_ch.send_nowait(final_event)

            if self._speech_duration > 0:
                usage_event = stt.SpeechEvent(
                    type=stt.SpeechEventType.RECOGNITION_USAGE,
                    alternatives=[],
                    recognition_usage=stt.RecognitionUsage(audio_duration=self._speech_duration),
                )
                self._event_ch.send_nowait(usage_event)
                self._speech_duration = 0

        elif message_type == ServerMessageType.EndOfTranscript:
            if closing_ws:
                pass
            else:
                raise Exception("Speechmatics connection closed unexpectedly")

Helper class that provides a standard way to create an ABC using inheritance.

Args: sample_rate : int or None, optional The desired sample rate for the audio input. If specified, the audio input will be automatically resampled to match the given sample rate before being processed for Speech-to-Text. If not provided (None), the input will retain its original sample rate.

Ancestors

  • livekit.agents.stt.stt.RecognizeStream
  • abc.ABC