Module livekit.plugins.resemble

Sub-modules

livekit.plugins.resemble.models

Classes

class ChunkedStream (*,
tts: TTS,
input_text: str,
opts: _TTSOptions,
conn_options: APIConnectOptions,
api_key: str,
session: aiohttp.ClientSession)
Expand source code
class ChunkedStream(tts.ChunkedStream):
    """Synthesize text into speech in one go using Resemble AI's REST API."""

    def __init__(
        self,
        *,
        tts: TTS,
        input_text: str,
        opts: _TTSOptions,
        conn_options: APIConnectOptions,
        api_key: str,
        session: aiohttp.ClientSession,
    ) -> None:
        super().__init__(tts=tts, input_text=input_text, conn_options=conn_options)
        self._opts, self._session, self._api_key = opts, session, api_key

    async def _run(self) -> None:
        request_id = utils.shortuuid()

        # Create request headers
        headers = {
            "Authorization": f"Bearer {self._api_key}",
            "Content-Type": "application/json",
            "Accept": "application/json",  # Expect JSON response
        }

        # Create request payload
        payload = {
            "voice_uuid": self._opts.voice_uuid,
            "data": self._input_text,
            "sample_rate": self._opts.sample_rate,
            "precision": "PCM_16",
        }

        decoder = utils.codecs.AudioStreamDecoder(
            sample_rate=self._opts.sample_rate,
            num_channels=NUM_CHANNELS,
        )

        try:
            async with self._session.post(
                RESEMBLE_REST_API_URL,
                headers=headers,
                json=payload,
                timeout=aiohttp.ClientTimeout(
                    total=30,
                    sock_connect=self._conn_options.timeout,
                ),
            ) as response:
                response.raise_for_status()
                response_json = await response.json()

                # Check for success
                if not response_json.get("success", False):
                    issues = response_json.get("issues", ["Unknown error"])
                    error_msg = "; ".join(issues)
                    raise APIStatusError(
                        message=f"Resemble API returned failure: {error_msg}",
                        status_code=response.status,
                        request_id=request_id,
                        body=json.dumps(response_json),
                    )

                # Extract base64-encoded audio content
                audio_content_b64 = response_json.get("audio_content")
                if not audio_content_b64:
                    raise APIStatusError(
                        message="No audio content in response",
                        status_code=response.status,
                        request_id=request_id,
                        body=json.dumps(response_json),
                    )

                # Decode base64 to get raw audio bytes
                audio_bytes = base64.b64decode(audio_content_b64)
                decoder.push(audio_bytes)
                decoder.end_input()

                emitter = tts.SynthesizedAudioEmitter(
                    event_ch=self._event_ch,
                    request_id=request_id,
                )
                async for frame in decoder:
                    emitter.push(frame)
                emitter.flush()

        except aiohttp.ClientResponseError as e:
            raise APIStatusError(
                message=e.message,
                status_code=e.status,
                request_id=request_id,
                body=f"resemble api error: {str(e)}",
            ) from e
        except asyncio.TimeoutError as e:
            raise APITimeoutError() from e
        except aiohttp.ClientError as e:
            raise APIConnectionError(
                message=f"Resemble API connection error: {str(e)}",
            ) from e
        except Exception as e:
            raise APIConnectionError(f"Error during synthesis: {str(e)}") from e
        finally:
            await decoder.aclose()

Synthesize text into speech in one go using Resemble AI's REST API.

Ancestors

Inherited members

