Module livekit.plugins.telnyx.tts

Classes

class SynthesizeStream (*,
tts: TTS,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0))
Expand source code
class SynthesizeStream(tts.SynthesizeStream):
    def __init__(
        self, *, tts: TTS, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS
    ) -> None:
        super().__init__(tts=tts, conn_options=conn_options)
        self._tts: TTS = tts

    async def _run(self, output_emitter: tts.AudioEmitter) -> None:
        self._segments_ch = utils.aio.Chan[str]()
        request_id = utils.shortuuid()
        output_emitter.initialize(
            request_id=request_id,
            sample_rate=SAMPLE_RATE,
            num_channels=NUM_CHANNELS,
            mime_type="audio/pcm",
            stream=True,
        )

        async def _collect_segments() -> None:
            segment_text = ""
            async for input_data in self._input_ch:
                if isinstance(input_data, str):
                    segment_text += input_data
                elif isinstance(input_data, self._FlushSentinel):
                    if segment_text:
                        self._segments_ch.send_nowait(segment_text)
                        segment_text = ""
            self._segments_ch.close()

        async def _run_segments() -> None:
            async for text in self._segments_ch:
                await self._run_ws(text, output_emitter)

        tasks = [
            asyncio.create_task(_collect_segments()),
            asyncio.create_task(_run_segments()),
        ]
        try:
            await asyncio.gather(*tasks)
        except asyncio.TimeoutError:
            raise APITimeoutError() from None
        except aiohttp.ClientResponseError as e:
            raise APIStatusError(
                message=e.message, status_code=e.status, request_id=request_id, body=None
            ) from None
        except APIConnectionError:
            raise
        except APIStatusError:
            raise
        except Exception as e:
            raise APIConnectionError() from e
        finally:
            await utils.aio.gracefully_cancel(*tasks)

    async def _run_ws(self, text: str, output_emitter: tts.AudioEmitter) -> None:
        segment_id = utils.shortuuid()
        output_emitter.start_segment(segment_id=segment_id)

        url = f"{self._tts._opts.base_url}?voice={self._tts._opts.voice}"
        headers = {"Authorization": f"Bearer {self._tts._opts.api_key}"}

        decoder = utils.codecs.AudioStreamDecoder(
            sample_rate=SAMPLE_RATE,
            num_channels=NUM_CHANNELS,
            format="audio/mp3",
        )

        async def send_task(ws: aiohttp.ClientWebSocketResponse) -> None:
            await ws.send_str(json.dumps({"text": " "}))
            self._mark_started()
            await ws.send_str(json.dumps({"text": text}))
            await ws.send_str(json.dumps({"text": ""}))

        async def recv_task(ws: aiohttp.ClientWebSocketResponse) -> None:
            async for msg in ws:
                if msg.type == aiohttp.WSMsgType.TEXT:
                    try:
                        data = json.loads(msg.data)
                        audio_data = data.get("audio")
                        if audio_data:
                            audio_bytes = base64.b64decode(audio_data)
                            if audio_bytes:
                                decoder.push(audio_bytes)
                    except json.JSONDecodeError:
                        logger.warning("Telnyx TTS: Received invalid JSON")

                elif msg.type in (
                    aiohttp.WSMsgType.CLOSE,
                    aiohttp.WSMsgType.CLOSED,
                    aiohttp.WSMsgType.CLOSING,
                ):
                    break
                elif msg.type == aiohttp.WSMsgType.ERROR:
                    logger.error("Telnyx TTS WebSocket error: %s", ws.exception())
                    break

            decoder.end_input()

        async def decode_task() -> None:
            async for frame in decoder:
                output_emitter.push(frame.data.tobytes())

        try:
            ws = await asyncio.wait_for(
                self._tts._session_manager.ensure_session().ws_connect(url, headers=headers),
                self._conn_options.timeout,
            )
            async with ws:
                tasks = [
                    asyncio.create_task(send_task(ws)),
                    asyncio.create_task(recv_task(ws)),
                    asyncio.create_task(decode_task()),
                ]
                try:
                    await asyncio.gather(*tasks)
                finally:
                    await utils.aio.gracefully_cancel(*tasks)
        except asyncio.TimeoutError:
            raise APITimeoutError() from None
        except aiohttp.ClientResponseError as e:
            raise APIStatusError(
                message=e.message, status_code=e.status, request_id=None, body=None
            ) from None
        except (APIConnectionError, APIStatusError, APITimeoutError):
            raise
        except Exception as e:
            raise APIConnectionError() from e
        finally:
            await decoder.aclose()
            output_emitter.end_segment()

Helper class that provides a standard way to create an ABC using inheritance.

Ancestors

  • livekit.agents.tts.tts.SynthesizeStream
  • abc.ABC
class TTS (*,
voice: str = 'Telnyx.NaturalHD.astra',
api_key: str | None = None,
base_url: str = 'wss://api.telnyx.com/v2/text-to-speech/speech',
http_session: aiohttp.ClientSession | None = None)
Expand source code
class TTS(tts.TTS):
    def __init__(
        self,
        *,
        voice: str = "Telnyx.NaturalHD.astra",
        api_key: str | None = None,
        base_url: str = TTS_ENDPOINT,
        http_session: aiohttp.ClientSession | None = None,
    ) -> None:
        super().__init__(
            capabilities=tts.TTSCapabilities(streaming=True),
            sample_rate=SAMPLE_RATE,
            num_channels=NUM_CHANNELS,
        )

        self._opts = _TTSOptions(
            voice=voice,
            api_key=get_api_key(api_key),
            base_url=base_url,
        )
        self._session_manager = SessionManager(http_session)
        self._streams = weakref.WeakSet[SynthesizeStream]()

    @property
    def model(self) -> str:
        return self._opts.voice

    @property
    def provider(self) -> str:
        return "telnyx"

    def synthesize(
        self,
        text: str,
        *,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    ) -> tts.ChunkedStream:
        return self._synthesize_with_stream(text, conn_options=conn_options)

    def stream(
        self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS
    ) -> SynthesizeStream:
        stream = SynthesizeStream(tts=self, conn_options=conn_options)
        self._streams.add(stream)
        return stream

    async def aclose(self) -> None:
        for stream in list(self._streams):
            await stream.aclose()
        self._streams.clear()
        await self._session_manager.close()

Helper class that provides a standard way to create an ABC using inheritance.

Ancestors

  • livekit.agents.tts.tts.TTS
  • abc.ABC
  • EventEmitter
  • typing.Generic

Instance variables

prop model : str
Expand source code
@property
def model(self) -> str:
    return self._opts.voice

Get the model name/identifier for this TTS instance.

Returns

The model name if available, "unknown" otherwise.

Note

Plugins should override this property to provide their model information.

prop provider : str
Expand source code
@property
def provider(self) -> str:
    return "telnyx"

Get the provider name/identifier for this TTS instance.

Returns

The provider name if available, "unknown" otherwise.

Note

Plugins should override this property to provide their provider information.

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    for stream in list(self._streams):
        await stream.aclose()
    self._streams.clear()
    await self._session_manager.close()
def stream(self,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> SynthesizeStream
Expand source code
def stream(
    self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS
) -> SynthesizeStream:
    stream = SynthesizeStream(tts=self, conn_options=conn_options)
    self._streams.add(stream)
    return stream
def synthesize(self,
text: str,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.agents.tts.tts.ChunkedStream
Expand source code
def synthesize(
    self,
    text: str,
    *,
    conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
) -> tts.ChunkedStream:
    return self._synthesize_with_stream(text, conn_options=conn_options)

Inherited members