Module livekit.plugins.asyncai
AsyncAI plugin for LiveKit Agents
See https://docs.livekit.io/agents/integrations/tts/asyncai/ for more information.
Classes
class TTS (*,
api_key: str | None = None,
model: TTSModels | str = 'asyncflow_multilingual_v1.0',
language: str | None = None,
encoding: TTSEncoding = 'pcm_s16le',
voice: str = 'e0f39dc4-f691-4e78-bba5-5c636692cc04',
sample_rate: int = 32000,
http_session: aiohttp.ClientSession | None = None,
tokenizer: NotGivenOr[tokenize.SentenceTokenizer] = NOT_GIVEN,
base_url: str = 'https://api.async.ai')-
Expand source code
class TTS(tts.TTS): def __init__( self, *, api_key: str | None = None, model: TTSModels | str = "asyncflow_multilingual_v1.0", language: str | None = None, encoding: TTSEncoding = "pcm_s16le", voice: str = TTSDefaultVoiceId, sample_rate: int = 32000, http_session: aiohttp.ClientSession | None = None, tokenizer: NotGivenOr[tokenize.SentenceTokenizer] = NOT_GIVEN, base_url: str = "https://api.async.ai", ) -> None: """ Create a new instance of Async TTS. See https://docs.async.ai/text-to-speech-websocket-3477526w0 for more details on the the Async API. Args: model (TTSModels, optional): The Async TTS model to use. Defaults to "asyncflow_multilingual_v1.0". language (str, optional): The language code for synthesis. encoding (TTSEncoding, optional): The audio encoding format. Defaults to "pcm_s16le". voice (str, optional): The voice ID. sample_rate (int, optional): The audio sample rate in Hz. Defaults to 32000. api_key (str, optional): The Async API key. If not provided, it will be read from the ASYNCAI_API_KEY environment variable. http_session (aiohttp.ClientSession | None, optional): An existing aiohttp ClientSession to use. If not provided, a new session will be created. tokenizer (tokenize.SentenceTokenizer, optional): The tokenizer to use. Defaults to `livekit.agents.tokenize.blingfire.SentenceTokenizer`. base_url (str, optional): The base URL for the Async API. Defaults to "https://api.async.ai". """ super().__init__( capabilities=tts.TTSCapabilities(streaming=True), sample_rate=sample_rate, num_channels=1, ) async_api_key = api_key or os.environ.get("ASYNCAI_API_KEY") if not async_api_key: raise ValueError("ASYNCAI_API_KEY must be set") self._opts = _TTSOptions( model=model, language=language, encoding=encoding, sample_rate=sample_rate, voice=voice, api_key=async_api_key, base_url=base_url, ) self._session = http_session self._pool = utils.ConnectionPool[aiohttp.ClientWebSocketResponse]( connect_cb=self._connect_ws, close_cb=self._close_ws, max_session_duration=300, mark_refreshed_on_get=True, ) self._streams = weakref.WeakSet[SynthesizeStream]() self._sentence_tokenizer = ( tokenizer if is_given(tokenizer) else tokenize.blingfire.SentenceTokenizer() ) @property def model(self) -> str: return self._opts.model @property def provider(self) -> str: return "AsyncAI" async def _connect_ws(self, timeout: float) -> aiohttp.ClientWebSocketResponse: session = self._ensure_session() query = urlencode({API_AUTH_HEADER: self._opts.api_key, API_VERSION_HEADER: API_VERSION}) url = self._opts.get_ws_url(f"/text_to_speech/websocket/ws?{query}") init_payload = { "model_id": self._opts.model, "voice": {"mode": "id", "id": self._opts.voice}, "output_format": { "container": "raw", "encoding": self._opts.encoding, "sample_rate": self._opts.sample_rate, }, } if self._opts.language is not None: init_payload["language"] = self._opts.language ws = await asyncio.wait_for(session.ws_connect(url), timeout) await ws.send_str(json.dumps(init_payload)) return ws async def _close_ws(self, ws: aiohttp.ClientWebSocketResponse) -> None: await ws.close() def _ensure_session(self) -> aiohttp.ClientSession: if not self._session: self._session = utils.http_context.http_session() return self._session def prewarm(self) -> None: self._pool.prewarm() def update_options( self, *, model: NotGivenOr[TTSModels | str] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, voice: NotGivenOr[str] = NOT_GIVEN, ) -> None: """ Update the Text-to-Speech (TTS) configuration options. This method allows updating the TTS settings, including model type, language and voice. If any parameter is not provided, the existing value will be retained. Args: model (TTSModels, optional): The Async TTS model to use. Defaults to "asyncflow_multilingual_v1.0". language (str, optional): The language code for synthesis. Defaults to "en". voice (str, optional): The voice ID. """ if is_given(model): self._opts.model = model if is_given(language): self._opts.language = language if is_given(voice): self._opts.voice = cast(Union[str, list[float]], voice) def stream( self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS ) -> SynthesizeStream: stream = SynthesizeStream(tts=self, conn_options=conn_options) self._streams.add(stream) return stream def synthesize( self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS ) -> tts.ChunkedStream: raise NotImplementedError("AsyncAI TTS supports streaming only; use tts.stream().") async def aclose(self) -> None: for stream in list(self._streams): await stream.aclose() self._streams.clear() await self._pool.aclose()Helper class that provides a standard way to create an ABC using inheritance.
Create a new instance of Async TTS.
See https://docs.async.ai/text-to-speech-websocket-3477526w0 for more details on the the Async API.
Args
model:TTSModels, optional- The Async TTS model to use. Defaults to "asyncflow_multilingual_v1.0".
language:str, optional- The language code for synthesis.
encoding:TTSEncoding, optional- The audio encoding format. Defaults to "pcm_s16le".
voice:str, optional- The voice ID.
sample_rate:int, optional- The audio sample rate in Hz. Defaults to 32000.
api_key:str, optional- The Async API key. If not provided, it will be read from the ASYNCAI_API_KEY environment variable.
http_session:aiohttp.ClientSession | None, optional- An existing aiohttp ClientSession to use. If not provided, a new session will be created.
tokenizer:tokenize.SentenceTokenizer, optional- The tokenizer to use. Defaults to
SentenceTokenizer. base_url:str, optional- The base URL for the Async API. Defaults to "https://api.async.ai".
Ancestors
- livekit.agents.tts.tts.TTS
- abc.ABC
- EventEmitter
- typing.Generic
Instance variables
prop model : str-
Expand source code
@property def model(self) -> str: return self._opts.modelGet the model name/identifier for this TTS instance.
Returns
The model name if available, "unknown" otherwise.
Note
Plugins should override this property to provide their model information.
prop provider : str-
Expand source code
@property def provider(self) -> str: return "AsyncAI"Get the provider name/identifier for this TTS instance.
Returns
The provider name if available, "unknown" otherwise.
Note
Plugins should override this property to provide their provider information.
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: for stream in list(self._streams): await stream.aclose() self._streams.clear() await self._pool.aclose() def prewarm(self) ‑> None-
Expand source code
def prewarm(self) -> None: self._pool.prewarm()Pre-warm connection to the TTS service
def stream(self,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.asyncai.tts.SynthesizeStream-
Expand source code
def stream( self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS ) -> SynthesizeStream: stream = SynthesizeStream(tts=self, conn_options=conn_options) self._streams.add(stream) return stream def synthesize(self,
text: str,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.agents.tts.tts.ChunkedStream-
Expand source code
def synthesize( self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS ) -> tts.ChunkedStream: raise NotImplementedError("AsyncAI TTS supports streaming only; use tts.stream().") def update_options(self,
*,
model: NotGivenOr[TTSModels | str] = NOT_GIVEN,
language: NotGivenOr[str] = NOT_GIVEN,
voice: NotGivenOr[str] = NOT_GIVEN) ‑> None-
Expand source code
def update_options( self, *, model: NotGivenOr[TTSModels | str] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, voice: NotGivenOr[str] = NOT_GIVEN, ) -> None: """ Update the Text-to-Speech (TTS) configuration options. This method allows updating the TTS settings, including model type, language and voice. If any parameter is not provided, the existing value will be retained. Args: model (TTSModels, optional): The Async TTS model to use. Defaults to "asyncflow_multilingual_v1.0". language (str, optional): The language code for synthesis. Defaults to "en". voice (str, optional): The voice ID. """ if is_given(model): self._opts.model = model if is_given(language): self._opts.language = language if is_given(voice): self._opts.voice = cast(Union[str, list[float]], voice)Update the Text-to-Speech (TTS) configuration options.
This method allows updating the TTS settings, including model type, language and voice. If any parameter is not provided, the existing value will be retained.
Args
model:TTSModels, optional- The Async TTS model to use. Defaults to "asyncflow_multilingual_v1.0".
language:str, optional- The language code for synthesis. Defaults to "en".
voice:str, optional- The voice ID.
Inherited members