Module livekit.plugins.rime

Rime plugin for LiveKit Agents

See https://docs.livekit.io/agents/integrations/tts/rime/ for more information.

Classes

class ChunkedStream (tts: TTS,
input_text: str,
conn_options: APIConnectOptions)
Expand source code
class ChunkedStream(tts.ChunkedStream):
    """Synthesize using the chunked api endpoint"""

    def __init__(self, tts: TTS, input_text: str, conn_options: APIConnectOptions) -> None:
        super().__init__(tts=tts, input_text=input_text, conn_options=conn_options)
        self._tts: TTS = tts
        self._opts = replace(tts._opts)

    async def _run(self, output_emitter: tts.AudioEmitter) -> None:
        payload: dict = {
            "speaker": self._opts.speaker,
            "text": self._input_text,
            "modelId": self._opts.model,
            **_model_params(self._opts),
        }
        format = "audio/pcm"
        if self._opts.model == "arcana" and self._opts.arcana_options is not None:
            if is_given(self._opts.arcana_options.sample_rate):
                payload["samplingRate"] = self._opts.arcana_options.sample_rate
        elif self._opts.model == "coda" and self._opts.coda_options is not None:
            if is_given(self._opts.coda_options.sample_rate):
                payload["samplingRate"] = self._opts.coda_options.sample_rate
        elif _is_mist_model(self._opts.model) and self._opts.mist_options is not None:
            mist_opts = self._opts.mist_options
            if is_given(mist_opts.sample_rate):
                payload["samplingRate"] = mist_opts.sample_rate
            if self._opts.model == "mistv2" and is_given(mist_opts.reduce_latency):
                payload["reduceLatency"] = mist_opts.reduce_latency

        try:
            async with self._tts._ensure_session().post(
                self._tts._base_url,
                headers={
                    "accept": format,
                    "Authorization": f"Bearer {self._tts._api_key}",
                    "content-type": "application/json",
                },
                json=payload,
                timeout=aiohttp.ClientTimeout(
                    total=self._tts._total_timeout, sock_connect=self._conn_options.timeout
                ),
            ) as resp:
                resp.raise_for_status()

                if not resp.content_type.startswith("audio"):
                    content = await resp.text()
                    logger.error("Rime returned non-audio data: %s", content)
                    return

                output_emitter.initialize(
                    request_id=utils.shortuuid(),
                    sample_rate=self._tts.sample_rate,
                    num_channels=NUM_CHANNELS,
                    mime_type=format,
                )

                async for data, _ in resp.content.iter_chunks():
                    output_emitter.push(data)

        except asyncio.TimeoutError:
            raise APITimeoutError() from None
        except aiohttp.ClientResponseError as e:
            raise APIStatusError(
                message=e.message, status_code=e.status, request_id=None, body=None
            ) from None
        except Exception as e:
            raise APIConnectionError() from e

Synthesize using the chunked api endpoint

Ancestors

  • livekit.agents.tts.tts.ChunkedStream
  • abc.ABC
