Module livekit.plugins.rime

Rime plugin for LiveKit Agents

See https://docs.livekit.io/agents/integrations/tts/rime/ for more information.

Classes

class ChunkedStream (tts: TTS,
input_text: str,
opts: _TTSOptions,
api_key: str,
session: aiohttp.ClientSession,
conn_options: APIConnectOptions,
segment_id: NotGivenOr[str] = NOT_GIVEN)
Expand source code
class ChunkedStream(tts.ChunkedStream):
    """Synthesize using the chunked api endpoint"""

    def __init__(
        self,
        tts: TTS,
        input_text: str,
        opts: _TTSOptions,
        api_key: str,
        session: aiohttp.ClientSession,
        conn_options: APIConnectOptions,
        segment_id: NotGivenOr[str] = NOT_GIVEN,
    ) -> None:
        super().__init__(tts=tts, input_text=input_text, conn_options=conn_options)
        self._opts = opts
        self._session = session
        self._segment_id = segment_id if is_given(segment_id) else utils.shortuuid()
        self._api_key = api_key

    async def _run(self) -> None:
        request_id = utils.shortuuid()
        payload = {
            "speaker": self._opts.speaker,
            "text": self._input_text,
            "modelId": self._opts.model,
        }
        format = "mp3"
        if self._opts.model == "arcana":
            arcana_opts = self._opts.arcana_options
            if is_given(arcana_opts.repetition_penalty):
                payload["repetition_penalty"] = arcana_opts.repetition_penalty
            if is_given(arcana_opts.temperature):
                payload["temperature"] = arcana_opts.temperature
            if is_given(arcana_opts.top_p):
                payload["top_p"] = arcana_opts.top_p
            if is_given(arcana_opts.max_tokens):
                payload["max_tokens"] = arcana_opts.max_tokens
            format = "wav"
        elif self._opts.model == "mistv2":
            mistv2_opts = self._opts.mistv2_options
            if is_given(mistv2_opts.lang):
                payload["lang"] = mistv2_opts.lang
            if is_given(mistv2_opts.sample_rate):
                payload["samplingRate"] = mistv2_opts.sample_rate
            if is_given(mistv2_opts.speed_alpha):
                payload["speedAlpha"] = mistv2_opts.speed_alpha
            if is_given(mistv2_opts.reduce_latency):
                payload["reduceLatency"] = mistv2_opts.reduce_latency
            if is_given(mistv2_opts.pause_between_brackets):
                payload["pauseBetweenBrackets"] = mistv2_opts.pause_between_brackets
            if is_given(mistv2_opts.phonemize_between_brackets):
                payload["phonemizeBetweenBrackets"] = mistv2_opts.phonemize_between_brackets

        headers = {
            "accept": f"audio/{format}",
            "Authorization": f"Bearer {self._api_key}",
            "content-type": "application/json",
        }
        decoder = utils.codecs.AudioStreamDecoder(
            sample_rate=self._tts.sample_rate,
            num_channels=NUM_CHANNELS,
            format=format,
        )

        decode_task: asyncio.Task | None = None
        try:
            async with self._session.post(
                self._tts._base_url,
                headers=headers,
                json=payload,
                timeout=aiohttp.ClientTimeout(connect=self._conn_options.timeout, total=30),
            ) as response:
                if not response.content_type.startswith("audio"):
                    content = await response.text()
                    logger.error("Rime returned non-audio data: %s", content)
                    return

                async def _decode_loop():
                    try:
                        async for bytes_data, _ in response.content.iter_chunks():
                            decoder.push(bytes_data)
                    finally:
                        decoder.end_input()

                decode_task = asyncio.create_task(_decode_loop())
                emitter = tts.SynthesizedAudioEmitter(
                    event_ch=self._event_ch,
                    request_id=request_id,
                    segment_id=self._segment_id,
                )

                async for frame in decoder:
                    emitter.push(frame)
                emitter.flush()

        except asyncio.TimeoutError as e:
            raise APITimeoutError() from e
        except aiohttp.ClientResponseError as e:
            raise APIStatusError(
                message=e.message,
                status_code=e.status,
                request_id=request_id,
                body=None,
            ) from e
        except Exception as e:
            raise APIConnectionError() from e
        finally:
            if decode_task:
                await utils.aio.gracefully_cancel(decode_task)
            await decoder.aclose()

Synthesize using the chunked api endpoint

Ancestors

  • livekit.agents.tts.tts.ChunkedStream
  • abc.ABC
