Module livekit.plugins.rime
Rime plugin for LiveKit Agents
See https://docs.livekit.io/agents/integrations/tts/rime/ for more information.
Classes
class ChunkedStream (tts: TTS,
input_text: str,
conn_options: APIConnectOptions)-
Expand source code
class ChunkedStream(tts.ChunkedStream): """Synthesize using the chunked api endpoint""" def __init__(self, tts: TTS, input_text: str, conn_options: APIConnectOptions) -> None: super().__init__(tts=tts, input_text=input_text, conn_options=conn_options) self._tts: TTS = tts self._opts = replace(tts._opts) async def _run(self, output_emitter: tts.AudioEmitter) -> None: payload: dict = { "speaker": self._opts.speaker, "text": self._input_text, "modelId": self._opts.model, **_model_params(self._opts), } format = "audio/pcm" if self._opts.model == "arcana" and self._opts.arcana_options is not None: if is_given(self._opts.arcana_options.sample_rate): payload["samplingRate"] = self._opts.arcana_options.sample_rate elif self._opts.model == "coda" and self._opts.coda_options is not None: if is_given(self._opts.coda_options.sample_rate): payload["samplingRate"] = self._opts.coda_options.sample_rate elif _is_mist_model(self._opts.model) and self._opts.mist_options is not None: mist_opts = self._opts.mist_options if is_given(mist_opts.sample_rate): payload["samplingRate"] = mist_opts.sample_rate if self._opts.model == "mistv2" and is_given(mist_opts.reduce_latency): payload["reduceLatency"] = mist_opts.reduce_latency try: async with self._tts._ensure_session().post( self._tts._base_url, headers={ "accept": format, "Authorization": f"Bearer {self._tts._api_key}", "content-type": "application/json", }, json=payload, timeout=aiohttp.ClientTimeout( total=self._tts._total_timeout, sock_connect=self._conn_options.timeout ), ) as resp: resp.raise_for_status() if not resp.content_type.startswith("audio"): content = await resp.text() logger.error("Rime returned non-audio data: %s", content) return output_emitter.initialize( request_id=utils.shortuuid(), sample_rate=self._tts.sample_rate, num_channels=NUM_CHANNELS, mime_type=format, ) async for data, _ in resp.content.iter_chunks(): output_emitter.push(data) except asyncio.TimeoutError: raise APITimeoutError() from None except aiohttp.ClientResponseError as e: raise APIStatusError( message=e.message, status_code=e.status, request_id=None, body=None ) from None except Exception as e: raise APIConnectionError() from eSynthesize using the chunked api endpoint
Ancestors
- livekit.agents.tts.tts.ChunkedStream
- abc.ABC
class TTS (*,
base_url: NotGivenOr[str] = NOT_GIVEN,
model: TTSModels | str = 'arcana',
speaker: NotGivenOr[ArcanaVoices | str] = NOT_GIVEN,
lang: TTSLangs | str = 'eng',
repetition_penalty: NotGivenOr[float] = NOT_GIVEN,
temperature: NotGivenOr[float] = NOT_GIVEN,
top_p: NotGivenOr[float] = NOT_GIVEN,
max_tokens: NotGivenOr[int] = NOT_GIVEN,
sample_rate: int = 22050,
speed_alpha: NotGivenOr[float] = NOT_GIVEN,
reduce_latency: NotGivenOr[bool] = NOT_GIVEN,
pause_between_brackets: NotGivenOr[bool] = NOT_GIVEN,
phonemize_between_brackets: NotGivenOr[bool] = NOT_GIVEN,
api_key: NotGivenOr[str] = NOT_GIVEN,
http_session: aiohttp.ClientSession | None = None,
use_websocket: bool = False,
segment: NotGivenOr[str] = NOT_GIVEN,
tokenizer: NotGivenOr[tokenize.SentenceTokenizer] = NOT_GIVEN)-
Expand source code
class TTS(tts.TTS): def __init__( self, *, base_url: NotGivenOr[str] = NOT_GIVEN, model: TTSModels | str = "arcana", speaker: NotGivenOr[ArcanaVoices | str] = NOT_GIVEN, lang: TTSLangs | str = "eng", # Arcana options repetition_penalty: NotGivenOr[float] = NOT_GIVEN, temperature: NotGivenOr[float] = NOT_GIVEN, top_p: NotGivenOr[float] = NOT_GIVEN, max_tokens: NotGivenOr[int] = NOT_GIVEN, # Mistv2 options sample_rate: int = 22050, speed_alpha: NotGivenOr[float] = NOT_GIVEN, reduce_latency: NotGivenOr[bool] = NOT_GIVEN, pause_between_brackets: NotGivenOr[bool] = NOT_GIVEN, phonemize_between_brackets: NotGivenOr[bool] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, use_websocket: bool = False, segment: NotGivenOr[str] = NOT_GIVEN, tokenizer: NotGivenOr[tokenize.SentenceTokenizer] = NOT_GIVEN, ) -> None: if is_given(base_url): # Infer streaming mode from URL prefix; an explicit use_websocket=True still wins. use_websocket = use_websocket or base_url.startswith(("ws://", "wss://")) resolved_base_url = base_url else: resolved_base_url = RIME_WS_BASE_URL if use_websocket else RIME_BASE_URL super().__init__( capabilities=tts.TTSCapabilities( streaming=use_websocket, aligned_transcript=use_websocket, ), sample_rate=sample_rate, num_channels=NUM_CHANNELS, ) self._api_key = api_key if is_given(api_key) else os.environ.get("RIME_API_KEY") if not self._api_key: raise ValueError( "Rime API key is required, either as argument or set RIME_API_KEY environmental variable" # noqa: E501 ) if not is_given(speaker): if _is_mist_model(model): speaker = DefaultMistVoice elif model == "coda": speaker = DefaultCodaVoice else: speaker = "astra" self._opts = _TTSOptions( model=model, speaker=speaker, ) if model == "arcana": self._opts.arcana_options = _ArcanaOptions( repetition_penalty=repetition_penalty, temperature=temperature, top_p=top_p, max_tokens=max_tokens, lang=lang, sample_rate=sample_rate, ) elif model == "coda": self._opts.coda_options = _CodaOptions( max_tokens=max_tokens, lang=lang, sample_rate=sample_rate, ) elif _is_mist_model(model): self._opts.mist_options = _MistOptions( lang=lang, sample_rate=sample_rate, speed_alpha=speed_alpha, reduce_latency=reduce_latency, pause_between_brackets=pause_between_brackets, phonemize_between_brackets=phonemize_between_brackets, ) self._session = http_session self._base_url = resolved_base_url self._use_websocket = use_websocket self._segment = segment if is_given(segment) else "bySentence" self._total_timeout = _timeout_for_model(model) self._streams: weakref.WeakSet[SynthesizeStream] = weakref.WeakSet() self._sentence_tokenizer = ( tokenizer if is_given(tokenizer) else tokenize.blingfire.SentenceTokenizer() ) self._pool = utils.ConnectionPool[aiohttp.ClientWebSocketResponse]( connect_cb=self._connect_ws, close_cb=self._close_ws, max_session_duration=300, mark_refreshed_on_get=True, ) @property def model(self) -> str: return self._opts.model @property def provider(self) -> str: return "Rime" def _ensure_session(self) -> aiohttp.ClientSession: if not self._session: self._session = utils.http_context.http_session() return self._session def _ws_url(self) -> str: params: dict[str, object] = { "speaker": self._opts.speaker, "modelId": self._opts.model, "audioFormat": "pcm", "samplingRate": self._sample_rate, "segment": self._segment, **_model_params(self._opts), } encoded = { k: ("true" if v else "false") if isinstance(v, bool) else v for k, v in params.items() } return f"{self._base_url}/ws3?{urlencode(encoded)}" async def _connect_ws(self, timeout: float) -> aiohttp.ClientWebSocketResponse: session = self._ensure_session() return await asyncio.wait_for( session.ws_connect( self._ws_url(), headers={"Authorization": f"Bearer {self._api_key}"} ), timeout, ) async def _close_ws(self, ws: aiohttp.ClientWebSocketResponse) -> None: try: await ws.send_str(json.dumps({"operation": "eos"})) try: await asyncio.wait_for(ws.receive(), timeout=1.0) except asyncio.TimeoutError: pass except Exception as e: logger.warning(f"Error during Rime WS close sequence: {e}") finally: await ws.close() def prewarm(self) -> None: if self._use_websocket: self._pool.prewarm() def stream( self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS ) -> SynthesizeStream: if not self._use_websocket: raise RuntimeError( "Rime TTS streaming requires use_websocket=True at construction time" ) s = SynthesizeStream(tts=self, conn_options=conn_options) self._streams.add(s) return s async def aclose(self) -> None: for s in list(self._streams): await s.aclose() self._streams.clear() await self._pool.aclose() def synthesize( self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS ) -> ChunkedStream: if self._use_websocket: raise RuntimeError( "Rime TTS one-shot synthesize requires use_websocket=False at construction time" ) return ChunkedStream(tts=self, input_text=text, conn_options=conn_options) def update_options( self, *, model: NotGivenOr[TTSModels | str] = NOT_GIVEN, speaker: NotGivenOr[str] = NOT_GIVEN, lang: NotGivenOr[TTSLangs | str] = NOT_GIVEN, # Arcana parameters repetition_penalty: NotGivenOr[float] = NOT_GIVEN, temperature: NotGivenOr[float] = NOT_GIVEN, top_p: NotGivenOr[float] = NOT_GIVEN, max_tokens: NotGivenOr[int] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, # Mistv2 parameters speed_alpha: NotGivenOr[float] = NOT_GIVEN, reduce_latency: NotGivenOr[bool] = NOT_GIVEN, pause_between_brackets: NotGivenOr[bool] = NOT_GIVEN, phonemize_between_brackets: NotGivenOr[bool] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, ) -> None: # WS URL is bound at pool connect; invalidate if any URL-affecting param changed. prev_ws_url = self._ws_url() if self._use_websocket else None if is_given(base_url): self._base_url = base_url if is_given(model): self._opts.model = model self._total_timeout = _timeout_for_model(model) if model == "arcana" and self._opts.arcana_options is None: self._opts.arcana_options = _ArcanaOptions() elif model == "coda" and self._opts.coda_options is None: self._opts.coda_options = _CodaOptions() elif _is_mist_model(model) and self._opts.mist_options is None: self._opts.mist_options = _MistOptions() if is_given(speaker): self._opts.speaker = speaker if self._opts.model == "arcana" and self._opts.arcana_options is not None: if is_given(repetition_penalty): self._opts.arcana_options.repetition_penalty = repetition_penalty if is_given(temperature): self._opts.arcana_options.temperature = temperature if is_given(top_p): self._opts.arcana_options.top_p = top_p if is_given(max_tokens): self._opts.arcana_options.max_tokens = max_tokens if is_given(lang): self._opts.arcana_options.lang = lang if is_given(sample_rate): self._opts.arcana_options.sample_rate = sample_rate elif self._opts.model == "coda" and self._opts.coda_options is not None: if is_given(max_tokens): self._opts.coda_options.max_tokens = max_tokens if is_given(lang): self._opts.coda_options.lang = lang if is_given(sample_rate): self._opts.coda_options.sample_rate = sample_rate elif _is_mist_model(self._opts.model) and self._opts.mist_options is not None: if is_given(lang): self._opts.mist_options.lang = lang if is_given(sample_rate): self._opts.mist_options.sample_rate = sample_rate if is_given(speed_alpha): self._opts.mist_options.speed_alpha = speed_alpha if is_given(reduce_latency): self._opts.mist_options.reduce_latency = reduce_latency if is_given(pause_between_brackets): self._opts.mist_options.pause_between_brackets = pause_between_brackets if is_given(phonemize_between_brackets): self._opts.mist_options.phonemize_between_brackets = phonemize_between_brackets if prev_ws_url is not None and self._ws_url() != prev_ws_url: self._pool.invalidate()Helper class that provides a standard way to create an ABC using inheritance.
Ancestors
- livekit.agents.tts.tts.TTS
- abc.ABC
- EventEmitter
- typing.Generic
Instance variables
prop model : str-
Expand source code
@property def model(self) -> str: return self._opts.modelGet the model name/identifier for this TTS instance.
Returns
The model name if available, "unknown" otherwise.
Note
Plugins should override this property to provide their model information.
prop provider : str-
Expand source code
@property def provider(self) -> str: return "Rime"Get the provider name/identifier for this TTS instance.
Returns
The provider name if available, "unknown" otherwise.
Note
Plugins should override this property to provide their provider information.
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: for s in list(self._streams): await s.aclose() self._streams.clear() await self._pool.aclose() def prewarm(self) ‑> None-
Expand source code
def prewarm(self) -> None: if self._use_websocket: self._pool.prewarm()Pre-warm connection to the TTS service
def stream(self,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.rime.tts.SynthesizeStream-
Expand source code
def stream( self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS ) -> SynthesizeStream: if not self._use_websocket: raise RuntimeError( "Rime TTS streaming requires use_websocket=True at construction time" ) s = SynthesizeStream(tts=self, conn_options=conn_options) self._streams.add(s) return s def synthesize(self,
text: str,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.rime.tts.ChunkedStream-
Expand source code
def synthesize( self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS ) -> ChunkedStream: if self._use_websocket: raise RuntimeError( "Rime TTS one-shot synthesize requires use_websocket=False at construction time" ) return ChunkedStream(tts=self, input_text=text, conn_options=conn_options) def update_options(self,
*,
model: NotGivenOr[TTSModels | str] = NOT_GIVEN,
speaker: NotGivenOr[str] = NOT_GIVEN,
lang: NotGivenOr[TTSLangs | str] = NOT_GIVEN,
repetition_penalty: NotGivenOr[float] = NOT_GIVEN,
temperature: NotGivenOr[float] = NOT_GIVEN,
top_p: NotGivenOr[float] = NOT_GIVEN,
max_tokens: NotGivenOr[int] = NOT_GIVEN,
sample_rate: NotGivenOr[int] = NOT_GIVEN,
speed_alpha: NotGivenOr[float] = NOT_GIVEN,
reduce_latency: NotGivenOr[bool] = NOT_GIVEN,
pause_between_brackets: NotGivenOr[bool] = NOT_GIVEN,
phonemize_between_brackets: NotGivenOr[bool] = NOT_GIVEN,
base_url: NotGivenOr[str] = NOT_GIVEN) ‑> None-
Expand source code
def update_options( self, *, model: NotGivenOr[TTSModels | str] = NOT_GIVEN, speaker: NotGivenOr[str] = NOT_GIVEN, lang: NotGivenOr[TTSLangs | str] = NOT_GIVEN, # Arcana parameters repetition_penalty: NotGivenOr[float] = NOT_GIVEN, temperature: NotGivenOr[float] = NOT_GIVEN, top_p: NotGivenOr[float] = NOT_GIVEN, max_tokens: NotGivenOr[int] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, # Mistv2 parameters speed_alpha: NotGivenOr[float] = NOT_GIVEN, reduce_latency: NotGivenOr[bool] = NOT_GIVEN, pause_between_brackets: NotGivenOr[bool] = NOT_GIVEN, phonemize_between_brackets: NotGivenOr[bool] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, ) -> None: # WS URL is bound at pool connect; invalidate if any URL-affecting param changed. prev_ws_url = self._ws_url() if self._use_websocket else None if is_given(base_url): self._base_url = base_url if is_given(model): self._opts.model = model self._total_timeout = _timeout_for_model(model) if model == "arcana" and self._opts.arcana_options is None: self._opts.arcana_options = _ArcanaOptions() elif model == "coda" and self._opts.coda_options is None: self._opts.coda_options = _CodaOptions() elif _is_mist_model(model) and self._opts.mist_options is None: self._opts.mist_options = _MistOptions() if is_given(speaker): self._opts.speaker = speaker if self._opts.model == "arcana" and self._opts.arcana_options is not None: if is_given(repetition_penalty): self._opts.arcana_options.repetition_penalty = repetition_penalty if is_given(temperature): self._opts.arcana_options.temperature = temperature if is_given(top_p): self._opts.arcana_options.top_p = top_p if is_given(max_tokens): self._opts.arcana_options.max_tokens = max_tokens if is_given(lang): self._opts.arcana_options.lang = lang if is_given(sample_rate): self._opts.arcana_options.sample_rate = sample_rate elif self._opts.model == "coda" and self._opts.coda_options is not None: if is_given(max_tokens): self._opts.coda_options.max_tokens = max_tokens if is_given(lang): self._opts.coda_options.lang = lang if is_given(sample_rate): self._opts.coda_options.sample_rate = sample_rate elif _is_mist_model(self._opts.model) and self._opts.mist_options is not None: if is_given(lang): self._opts.mist_options.lang = lang if is_given(sample_rate): self._opts.mist_options.sample_rate = sample_rate if is_given(speed_alpha): self._opts.mist_options.speed_alpha = speed_alpha if is_given(reduce_latency): self._opts.mist_options.reduce_latency = reduce_latency if is_given(pause_between_brackets): self._opts.mist_options.pause_between_brackets = pause_between_brackets if is_given(phonemize_between_brackets): self._opts.mist_options.phonemize_between_brackets = phonemize_between_brackets if prev_ws_url is not None and self._ws_url() != prev_ws_url: self._pool.invalidate()
Inherited members