Module livekit.plugins.asyncai

AsyncAI plugin for LiveKit Agents

See https://docs.livekit.io/agents/integrations/tts/asyncai/ for more information.

Classes

class TTS (*,
api_key: str | None = None,
model: TTSModels | str = 'asyncflow_multilingual_v1.0',
language: str | None = None,
encoding: TTSEncoding = 'pcm_s16le',
voice: str = 'e0f39dc4-f691-4e78-bba5-5c636692cc04',
sample_rate: int = 32000,
http_session: aiohttp.ClientSession | None = None,
tokenizer: NotGivenOr[tokenize.SentenceTokenizer] = NOT_GIVEN,
base_url: str = 'https://api.async.ai')
Expand source code
class TTS(tts.TTS):
    def __init__(
        self,
        *,
        api_key: str | None = None,
        model: TTSModels | str = "asyncflow_multilingual_v1.0",
        language: str | None = None,
        encoding: TTSEncoding = "pcm_s16le",
        voice: str = TTSDefaultVoiceId,
        sample_rate: int = 32000,
        http_session: aiohttp.ClientSession | None = None,
        tokenizer: NotGivenOr[tokenize.SentenceTokenizer] = NOT_GIVEN,
        base_url: str = "https://api.async.ai",
    ) -> None:
        """
        Create a new instance of Async TTS.

        See https://docs.async.ai/text-to-speech-websocket-3477526w0 for more details
            on the the Async API.

        Args:
            model (TTSModels, optional): The Async TTS model to use. Defaults to "asyncflow_multilingual_v1.0".
            language (str, optional): The language code for synthesis.
            encoding (TTSEncoding, optional): The audio encoding format. Defaults to "pcm_s16le".
            voice (str, optional): The voice ID.
            sample_rate (int, optional): The audio sample rate in Hz. Defaults to 32000.
            api_key (str, optional): The Async API key. If not provided, it will be
                read from the ASYNCAI_API_KEY environment variable.
            http_session (aiohttp.ClientSession | None, optional): An existing aiohttp
                ClientSession to use. If not provided, a new session will be created.
            tokenizer (tokenize.SentenceTokenizer, optional): The tokenizer to use. Defaults to `livekit.agents.tokenize.blingfire.SentenceTokenizer`.
            base_url (str, optional): The base URL for the Async API. Defaults to "https://api.async.ai".
        """

        super().__init__(
            capabilities=tts.TTSCapabilities(streaming=True),
            sample_rate=sample_rate,
            num_channels=1,
        )
        async_api_key = api_key or os.environ.get("ASYNCAI_API_KEY")
        if not async_api_key:
            raise ValueError("ASYNCAI_API_KEY must be set")

        self._opts = _TTSOptions(
            model=model,
            language=language,
            encoding=encoding,
            sample_rate=sample_rate,
            voice=voice,
            api_key=async_api_key,
            base_url=base_url,
        )
        self._session = http_session
        self._pool = utils.ConnectionPool[aiohttp.ClientWebSocketResponse](
            connect_cb=self._connect_ws,
            close_cb=self._close_ws,
            max_session_duration=300,
            mark_refreshed_on_get=True,
        )
        self._streams = weakref.WeakSet[SynthesizeStream]()

        self._sentence_tokenizer = (
            tokenizer if is_given(tokenizer) else tokenize.blingfire.SentenceTokenizer()
        )

    @property
    def model(self) -> str:
        return self._opts.model

    @property
    def provider(self) -> str:
        return "AsyncAI"

    async def _connect_ws(self, timeout: float) -> aiohttp.ClientWebSocketResponse:
        session = self._ensure_session()
        query = urlencode({API_AUTH_HEADER: self._opts.api_key, API_VERSION_HEADER: API_VERSION})
        url = self._opts.get_ws_url(f"/text_to_speech/websocket/ws?{query}")

        init_payload = {
            "model_id": self._opts.model,
            "voice": {"mode": "id", "id": self._opts.voice},
            "output_format": {
                "container": "raw",
                "encoding": self._opts.encoding,
                "sample_rate": self._opts.sample_rate,
            },
        }

        if self._opts.language is not None:
            init_payload["language"] = self._opts.language
        ws = await asyncio.wait_for(session.ws_connect(url), timeout)
        await ws.send_str(json.dumps(init_payload))
        return ws

    async def _close_ws(self, ws: aiohttp.ClientWebSocketResponse) -> None:
        await ws.close()

    def _ensure_session(self) -> aiohttp.ClientSession:
        if not self._session:
            self._session = utils.http_context.http_session()

        return self._session

    def prewarm(self) -> None:
        self._pool.prewarm()

    def update_options(
        self,
        *,
        model: NotGivenOr[TTSModels | str] = NOT_GIVEN,
        language: NotGivenOr[str] = NOT_GIVEN,
        voice: NotGivenOr[str] = NOT_GIVEN,
    ) -> None:
        """
        Update the Text-to-Speech (TTS) configuration options.

        This method allows updating the TTS settings, including model type, language and voice.
        If any parameter is not provided, the existing value will be retained.

        Args:
            model (TTSModels, optional): The Async TTS model to use. Defaults to "asyncflow_multilingual_v1.0".
            language (str, optional): The language code for synthesis. Defaults to "en".
            voice (str, optional): The voice ID.
        """
        if is_given(model):
            self._opts.model = model
        if is_given(language):
            self._opts.language = language
        if is_given(voice):
            self._opts.voice = cast(Union[str, list[float]], voice)

    def stream(
        self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS
    ) -> SynthesizeStream:
        stream = SynthesizeStream(tts=self, conn_options=conn_options)
        self._streams.add(stream)
        return stream

    def synthesize(
        self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS
    ) -> tts.ChunkedStream:
        raise NotImplementedError("AsyncAI TTS supports streaming only; use tts.stream().")

    async def aclose(self) -> None:
        for stream in list(self._streams):
            await stream.aclose()

        self._streams.clear()
        await self._pool.aclose()