class TTS (*,
base_url: NotGivenOr[str] = NOT_GIVEN,
model: TTSModels | str = 'arcana',
speaker: NotGivenOr[ArcanaVoices | str] = NOT_GIVEN,
lang: TTSLangs | str = 'eng',
repetition_penalty: NotGivenOr[float] = NOT_GIVEN,
temperature: NotGivenOr[float] = NOT_GIVEN,
top_p: NotGivenOr[float] = NOT_GIVEN,
max_tokens: NotGivenOr[int] = NOT_GIVEN,
sample_rate: int = 22050,
speed_alpha: NotGivenOr[float] = NOT_GIVEN,
reduce_latency: NotGivenOr[bool] = NOT_GIVEN,
pause_between_brackets: NotGivenOr[bool] = NOT_GIVEN,
phonemize_between_brackets: NotGivenOr[bool] = NOT_GIVEN,
api_key: NotGivenOr[str] = NOT_GIVEN,
http_session: aiohttp.ClientSession | None = None,
use_websocket: bool = False,
segment: NotGivenOr[str] = NOT_GIVEN,
tokenizer: NotGivenOr[tokenize.SentenceTokenizer] = NOT_GIVEN)
Expand source code
class TTS(tts.TTS):
    def __init__(
        self,
        *,
        base_url: NotGivenOr[str] = NOT_GIVEN,
        model: TTSModels | str = "arcana",
        speaker: NotGivenOr[ArcanaVoices | str] = NOT_GIVEN,
        lang: TTSLangs | str = "eng",
        # Arcana options
        repetition_penalty: NotGivenOr[float] = NOT_GIVEN,
        temperature: NotGivenOr[float] = NOT_GIVEN,
        top_p: NotGivenOr[float] = NOT_GIVEN,
        max_tokens: NotGivenOr[int] = NOT_GIVEN,
        # Mistv2 options
        sample_rate: int = 22050,
        speed_alpha: NotGivenOr[float] = NOT_GIVEN,
        reduce_latency: NotGivenOr[bool] = NOT_GIVEN,
        pause_between_brackets: NotGivenOr[bool] = NOT_GIVEN,
        phonemize_between_brackets: NotGivenOr[bool] = NOT_GIVEN,
        api_key: NotGivenOr[str] = NOT_GIVEN,
        http_session: aiohttp.ClientSession | None = None,
        use_websocket: bool = False,
        segment: NotGivenOr[str] = NOT_GIVEN,
        tokenizer: NotGivenOr[tokenize.SentenceTokenizer] = NOT_GIVEN,
    ) -> None:
        if is_given(base_url):
            # Infer streaming mode from URL prefix; an explicit use_websocket=True still wins.
            use_websocket = use_websocket or base_url.startswith(("ws://", "wss://"))
            resolved_base_url = base_url
        else:
            resolved_base_url = RIME_WS_BASE_URL if use_websocket else RIME_BASE_URL

        super().__init__(
            capabilities=tts.TTSCapabilities(
                streaming=use_websocket,
                aligned_transcript=use_websocket,
            ),
            sample_rate=sample_rate,
            num_channels=NUM_CHANNELS,
        )
        self._api_key = api_key if is_given(api_key) else os.environ.get("RIME_API_KEY")
        if not self._api_key:
            raise ValueError(
                "Rime API key is required, either as argument or set RIME_API_KEY environmental variable"  # noqa: E501
            )

        if not is_given(speaker):
            if _is_mist_model(model):
                speaker = DefaultMistVoice
            elif model == "coda":
                speaker = DefaultCodaVoice
            else:
                speaker = "astra"

        self._opts = _TTSOptions(
            model=model,
            speaker=speaker,
        )
        if model == "arcana":
            self._opts.arcana_options = _ArcanaOptions(
                repetition_penalty=repetition_penalty,
                temperature=temperature,
                top_p=top_p,
                max_tokens=max_tokens,
                lang=lang,
                sample_rate=sample_rate,
            )
        elif model == "coda":
            self._opts.coda_options = _CodaOptions(
                max_tokens=max_tokens,
                lang=lang,
                sample_rate=sample_rate,
            )
        elif _is_mist_model(model):
            self._opts.mist_options = _MistOptions(
                lang=lang,
                sample_rate=sample_rate,
                speed_alpha=speed_alpha,
                reduce_latency=reduce_latency,
                pause_between_brackets=pause_between_brackets,
                phonemize_between_brackets=phonemize_between_brackets,
            )
        self._session = http_session
        self._base_url = resolved_base_url
        self._use_websocket = use_websocket
        self._segment = segment if is_given(segment) else "bySentence"

        self._total_timeout = _timeout_for_model(model)

        self._streams: weakref.WeakSet[SynthesizeStream] = weakref.WeakSet()
        self._sentence_tokenizer = (
            tokenizer if is_given(tokenizer) else tokenize.blingfire.SentenceTokenizer()
        )
        self._pool = utils.ConnectionPool[aiohttp.ClientWebSocketResponse](
            connect_cb=self._connect_ws,
            close_cb=self._close_ws,
            max_session_duration=300,
            mark_refreshed_on_get=True,
        )

    @property
    def model(self) -> str:
        return self._opts.model

    @property
    def provider(self) -> str:
        return "Rime"

    def _ensure_session(self) -> aiohttp.ClientSession:
        if not self._session:
            self._session = utils.http_context.http_session()

        return self._session

    def _ws_url(self) -> str:
        params: dict[str, object] = {
            "speaker": self._opts.speaker,
            "modelId": self._opts.model,
            "audioFormat": "pcm",
            "samplingRate": self._sample_rate,
            "segment": self._segment,
            **_model_params(self._opts),
        }
        encoded = {
            k: ("true" if v else "false") if isinstance(v, bool) else v for k, v in params.items()
        }
        return f"{self._base_url}/ws3?{urlencode(encoded)}"

    async def _connect_ws(self, timeout: float) -> aiohttp.ClientWebSocketResponse:
        session = self._ensure_session()
        return await asyncio.wait_for(
            session.ws_connect(
                self._ws_url(), headers={"Authorization": f"Bearer {self._api_key}"}
            ),
            timeout,
        )

    async def _close_ws(self, ws: aiohttp.ClientWebSocketResponse) -> None:
        try:
            await ws.send_str(json.dumps({"operation": "eos"}))
            try:
                await asyncio.wait_for(ws.receive(), timeout=1.0)
            except asyncio.TimeoutError:
                pass
        except Exception as e:
            logger.warning(f"Error during Rime WS close sequence: {e}")
        finally:
            await ws.close()

    def prewarm(self) -> None:
        if self._use_websocket:
            self._pool.prewarm()

    def stream(
        self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS
    ) -> SynthesizeStream:
        if not self._use_websocket:
            raise RuntimeError(
                "Rime TTS streaming requires use_websocket=True at construction time"
            )
        s = SynthesizeStream(tts=self, conn_options=conn_options)
        self._streams.add(s)
        return s

    async def aclose(self) -> None:
        for s in list(self._streams):
            await s.aclose()
        self._streams.clear()
        await self._pool.aclose()

    def synthesize(
        self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS
    ) -> ChunkedStream:
        if self._use_websocket:
            raise RuntimeError(
                "Rime TTS one-shot synthesize requires use_websocket=False at construction time"
            )
        return ChunkedStream(tts=self, input_text=text, conn_options=conn_options)

    def update_options(
        self,
        *,
        model: NotGivenOr[TTSModels | str] = NOT_GIVEN,
        speaker: NotGivenOr[str] = NOT_GIVEN,
        lang: NotGivenOr[TTSLangs | str] = NOT_GIVEN,
        # Arcana parameters
        repetition_penalty: NotGivenOr[float] = NOT_GIVEN,
        temperature: NotGivenOr[float] = NOT_GIVEN,
        top_p: NotGivenOr[float] = NOT_GIVEN,
        max_tokens: NotGivenOr[int] = NOT_GIVEN,
        sample_rate: NotGivenOr[int] = NOT_GIVEN,
        # Mistv2 parameters
        speed_alpha: NotGivenOr[float] = NOT_GIVEN,
        reduce_latency: NotGivenOr[bool] = NOT_GIVEN,
        pause_between_brackets: NotGivenOr[bool] = NOT_GIVEN,
        phonemize_between_brackets: NotGivenOr[bool] = NOT_GIVEN,
        base_url: NotGivenOr[str] = NOT_GIVEN,
    ) -> None:
        # WS URL is bound at pool connect; invalidate if any URL-affecting param changed.
        prev_ws_url = self._ws_url() if self._use_websocket else None
        if is_given(base_url):
            self._base_url = base_url
        if is_given(model):
            self._opts.model = model
            self._total_timeout = _timeout_for_model(model)

            if model == "arcana" and self._opts.arcana_options is None:
                self._opts.arcana_options = _ArcanaOptions()
            elif model == "coda" and self._opts.coda_options is None:
                self._opts.coda_options = _CodaOptions()
            elif _is_mist_model(model) and self._opts.mist_options is None:
                self._opts.mist_options = _MistOptions()

        if is_given(speaker):
            self._opts.speaker = speaker

        if self._opts.model == "arcana" and self._opts.arcana_options is not None:
            if is_given(repetition_penalty):
                self._opts.arcana_options.repetition_penalty = repetition_penalty
            if is_given(temperature):
                self._opts.arcana_options.temperature = temperature
            if is_given(top_p):
                self._opts.arcana_options.top_p = top_p
            if is_given(max_tokens):
                self._opts.arcana_options.max_tokens = max_tokens
            if is_given(lang):
                self._opts.arcana_options.lang = lang
            if is_given(sample_rate):
                self._opts.arcana_options.sample_rate = sample_rate

        elif self._opts.model == "coda" and self._opts.coda_options is not None:
            if is_given(max_tokens):
                self._opts.coda_options.max_tokens = max_tokens
            if is_given(lang):
                self._opts.coda_options.lang = lang
            if is_given(sample_rate):
                self._opts.coda_options.sample_rate = sample_rate

        elif _is_mist_model(self._opts.model) and self._opts.mist_options is not None:
            if is_given(lang):
                self._opts.mist_options.lang = lang
            if is_given(sample_rate):
                self._opts.mist_options.sample_rate = sample_rate
            if is_given(speed_alpha):
                self._opts.mist_options.speed_alpha = speed_alpha
            if is_given(reduce_latency):
                self._opts.mist_options.reduce_latency = reduce_latency
            if is_given(pause_between_brackets):
                self._opts.mist_options.pause_between_brackets = pause_between_brackets
            if is_given(phonemize_between_brackets):
                self._opts.mist_options.phonemize_between_brackets = phonemize_between_brackets

        if prev_ws_url is not None and self._ws_url() != prev_ws_url:
            self._pool.invalidate()

