Module livekit.agents.inference.tts

Classes

class CartesiaOptions (*args, **kwargs)
Expand source code
class CartesiaOptions(TypedDict, total=False):
    duration: float  # max duration of audio in seconds
    speed: Literal["slow", "normal", "fast"]  # default: not specified

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)

Ancestors

  • builtins.dict

Class variables

var duration : float
var speed : Literal['slow', 'normal', 'fast']
class ElevenlabsOptions (*args, **kwargs)
Expand source code
class ElevenlabsOptions(TypedDict, total=False):
    inactivity_timeout: int  # default: 60
    apply_text_normalization: Literal["auto", "off", "on"]  # default: "auto"

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)

Ancestors

  • builtins.dict

Class variables

var apply_text_normalization : Literal['auto', 'off', 'on']
var inactivity_timeout : int
class InworldOptions (*args, **kwargs)
Expand source code
class InworldOptions(TypedDict, total=False):
    pass

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)

Ancestors

  • builtins.dict
class RimeOptions (*args, **kwargs)
Expand source code
class RimeOptions(TypedDict, total=False):
    pass

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)

Ancestors

  • builtins.dict
class SynthesizeStream (*,
tts: TTS,
conn_options: APIConnectOptions)
Expand source code
class SynthesizeStream(tts.SynthesizeStream):
    """Streamed API using websockets"""

    def __init__(self, *, tts: TTS, conn_options: APIConnectOptions):
        super().__init__(tts=tts, conn_options=conn_options)
        self._tts: TTS = tts

        self._opts = replace(tts._opts)

    async def _run(self, output_emitter: tts.AudioEmitter) -> None:
        request_id = utils.shortuuid()
        output_emitter.initialize(
            request_id=request_id,
            sample_rate=self._opts.sample_rate,
            num_channels=1,
            stream=True,
            mime_type="audio/pcm",
        )

        sent_tokenizer_stream = tokenize.basic.SentenceTokenizer().stream()

        async def _input_task() -> None:
            async for data in self._input_ch:
                if isinstance(data, self._FlushSentinel):
                    sent_tokenizer_stream.flush()
                    continue
                sent_tokenizer_stream.push_text(data)

            sent_tokenizer_stream.end_input()

        async def _sentence_stream_task(ws: aiohttp.ClientWebSocketResponse) -> None:
            base_pkt = {
                "type": "input_transcript",
            }
            async for ev in sent_tokenizer_stream:
                token_pkt = base_pkt.copy()
                token_pkt["transcript"] = ev.token + " "
                self._mark_started()
                await ws.send_str(json.dumps(token_pkt))

            end_pkt = {
                "type": "session.flush",
            }
            await ws.send_str(json.dumps(end_pkt))

        async def _recv_task(ws: aiohttp.ClientWebSocketResponse) -> None:
            current_session_id: str | None = None
            while True:
                msg = await ws.receive()
                if msg.type in (
                    aiohttp.WSMsgType.CLOSED,
                    aiohttp.WSMsgType.CLOSE,
                    aiohttp.WSMsgType.CLOSING,
                ):
                    raise APIStatusError(
                        "Gateway connection closed unexpectedly", request_id=request_id
                    )

                if msg.type != aiohttp.WSMsgType.TEXT:
                    logger.warning("unexpected Gateway message type %s", msg.type)
                    continue

                data: dict[str, Any] = json.loads(msg.data)
                session_id = data.get("session_id")
                if current_session_id is None and session_id is not None:
                    current_session_id = session_id
                    output_emitter.start_segment(segment_id=session_id)

                if data.get("type") == "session.created":
                    pass
                elif data.get("type") == "output_audio":
                    b64data = base64.b64decode(data["audio"])
                    output_emitter.push(b64data)
                elif data.get("type") == "done":
                    output_emitter.end_input()
                    break
                elif data.get("type") == "error":
                    raise APIError(f"LiveKit TTS returned error: {msg.data}")
                else:
                    logger.warning("unexpected message %s", data)

        try:
            async with self._tts._pool.connection(timeout=self._conn_options.timeout) as ws:
                tasks = [
                    asyncio.create_task(_input_task()),
                    asyncio.create_task(_sentence_stream_task(ws)),
                    asyncio.create_task(_recv_task(ws)),
                ]

                try:
                    await asyncio.gather(*tasks)
                finally:
                    await sent_tokenizer_stream.aclose()
                    await utils.aio.gracefully_cancel(*tasks)

        except asyncio.TimeoutError:
            raise APITimeoutError() from None

        except aiohttp.ClientResponseError as e:
            raise APIStatusError(
                message=e.message, status_code=e.status, request_id=None, body=None
            ) from None

        except Exception as e:
            raise APIConnectionError() from e

Streamed API using websockets