class SynthesizeStream (*,
tts: TTS,
opts: _TTSOptions,
pool: utils.ConnectionPool[aiohttp.ClientWebSocketResponse],
api_key: str)
Expand source code
class SynthesizeStream(tts.SynthesizeStream):
    """Stream-based text-to-speech synthesis using Resemble AI WebSocket API.


    This implementation connects to Resemble's WebSocket API for real-time streaming
    synthesis. Note that this requires a Business plan subscription with Resemble AI.
    """

    def __init__(
        self,
        *,
        tts: TTS,
        opts: _TTSOptions,
        pool: utils.ConnectionPool[aiohttp.ClientWebSocketResponse],
        api_key: str,
    ):
        super().__init__(tts=tts)
        self._opts, self._pool, self._api_key = opts, pool, api_key

    async def _run(self) -> None:
        request_id = utils.shortuuid()
        self._segments_ch = utils.aio.Chan[tokenize.SentenceStream]()

        @utils.log_exceptions(logger=logger)
        async def _tokenize_input():
            """tokenize text from the input_ch to words"""
            input_stream = None
            async for input in self._input_ch:
                if isinstance(input, str):
                    if input_stream is None:
                        # new segment (after flush for e.g)
                        input_stream = self._opts.tokenizer.stream()
                        self._segments_ch.send_nowait(input_stream)
                    input_stream.push_text(input)
                elif isinstance(input, self._FlushSentinel):
                    if input_stream is not None:
                        input_stream.end_input()
                    input_stream = None
            if input_stream is not None:
                input_stream.end_input()
            self._segments_ch.close()

        @utils.log_exceptions(logger=logger)
        async def _process_segments():
            async for input_stream in self._segments_ch:
                await self._run_ws(input_stream)

        tasks = [
            asyncio.create_task(_tokenize_input()),
            asyncio.create_task(_process_segments()),
        ]
        try:
            await asyncio.gather(*tasks)
        except asyncio.TimeoutError as e:
            raise APITimeoutError() from e
        except aiohttp.ClientResponseError as e:
            raise APIStatusError(
                message=e.message,
                status_code=e.status,
                request_id=request_id,
                body=None,
            ) from e
        except Exception as e:
            raise APIConnectionError() from e
        finally:
            await utils.aio.gracefully_cancel(*tasks)

    async def _run_ws(
        self,
        input_stream: tokenize.SentenceStream,
    ) -> None:
        async with self._pool.connection() as ws:
            segment_id = utils.shortuuid()
            decoder = utils.codecs.AudioStreamDecoder(
                sample_rate=self._opts.sample_rate,
                num_channels=NUM_CHANNELS,
            )
            index_lock = asyncio.Lock()
            current_index = 0
            pending_requests = set()

            @utils.log_exceptions(logger=logger)
            async def _send_task(ws: aiohttp.ClientWebSocketResponse):
                nonlocal current_index
                index = 0
                async for data in input_stream:
                    payload = {
                        "voice_uuid": self._opts.voice_uuid,
                        "data": data.token,
                        "request_id": index,
                        "sample_rate": self._opts.sample_rate,
                        "precision": "PCM_16",
                        "output_format": "mp3",
                    }
                    async with index_lock:
                        pending_requests.add(index)
                    index += 1
                    current_index = index
                    await ws.send_str(json.dumps(payload))

            @utils.log_exceptions(logger=logger)
            async def _emit_task():
                emitter = tts.SynthesizedAudioEmitter(
                    event_ch=self._event_ch,
                    request_id=str(current_index),
                    segment_id=segment_id,
                )
                async for frame in decoder:
                    emitter.push(frame)
                emitter.flush()

            @utils.log_exceptions(logger=logger)
            async def _recv_task(ws: aiohttp.ClientWebSocketResponse):
                while True:
                    msg = await ws.receive()
                    if msg.type in (
                        aiohttp.WSMsgType.CLOSED,
                        aiohttp.WSMsgType.CLOSE,
                        aiohttp.WSMsgType.CLOSING,
                    ):
                        raise APIStatusError(
                            "Resemble connection closed unexpectedly",
                            request_id=str(current_index),
                        )

                    if msg.type != aiohttp.WSMsgType.TEXT:
                        logger.warning("Unexpected Resemble message type %s", msg.type)
                        continue

                    data = json.loads(msg.data)

                    if data.get("type") == "audio":
                        if data.get("audio_content", None):
                            b64data = base64.b64decode(data["audio_content"])
                            decoder.push(b64data)

                    elif data.get("type") == "audio_end":
                        async with index_lock:
                            index = data["request_id"]
                            pending_requests.remove(index)
                            if not pending_requests:
                                decoder.end_input()
                                break  # we are not going to receive any more audio
                    else:
                        logger.error("Unexpected Resemble message %s", data)

            tasks = [
                asyncio.create_task(_send_task(ws)),
                asyncio.create_task(_recv_task(ws)),
                asyncio.create_task(_emit_task()),
            ]

            try:
                await asyncio.gather(*tasks)
            except asyncio.TimeoutError as e:
                raise APITimeoutError() from e
            except aiohttp.ClientResponseError as e:
                raise APIStatusError(
                    message=e.message,
                    status_code=e.status,
                    request_id=str(current_index),
                    body=None,
                ) from e
            except Exception as e:
                raise APIConnectionError() from e
            finally:
                await utils.aio.gracefully_cancel(*tasks)