Helper class that provides a standard way to create an ABC using inheritance.

Ancestors

  • livekit.agents.tts.tts.TTS
  • abc.ABC
  • EventEmitter
  • typing.Generic

Instance variables

prop model : str
Expand source code
@property
def model(self) -> str:
    return self._opts.model

Get the model name/identifier for this TTS instance.

Returns

The model name if available, "unknown" otherwise.

Note

Plugins should override this property to provide their model information.

prop provider : str
Expand source code
@property
def provider(self) -> str:
    return "Rime"

Get the provider name/identifier for this TTS instance.

Returns

The provider name if available, "unknown" otherwise.

Note

Plugins should override this property to provide their provider information.

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    for s in list(self._streams):
        await s.aclose()
    self._streams.clear()
    await self._pool.aclose()
def prewarm(self) ‑> None
Expand source code
def prewarm(self) -> None:
    if self._use_websocket:
        self._pool.prewarm()

Pre-warm connection to the TTS service

def stream(self,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.rime.tts.SynthesizeStream
Expand source code
def stream(
    self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS
) -> SynthesizeStream:
    if not self._use_websocket:
        raise RuntimeError(
            "Rime TTS streaming requires use_websocket=True at construction time"
        )
    s = SynthesizeStream(tts=self, conn_options=conn_options)
    self._streams.add(s)
    return s
def synthesize(self,
text: str,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.rime.tts.ChunkedStream
Expand source code
def synthesize(
    self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS
) -> ChunkedStream:
    if self._use_websocket:
        raise RuntimeError(
            "Rime TTS one-shot synthesize requires use_websocket=False at construction time"
        )
    return ChunkedStream(tts=self, input_text=text, conn_options=conn_options)
def update_options(self,
*,
model: NotGivenOr[TTSModels | str] = NOT_GIVEN,
speaker: NotGivenOr[str] = NOT_GIVEN,
lang: NotGivenOr[TTSLangs | str] = NOT_GIVEN,
repetition_penalty: NotGivenOr[float] = NOT_GIVEN,
temperature: NotGivenOr[float] = NOT_GIVEN,
top_p: NotGivenOr[float] = NOT_GIVEN,
max_tokens: NotGivenOr[int] = NOT_GIVEN,
sample_rate: NotGivenOr[int] = NOT_GIVEN,
speed_alpha: NotGivenOr[float] = NOT_GIVEN,
reduce_latency: NotGivenOr[bool] = NOT_GIVEN,
pause_between_brackets: NotGivenOr[bool] = NOT_GIVEN,
phonemize_between_brackets: NotGivenOr[bool] = NOT_GIVEN,
base_url: NotGivenOr[str] = NOT_GIVEN) ‑> None
Expand source code
def update_options(
    self,
    *,
    model: NotGivenOr[TTSModels | str] = NOT_GIVEN,
    speaker: NotGivenOr[str] = NOT_GIVEN,
    lang: NotGivenOr[TTSLangs | str] = NOT_GIVEN,
    # Arcana parameters
    repetition_penalty: NotGivenOr[float] = NOT_GIVEN,
    temperature: NotGivenOr[float] = NOT_GIVEN,
    top_p: NotGivenOr[float] = NOT_GIVEN,
    max_tokens: NotGivenOr[int] = NOT_GIVEN,
    sample_rate: NotGivenOr[int] = NOT_GIVEN,
    # Mistv2 parameters
    speed_alpha: NotGivenOr[float] = NOT_GIVEN,
    reduce_latency: NotGivenOr[bool] = NOT_GIVEN,
    pause_between_brackets: NotGivenOr[bool] = NOT_GIVEN,
    phonemize_between_brackets: NotGivenOr[bool] = NOT_GIVEN,
    base_url: NotGivenOr[str] = NOT_GIVEN,
) -> None:
    # WS URL is bound at pool connect; invalidate if any URL-affecting param changed.
    prev_ws_url = self._ws_url() if self._use_websocket else None
    if is_given(base_url):
        self._base_url = base_url
    if is_given(model):
        self._opts.model = model
        self._total_timeout = _timeout_for_model(model)

        if model == "arcana" and self._opts.arcana_options is None:
            self._opts.arcana_options = _ArcanaOptions()
        elif model == "coda" and self._opts.coda_options is None:
            self._opts.coda_options = _CodaOptions()
        elif _is_mist_model(model) and self._opts.mist_options is None:
            self._opts.mist_options = _MistOptions()

    if is_given(speaker):
        self._opts.speaker = speaker

    if self._opts.model == "arcana" and self._opts.arcana_options is not None:
        if is_given(repetition_penalty):
            self._opts.arcana_options.repetition_penalty = repetition_penalty
        if is_given(temperature):
            self._opts.arcana_options.temperature = temperature
        if is_given(top_p):
            self._opts.arcana_options.top_p = top_p
        if is_given(max_tokens):
            self._opts.arcana_options.max_tokens = max_tokens
        if is_given(lang):
            self._opts.arcana_options.lang = lang
        if is_given(sample_rate):
            self._opts.arcana_options.sample_rate = sample_rate

    elif self._opts.model == "coda" and self._opts.coda_options is not None:
        if is_given(max_tokens):
            self._opts.coda_options.max_tokens = max_tokens
        if is_given(lang):
            self._opts.coda_options.lang = lang
        if is_given(sample_rate):
            self._opts.coda_options.sample_rate = sample_rate

    elif _is_mist_model(self._opts.model) and self._opts.mist_options is not None:
        if is_given(lang):
            self._opts.mist_options.lang = lang
        if is_given(sample_rate):
            self._opts.mist_options.sample_rate = sample_rate
        if is_given(speed_alpha):
            self._opts.mist_options.speed_alpha = speed_alpha
        if is_given(reduce_latency):
            self._opts.mist_options.reduce_latency = reduce_latency
        if is_given(pause_between_brackets):
            self._opts.mist_options.pause_between_brackets = pause_between_brackets
        if is_given(phonemize_between_brackets):
            self._opts.mist_options.phonemize_between_brackets = phonemize_between_brackets

    if prev_ws_url is not None and self._ws_url() != prev_ws_url:
        self._pool.invalidate()

Inherited members