Ancestors

  • livekit.agents.tts.tts.SynthesizeStream
  • abc.ABC
class TTS (model: NotGivenOr[TTSModels | str] = NOT_GIVEN,
*,
voice: NotGivenOr[str] = NOT_GIVEN,
language: NotGivenOr[str] = NOT_GIVEN,
encoding: NotGivenOr[TTSEncoding] = NOT_GIVEN,
sample_rate: NotGivenOr[int] = NOT_GIVEN,
base_url: NotGivenOr[str] = NOT_GIVEN,
api_key: NotGivenOr[str] = NOT_GIVEN,
api_secret: NotGivenOr[str] = NOT_GIVEN,
http_session: aiohttp.ClientSession | None = None,
extra_kwargs: NotGivenOr[dict[str, Any] | CartesiaOptions | ElevenlabsOptions | RimeOptions | InworldOptions] = NOT_GIVEN)
Expand source code
class TTS(tts.TTS):
    @overload
    def __init__(
        self,
        model: CartesiaModels,
        *,
        voice: NotGivenOr[str] = NOT_GIVEN,
        language: NotGivenOr[str] = NOT_GIVEN,
        encoding: NotGivenOr[TTSEncoding] = NOT_GIVEN,
        sample_rate: NotGivenOr[int] = NOT_GIVEN,
        base_url: NotGivenOr[str] = NOT_GIVEN,
        api_key: NotGivenOr[str] = NOT_GIVEN,
        api_secret: NotGivenOr[str] = NOT_GIVEN,
        http_session: aiohttp.ClientSession | None = None,
        extra_kwargs: NotGivenOr[CartesiaOptions] = NOT_GIVEN,
    ) -> None:
        pass

    @overload
    def __init__(
        self,
        model: ElevenlabsModels,
        *,
        voice: NotGivenOr[str] = NOT_GIVEN,
        language: NotGivenOr[str] = NOT_GIVEN,
        encoding: NotGivenOr[TTSEncoding] = NOT_GIVEN,
        sample_rate: NotGivenOr[int] = NOT_GIVEN,
        base_url: NotGivenOr[str] = NOT_GIVEN,
        api_key: NotGivenOr[str] = NOT_GIVEN,
        api_secret: NotGivenOr[str] = NOT_GIVEN,
        http_session: aiohttp.ClientSession | None = None,
        extra_kwargs: NotGivenOr[ElevenlabsOptions] = NOT_GIVEN,
    ) -> None:
        pass

    @overload
    def __init__(
        self,
        model: RimeModels,
        *,
        voice: NotGivenOr[str] = NOT_GIVEN,
        language: NotGivenOr[str] = NOT_GIVEN,
        encoding: NotGivenOr[TTSEncoding] = NOT_GIVEN,
        sample_rate: NotGivenOr[int] = NOT_GIVEN,
        base_url: NotGivenOr[str] = NOT_GIVEN,
        api_key: NotGivenOr[str] = NOT_GIVEN,
        api_secret: NotGivenOr[str] = NOT_GIVEN,
        http_session: aiohttp.ClientSession | None = None,
        extra_kwargs: NotGivenOr[RimeOptions] = NOT_GIVEN,
    ) -> None:
        pass

    @overload
    def __init__(
        self,
        model: InworldModels,
        *,
        voice: NotGivenOr[str] = NOT_GIVEN,
        language: NotGivenOr[str] = NOT_GIVEN,
        encoding: NotGivenOr[TTSEncoding] = NOT_GIVEN,
        sample_rate: NotGivenOr[int] = NOT_GIVEN,
        base_url: NotGivenOr[str] = NOT_GIVEN,
        api_key: NotGivenOr[str] = NOT_GIVEN,
        api_secret: NotGivenOr[str] = NOT_GIVEN,
        http_session: aiohttp.ClientSession | None = None,
        extra_kwargs: NotGivenOr[InworldOptions] = NOT_GIVEN,
    ) -> None:
        pass

    @overload
    def __init__(
        self,
        model: NotGivenOr[str] = NOT_GIVEN,
        *,
        voice: NotGivenOr[str] = NOT_GIVEN,
        language: NotGivenOr[str] = NOT_GIVEN,
        encoding: NotGivenOr[TTSEncoding] = NOT_GIVEN,
        sample_rate: NotGivenOr[int] = NOT_GIVEN,
        base_url: NotGivenOr[str] = NOT_GIVEN,
        api_key: NotGivenOr[str] = NOT_GIVEN,
        api_secret: NotGivenOr[str] = NOT_GIVEN,
        http_session: aiohttp.ClientSession | None = None,
        extra_kwargs: NotGivenOr[dict[str, Any]] = NOT_GIVEN,
    ) -> None:
        pass

    def __init__(
        self,
        model: NotGivenOr[TTSModels | str] = NOT_GIVEN,  # TODO: add a default model
        *,
        voice: NotGivenOr[str] = NOT_GIVEN,
        language: NotGivenOr[str] = NOT_GIVEN,
        encoding: NotGivenOr[TTSEncoding] = NOT_GIVEN,
        sample_rate: NotGivenOr[int] = NOT_GIVEN,
        base_url: NotGivenOr[str] = NOT_GIVEN,
        api_key: NotGivenOr[str] = NOT_GIVEN,
        api_secret: NotGivenOr[str] = NOT_GIVEN,
        http_session: aiohttp.ClientSession | None = None,
        extra_kwargs: NotGivenOr[
            dict[str, Any] | CartesiaOptions | ElevenlabsOptions | RimeOptions | InworldOptions
        ] = NOT_GIVEN,
    ) -> None:
        """Livekit Cloud Inference TTS

        Args:
            model (TTSModels | str, optional): TTS model to use, in "provider/model[:voice_id]" format
            voice (str, optional): Voice to use, use a default one if not provided
            language (str, optional): Language of the TTS model.
            encoding (TTSEncoding, optional): Encoding of the TTS model.
            sample_rate (int, optional): Sample rate of the TTS model.
            base_url (str, optional): LIVEKIT_URL, if not provided, read from environment variable.
            api_key (str, optional): LIVEKIT_API_KEY, if not provided, read from environment variable.
            api_secret (str, optional): LIVEKIT_API_SECRET, if not provided, read from environment variable.
            http_session (aiohttp.ClientSession, optional): HTTP session to use.
            extra_kwargs (dict, optional): Extra kwargs to pass to the TTS model.
        """
        sample_rate = sample_rate if is_given(sample_rate) else DEFAULT_SAMPLE_RATE
        super().__init__(
            capabilities=tts.TTSCapabilities(streaming=True, aligned_transcript=False),
            sample_rate=sample_rate,
            num_channels=1,
        )

        lk_base_url = (
            base_url
            if is_given(base_url)
            else os.environ.get("LIVEKIT_INFERENCE_URL", DEFAULT_BASE_URL)
        )

        lk_api_key = (
            api_key
            if is_given(api_key)
            else os.getenv("LIVEKIT_INFERENCE_API_KEY", os.getenv("LIVEKIT_API_KEY", ""))
        )
        if not lk_api_key:
            raise ValueError(
                "api_key is required, either as argument or set LIVEKIT_API_KEY environmental variable"
            )

        lk_api_secret = (
            api_secret
            if is_given(api_secret)
            else os.getenv("LIVEKIT_INFERENCE_API_SECRET", os.getenv("LIVEKIT_API_SECRET", ""))
        )
        if not lk_api_secret:
            raise ValueError(
                "api_secret is required, either as argument or set LIVEKIT_API_SECRET environmental variable"
            )

        # read voice id from the model if provided: "provider/model:voice_id"
        if is_given(model) and (idx := model.rfind(":")) != -1:
            if is_given(voice) and voice != model[idx + 1 :]:
                logger.warning(
                    "`voice` is provided via both argument and model, using the one from the argument",
                    extra={"voice": voice, "model": model},
                )
            else:
                voice = model[idx + 1 :]
            model = model[:idx]

        self._opts = _TTSOptions(
            model=model,
            voice=voice,
            language=language,
            encoding=encoding if is_given(encoding) else DEFAULT_ENCODING,
            sample_rate=sample_rate,
            base_url=lk_base_url,
            api_key=lk_api_key,
            api_secret=lk_api_secret,
            extra_kwargs=dict(extra_kwargs) if is_given(extra_kwargs) else {},
        )
        self._session = http_session
        self._pool = utils.ConnectionPool[aiohttp.ClientWebSocketResponse](
            connect_cb=self._connect_ws,
            close_cb=self._close_ws,
            max_session_duration=300,
            mark_refreshed_on_get=True,
        )
        self._streams = weakref.WeakSet[SynthesizeStream]()

    async def _connect_ws(self, timeout: float) -> aiohttp.ClientWebSocketResponse:
        session = self._ensure_session()
        base_url = self._opts.base_url
        if base_url.startswith(("http://", "https://")):
            base_url = base_url.replace("http", "ws", 1)

        headers = {
            "Authorization": f"Bearer {create_access_token(self._opts.api_key, self._opts.api_secret)}",
        }
        ws = None
        try:
            ws = await asyncio.wait_for(
                session.ws_connect(f"{base_url}/tts", headers=headers), timeout
            )
        except (aiohttp.ClientConnectorError, asyncio.TimeoutError) as e:
            if isinstance(e, aiohttp.ClientResponseError) and e.status == 429:
                raise APIStatusError("LiveKit TTS quota exceeded", status_code=e.status) from e
            raise APIConnectionError("failed to connect to LiveKit TTS") from e

        params = {
            "type": "session.create",
            "sample_rate": str(self._opts.sample_rate),
            "encoding": self._opts.encoding,
            "extra": self._opts.extra_kwargs,
        }

        if self._opts.voice:
            params["voice"] = self._opts.voice
        if self._opts.model:
            params["model"] = self._opts.model
        if self._opts.language:
            params["language"] = self._opts.language

        try:
            await ws.send_str(json.dumps(params))
        except Exception as e:
            await ws.close()
            raise APIConnectionError("failed to send session.create message to LiveKit TTS") from e

        return ws

    async def _close_ws(self, ws: aiohttp.ClientWebSocketResponse) -> None:
        await ws.close()

    def _ensure_session(self) -> aiohttp.ClientSession:
        if not self._session:
            self._session = utils.http_context.http_session()

        return self._session

    def prewarm(self) -> None:
        self._pool.prewarm()

    def update_options(
        self,
        *,
        voice: NotGivenOr[str] = NOT_GIVEN,
        model: NotGivenOr[TTSModels | str] = NOT_GIVEN,
        language: NotGivenOr[str] = NOT_GIVEN,
    ) -> None:
        """
        Args:
            voice (str, optional): Voice.
            model (TTSModels | str, optional): TTS model to use.
            language (str, optional): Language code for the TTS model.
        """
        if is_given(model):
            self._opts.model = model
        if is_given(voice):
            self._opts.voice = voice
        if is_given(language):
            self._opts.language = language

    def synthesize(
        self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS
    ) -> tts.ChunkedStream:
        raise NotImplementedError("ChunkedStream is not implemented")

    def stream(
        self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS
    ) -> SynthesizeStream:
        stream = SynthesizeStream(tts=self, conn_options=conn_options)
        self._streams.add(stream)
        return stream

    async def aclose(self) -> None:
        for stream in list(self._streams):
            await stream.aclose()

        self._streams.clear()
        await self._pool.aclose()

