Module livekit.plugins.respeecher

Respeecher plugin for LiveKit Agents

Voice cloning and synthesis plugin for LiveKit Agents using Respeecher API.

Functions

async def list_voices(*,
model: TTSModels | str = '/public/tts/en-rt',
api_key: NotGivenOr[str] = NOT_GIVEN,
base_url: str = 'https://api.respeecher.com/v1',
http_session: aiohttp.ClientSession | None = None) ‑> list[livekit.plugins.respeecher.models.Voice]
Expand source code
async def list_voices(
    *,
    model: TTSModels | str = "/public/tts/en-rt",
    api_key: NotGivenOr[str] = NOT_GIVEN,
    base_url: str = API_BASE_URL,
    http_session: aiohttp.ClientSession | None = None,
) -> list[Voice]:
    """List available voices for the given Respeecher model.

    Args:
        model: The Respeecher TTS model whose voices should be listed.
        api_key: Respeecher API key. If not provided, uses RESPEECHER_API_KEY env variable.
        base_url: The base URL for the Respeecher API.
        http_session: Optional aiohttp session to use for the request.
    """
    resolved_api_key = api_key if is_given(api_key) else os.environ.get("RESPEECHER_API_KEY")
    if not resolved_api_key:
        raise ValueError("RESPEECHER_API_KEY must be set")

    session = http_session or utils.http_context.http_session()
    async with session.get(
        f"{base_url}{model}/voices",
        headers={
            API_AUTH_HEADER: resolved_api_key,
            API_VERSION_HEADER: API_VERSION,
        },
    ) as resp:
        resp.raise_for_status()
        data = await resp.json()
        voices = [Voice(voice_data) for voice_data in data]
        if not voices:
            raise APIError("No voices are available")
        return voices

List available voices for the given Respeecher model.

Args

model
The Respeecher TTS model whose voices should be listed.
api_key
Respeecher API key. If not provided, uses RESPEECHER_API_KEY env variable.
base_url
The base URL for the Respeecher API.
http_session
Optional aiohttp session to use for the request.

Classes

class ChunkedStream (*,
tts: TTS,
input_text: str,
conn_options: APIConnectOptions)
Expand source code
class ChunkedStream(tts.ChunkedStream):
    """Synthesize text using Respeecher HTTPS endpoint"""

    def __init__(self, *, tts: TTS, input_text: str, conn_options: APIConnectOptions) -> None:
        super().__init__(tts=tts, input_text=input_text, conn_options=conn_options)
        self._tts: TTS = tts
        self._opts = replace(tts._opts)

    async def _run(self, output_emitter: tts.AudioEmitter) -> None:
        """Run the TTS synthesis"""
        json_data = {
            "transcript": self._input_text,
            "voice": {
                "id": self._opts.voice_id,
            },
            "output_format": {
                "sample_rate": self._opts.sample_rate,
                "encoding": self._opts.encoding,
            },
        }

        if is_given(self._opts.voice_settings) and self._opts.voice_settings.sampling_params:
            json_data["voice"]["sampling_params"] = self._opts.voice_settings.sampling_params  # type: ignore[index]

        http_url = f"{self._opts.base_url}{self._opts.model}/tts/bytes"

        try:
            async with self._tts._ensure_session().post(
                http_url,
                headers={
                    API_AUTH_HEADER: self._opts.api_key,
                    API_VERSION_HEADER: API_VERSION,
                    "Content-Type": "application/json",
                },
                json=json_data,
                timeout=aiohttp.ClientTimeout(total=30, sock_connect=self._conn_options.timeout),
            ) as resp:
                resp.raise_for_status()

                output_emitter.initialize(
                    request_id=utils.shortuuid(),
                    sample_rate=self._opts.sample_rate,
                    num_channels=1,
                    # /tts/bytes returns WAV-wrapped PCM; the WebSocket stream returns raw PCM.
                    mime_type="audio/wav",
                )

                async for data, _ in resp.content.iter_chunks():
                    output_emitter.push(data)

                output_emitter.flush()
        except asyncio.TimeoutError:
            raise APITimeoutError() from None
        except aiohttp.ClientResponseError as e:
            raise APIStatusError(
                message=e.message, status_code=e.status, request_id=None, body=None
            ) from None
        except Exception as e:
            raise APIConnectionError() from e

Synthesize text using Respeecher HTTPS endpoint

Ancestors

  • livekit.agents.tts.tts.ChunkedStream
  • abc.ABC
