Module livekit.agents.inference.tts
Classes
class CartesiaOptions (*args, **kwargs)
-
Expand source code
class CartesiaOptions(TypedDict, total=False): duration: float # max duration of audio in seconds speed: Literal["slow", "normal", "fast"] # default: not specified
dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)
Ancestors
- builtins.dict
Class variables
var duration : float
var speed : Literal['slow', 'normal', 'fast']
class ElevenlabsOptions (*args, **kwargs)
-
Expand source code
class ElevenlabsOptions(TypedDict, total=False): inactivity_timeout: int # default: 60 apply_text_normalization: Literal["auto", "off", "on"] # default: "auto"
dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)
Ancestors
- builtins.dict
Class variables
var apply_text_normalization : Literal['auto', 'off', 'on']
var inactivity_timeout : int
class InworldOptions (*args, **kwargs)
-
Expand source code
class InworldOptions(TypedDict, total=False): pass
dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)
Ancestors
- builtins.dict
class RimeOptions (*args, **kwargs)
-
Expand source code
class RimeOptions(TypedDict, total=False): pass
dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)
Ancestors
- builtins.dict
class SynthesizeStream (*,
tts: TTS,
conn_options: APIConnectOptions)-
Expand source code
class SynthesizeStream(tts.SynthesizeStream): """Streamed API using websockets""" def __init__(self, *, tts: TTS, conn_options: APIConnectOptions): super().__init__(tts=tts, conn_options=conn_options) self._tts: TTS = tts self._opts = replace(tts._opts) async def _run(self, output_emitter: tts.AudioEmitter) -> None: request_id = utils.shortuuid() output_emitter.initialize( request_id=request_id, sample_rate=self._opts.sample_rate, num_channels=1, stream=True, mime_type="audio/pcm", ) sent_tokenizer_stream = tokenize.basic.SentenceTokenizer().stream() async def _input_task() -> None: async for data in self._input_ch: if isinstance(data, self._FlushSentinel): sent_tokenizer_stream.flush() continue sent_tokenizer_stream.push_text(data) sent_tokenizer_stream.end_input() async def _sentence_stream_task(ws: aiohttp.ClientWebSocketResponse) -> None: base_pkt = { "type": "input_transcript", } async for ev in sent_tokenizer_stream: token_pkt = base_pkt.copy() token_pkt["transcript"] = ev.token + " " self._mark_started() await ws.send_str(json.dumps(token_pkt)) end_pkt = { "type": "session.flush", } await ws.send_str(json.dumps(end_pkt)) async def _recv_task(ws: aiohttp.ClientWebSocketResponse) -> None: current_session_id: str | None = None while True: msg = await ws.receive() if msg.type in ( aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE, aiohttp.WSMsgType.CLOSING, ): raise APIStatusError( "Gateway connection closed unexpectedly", request_id=request_id ) if msg.type != aiohttp.WSMsgType.TEXT: logger.warning("unexpected Gateway message type %s", msg.type) continue data: dict[str, Any] = json.loads(msg.data) session_id = data.get("session_id") if current_session_id is None and session_id is not None: current_session_id = session_id output_emitter.start_segment(segment_id=session_id) if data.get("type") == "session.created": pass elif data.get("type") == "output_audio": b64data = base64.b64decode(data["audio"]) output_emitter.push(b64data) elif data.get("type") == "done": output_emitter.end_input() break elif data.get("type") == "error": raise APIError(f"LiveKit TTS returned error: {msg.data}") else: logger.warning("unexpected message %s", data) try: async with self._tts._pool.connection(timeout=self._conn_options.timeout) as ws: tasks = [ asyncio.create_task(_input_task()), asyncio.create_task(_sentence_stream_task(ws)), asyncio.create_task(_recv_task(ws)), ] try: await asyncio.gather(*tasks) finally: await sent_tokenizer_stream.aclose() await utils.aio.gracefully_cancel(*tasks) except asyncio.TimeoutError: raise APITimeoutError() from None except aiohttp.ClientResponseError as e: raise APIStatusError( message=e.message, status_code=e.status, request_id=None, body=None ) from None except Exception as e: raise APIConnectionError() from e
Streamed API using websockets
Ancestors
- livekit.agents.tts.tts.SynthesizeStream
- abc.ABC
class TTS (model: NotGivenOr[TTSModels | str] = NOT_GIVEN,
*,
voice: NotGivenOr[str] = NOT_GIVEN,
language: NotGivenOr[str] = NOT_GIVEN,
encoding: NotGivenOr[TTSEncoding] = NOT_GIVEN,
sample_rate: NotGivenOr[int] = NOT_GIVEN,
base_url: NotGivenOr[str] = NOT_GIVEN,
api_key: NotGivenOr[str] = NOT_GIVEN,
api_secret: NotGivenOr[str] = NOT_GIVEN,
http_session: aiohttp.ClientSession | None = None,
extra_kwargs: NotGivenOr[dict[str, Any] | CartesiaOptions | ElevenlabsOptions | RimeOptions | InworldOptions] = NOT_GIVEN)-
Expand source code
class TTS(tts.TTS): @overload def __init__( self, model: CartesiaModels, *, voice: NotGivenOr[str] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, encoding: NotGivenOr[TTSEncoding] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, extra_kwargs: NotGivenOr[CartesiaOptions] = NOT_GIVEN, ) -> None: pass @overload def __init__( self, model: ElevenlabsModels, *, voice: NotGivenOr[str] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, encoding: NotGivenOr[TTSEncoding] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, extra_kwargs: NotGivenOr[ElevenlabsOptions] = NOT_GIVEN, ) -> None: pass @overload def __init__( self, model: RimeModels, *, voice: NotGivenOr[str] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, encoding: NotGivenOr[TTSEncoding] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, extra_kwargs: NotGivenOr[RimeOptions] = NOT_GIVEN, ) -> None: pass @overload def __init__( self, model: InworldModels, *, voice: NotGivenOr[str] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, encoding: NotGivenOr[TTSEncoding] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, extra_kwargs: NotGivenOr[InworldOptions] = NOT_GIVEN, ) -> None: pass @overload def __init__( self, model: NotGivenOr[str] = NOT_GIVEN, *, voice: NotGivenOr[str] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, encoding: NotGivenOr[TTSEncoding] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, extra_kwargs: NotGivenOr[dict[str, Any]] = NOT_GIVEN, ) -> None: pass def __init__( self, model: NotGivenOr[TTSModels | str] = NOT_GIVEN, # TODO: add a default model *, voice: NotGivenOr[str] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, encoding: NotGivenOr[TTSEncoding] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, extra_kwargs: NotGivenOr[ dict[str, Any] | CartesiaOptions | ElevenlabsOptions | RimeOptions | InworldOptions ] = NOT_GIVEN, ) -> None: """Livekit Cloud Inference TTS Args: model (TTSModels | str, optional): TTS model to use, in "provider/model[:voice_id]" format voice (str, optional): Voice to use, use a default one if not provided language (str, optional): Language of the TTS model. encoding (TTSEncoding, optional): Encoding of the TTS model. sample_rate (int, optional): Sample rate of the TTS model. base_url (str, optional): LIVEKIT_URL, if not provided, read from environment variable. api_key (str, optional): LIVEKIT_API_KEY, if not provided, read from environment variable. api_secret (str, optional): LIVEKIT_API_SECRET, if not provided, read from environment variable. http_session (aiohttp.ClientSession, optional): HTTP session to use. extra_kwargs (dict, optional): Extra kwargs to pass to the TTS model. """ sample_rate = sample_rate if is_given(sample_rate) else DEFAULT_SAMPLE_RATE super().__init__( capabilities=tts.TTSCapabilities(streaming=True, aligned_transcript=False), sample_rate=sample_rate, num_channels=1, ) lk_base_url = ( base_url if is_given(base_url) else os.environ.get("LIVEKIT_INFERENCE_URL", DEFAULT_BASE_URL) ) lk_api_key = ( api_key if is_given(api_key) else os.getenv("LIVEKIT_INFERENCE_API_KEY", os.getenv("LIVEKIT_API_KEY", "")) ) if not lk_api_key: raise ValueError( "api_key is required, either as argument or set LIVEKIT_API_KEY environmental variable" ) lk_api_secret = ( api_secret if is_given(api_secret) else os.getenv("LIVEKIT_INFERENCE_API_SECRET", os.getenv("LIVEKIT_API_SECRET", "")) ) if not lk_api_secret: raise ValueError( "api_secret is required, either as argument or set LIVEKIT_API_SECRET environmental variable" ) # read voice id from the model if provided: "provider/model:voice_id" if is_given(model) and (idx := model.rfind(":")) != -1: if is_given(voice) and voice != model[idx + 1 :]: logger.warning( "`voice` is provided via both argument and model, using the one from the argument", extra={"voice": voice, "model": model}, ) else: voice = model[idx + 1 :] model = model[:idx] self._opts = _TTSOptions( model=model, voice=voice, language=language, encoding=encoding if is_given(encoding) else DEFAULT_ENCODING, sample_rate=sample_rate, base_url=lk_base_url, api_key=lk_api_key, api_secret=lk_api_secret, extra_kwargs=dict(extra_kwargs) if is_given(extra_kwargs) else {}, ) self._session = http_session self._pool = utils.ConnectionPool[aiohttp.ClientWebSocketResponse]( connect_cb=self._connect_ws, close_cb=self._close_ws, max_session_duration=300, mark_refreshed_on_get=True, ) self._streams = weakref.WeakSet[SynthesizeStream]() async def _connect_ws(self, timeout: float) -> aiohttp.ClientWebSocketResponse: session = self._ensure_session() base_url = self._opts.base_url if base_url.startswith(("http://", "https://")): base_url = base_url.replace("http", "ws", 1) headers = { "Authorization": f"Bearer {create_access_token(self._opts.api_key, self._opts.api_secret)}", } ws = None try: ws = await asyncio.wait_for( session.ws_connect(f"{base_url}/tts", headers=headers), timeout ) except (aiohttp.ClientConnectorError, asyncio.TimeoutError) as e: if isinstance(e, aiohttp.ClientResponseError) and e.status == 429: raise APIStatusError("LiveKit TTS quota exceeded", status_code=e.status) from e raise APIConnectionError("failed to connect to LiveKit TTS") from e params = { "type": "session.create", "sample_rate": str(self._opts.sample_rate), "encoding": self._opts.encoding, "extra": self._opts.extra_kwargs, } if self._opts.voice: params["voice"] = self._opts.voice if self._opts.model: params["model"] = self._opts.model if self._opts.language: params["language"] = self._opts.language try: await ws.send_str(json.dumps(params)) except Exception as e: await ws.close() raise APIConnectionError("failed to send session.create message to LiveKit TTS") from e return ws async def _close_ws(self, ws: aiohttp.ClientWebSocketResponse) -> None: await ws.close() def _ensure_session(self) -> aiohttp.ClientSession: if not self._session: self._session = utils.http_context.http_session() return self._session def prewarm(self) -> None: self._pool.prewarm() def update_options( self, *, voice: NotGivenOr[str] = NOT_GIVEN, model: NotGivenOr[TTSModels | str] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, ) -> None: """ Args: voice (str, optional): Voice. model (TTSModels | str, optional): TTS model to use. language (str, optional): Language code for the TTS model. """ if is_given(model): self._opts.model = model if is_given(voice): self._opts.voice = voice if is_given(language): self._opts.language = language def synthesize( self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS ) -> tts.ChunkedStream: raise NotImplementedError("ChunkedStream is not implemented") def stream( self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS ) -> SynthesizeStream: stream = SynthesizeStream(tts=self, conn_options=conn_options) self._streams.add(stream) return stream async def aclose(self) -> None: for stream in list(self._streams): await stream.aclose() self._streams.clear() await self._pool.aclose()
Helper class that provides a standard way to create an ABC using inheritance.
Livekit Cloud Inference TTS
Args
model
:TTSModels | str
, optional- TTS model to use, in "provider/model[:voice_id]" format
voice
:str
, optional- Voice to use, use a default one if not provided
language
:str
, optional- Language of the TTS model.
encoding
:TTSEncoding
, optional- Encoding of the TTS model.
sample_rate
:int
, optional- Sample rate of the TTS model.
base_url
:str
, optional- LIVEKIT_URL, if not provided, read from environment variable.
api_key
:str
, optional- LIVEKIT_API_KEY, if not provided, read from environment variable.
api_secret
:str
, optional- LIVEKIT_API_SECRET, if not provided, read from environment variable.
http_session
:aiohttp.ClientSession
, optional- HTTP session to use.
extra_kwargs
:dict
, optional- Extra kwargs to pass to the TTS model.
Ancestors
- livekit.agents.tts.tts.TTS
- abc.ABC
- EventEmitter
- typing.Generic
Methods
async def aclose(self) ‑> None
-
Expand source code
async def aclose(self) -> None: for stream in list(self._streams): await stream.aclose() self._streams.clear() await self._pool.aclose()
def prewarm(self) ‑> None
-
Expand source code
def prewarm(self) -> None: self._pool.prewarm()
Pre-warm connection to the TTS service
def stream(self,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> SynthesizeStream-
Expand source code
def stream( self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS ) -> SynthesizeStream: stream = SynthesizeStream(tts=self, conn_options=conn_options) self._streams.add(stream) return stream
def synthesize(self,
text: str,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.agents.tts.tts.ChunkedStream-
Expand source code
def synthesize( self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS ) -> tts.ChunkedStream: raise NotImplementedError("ChunkedStream is not implemented")
def update_options(self,
*,
voice: NotGivenOr[str] = NOT_GIVEN,
model: NotGivenOr[TTSModels | str] = NOT_GIVEN,
language: NotGivenOr[str] = NOT_GIVEN) ‑> None-
Expand source code
def update_options( self, *, voice: NotGivenOr[str] = NOT_GIVEN, model: NotGivenOr[TTSModels | str] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, ) -> None: """ Args: voice (str, optional): Voice. model (TTSModels | str, optional): TTS model to use. language (str, optional): Language code for the TTS model. """ if is_given(model): self._opts.model = model if is_given(voice): self._opts.voice = voice if is_given(language): self._opts.language = language
Args
voice
:str
, optional- Voice.
model
:TTSModels | str
, optional- TTS model to use.
language
:str
, optional- Language code for the TTS model.
Inherited members