Module livekit.plugins.resemble

Resemble plugin for LiveKit Agents

See https://docs.livekit.io/agents/integrations/tts/resemble/ for more information.

Sub-modules

livekit.plugins.resemble.models

Classes

class ChunkedStream (*,
tts: TTS,
input_text: str,
conn_options: APIConnectOptions)
Expand source code
class ChunkedStream(tts.ChunkedStream):
    """Synthesize text into speech in one go using Resemble AI's REST API."""

    def __init__(self, *, tts: TTS, input_text: str, conn_options: APIConnectOptions) -> None:
        super().__init__(tts=tts, input_text=input_text, conn_options=conn_options)
        self._tts: TTS = tts
        self._opts = replace(tts._opts)

    async def _run(self, output_emitter: tts.AudioEmitter) -> None:
        try:
            async with self._tts._ensure_session().post(
                RESEMBLE_REST_API_URL,
                headers={
                    "Authorization": f"Bearer {self._tts._api_key}",
                    "Content-Type": "application/json",
                    "Accept": "application/json",
                },
                json={
                    "voice_uuid": self._opts.voice_uuid,
                    "data": self._input_text,
                    "sample_rate": self._opts.sample_rate,
                    "precision": "PCM_16",
                },
                timeout=aiohttp.ClientTimeout(
                    total=30,
                    sock_connect=self._conn_options.timeout,
                ),
            ) as resp:
                resp.raise_for_status()
                response_json = await resp.json()

                if not response_json.get("success", False):
                    issues = response_json.get("issues", ["Unknown error"])
                    error_msg = "; ".join(issues)
                    raise APIError(
                        message=f"Resemble API returned failure: {error_msg}",
                        body=json.dumps(response_json),
                    )

                output_emitter.initialize(
                    request_id=utils.shortuuid(),
                    sample_rate=self._opts.sample_rate,
                    num_channels=1,
                    mime_type="audio/wav",
                )

                audio_b64 = response_json["audio_content"]
                audio_bytes = base64.b64decode(audio_b64)

                output_emitter.push(audio_bytes)
                output_emitter.flush()

        except asyncio.TimeoutError:
            raise APITimeoutError() from None
        except aiohttp.ClientResponseError as e:
            raise APIStatusError(
                message=e.message, status_code=e.status, request_id=None, body=None
            ) from None
        except Exception as e:
            raise APIConnectionError() from e

Synthesize text into speech in one go using Resemble AI's REST API.

Ancestors

  • livekit.agents.tts.tts.ChunkedStream
  • abc.ABC