class SynthesizeStream (*,
tts: TTS,
conn_options: APIConnectOptions)
Expand source code
class SynthesizeStream(tts.SynthesizeStream):
    """Streamed API using WebSocket for real-time synthesis"""

    def __init__(self, *, tts: TTS, conn_options: APIConnectOptions):
        super().__init__(tts=tts, conn_options=conn_options)
        self._tts: TTS = tts
        self._opts = replace(tts._opts)

    async def aclose(self) -> None:
        await super().aclose()

    async def _run(self, output_emitter: tts.AudioEmitter) -> None:
        context_id = utils.shortuuid()
        output_emitter.initialize(
            request_id=context_id,
            sample_rate=self._opts.sample_rate,
            num_channels=1,
            stream=True,
            mime_type="audio/pcm",
        )
        output_emitter.start_segment(segment_id=context_id)

        sent_tokenizer_stream = self._tts._sentence_tokenizer.stream()
        input_ended = False

        def _voice_payload() -> dict[str, object]:
            voice: dict[str, object] = {"id": self._opts.voice_id}
            if is_given(self._opts.voice_settings) and self._opts.voice_settings.sampling_params:
                voice["sampling_params"] = self._opts.voice_settings.sampling_params
            return voice

        async def _input_task() -> None:
            async for data in self._input_ch:
                if isinstance(data, self._FlushSentinel):
                    sent_tokenizer_stream.flush()
                    continue
                sent_tokenizer_stream.push_text(data)
            sent_tokenizer_stream.end_input()

        async def _sentence_stream_task(ws: aiohttp.ClientWebSocketResponse) -> None:
            nonlocal input_ended
            output_format = {
                "encoding": self._opts.encoding,
                "sample_rate": self._opts.sample_rate,
            }
            async for sent in sent_tokenizer_stream:
                self._mark_started()
                await ws.send_str(
                    json.dumps(
                        {
                            "context_id": context_id,
                            "transcript": sent.token if sent.token else " ",
                            "voice": _voice_payload(),
                            "continue": True,
                            "output_format": output_format,
                        }
                    )
                )

            await ws.send_str(
                json.dumps(
                    {
                        "context_id": context_id,
                        "transcript": " ",
                        "voice": _voice_payload(),
                        "continue": False,
                        "output_format": output_format,
                    }
                )
            )
            input_ended = True

        async def _recv_task(ws: aiohttp.ClientWebSocketResponse) -> None:
            while True:
                msg = await ws.receive(timeout=self._conn_options.timeout)
                if msg.type in (
                    aiohttp.WSMsgType.CLOSED,
                    aiohttp.WSMsgType.CLOSE,
                    aiohttp.WSMsgType.CLOSING,
                ):
                    raise APIStatusError(
                        "Respeecher connection closed unexpectedly", request_id=context_id
                    )

                if msg.type == aiohttp.WSMsgType.ERROR:
                    raise APIConnectionError(
                        f"Respeecher WebSocket transport error: {ws.exception()}"
                    )

                if msg.type != aiohttp.WSMsgType.TEXT:
                    logger.warning("Unexpected Respeecher message type %s", msg.type)
                    continue

                data = json.loads(msg.data)

                if data.get("context_id") != context_id:
                    logger.warning(
                        "Received a message with context_id=%s instead of expected %s",
                        data.get("context_id"),
                        context_id,
                    )
                    continue

                if data.get("type") == "error":
                    raise APIError(f"Respeecher returned error: {data.get('error')}")

                if data.get("type") == "chunk":
                    audio_data = base64.b64decode(data["data"])
                    output_emitter.push(audio_data)

                elif data.get("type") == "done":
                    if input_ended:
                        break

        try:
            async with self._tts._pool.connection(timeout=self._conn_options.timeout) as ws:
                tasks = [
                    asyncio.create_task(_input_task()),
                    asyncio.create_task(_sentence_stream_task(ws)),
                    asyncio.create_task(_recv_task(ws)),
                ]

                try:
                    await asyncio.gather(*tasks)
                finally:
                    await sent_tokenizer_stream.aclose()
                    await utils.aio.gracefully_cancel(*tasks)
        except asyncio.TimeoutError:
            raise APITimeoutError() from None
        except aiohttp.ClientResponseError as e:
            raise APIStatusError(
                message=e.message, status_code=e.status, request_id=None, body=None
            ) from None
        except APIError:
            raise
        except Exception as e:
            raise APIConnectionError() from e
        finally:
            output_emitter.end_segment()

Streamed API using WebSocket for real-time synthesis

Ancestors

  • livekit.agents.tts.tts.SynthesizeStream
  • abc.ABC

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    await super().aclose()

Close ths stream immediately