Stream-based text-to-speech synthesis using Resemble AI WebSocket API.

This implementation connects to Resemble's WebSocket API for real-time streaming synthesis. Note that this requires a Business plan subscription with Resemble AI.

Ancestors

Inherited members

class TTS (*,
api_key: str | None = None,
voice_uuid: str | None = None,
tokenizer: tokenize.SentenceTokenizer | None = None,
sample_rate: int = 44100,
http_session: aiohttp.ClientSession | None = None,
use_streaming: bool = True)
Expand source code
class TTS(tts.TTS):
    def __init__(
        self,
        *,
        api_key: str | None = None,
        voice_uuid: str | None = None,
        tokenizer: tokenize.SentenceTokenizer | None = None,
        sample_rate: int = 44100,
        http_session: aiohttp.ClientSession | None = None,
        use_streaming: bool = True,
    ) -> None:
        """
        Create a new instance of the Resemble TTS.

        See https://docs.app.resemble.ai/docs/text_to_speech/ for more documentation on all of these options.

        Args:
            voice_uuid (str, optional): The voice UUID for the desired voice. Defaults to None.
            sample_rate (int, optional): The audio sample rate in Hz. Defaults to 44100.
            api_key (str | None, optional): The Resemble API key. If not provided, it will be read from the RESEMBLE_API_KEY environment variable.
            http_session (aiohttp.ClientSession | None, optional): An existing aiohttp ClientSession to use. If not provided, a new session will be created.
            tokenizer (tokenize.SentenceTokenizer, optional): The tokenizer to use. Defaults to tokenize.SentenceTokenizer().
            use_streaming (bool, optional): Whether to use streaming or not. Defaults to True.
        """  # noqa: E501
        super().__init__(
            capabilities=tts.TTSCapabilities(streaming=use_streaming),
            sample_rate=sample_rate,
            num_channels=NUM_CHANNELS,
        )

        api_key = api_key or os.environ.get("RESEMBLE_API_KEY")
        if not api_key:
            raise ValueError(
                "Resemble API key is required, either as argument or set RESEMBLE_API_KEY environment variable"
            )
        self._api_key = api_key

        if tokenizer is None:
            tokenizer = tokenize.basic.SentenceTokenizer(
                min_sentence_len=BUFFERED_WORDS_COUNT
            )

        if voice_uuid is None:
            voice_uuid = DEFAULT_VOICE_UUID

        self._opts = _TTSOptions(
            voice_uuid=voice_uuid,
            sample_rate=sample_rate,
            tokenizer=tokenizer,
        )

        self._session = http_session
        self._streams = weakref.WeakSet[SynthesizeStream]()
        self._pool = utils.ConnectionPool[aiohttp.ClientWebSocketResponse](
            connect_cb=self._connect_ws,
            close_cb=self._close_ws,
        )

    async def _connect_ws(self) -> aiohttp.ClientWebSocketResponse:
        session = self._ensure_session()

        return await asyncio.wait_for(
            session.ws_connect(
                RESEMBLE_WEBSOCKET_URL,
                headers={"Authorization": f"Bearer {self._api_key}"},
            ),
            self._conn_options.timeout,
        )

    async def _close_ws(self, ws: aiohttp.ClientWebSocketResponse):
        await ws.close()

    def _ensure_session(self) -> aiohttp.ClientSession:
        if not self._session:
            self._session = utils.http_context.http_session()

        return self._session

    def prewarm(self) -> None:
        self._pool.prewarm()

    def update_options(
        self,
        *,
        voice_uuid: str | None = None,
        sample_rate: int | None = None,
    ) -> None:
        """
        Update the Text-to-Speech (TTS) configuration options.

        Args:
            voice_uuid (str, optional): The voice UUID for the desired voice.
            sample_rate (int, optional): The audio sample rate in Hz.
        """  # noqa: E501
        self._opts.voice_uuid = voice_uuid or self._opts.voice_uuid
        self._opts.sample_rate = sample_rate or self._opts.sample_rate

    def synthesize(
        self,
        text: str,
        *,
        conn_options: Optional[APIConnectOptions] = None,
    ) -> ChunkedStream:
        return ChunkedStream(
            tts=self,
            input_text=text,
            conn_options=conn_options or DEFAULT_API_CONNECT_OPTIONS,
            opts=self._opts,
            api_key=self._api_key,
            session=self._ensure_session(),
        )

    def stream(
        self, *, conn_options: Optional[APIConnectOptions] = None
    ) -> SynthesizeStream:
        stream = SynthesizeStream(
            tts=self,
            pool=self._pool,
            opts=self._opts,
            api_key=self._api_key,
        )
        self._streams.add(stream)
        return stream

    async def aclose(self) -> None:
        for stream in list(self._streams):
            await stream.aclose()
        self._streams.clear()
        await self._pool.aclose()
        await super().aclose()

