Module livekit.plugins.telnyx.stt

Classes

class STT (*,
language: str = 'en',
transcription_engine: TranscriptionEngine = 'telnyx',
interim_results: bool = True,
api_key: str | None = None,
base_url: str = 'wss://api.telnyx.com/v2/speech-to-text/transcription',
sample_rate: int = 16000,
http_session: aiohttp.ClientSession | None = None)
Expand source code
class STT(stt.STT):
    def __init__(
        self,
        *,
        language: str = "en",
        transcription_engine: TranscriptionEngine = "telnyx",
        interim_results: bool = True,
        api_key: str | None = None,
        base_url: str = STT_ENDPOINT,
        sample_rate: int = SAMPLE_RATE,
        http_session: aiohttp.ClientSession | None = None,
    ) -> None:
        super().__init__(
            capabilities=stt.STTCapabilities(
                streaming=True,
                interim_results=interim_results,
            )
        )

        self._opts = _STTOptions(
            api_key=get_api_key(api_key),
            language=LanguageCode(language),
            transcription_engine=transcription_engine,
            interim_results=interim_results,
            base_url=base_url,
            sample_rate=sample_rate,
        )
        self._session_manager = SessionManager(http_session)
        self._streams = weakref.WeakSet[SpeechStream]()

    @property
    def model(self) -> str:
        return self._opts.transcription_engine

    @property
    def provider(self) -> str:
        return "telnyx"

    async def _recognize_impl(
        self,
        buffer: AudioBuffer,
        *,
        language: NotGivenOr[str] = NOT_GIVEN,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    ) -> stt.SpeechEvent:
        resolved_language = LanguageCode(language) if is_given(language) else self._opts.language

        stream = self.stream(language=language, conn_options=conn_options)
        try:
            frames = buffer if isinstance(buffer, list) else [buffer]
            for frame in frames:
                stream.push_frame(frame)
            stream.end_input()

            final_text = ""
            async for event in stream:
                if event.type == stt.SpeechEventType.FINAL_TRANSCRIPT:
                    if event.alternatives:
                        final_text += event.alternatives[0].text

            return stt.SpeechEvent(
                type=stt.SpeechEventType.FINAL_TRANSCRIPT,
                alternatives=[
                    stt.SpeechData(
                        language=resolved_language,
                        text=final_text,
                    )
                ],
            )
        finally:
            await stream.aclose()

    def stream(
        self,
        *,
        language: NotGivenOr[str] = NOT_GIVEN,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    ) -> SpeechStream:
        resolved_language = LanguageCode(language) if is_given(language) else self._opts.language
        stream = SpeechStream(
            stt=self,
            conn_options=conn_options,
            language=resolved_language,
        )
        self._streams.add(stream)
        return stream

    async def aclose(self) -> None:
        for stream in list(self._streams):
            await stream.aclose()
        self._streams.clear()
        await self._session_manager.close()

Helper class that provides a standard way to create an ABC using inheritance.

Ancestors

  • livekit.agents.stt.stt.STT
  • abc.ABC
  • EventEmitter
  • typing.Generic

Instance variables

prop model : str
Expand source code
@property
def model(self) -> str:
    return self._opts.transcription_engine

Get the model name/identifier for this STT instance.

Returns

The model name if available, "unknown" otherwise.

Note

Plugins should override this property to provide their model information.

prop provider : str
Expand source code
@property
def provider(self) -> str:
    return "telnyx"

Get the provider name/identifier for this STT instance.

Returns

The provider name if available, "unknown" otherwise.

Note

Plugins should override this property to provide their provider information.

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    for stream in list(self._streams):
        await stream.aclose()
    self._streams.clear()
    await self._session_manager.close()

Close the STT, and every stream/requests associated with it