class SynthesizeStream (*,
tts: TTS,
conn_options: APIConnectOptions)
Expand source code
class SynthesizeStream(tts.SynthesizeStream):
    """Stream-based text-to-speech synthesis using Resemble AI WebSocket API.


    This implementation connects to Resemble's WebSocket API for real-time streaming
    synthesis. Note that this requires a Business plan subscription with Resemble AI.
    """

    def __init__(self, *, tts: TTS, conn_options: APIConnectOptions):
        super().__init__(tts=tts, conn_options=conn_options)
        self._tts: TTS = tts
        self._opts = replace(tts._opts)
        self._segments_ch = utils.aio.Chan[tokenize.SentenceStream]()

    async def _run(self, output_emitter: tts.AudioEmitter) -> None:
        request_id = utils.shortuuid()
        output_emitter.initialize(
            request_id=request_id,
            sample_rate=self._opts.sample_rate,
            num_channels=1,
            stream=True,
            mime_type="audio/mp3",
        )

        async def _tokenize_input() -> None:
            """tokenize text from the input_ch to words"""
            input_stream = None
            async for text in self._input_ch:
                if isinstance(text, str):
                    if input_stream is None:
                        # new segment (after flush for e.g)
                        input_stream = self._opts.tokenizer.stream()
                        self._segments_ch.send_nowait(input_stream)
                    input_stream.push_text(text)
                elif isinstance(text, self._FlushSentinel):
                    if input_stream is not None:
                        input_stream.end_input()
                    input_stream = None

            if input_stream is not None:
                input_stream.end_input()

            self._segments_ch.close()

        async def _process_segments() -> None:
            async for input_stream in self._segments_ch:
                await self._run_ws(input_stream, output_emitter)

        tasks = [
            asyncio.create_task(_tokenize_input()),
            asyncio.create_task(_process_segments()),
        ]
        try:
            await asyncio.gather(*tasks)
        except asyncio.TimeoutError:
            raise APITimeoutError() from None
        except aiohttp.ClientResponseError as e:
            raise APIStatusError(
                message=e.message, status_code=e.status, request_id=request_id, body=None
            ) from None
        except Exception as e:
            raise APIConnectionError() from e
        finally:
            await utils.aio.gracefully_cancel(*tasks)

    async def _run_ws(
        self, input_stream: tokenize.SentenceStream, output_emitter: tts.AudioEmitter
    ) -> None:
        segment_id = utils.shortuuid()
        output_emitter.start_segment(segment_id=segment_id)

        last_index = 0
        input_ended = False

        async def _send_task(ws: aiohttp.ClientWebSocketResponse) -> None:
            nonlocal input_ended, last_index
            async for data in input_stream:
                last_index += 1
                payload = {
                    "voice_uuid": self._opts.voice_uuid,
                    "data": data.token,
                    "request_id": last_index,
                    "sample_rate": self._opts.sample_rate,
                    "precision": "PCM_16",
                    "output_format": "mp3",
                }
                self._mark_started()
                await ws.send_str(json.dumps(payload))

            input_ended = True

        async def _recv_task(ws: aiohttp.ClientWebSocketResponse) -> None:
            while True:
                msg = await ws.receive()
                if msg.type in (
                    aiohttp.WSMsgType.CLOSED,
                    aiohttp.WSMsgType.CLOSE,
                    aiohttp.WSMsgType.CLOSING,
                ):
                    raise APIStatusError("Resemble connection closed unexpectedly")

                if msg.type != aiohttp.WSMsgType.TEXT:
                    logger.warning("Unexpected Resemble message type %s", msg.type)
                    continue

                data = json.loads(msg.data)
                if data.get("type") == "audio":
                    if data.get("audio_content", None):
                        b64data = base64.b64decode(data["audio_content"])
                        output_emitter.push(b64data)

                elif data.get("type") == "audio_end":
                    index = data["request_id"]
                    if index == last_index and input_ended:
                        output_emitter.end_segment()
                        break
                else:
                    logger.error("Unexpected Resemble message %s", data)

        async with self._tts._pool.connection(timeout=self._conn_options.timeout) as ws:
            tasks = [
                asyncio.create_task(_send_task(ws)),
                asyncio.create_task(_recv_task(ws)),
            ]
            try:
                await asyncio.gather(*tasks)
            finally:
                await utils.aio.gracefully_cancel(*tasks)

Stream-based text-to-speech synthesis using Resemble AI WebSocket API.

This implementation connects to Resemble's WebSocket API for real-time streaming synthesis. Note that this requires a Business plan subscription with Resemble AI.

Ancestors

  • livekit.agents.tts.tts.SynthesizeStream
  • abc.ABC