class TTS (*,
voice_id: NotGivenOr[str] = NOT_GIVEN,
api_key: NotGivenOr[str] = NOT_GIVEN,
model: TTSModels | str = '/public/tts/en-rt',
encoding: TTSEncoding = 'pcm_s16le',
voice_settings: NotGivenOr[VoiceSettings] = NOT_GIVEN,
sample_rate: int = 24000,
tokenizer: NotGivenOr[tokenize.SentenceTokenizer] = NOT_GIVEN,
http_session: aiohttp.ClientSession | None = None,
base_url: str = 'https://api.respeecher.com/v1')
Expand source code
class TTS(tts.TTS):
    def __init__(
        self,
        *,
        voice_id: NotGivenOr[str] = NOT_GIVEN,
        api_key: NotGivenOr[str] = NOT_GIVEN,
        model: TTSModels | str = "/public/tts/en-rt",
        encoding: TTSEncoding = "pcm_s16le",
        voice_settings: NotGivenOr[VoiceSettings] = NOT_GIVEN,
        sample_rate: int = 24000,
        tokenizer: NotGivenOr[tokenize.SentenceTokenizer] = NOT_GIVEN,
        http_session: aiohttp.ClientSession | None = None,
        base_url: str = API_BASE_URL,
    ) -> None:
        """
        Create a new instance of Respeecher TTS.

        Args:
            voice_id: ID of the voice to use. If not provided, a model-specific default is used (see `DEFAULT_VOICES`). Each model exposes a different set of voices; call the module-level `list_voices()` helper to discover the IDs available for the chosen model.
            api_key: Respeecher API key. If not provided, uses RESPEECHER_API_KEY env variable.
            model: The Respeecher TTS model to use.
            encoding: Audio encoding format.
            voice_settings: Optional voice settings including sampling parameters.
            sample_rate: Audio sample rate in Hz.
            http_session: Optional aiohttp session to use for requests.
            base_url: The base URL for the Respeecher API.
        """

        super().__init__(
            capabilities=tts.TTSCapabilities(
                streaming=True,
                aligned_transcript=False,
            ),
            sample_rate=sample_rate,
            num_channels=1,
        )

        respeecher_api_key = api_key if is_given(api_key) else os.environ.get("RESPEECHER_API_KEY")
        if not respeecher_api_key:
            raise ValueError("RESPEECHER_API_KEY must be set")

        resolved_voice_id = voice_id if is_given(voice_id) else DEFAULT_VOICES.get(model)
        if not resolved_voice_id:
            raise ValueError(
                f"voice_id is required for model {model!r} (no default voice is configured); "
                "pass voice_id explicitly or use one of the supported models."
            )

        self._opts = _TTSOptions(
            model=model,
            encoding=encoding,
            sample_rate=sample_rate,
            voice_id=resolved_voice_id,
            voice_settings=voice_settings,
            api_key=respeecher_api_key,
            base_url=base_url,
        )
        self._session = http_session
        self._streams = weakref.WeakSet[SynthesizeStream]()
        self._sentence_tokenizer = (
            tokenizer if is_given(tokenizer) else tokenize.blingfire.SentenceTokenizer()
        )
        self._pool = utils.ConnectionPool[aiohttp.ClientWebSocketResponse](
            connect_cb=self._connect_ws,
            close_cb=self._close_ws,
        )
        self._retired_pools: list[utils.ConnectionPool[aiohttp.ClientWebSocketResponse]] = []

    @property
    def model(self) -> str:
        return self._opts.model

    @property
    def provider(self) -> str:
        return "Respeecher"

    async def _connect_ws(self, timeout: float) -> aiohttp.ClientWebSocketResponse:
        session = self._ensure_session()
        # WebSocket protocol does not support custom headers, using query parameter
        ws_url = self._opts.base_url.replace("https://", "wss://").replace("http://", "ws://")
        if not ws_url.startswith("wss://"):
            logger.error("Insecure WebSocket connection detected, wss:// required")
            raise APIConnectionError("Secure WebSocket connection (wss://) required")

        full_ws_url = f"{ws_url}{self._opts.model}/tts/websocket?api_key={self._opts.api_key}&source={API_VERSION_HEADER}&version={API_VERSION}"
        return await asyncio.wait_for(session.ws_connect(full_ws_url), timeout)

    async def _close_ws(self, ws: aiohttp.ClientWebSocketResponse) -> None:
        await ws.close()

    def _ensure_session(self) -> aiohttp.ClientSession:
        if not self._session:
            self._session = utils.http_context.http_session()
        return self._session

    def update_options(
        self,
        *,
        voice_id: NotGivenOr[str] = NOT_GIVEN,
        voice_settings: NotGivenOr[VoiceSettings] = NOT_GIVEN,
        model: NotGivenOr[TTSModels | str] = NOT_GIVEN,
    ) -> None:
        """Update TTS options"""
        if is_given(model) and model != self._opts.model:
            self._opts.model = model
            # The model is baked into the WebSocket URL, so existing pooled
            # connections can't serve the new model. Retire the old pool
            # (letting any in-flight stream finish using its connection) and
            # route new requests through a fresh pool. The retired pool is
            # closed during aclose().
            self._retired_pools.append(self._pool)
            self._pool = utils.ConnectionPool[aiohttp.ClientWebSocketResponse](
                connect_cb=self._connect_ws,
                close_cb=self._close_ws,
            )

        if is_given(voice_id):
            self._opts.voice_id = voice_id
        if is_given(voice_settings):
            self._opts.voice_settings = voice_settings

    def synthesize(
        self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS
    ) -> ChunkedStream:
        return ChunkedStream(tts=self, input_text=text, conn_options=conn_options)

    def prewarm(self) -> None:
        self._pool.prewarm()

    def stream(
        self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS
    ) -> SynthesizeStream:
        stream = SynthesizeStream(tts=self, conn_options=conn_options)
        self._streams.add(stream)
        return stream

    async def aclose(self) -> None:
        for stream in list(self._streams):
            await stream.aclose()

        self._streams.clear()
        await self._pool.aclose()
        for pool in self._retired_pools:
            await pool.aclose()
        self._retired_pools.clear()