def stream(self,
*,
language: NotGivenOr[str] = NOT_GIVEN,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> SpeechStream
Expand source code
def stream(
    self,
    *,
    language: NotGivenOr[str] = NOT_GIVEN,
    conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
) -> SpeechStream:
    resolved_language = LanguageCode(language) if is_given(language) else self._opts.language
    stream = SpeechStream(
        stt=self,
        conn_options=conn_options,
        language=resolved_language,
    )
    self._streams.add(stream)
    return stream

Inherited members

class SpeechStream (*,
stt: STT,
conn_options: APIConnectOptions,
language: LanguageCode)
Expand source code
class SpeechStream(stt.RecognizeStream):
    def __init__(
        self,
        *,
        stt: STT,
        conn_options: APIConnectOptions,
        language: LanguageCode,
    ) -> None:
        super().__init__(stt=stt, conn_options=conn_options, sample_rate=stt._opts.sample_rate)
        self._stt: STT = stt
        self._language = language
        self._speaking = False

    async def _run(self) -> None:
        closing_ws = False

        @utils.log_exceptions(logger=logger)
        async def send_task(ws: aiohttp.ClientWebSocketResponse) -> None:
            nonlocal closing_ws

            wav_header = _create_streaming_wav_header(self._stt._opts.sample_rate, NUM_CHANNELS)
            await ws.send_bytes(wav_header)

            samples_per_chunk = self._stt._opts.sample_rate // 20
            audio_bstream = utils.audio.AudioByteStream(
                sample_rate=self._stt._opts.sample_rate,
                num_channels=NUM_CHANNELS,
                samples_per_channel=samples_per_chunk,
            )

            async for data in self._input_ch:
                if isinstance(data, rtc.AudioFrame):
                    for frame in audio_bstream.write(data.data.tobytes()):
                        await ws.send_bytes(frame.data.tobytes())
                elif isinstance(data, self._FlushSentinel):
                    for frame in audio_bstream.flush():
                        await ws.send_bytes(frame.data.tobytes())

            for frame in audio_bstream.flush():
                await ws.send_bytes(frame.data.tobytes())

            # Don't close the WS here — let recv_task read the final
            # transcript before the server closes the connection.
            closing_ws = True

        @utils.log_exceptions(logger=logger)
        async def recv_task(ws: aiohttp.ClientWebSocketResponse) -> None:
            nonlocal closing_ws
            while True:
                msg = await ws.receive()
                if msg.type in (
                    aiohttp.WSMsgType.CLOSED,
                    aiohttp.WSMsgType.CLOSE,
                    aiohttp.WSMsgType.CLOSING,
                ):
                    if closing_ws:
                        return
                    raise APIStatusError(message="Telnyx STT WebSocket closed unexpectedly")

                if msg.type == aiohttp.WSMsgType.TEXT:
                    try:
                        data = json.loads(msg.data)
                        logger.debug(
                            "Telnyx STT received: is_final=%s, has_transcript=%s",
                            data.get("is_final"),
                            bool(data.get("transcript")),
                        )
                        self._process_stream_event(data)
                    except Exception:
                        logger.exception("Failed to process Telnyx STT message")
                elif msg.type == aiohttp.WSMsgType.ERROR:
                    logger.error("Telnyx STT WebSocket error: %s", ws.exception())

        ws: aiohttp.ClientWebSocketResponse | None = None
        try:
            ws = await self._connect_ws()
            tasks = [
                asyncio.create_task(send_task(ws)),
                asyncio.create_task(recv_task(ws)),
            ]
            try:
                await asyncio.gather(*tasks)
            finally:
                await utils.aio.gracefully_cancel(*tasks)
        finally:
            if ws is not None:
                await ws.close()

    async def _connect_ws(self) -> aiohttp.ClientWebSocketResponse:
        opts = self._stt._opts
        params = {
            "transcription_engine": opts.transcription_engine,
            "language": self._language,
            "input_format": "wav",
        }
        query_string = "&".join(f"{k}={v}" for k, v in params.items())
        url = f"{opts.base_url}?{query_string}"
        headers = {"Authorization": f"Bearer {opts.api_key}"}

        try:
            ws = await asyncio.wait_for(
                self._stt._session_manager.ensure_session().ws_connect(url, headers=headers),
                self._conn_options.timeout,
            )
            logger.debug("Established Telnyx STT WebSocket connection")
            return ws
        except (aiohttp.ClientConnectorError, asyncio.TimeoutError) as e:
            raise APIConnectionError("Failed to connect to Telnyx STT") from e

    def _process_stream_event(self, data: dict) -> None:
        transcript = data.get("transcript", "")
        is_final = data.get("is_final", False)

        if not transcript:
            return

        if not self._speaking:
            self._speaking = True
            self._event_ch.send_nowait(stt.SpeechEvent(type=stt.SpeechEventType.START_OF_SPEECH))

        alternatives = [
            stt.SpeechData(
                language=self._language,
                text=transcript,
                confidence=data.get("confidence", 0.0),
            )
        ]

        if is_final:
            self._event_ch.send_nowait(
                stt.SpeechEvent(
                    type=stt.SpeechEventType.FINAL_TRANSCRIPT,
                    alternatives=alternatives,
                )
            )
            self._speaking = False
            self._event_ch.send_nowait(stt.SpeechEvent(type=stt.SpeechEventType.END_OF_SPEECH))
        else:
            self._event_ch.send_nowait(
                stt.SpeechEvent(
                    type=stt.SpeechEventType.INTERIM_TRANSCRIPT,
                    alternatives=alternatives,
                )
            )

Helper class that provides a standard way to create an ABC using inheritance.

Args: sample_rate : int or None, optional The desired sample rate for the audio input. If specified, the audio input will be automatically resampled to match the given sample rate before being processed for Speech-to-Text. If not provided (None), the input will retain its original sample rate.

Ancestors

  • livekit.agents.stt.stt.RecognizeStream
  • abc.ABC