class TTS (*,
base_url: str = 'https://users.rime.ai/v1/rime-tts',
model: TTSModels | str = 'arcana',
speaker: NotGivenOr[ArcanaVoices | str] = NOT_GIVEN,
repetition_penalty: NotGivenOr[float] = NOT_GIVEN,
temperature: NotGivenOr[float] = NOT_GIVEN,
top_p: NotGivenOr[float] = NOT_GIVEN,
max_tokens: NotGivenOr[int] = NOT_GIVEN,
lang: TTSLangs | str = 'eng',
sample_rate: int = 22050,
speed_alpha: NotGivenOr[float] = NOT_GIVEN,
reduce_latency: NotGivenOr[bool] = NOT_GIVEN,
pause_between_brackets: NotGivenOr[bool] = NOT_GIVEN,
phonemize_between_brackets: NotGivenOr[bool] = NOT_GIVEN,
api_key: NotGivenOr[str] = NOT_GIVEN,
http_session: aiohttp.ClientSession | None = None)
Expand source code
class TTS(tts.TTS):
    def __init__(
        self,
        *,
        base_url: str = "https://users.rime.ai/v1/rime-tts",
        model: TTSModels | str = "arcana",
        speaker: NotGivenOr[ArcanaVoices | str] = NOT_GIVEN,
        # Arcana options
        repetition_penalty: NotGivenOr[float] = NOT_GIVEN,
        temperature: NotGivenOr[float] = NOT_GIVEN,
        top_p: NotGivenOr[float] = NOT_GIVEN,
        max_tokens: NotGivenOr[int] = NOT_GIVEN,
        # Mistv2 options
        lang: TTSLangs | str = "eng",
        sample_rate: int = 22050,
        speed_alpha: NotGivenOr[float] = NOT_GIVEN,
        reduce_latency: NotGivenOr[bool] = NOT_GIVEN,
        pause_between_brackets: NotGivenOr[bool] = NOT_GIVEN,
        phonemize_between_brackets: NotGivenOr[bool] = NOT_GIVEN,
        api_key: NotGivenOr[str] = NOT_GIVEN,
        http_session: aiohttp.ClientSession | None = None,
    ) -> None:
        super().__init__(
            capabilities=tts.TTSCapabilities(
                streaming=False,
            ),
            sample_rate=sample_rate,
            num_channels=NUM_CHANNELS,
        )
        self._api_key = api_key if is_given(api_key) else os.environ.get("RIME_API_KEY")
        if not self._api_key:
            raise ValueError(
                "Rime API key is required, either as argument or set RIME_API_KEY environmental variable"  # noqa: E501
            )

        if not is_given(speaker):
            if model == "mistv2":
                speaker = "cove"
            else:
                speaker = "astra"

        self._opts = _TTSOptions(
            model=model,
            speaker=speaker,
        )
        if model == "arcana":
            self._opts.arcana_options = _ArcanaOptions(
                repetition_penalty=repetition_penalty,
                temperature=temperature,
                top_p=top_p,
                max_tokens=max_tokens,
            )
        elif model == "mistv2":
            self._opts.mistv2_options = _Mistv2Options(
                lang=lang,
                sample_rate=sample_rate,
                speed_alpha=speed_alpha,
                reduce_latency=reduce_latency,
                pause_between_brackets=pause_between_brackets,
                phonemize_between_brackets=phonemize_between_brackets,
            )
        self._session = http_session
        self._base_url = base_url

    def _ensure_session(self) -> aiohttp.ClientSession:
        if not self._session:
            self._session = utils.http_context.http_session()

        return self._session

    def synthesize(
        self,
        text: str,
        *,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
        segment_id: NotGivenOr[str] = NOT_GIVEN,
    ) -> ChunkedStream:
        return ChunkedStream(
            tts=self,
            input_text=text,
            conn_options=conn_options,
            opts=self._opts,
            session=self._ensure_session(),
            segment_id=segment_id if is_given(segment_id) else None,
            api_key=self._api_key,
        )

    def update_options(
        self,
        *,
        model: NotGivenOr[TTSModels | str] = NOT_GIVEN,
        speaker: NotGivenOr[str] = NOT_GIVEN,
    ) -> None:
        if is_given(model):
            self._opts.model = model
        if is_given(speaker):
            self._opts.speaker = speaker

Helper class that provides a standard way to create an ABC using inheritance.

Ancestors

  • livekit.agents.tts.tts.TTS
  • abc.ABC
  • EventEmitter
  • typing.Generic

Methods

def synthesize(self,
text: str,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0),
segment_id: NotGivenOr[str] = NOT_GIVEN) ‑> livekit.plugins.rime.tts.ChunkedStream
Expand source code
def synthesize(
    self,
    text: str,
    *,
    conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    segment_id: NotGivenOr[str] = NOT_GIVEN,
) -> ChunkedStream:
    return ChunkedStream(
        tts=self,
        input_text=text,
        conn_options=conn_options,
        opts=self._opts,
        session=self._ensure_session(),
        segment_id=segment_id if is_given(segment_id) else None,
        api_key=self._api_key,
    )
def update_options(self,
*,
model: NotGivenOr[TTSModels | str] = NOT_GIVEN,
speaker: NotGivenOr[str] = NOT_GIVEN) ‑> None
Expand source code
def update_options(
    self,
    *,
    model: NotGivenOr[TTSModels | str] = NOT_GIVEN,
    speaker: NotGivenOr[str] = NOT_GIVEN,
) -> None:
    if is_given(model):
        self._opts.model = model
    if is_given(speaker):
        self._opts.speaker = speaker

Inherited members