Helper class that provides a standard way to create an ABC using inheritance.

Create a new instance of Respeecher TTS.

Args

voice_id
ID of the voice to use. If not provided, a model-specific default is used (see DEFAULT_VOICES). Each model exposes a different set of voices; call the module-level list_voices() helper to discover the IDs available for the chosen model.
api_key
Respeecher API key. If not provided, uses RESPEECHER_API_KEY env variable.
model
The Respeecher TTS model to use.
encoding
Audio encoding format.
voice_settings
Optional voice settings including sampling parameters.
sample_rate
Audio sample rate in Hz.
http_session
Optional aiohttp session to use for requests.
base_url
The base URL for the Respeecher API.

Ancestors

  • livekit.agents.tts.tts.TTS
  • abc.ABC
  • EventEmitter
  • typing.Generic

Instance variables

prop model : str
Expand source code
@property
def model(self) -> str:
    return self._opts.model

Get the model name/identifier for this TTS instance.

Returns

The model name if available, "unknown" otherwise.

Note

Plugins should override this property to provide their model information.

prop provider : str
Expand source code
@property
def provider(self) -> str:
    return "Respeecher"

Get the provider name/identifier for this TTS instance.

Returns

The provider name if available, "unknown" otherwise.

Note

Plugins should override this property to provide their provider information.

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    for stream in list(self._streams):
        await stream.aclose()

    self._streams.clear()
    await self._pool.aclose()
    for pool in self._retired_pools:
        await pool.aclose()
    self._retired_pools.clear()
def prewarm(self) ‑> None
Expand source code
def prewarm(self) -> None:
    self._pool.prewarm()

Pre-warm connection to the TTS service

def stream(self,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.respeecher.tts.SynthesizeStream
Expand source code
def stream(
    self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS
) -> SynthesizeStream:
    stream = SynthesizeStream(tts=self, conn_options=conn_options)
    self._streams.add(stream)
    return stream
def synthesize(self,
text: str,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.respeecher.tts.ChunkedStream
Expand source code
def synthesize(
    self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS
) -> ChunkedStream:
    return ChunkedStream(tts=self, input_text=text, conn_options=conn_options)
def update_options(self,
*,
voice_id: NotGivenOr[str] = NOT_GIVEN,
voice_settings: NotGivenOr[VoiceSettings] = NOT_GIVEN,
model: NotGivenOr[TTSModels | str] = NOT_GIVEN) ‑> None
Expand source code
def update_options(
    self,
    *,
    voice_id: NotGivenOr[str] = NOT_GIVEN,
    voice_settings: NotGivenOr[VoiceSettings] = NOT_GIVEN,
    model: NotGivenOr[TTSModels | str] = NOT_GIVEN,
) -> None:
    """Update TTS options"""
    if is_given(model) and model != self._opts.model:
        self._opts.model = model
        # The model is baked into the WebSocket URL, so existing pooled
        # connections can't serve the new model. Retire the old pool
        # (letting any in-flight stream finish using its connection) and
        # route new requests through a fresh pool. The retired pool is
        # closed during aclose().
        self._retired_pools.append(self._pool)
        self._pool = utils.ConnectionPool[aiohttp.ClientWebSocketResponse](
            connect_cb=self._connect_ws,
            close_cb=self._close_ws,
        )

    if is_given(voice_id):
        self._opts.voice_id = voice_id
    if is_given(voice_settings):
        self._opts.voice_settings = voice_settings

Update TTS options

Inherited members