Module livekit.plugins.respeecher
Respeecher plugin for LiveKit Agents
Voice cloning and synthesis plugin for LiveKit Agents using Respeecher API.
Functions
async def list_voices(*,
model: TTSModels | str = '/public/tts/en-rt',
api_key: NotGivenOr[str] = NOT_GIVEN,
base_url: str = 'https://api.respeecher.com/v1',
http_session: aiohttp.ClientSession | None = None) ‑> list[livekit.plugins.respeecher.models.Voice]-
Expand source code
async def list_voices( *, model: TTSModels | str = "/public/tts/en-rt", api_key: NotGivenOr[str] = NOT_GIVEN, base_url: str = API_BASE_URL, http_session: aiohttp.ClientSession | None = None, ) -> list[Voice]: """List available voices for the given Respeecher model. Args: model: The Respeecher TTS model whose voices should be listed. api_key: Respeecher API key. If not provided, uses RESPEECHER_API_KEY env variable. base_url: The base URL for the Respeecher API. http_session: Optional aiohttp session to use for the request. """ resolved_api_key = api_key if is_given(api_key) else os.environ.get("RESPEECHER_API_KEY") if not resolved_api_key: raise ValueError("RESPEECHER_API_KEY must be set") session = http_session or utils.http_context.http_session() async with session.get( f"{base_url}{model}/voices", headers={ API_AUTH_HEADER: resolved_api_key, API_VERSION_HEADER: API_VERSION, }, ) as resp: resp.raise_for_status() data = await resp.json() voices = [Voice(voice_data) for voice_data in data] if not voices: raise APIError("No voices are available") return voicesList available voices for the given Respeecher model.
Args
model- The Respeecher TTS model whose voices should be listed.
api_key- Respeecher API key. If not provided, uses RESPEECHER_API_KEY env variable.
base_url- The base URL for the Respeecher API.
http_session- Optional aiohttp session to use for the request.
Classes
class ChunkedStream (*,
tts: TTS,
input_text: str,
conn_options: APIConnectOptions)-
Expand source code
class ChunkedStream(tts.ChunkedStream): """Synthesize text using Respeecher HTTPS endpoint""" def __init__(self, *, tts: TTS, input_text: str, conn_options: APIConnectOptions) -> None: super().__init__(tts=tts, input_text=input_text, conn_options=conn_options) self._tts: TTS = tts self._opts = replace(tts._opts) async def _run(self, output_emitter: tts.AudioEmitter) -> None: """Run the TTS synthesis""" json_data = { "transcript": self._input_text, "voice": { "id": self._opts.voice_id, }, "output_format": { "sample_rate": self._opts.sample_rate, "encoding": self._opts.encoding, }, } if is_given(self._opts.voice_settings) and self._opts.voice_settings.sampling_params: json_data["voice"]["sampling_params"] = self._opts.voice_settings.sampling_params # type: ignore[index] http_url = f"{self._opts.base_url}{self._opts.model}/tts/bytes" try: async with self._tts._ensure_session().post( http_url, headers={ API_AUTH_HEADER: self._opts.api_key, API_VERSION_HEADER: API_VERSION, "Content-Type": "application/json", }, json=json_data, timeout=aiohttp.ClientTimeout(total=30, sock_connect=self._conn_options.timeout), ) as resp: resp.raise_for_status() output_emitter.initialize( request_id=utils.shortuuid(), sample_rate=self._opts.sample_rate, num_channels=1, # /tts/bytes returns WAV-wrapped PCM; the WebSocket stream returns raw PCM. mime_type="audio/wav", ) async for data, _ in resp.content.iter_chunks(): output_emitter.push(data) output_emitter.flush() except asyncio.TimeoutError: raise APITimeoutError() from None except aiohttp.ClientResponseError as e: raise APIStatusError( message=e.message, status_code=e.status, request_id=None, body=None ) from None except Exception as e: raise APIConnectionError() from eSynthesize text using Respeecher HTTPS endpoint
Ancestors
- livekit.agents.tts.tts.ChunkedStream
- abc.ABC
class SynthesizeStream (*,
tts: TTS,
conn_options: APIConnectOptions)-
Expand source code
class SynthesizeStream(tts.SynthesizeStream): """Streamed API using WebSocket for real-time synthesis""" def __init__(self, *, tts: TTS, conn_options: APIConnectOptions): super().__init__(tts=tts, conn_options=conn_options) self._tts: TTS = tts self._opts = replace(tts._opts) async def aclose(self) -> None: await super().aclose() async def _run(self, output_emitter: tts.AudioEmitter) -> None: context_id = utils.shortuuid() output_emitter.initialize( request_id=context_id, sample_rate=self._opts.sample_rate, num_channels=1, stream=True, mime_type="audio/pcm", ) output_emitter.start_segment(segment_id=context_id) sent_tokenizer_stream = self._tts._sentence_tokenizer.stream() input_ended = False def _voice_payload() -> dict[str, object]: voice: dict[str, object] = {"id": self._opts.voice_id} if is_given(self._opts.voice_settings) and self._opts.voice_settings.sampling_params: voice["sampling_params"] = self._opts.voice_settings.sampling_params return voice async def _input_task() -> None: async for data in self._input_ch: if isinstance(data, self._FlushSentinel): sent_tokenizer_stream.flush() continue sent_tokenizer_stream.push_text(data) sent_tokenizer_stream.end_input() async def _sentence_stream_task(ws: aiohttp.ClientWebSocketResponse) -> None: nonlocal input_ended output_format = { "encoding": self._opts.encoding, "sample_rate": self._opts.sample_rate, } async for sent in sent_tokenizer_stream: self._mark_started() await ws.send_str( json.dumps( { "context_id": context_id, "transcript": sent.token if sent.token else " ", "voice": _voice_payload(), "continue": True, "output_format": output_format, } ) ) await ws.send_str( json.dumps( { "context_id": context_id, "transcript": " ", "voice": _voice_payload(), "continue": False, "output_format": output_format, } ) ) input_ended = True async def _recv_task(ws: aiohttp.ClientWebSocketResponse) -> None: while True: msg = await ws.receive(timeout=self._conn_options.timeout) if msg.type in ( aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE, aiohttp.WSMsgType.CLOSING, ): raise APIStatusError( "Respeecher connection closed unexpectedly", request_id=context_id ) if msg.type == aiohttp.WSMsgType.ERROR: raise APIConnectionError( f"Respeecher WebSocket transport error: {ws.exception()}" ) if msg.type != aiohttp.WSMsgType.TEXT: logger.warning("Unexpected Respeecher message type %s", msg.type) continue data = json.loads(msg.data) if data.get("context_id") != context_id: logger.warning( "Received a message with context_id=%s instead of expected %s", data.get("context_id"), context_id, ) continue if data.get("type") == "error": raise APIError(f"Respeecher returned error: {data.get('error')}") if data.get("type") == "chunk": audio_data = base64.b64decode(data["data"]) output_emitter.push(audio_data) elif data.get("type") == "done": if input_ended: break try: async with self._tts._pool.connection(timeout=self._conn_options.timeout) as ws: tasks = [ asyncio.create_task(_input_task()), asyncio.create_task(_sentence_stream_task(ws)), asyncio.create_task(_recv_task(ws)), ] try: await asyncio.gather(*tasks) finally: await sent_tokenizer_stream.aclose() await utils.aio.gracefully_cancel(*tasks) except asyncio.TimeoutError: raise APITimeoutError() from None except aiohttp.ClientResponseError as e: raise APIStatusError( message=e.message, status_code=e.status, request_id=None, body=None ) from None except APIError: raise except Exception as e: raise APIConnectionError() from e finally: output_emitter.end_segment()Streamed API using WebSocket for real-time synthesis
Ancestors
- livekit.agents.tts.tts.SynthesizeStream
- abc.ABC
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: await super().aclose()Close ths stream immediately
class TTS (*,
voice_id: NotGivenOr[str] = NOT_GIVEN,
api_key: NotGivenOr[str] = NOT_GIVEN,
model: TTSModels | str = '/public/tts/en-rt',
encoding: TTSEncoding = 'pcm_s16le',
voice_settings: NotGivenOr[VoiceSettings] = NOT_GIVEN,
sample_rate: int = 24000,
tokenizer: NotGivenOr[tokenize.SentenceTokenizer] = NOT_GIVEN,
http_session: aiohttp.ClientSession | None = None,
base_url: str = 'https://api.respeecher.com/v1')-
Expand source code
class TTS(tts.TTS): def __init__( self, *, voice_id: NotGivenOr[str] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, model: TTSModels | str = "/public/tts/en-rt", encoding: TTSEncoding = "pcm_s16le", voice_settings: NotGivenOr[VoiceSettings] = NOT_GIVEN, sample_rate: int = 24000, tokenizer: NotGivenOr[tokenize.SentenceTokenizer] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, base_url: str = API_BASE_URL, ) -> None: """ Create a new instance of Respeecher TTS. Args: voice_id: ID of the voice to use. If not provided, a model-specific default is used (see `DEFAULT_VOICES`). Each model exposes a different set of voices; call the module-level `list_voices()` helper to discover the IDs available for the chosen model. api_key: Respeecher API key. If not provided, uses RESPEECHER_API_KEY env variable. model: The Respeecher TTS model to use. encoding: Audio encoding format. voice_settings: Optional voice settings including sampling parameters. sample_rate: Audio sample rate in Hz. http_session: Optional aiohttp session to use for requests. base_url: The base URL for the Respeecher API. """ super().__init__( capabilities=tts.TTSCapabilities( streaming=True, aligned_transcript=False, ), sample_rate=sample_rate, num_channels=1, ) respeecher_api_key = api_key if is_given(api_key) else os.environ.get("RESPEECHER_API_KEY") if not respeecher_api_key: raise ValueError("RESPEECHER_API_KEY must be set") resolved_voice_id = voice_id if is_given(voice_id) else DEFAULT_VOICES.get(model) if not resolved_voice_id: raise ValueError( f"voice_id is required for model {model!r} (no default voice is configured); " "pass voice_id explicitly or use one of the supported models." ) self._opts = _TTSOptions( model=model, encoding=encoding, sample_rate=sample_rate, voice_id=resolved_voice_id, voice_settings=voice_settings, api_key=respeecher_api_key, base_url=base_url, ) self._session = http_session self._streams = weakref.WeakSet[SynthesizeStream]() self._sentence_tokenizer = ( tokenizer if is_given(tokenizer) else tokenize.blingfire.SentenceTokenizer() ) self._pool = utils.ConnectionPool[aiohttp.ClientWebSocketResponse]( connect_cb=self._connect_ws, close_cb=self._close_ws, ) self._retired_pools: list[utils.ConnectionPool[aiohttp.ClientWebSocketResponse]] = [] @property def model(self) -> str: return self._opts.model @property def provider(self) -> str: return "Respeecher" async def _connect_ws(self, timeout: float) -> aiohttp.ClientWebSocketResponse: session = self._ensure_session() # WebSocket protocol does not support custom headers, using query parameter ws_url = self._opts.base_url.replace("https://", "wss://").replace("http://", "ws://") if not ws_url.startswith("wss://"): logger.error("Insecure WebSocket connection detected, wss:// required") raise APIConnectionError("Secure WebSocket connection (wss://) required") full_ws_url = f"{ws_url}{self._opts.model}/tts/websocket?api_key={self._opts.api_key}&source={API_VERSION_HEADER}&version={API_VERSION}" return await asyncio.wait_for(session.ws_connect(full_ws_url), timeout) async def _close_ws(self, ws: aiohttp.ClientWebSocketResponse) -> None: await ws.close() def _ensure_session(self) -> aiohttp.ClientSession: if not self._session: self._session = utils.http_context.http_session() return self._session def update_options( self, *, voice_id: NotGivenOr[str] = NOT_GIVEN, voice_settings: NotGivenOr[VoiceSettings] = NOT_GIVEN, model: NotGivenOr[TTSModels | str] = NOT_GIVEN, ) -> None: """Update TTS options""" if is_given(model) and model != self._opts.model: self._opts.model = model # The model is baked into the WebSocket URL, so existing pooled # connections can't serve the new model. Retire the old pool # (letting any in-flight stream finish using its connection) and # route new requests through a fresh pool. The retired pool is # closed during aclose(). self._retired_pools.append(self._pool) self._pool = utils.ConnectionPool[aiohttp.ClientWebSocketResponse]( connect_cb=self._connect_ws, close_cb=self._close_ws, ) if is_given(voice_id): self._opts.voice_id = voice_id if is_given(voice_settings): self._opts.voice_settings = voice_settings def synthesize( self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS ) -> ChunkedStream: return ChunkedStream(tts=self, input_text=text, conn_options=conn_options) def prewarm(self) -> None: self._pool.prewarm() def stream( self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS ) -> SynthesizeStream: stream = SynthesizeStream(tts=self, conn_options=conn_options) self._streams.add(stream) return stream async def aclose(self) -> None: for stream in list(self._streams): await stream.aclose() self._streams.clear() await self._pool.aclose() for pool in self._retired_pools: await pool.aclose() self._retired_pools.clear()Helper class that provides a standard way to create an ABC using inheritance.
Create a new instance of Respeecher TTS.
Args
voice_id- ID of the voice to use. If not provided, a model-specific default is used (see
DEFAULT_VOICES). Each model exposes a different set of voices; call the module-levellist_voices()helper to discover the IDs available for the chosen model. api_key- Respeecher API key. If not provided, uses RESPEECHER_API_KEY env variable.
model- The Respeecher TTS model to use.
encoding- Audio encoding format.
voice_settings- Optional voice settings including sampling parameters.
sample_rate- Audio sample rate in Hz.
http_session- Optional aiohttp session to use for requests.
base_url- The base URL for the Respeecher API.
Ancestors
- livekit.agents.tts.tts.TTS
- abc.ABC
- EventEmitter
- typing.Generic
Instance variables
prop model : str-
Expand source code
@property def model(self) -> str: return self._opts.modelGet the model name/identifier for this TTS instance.
Returns
The model name if available, "unknown" otherwise.
Note
Plugins should override this property to provide their model information.
prop provider : str-
Expand source code
@property def provider(self) -> str: return "Respeecher"Get the provider name/identifier for this TTS instance.
Returns
The provider name if available, "unknown" otherwise.
Note
Plugins should override this property to provide their provider information.
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: for stream in list(self._streams): await stream.aclose() self._streams.clear() await self._pool.aclose() for pool in self._retired_pools: await pool.aclose() self._retired_pools.clear() def prewarm(self) ‑> None-
Expand source code
def prewarm(self) -> None: self._pool.prewarm()Pre-warm connection to the TTS service
def stream(self,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.respeecher.tts.SynthesizeStream-
Expand source code
def stream( self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS ) -> SynthesizeStream: stream = SynthesizeStream(tts=self, conn_options=conn_options) self._streams.add(stream) return stream def synthesize(self,
text: str,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.respeecher.tts.ChunkedStream-
Expand source code
def synthesize( self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS ) -> ChunkedStream: return ChunkedStream(tts=self, input_text=text, conn_options=conn_options) def update_options(self,
*,
voice_id: NotGivenOr[str] = NOT_GIVEN,
voice_settings: NotGivenOr[VoiceSettings] = NOT_GIVEN,
model: NotGivenOr[TTSModels | str] = NOT_GIVEN) ‑> None-
Expand source code
def update_options( self, *, voice_id: NotGivenOr[str] = NOT_GIVEN, voice_settings: NotGivenOr[VoiceSettings] = NOT_GIVEN, model: NotGivenOr[TTSModels | str] = NOT_GIVEN, ) -> None: """Update TTS options""" if is_given(model) and model != self._opts.model: self._opts.model = model # The model is baked into the WebSocket URL, so existing pooled # connections can't serve the new model. Retire the old pool # (letting any in-flight stream finish using its connection) and # route new requests through a fresh pool. The retired pool is # closed during aclose(). self._retired_pools.append(self._pool) self._pool = utils.ConnectionPool[aiohttp.ClientWebSocketResponse]( connect_cb=self._connect_ws, close_cb=self._close_ws, ) if is_given(voice_id): self._opts.voice_id = voice_id if is_given(voice_settings): self._opts.voice_settings = voice_settingsUpdate TTS options
Inherited members