Module livekit.plugins.rime
Classes
class ChunkedStream (tts: TTS,
input_text: str,
opts: _TTSOptions,
api_key: str,
session: aiohttp.ClientSession,
conn_options: APIConnectOptions,
segment_id: NotGivenOr[str] = NOT_GIVEN)-
Expand source code
class ChunkedStream(tts.ChunkedStream): """Synthesize using the chunked api endpoint""" def __init__( self, tts: TTS, input_text: str, opts: _TTSOptions, api_key: str, session: aiohttp.ClientSession, conn_options: APIConnectOptions, segment_id: NotGivenOr[str] = NOT_GIVEN, ) -> None: super().__init__(tts=tts, input_text=input_text, conn_options=conn_options) self._opts = opts self._session = session self._segment_id = segment_id if is_given(segment_id) else utils.shortuuid() self._api_key = api_key async def _run(self) -> None: request_id = utils.shortuuid() headers = { "accept": "audio/mp3", "Authorization": f"Bearer {self._api_key}", "content-type": "application/json", } payload = { "speaker": self._opts.speaker, "text": self._input_text, "modelId": self._opts.model, "samplingRate": self._opts.sample_rate, "speedAlpha": self._opts.speed_alpha, "reduceLatency": self._opts.reduce_latency, "pauseBetweenBrackets": self._opts.pause_between_brackets, "phonemizeBetweenBrackets": self._opts.phonemize_between_brackets, } decoder = utils.codecs.AudioStreamDecoder( sample_rate=self._opts.sample_rate, num_channels=NUM_CHANNELS, ) decode_task: asyncio.Task | None = None try: async with self._session.post( DEFAULT_API_URL, headers=headers, json=payload ) as response: if not response.content_type.startswith("audio"): content = await response.text() logger.error("Rime returned non-audio data: %s", content) return async def _decode_loop(): try: async for bytes_data, _ in response.content.iter_chunks(): decoder.push(bytes_data) finally: decoder.end_input() decode_task = asyncio.create_task(_decode_loop()) emitter = tts.SynthesizedAudioEmitter( event_ch=self._event_ch, request_id=request_id, segment_id=self._segment_id, ) async for frame in decoder: emitter.push(frame) emitter.flush() except asyncio.TimeoutError as e: raise APITimeoutError() from e except aiohttp.ClientResponseError as e: raise APIStatusError( message=e.message, status_code=e.status, request_id=request_id, body=None, ) from e except Exception as e: raise APIConnectionError() from e finally: if decode_task: await utils.aio.gracefully_cancel(decode_task) await decoder.aclose()
Synthesize using the chunked api endpoint
Ancestors
- livekit.agents.tts.tts.ChunkedStream
- abc.ABC
class TTS (*,
model: TTSModels | str = 'mist',
speaker: str = 'lagoon',
sample_rate: int = 22050,
speed_alpha: float = 1.0,
reduce_latency: bool = False,
pause_between_brackets: bool = False,
phonemize_between_brackets: bool = False,
api_key: NotGivenOr[str] = NOT_GIVEN,
http_session: aiohttp.ClientSession | None = None)-
Expand source code
class TTS(tts.TTS): def __init__( self, *, model: TTSModels | str = "mist", speaker: str = "lagoon", sample_rate: int = 22050, speed_alpha: float = 1.0, reduce_latency: bool = False, pause_between_brackets: bool = False, phonemize_between_brackets: bool = False, api_key: NotGivenOr[str] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, ) -> None: super().__init__( capabilities=tts.TTSCapabilities( streaming=False, ), sample_rate=sample_rate, num_channels=NUM_CHANNELS, ) self._api_key = api_key if is_given(api_key) else os.environ.get("RIME_API_KEY") if not self._api_key: raise ValueError( "Rime API key is required, either as argument or set RIME_API_KEY environmental variable" # noqa: E501 ) self._opts = _TTSOptions( model=model, speaker=speaker, sample_rate=sample_rate, speed_alpha=speed_alpha, reduce_latency=reduce_latency, pause_between_brackets=pause_between_brackets, phonemize_between_brackets=phonemize_between_brackets, ) self._session = http_session def _ensure_session(self) -> aiohttp.ClientSession: if not self._session: self._session = utils.http_context.http_session() return self._session def synthesize( self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, segment_id: NotGivenOr[str] = NOT_GIVEN, ) -> ChunkedStream: return ChunkedStream( tts=self, input_text=text, conn_options=conn_options, opts=self._opts, session=self._ensure_session(), segment_id=segment_id if is_given(segment_id) else None, api_key=self._api_key, ) def update_options( self, *, model: NotGivenOr[TTSModels | str] = NOT_GIVEN, speaker: NotGivenOr[str] = NOT_GIVEN, ) -> None: if is_given(model): self._opts.model = model if is_given(speaker): self._opts.speaker = speaker
Helper class that provides a standard way to create an ABC using inheritance.
Ancestors
- livekit.agents.tts.tts.TTS
- abc.ABC
- EventEmitter
- typing.Generic
Methods
def synthesize(self,
text: str,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0),
segment_id: NotGivenOr[str] = NOT_GIVEN) ‑> livekit.plugins.rime.tts.ChunkedStream-
Expand source code
def synthesize( self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, segment_id: NotGivenOr[str] = NOT_GIVEN, ) -> ChunkedStream: return ChunkedStream( tts=self, input_text=text, conn_options=conn_options, opts=self._opts, session=self._ensure_session(), segment_id=segment_id if is_given(segment_id) else None, api_key=self._api_key, )
def update_options(self,
*,
model: NotGivenOr[TTSModels | str] = NOT_GIVEN,
speaker: NotGivenOr[str] = NOT_GIVEN) ‑> None-
Expand source code
def update_options( self, *, model: NotGivenOr[TTSModels | str] = NOT_GIVEN, speaker: NotGivenOr[str] = NOT_GIVEN, ) -> None: if is_given(model): self._opts.model = model if is_given(speaker): self._opts.speaker = speaker
Inherited members