Helper class that provides a standard way to create an ABC using inheritance.

Create a new instance of Async TTS.

See https://docs.async.ai/text-to-speech-websocket-3477526w0 for more details on the the Async API.

Args

model : TTSModels, optional
The Async TTS model to use. Defaults to "asyncflow_multilingual_v1.0".
language : str, optional
The language code for synthesis.
encoding : TTSEncoding, optional
The audio encoding format. Defaults to "pcm_s16le".
voice : str, optional
The voice ID.
sample_rate : int, optional
The audio sample rate in Hz. Defaults to 32000.
api_key : str, optional
The Async API key. If not provided, it will be read from the ASYNCAI_API_KEY environment variable.
http_session : aiohttp.ClientSession | None, optional
An existing aiohttp ClientSession to use. If not provided, a new session will be created.
tokenizer : tokenize.SentenceTokenizer, optional
The tokenizer to use. Defaults to SentenceTokenizer.
base_url : str, optional
The base URL for the Async API. Defaults to "https://api.async.ai".

Ancestors

  • livekit.agents.tts.tts.TTS
  • abc.ABC
  • EventEmitter
  • typing.Generic

Instance variables

prop model : str
Expand source code
@property
def model(self) -> str:
    return self._opts.model

Get the model name/identifier for this TTS instance.

Returns

The model name if available, "unknown" otherwise.

Note

Plugins should override this property to provide their model information.

prop provider : str
Expand source code
@property
def provider(self) -> str:
    return "AsyncAI"

Get the provider name/identifier for this TTS instance.

Returns

The provider name if available, "unknown" otherwise.

Note

Plugins should override this property to provide their provider information.

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    for stream in list(self._streams):
        await stream.aclose()

    self._streams.clear()
    await self._pool.aclose()
def prewarm(self) ‑> None
Expand source code
def prewarm(self) -> None:
    self._pool.prewarm()

Pre-warm connection to the TTS service

def stream(self,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.asyncai.tts.SynthesizeStream
Expand source code
def stream(
    self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS
) -> SynthesizeStream:
    stream = SynthesizeStream(tts=self, conn_options=conn_options)
    self._streams.add(stream)
    return stream
def synthesize(self,
text: str,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.agents.tts.tts.ChunkedStream
Expand source code
def synthesize(
    self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS
) -> tts.ChunkedStream:
    raise NotImplementedError("AsyncAI TTS supports streaming only; use tts.stream().")
def update_options(self,
*,
model: NotGivenOr[TTSModels | str] = NOT_GIVEN,
language: NotGivenOr[str] = NOT_GIVEN,
voice: NotGivenOr[str] = NOT_GIVEN) ‑> None
Expand source code
def update_options(
    self,
    *,
    model: NotGivenOr[TTSModels | str] = NOT_GIVEN,
    language: NotGivenOr[str] = NOT_GIVEN,
    voice: NotGivenOr[str] = NOT_GIVEN,
) -> None:
    """
    Update the Text-to-Speech (TTS) configuration options.

    This method allows updating the TTS settings, including model type, language and voice.
    If any parameter is not provided, the existing value will be retained.

    Args:
        model (TTSModels, optional): The Async TTS model to use. Defaults to "asyncflow_multilingual_v1.0".
        language (str, optional): The language code for synthesis. Defaults to "en".
        voice (str, optional): The voice ID.
    """
    if is_given(model):
        self._opts.model = model
    if is_given(language):
        self._opts.language = language
    if is_given(voice):
        self._opts.voice = cast(Union[str, list[float]], voice)

Update the Text-to-Speech (TTS) configuration options.

This method allows updating the TTS settings, including model type, language and voice. If any parameter is not provided, the existing value will be retained.

Args

model : TTSModels, optional
The Async TTS model to use. Defaults to "asyncflow_multilingual_v1.0".
language : str, optional
The language code for synthesis. Defaults to "en".
voice : str, optional
The voice ID.

Inherited members