Helper class that provides a standard way to create an ABC using inheritance.

Create a new instance of the Resemble TTS.

See https://docs.app.resemble.ai/docs/text_to_speech/ for more documentation on all of these options.

Args

voice_uuid : str, optional
The voice UUID for the desired voice. Defaults to None.
sample_rate : int, optional
The audio sample rate in Hz. Defaults to 44100.
api_key : str | None, optional
The Resemble API key. If not provided, it will be read from the RESEMBLE_API_KEY environment variable.
http_session : aiohttp.ClientSession | None, optional
An existing aiohttp ClientSession to use. If not provided, a new session will be created.
tokenizer : tokenize.SentenceTokenizer, optional
The tokenizer to use. Defaults to tokenize.SentenceTokenizer().
use_streaming : bool, optional
Whether to use streaming or not. Defaults to True.

Ancestors

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    for stream in list(self._streams):
        await stream.aclose()
    self._streams.clear()
    await self._pool.aclose()
    await super().aclose()
def stream(self, *, conn_options: Optional[APIConnectOptions] = None) ‑> livekit.plugins.resemble.tts.SynthesizeStream
Expand source code
def stream(
    self, *, conn_options: Optional[APIConnectOptions] = None
) -> SynthesizeStream:
    stream = SynthesizeStream(
        tts=self,
        pool=self._pool,
        opts=self._opts,
        api_key=self._api_key,
    )
    self._streams.add(stream)
    return stream
def synthesize(self, text: str, *, conn_options: Optional[APIConnectOptions] = None) ‑> livekit.plugins.resemble.tts.ChunkedStream
Expand source code
def synthesize(
    self,
    text: str,
    *,
    conn_options: Optional[APIConnectOptions] = None,
) -> ChunkedStream:
    return ChunkedStream(
        tts=self,
        input_text=text,
        conn_options=conn_options or DEFAULT_API_CONNECT_OPTIONS,
        opts=self._opts,
        api_key=self._api_key,
        session=self._ensure_session(),
    )
def update_options(self, *, voice_uuid: str | None = None, sample_rate: int | None = None) ‑> None
Expand source code
def update_options(
    self,
    *,
    voice_uuid: str | None = None,
    sample_rate: int | None = None,
) -> None:
    """
    Update the Text-to-Speech (TTS) configuration options.

    Args:
        voice_uuid (str, optional): The voice UUID for the desired voice.
        sample_rate (int, optional): The audio sample rate in Hz.
    """  # noqa: E501
    self._opts.voice_uuid = voice_uuid or self._opts.voice_uuid
    self._opts.sample_rate = sample_rate or self._opts.sample_rate

Update the Text-to-Speech (TTS) configuration options.

Args

voice_uuid : str, optional
The voice UUID for the desired voice.
sample_rate : int, optional
The audio sample rate in Hz.

Inherited members