Helper class that provides a standard way to create an ABC using inheritance.

Livekit Cloud Inference TTS

Args

model : TTSModels | str, optional
TTS model to use, in "provider/model[:voice_id]" format
voice : str, optional
Voice to use, use a default one if not provided
language : str, optional
Language of the TTS model.
encoding : TTSEncoding, optional
Encoding of the TTS model.
sample_rate : int, optional
Sample rate of the TTS model.
base_url : str, optional
LIVEKIT_URL, if not provided, read from environment variable.
api_key : str, optional
LIVEKIT_API_KEY, if not provided, read from environment variable.
api_secret : str, optional
LIVEKIT_API_SECRET, if not provided, read from environment variable.
http_session : aiohttp.ClientSession, optional
HTTP session to use.
extra_kwargs : dict, optional
Extra kwargs to pass to the TTS model.

Ancestors

  • livekit.agents.tts.tts.TTS
  • abc.ABC
  • EventEmitter
  • typing.Generic

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    for stream in list(self._streams):
        await stream.aclose()

    self._streams.clear()
    await self._pool.aclose()
def prewarm(self) ‑> None
Expand source code
def prewarm(self) -> None:
    self._pool.prewarm()

Pre-warm connection to the TTS service

def stream(self,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> SynthesizeStream
Expand source code
def stream(
    self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS
) -> SynthesizeStream:
    stream = SynthesizeStream(tts=self, conn_options=conn_options)
    self._streams.add(stream)
    return stream
def synthesize(self,
text: str,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.agents.tts.tts.ChunkedStream
Expand source code
def synthesize(
    self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS
) -> tts.ChunkedStream:
    raise NotImplementedError("ChunkedStream is not implemented")
def update_options(self,
*,
voice: NotGivenOr[str] = NOT_GIVEN,
model: NotGivenOr[TTSModels | str] = NOT_GIVEN,
language: NotGivenOr[str] = NOT_GIVEN) ‑> None
Expand source code
def update_options(
    self,
    *,
    voice: NotGivenOr[str] = NOT_GIVEN,
    model: NotGivenOr[TTSModels | str] = NOT_GIVEN,
    language: NotGivenOr[str] = NOT_GIVEN,
) -> None:
    """
    Args:
        voice (str, optional): Voice.
        model (TTSModels | str, optional): TTS model to use.
        language (str, optional): Language code for the TTS model.
    """
    if is_given(model):
        self._opts.model = model
    if is_given(voice):
        self._opts.voice = voice
    if is_given(language):
        self._opts.language = language

Args

voice : str, optional
Voice.
model : TTSModels | str, optional
TTS model to use.
language : str, optional
Language code for the TTS model.

Inherited members