class TTS (*,
api_key: str | None = None,
voice_uuid: str | None = None,
tokenizer: tokenize.SentenceTokenizer | None = None,
sample_rate: int = 44100,
http_session: aiohttp.ClientSession | None = None,
use_streaming: bool = True)
Expand source code
class TTS(tts.TTS):
    def __init__(
        self,
        *,
        api_key: str | None = None,
        voice_uuid: str | None = None,
        tokenizer: tokenize.SentenceTokenizer | None = None,
        sample_rate: int = 44100,
        http_session: aiohttp.ClientSession | None = None,
        use_streaming: bool = True,
    ) -> None:
        """
        Create a new instance of the Resemble TTS.

        See https://docs.app.resemble.ai/docs/text_to_speech/ for more documentation on all of these options.

        Args:
            voice_uuid (str, optional): The voice UUID for the desired voice. Defaults to None.
            sample_rate (int, optional): The audio sample rate in Hz. Defaults to 44100.
            api_key (str | None, optional): The Resemble API key. If not provided, it will be read from the RESEMBLE_API_KEY environment variable.
            http_session (aiohttp.ClientSession | None, optional): An existing aiohttp ClientSession to use. If not provided, a new session will be created.
            tokenizer (tokenize.SentenceTokenizer, optional): The tokenizer to use. Defaults to tokenize.SentenceTokenizer().
            use_streaming (bool, optional): Whether to use streaming or not. Defaults to True.
        """  # noqa: E501
        super().__init__(
            capabilities=tts.TTSCapabilities(streaming=use_streaming),
            sample_rate=sample_rate,
            num_channels=1,
        )

        api_key = api_key or os.environ.get("RESEMBLE_API_KEY")
        if not api_key:
            raise ValueError(
                "Resemble API key is required, either as argument or set RESEMBLE_API_KEY"
                " environment variable"
            )
        self._api_key = api_key

        if tokenizer is None:
            tokenizer = tokenize.blingfire.SentenceTokenizer()

        if voice_uuid is None:
            voice_uuid = DEFAULT_VOICE_UUID

        self._opts = _TTSOptions(
            voice_uuid=voice_uuid,
            sample_rate=sample_rate,
            tokenizer=tokenizer,
        )

        self._session = http_session
        self._streams = weakref.WeakSet[SynthesizeStream]()
        self._pool = utils.ConnectionPool[aiohttp.ClientWebSocketResponse](
            connect_cb=self._connect_ws,
            close_cb=self._close_ws,
        )

    async def _connect_ws(self, timeout: float) -> aiohttp.ClientWebSocketResponse:
        return await asyncio.wait_for(
            self._ensure_session().ws_connect(
                RESEMBLE_WEBSOCKET_URL,
                headers={"Authorization": f"Bearer {self._api_key}"},
            ),
            timeout,
        )

    async def _close_ws(self, ws: aiohttp.ClientWebSocketResponse) -> None:
        await ws.close()

    def _ensure_session(self) -> aiohttp.ClientSession:
        if not self._session:
            self._session = utils.http_context.http_session()

        return self._session

    def prewarm(self) -> None:
        self._pool.prewarm()

    def update_options(
        self,
        *,
        voice_uuid: str | None = None,
    ) -> None:
        """
        Update the Text-to-Speech (TTS) configuration options.

        Args:
            voice_uuid (str, optional): The voice UUID for the desired voice.
        """  # noqa: E501
        self._opts.voice_uuid = voice_uuid or self._opts.voice_uuid

    def synthesize(
        self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS
    ) -> ChunkedStream:
        return ChunkedStream(tts=self, input_text=text, conn_options=conn_options)

    def stream(
        self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS
    ) -> SynthesizeStream:
        stream = SynthesizeStream(tts=self, conn_options=conn_options)
        self._streams.add(stream)
        return stream

    async def aclose(self) -> None:
        for stream in list(self._streams):
            await stream.aclose()

        self._streams.clear()
        await self._pool.aclose()

Helper class that provides a standard way to create an ABC using inheritance.

Create a new instance of the Resemble TTS.

See https://docs.app.resemble.ai/docs/text_to_speech/ for more documentation on all of these options.

Args

voice_uuid : str, optional
The voice UUID for the desired voice. Defaults to None.
sample_rate : int, optional
The audio sample rate in Hz. Defaults to 44100.
api_key : str | None, optional
The Resemble API key. If not provided, it will be read from the RESEMBLE_API_KEY environment variable.
http_session : aiohttp.ClientSession | None, optional
An existing aiohttp ClientSession to use. If not provided, a new session will be created.
tokenizer : tokenize.SentenceTokenizer, optional
The tokenizer to use. Defaults to tokenize.SentenceTokenizer().
use_streaming : bool, optional
Whether to use streaming or not. Defaults to True.

Ancestors

  • livekit.agents.tts.tts.TTS
  • abc.ABC
  • EventEmitter
  • typing.Generic

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    for stream in list(self._streams):
        await stream.aclose()

    self._streams.clear()
    await self._pool.aclose()
def prewarm(self) ‑> None
Expand source code
def prewarm(self) -> None:
    self._pool.prewarm()

Pre-warm connection to the TTS service

def stream(self,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.resemble.tts.SynthesizeStream
Expand source code
def stream(
    self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS
) -> SynthesizeStream:
    stream = SynthesizeStream(tts=self, conn_options=conn_options)
    self._streams.add(stream)
    return stream
def synthesize(self,
text: str,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.resemble.tts.ChunkedStream
Expand source code
def synthesize(
    self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS
) -> ChunkedStream:
    return ChunkedStream(tts=self, input_text=text, conn_options=conn_options)
def update_options(self, *, voice_uuid: str | None = None) ‑> None
Expand source code
def update_options(
    self,
    *,
    voice_uuid: str | None = None,
) -> None:
    """
    Update the Text-to-Speech (TTS) configuration options.

    Args:
        voice_uuid (str, optional): The voice UUID for the desired voice.
    """  # noqa: E501
    self._opts.voice_uuid = voice_uuid or self._opts.voice_uuid

Update the Text-to-Speech (TTS) configuration options.

Args

voice_uuid : str, optional
The voice UUID for the desired voice.

Inherited members