Module livekit.plugins.inworld
Inworld plugin for LiveKit Agents
See https://docs.livekit.io/agents/integrations/tts/inworld/ and https://docs.livekit.io/agents/models/stt/inworld/ for more information.
Classes
class ChunkedStream (*,
tts: TTS,
input_text: str,
conn_options: APIConnectOptions)-
Expand source code
class ChunkedStream(tts.ChunkedStream): def __init__(self, *, tts: TTS, input_text: str, conn_options: APIConnectOptions) -> None: super().__init__(tts=tts, input_text=input_text, conn_options=conn_options) self._tts: TTS = tts self._opts = replace(tts._opts) async def _run(self, output_emitter: tts.AudioEmitter) -> None: try: audio_config: dict[str, Any] = { "audioEncoding": self._opts.encoding, "bitrate": self._opts.bit_rate, "sampleRateHertz": self._opts.sample_rate, "temperature": self._opts.temperature, "speakingRate": self._opts.speaking_rate, } body_params: dict[str, Any] = { "text": self._input_text, "voiceId": self._opts.voice, "modelId": self._opts.model, "audioConfig": audio_config, } if utils.is_given(self._opts.timestamp_type): body_params["timestampType"] = self._opts.timestamp_type if utils.is_given(self._opts.text_normalization): body_params["applyTextNormalization"] = self._opts.text_normalization body_params["timestampTransportStrategy"] = self._opts.timestamp_transport_strategy x_request_id = str(uuid.uuid4()) async with self._tts._ensure_session().post( urljoin(self._tts._base_url, "/tts/v1/voice:stream"), headers={ "Authorization": self._tts._authorization, "X-User-Agent": USER_AGENT, "X-Request-Id": x_request_id, }, json=body_params, timeout=aiohttp.ClientTimeout(sock_connect=self._conn_options.timeout), # large read_bufsize to avoid `ValueError: Chunk too big` read_bufsize=10 * 1024 * 1024, ) as resp: resp.raise_for_status() request_id = utils.shortuuid() output_emitter.initialize( request_id=request_id, sample_rate=self._opts.sample_rate, num_channels=NUM_CHANNELS, mime_type=self._opts.mime_type, ) async for raw_line in resp.content: line = raw_line.strip() if not line: continue try: data = json.loads(line) except json.JSONDecodeError: logger.warning("failed to parse Inworld response line: %s", line) continue if result := data.get("result"): # Handle timestamp info if present if timestamp_info := result.get("timestampInfo"): timed_strings = _parse_timestamp_info(timestamp_info) if timed_strings: output_emitter.push_timed_transcript(timed_strings) if audio_content := result.get("audioContent"): output_emitter.push(base64.b64decode(audio_content)) if self._opts.encoding == "PCM": output_emitter.flush() elif error := data.get("error"): raise APIStatusError( message=error.get("message"), status_code=error.get("code"), request_id=x_request_id, body=None, ) except asyncio.TimeoutError: raise APITimeoutError() from None except aiohttp.ClientResponseError as e: raise APIStatusError( message=e.message, status_code=e.status, request_id=x_request_id, body=None ) from None except Exception as e: raise APIConnectionError() from eUsed by the non-streamed synthesize API, some providers support chunked http responses
Ancestors
- livekit.agents.tts.tts.ChunkedStream
- abc.ABC
class STT (*,
api_key: NotGivenOr[str] = NOT_GIVEN,
model: NotGivenOr[str] = NOT_GIVEN,
language: NotGivenOr[str] = NOT_GIVEN,
sample_rate: NotGivenOr[int] = NOT_GIVEN,
num_channels: NotGivenOr[int] = NOT_GIVEN,
enable_voice_profile: bool = True,
voice_profile_top_n: int = 1,
vad_threshold: NotGivenOr[float] = NOT_GIVEN,
min_end_of_turn_silence_when_confident: int = 200,
end_of_turn_confidence_threshold: float = 0.3,
base_url: str = 'https://api.inworld.ai/',
http_session: aiohttp.ClientSession | None = None)-
Expand source code
class STT(stt.STT): def __init__( self, *, api_key: NotGivenOr[str] = NOT_GIVEN, model: NotGivenOr[str] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, num_channels: NotGivenOr[int] = NOT_GIVEN, enable_voice_profile: bool = True, voice_profile_top_n: int = 1, vad_threshold: NotGivenOr[float] = NOT_GIVEN, min_end_of_turn_silence_when_confident: int = 200, end_of_turn_confidence_threshold: float = 0.3, base_url: str = DEFAULT_API_URL, http_session: aiohttp.ClientSession | None = None, ) -> None: """Create a new instance of Inworld STT. Args: api_key: Inworld API key. If not provided, reads from INWORLD_API_KEY env var. model: STT model identifier. Any model string supported by the Inworld STT API is accepted (e.g. "inworld/inworld-stt-1", "assemblyai/universal-streaming-multilingual", "soniox/stt-rt-v4"). Defaults to "inworld/inworld-stt-1". language: Language code. Defaults to "en-US". sample_rate: Audio sample rate in Hz. Defaults to 16000. num_channels: Number of audio channels. Defaults to 1. enable_voice_profile: Enable voice profiling (age, gender, emotion, accent). Defaults to True. voice_profile_top_n: Number of top voice profile results per category. vad_threshold: VAD sensitivity threshold. min_end_of_turn_silence_when_confident: Minimum silence (ms) before end-of-turn when confidence is high. Defaults to 200. end_of_turn_confidence_threshold: Confidence threshold for end-of-turn detection. Lower values trigger end-of-turn more eagerly. Defaults to 0.3. base_url: Base URL for the Inworld API. Defaults to "https://api.inworld.ai/". http_session: Optional aiohttp.ClientSession to use. """ super().__init__( capabilities=stt.STTCapabilities( streaming=True, interim_results=True, offline_recognize=False ), ) api_key = api_key if utils.is_given(api_key) else os.getenv("INWORLD_API_KEY", "") if not api_key: raise ValueError("Inworld API key required. Set INWORLD_API_KEY or provide api_key.") self._authorization = f"Basic {api_key}" self._base_url = base_url self._http_session = http_session self._streams: weakref.WeakSet[SpeechStream] = weakref.WeakSet() self._opts = _STTOptions( model=model if utils.is_given(model) else DEFAULT_MODEL, language=language if utils.is_given(language) else DEFAULT_LANGUAGE, sample_rate=sample_rate if utils.is_given(sample_rate) else DEFAULT_SAMPLE_RATE, num_channels=num_channels if utils.is_given(num_channels) else DEFAULT_NUM_CHANNELS, enable_voice_profile=enable_voice_profile, voice_profile_top_n=voice_profile_top_n, vad_threshold=vad_threshold, min_end_of_turn_silence_when_confident=min_end_of_turn_silence_when_confident, end_of_turn_confidence_threshold=end_of_turn_confidence_threshold, ) @property def model(self) -> str: return self._opts.model @property def provider(self) -> str: return "Inworld" def update_options( self, *, model: NotGivenOr[str] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, enable_voice_profile: NotGivenOr[bool] = NOT_GIVEN, voice_profile_top_n: NotGivenOr[int] = NOT_GIVEN, vad_threshold: NotGivenOr[float] = NOT_GIVEN, min_end_of_turn_silence_when_confident: NotGivenOr[int] = NOT_GIVEN, end_of_turn_confidence_threshold: NotGivenOr[float] = NOT_GIVEN, ) -> None: """Update STT options. Changes apply to new streams only. Args: model: STT model identifier (e.g. "inworld/inworld-stt-1", "assemblyai/universal-streaming-multilingual"). Any model string is accepted. language: Language code (e.g. "en-US"). enable_voice_profile: Enable voice profiling. voice_profile_top_n: Number of top voice profile results. vad_threshold: VAD sensitivity threshold. min_end_of_turn_silence_when_confident: Min silence (ms) for end-of-turn. end_of_turn_confidence_threshold: Confidence threshold for end-of-turn. """ if utils.is_given(model): self._opts.model = model if utils.is_given(language): self._opts.language = language if utils.is_given(enable_voice_profile): self._opts.enable_voice_profile = enable_voice_profile if utils.is_given(voice_profile_top_n): self._opts.voice_profile_top_n = voice_profile_top_n if utils.is_given(vad_threshold): self._opts.vad_threshold = vad_threshold if utils.is_given(min_end_of_turn_silence_when_confident): self._opts.min_end_of_turn_silence_when_confident = ( min_end_of_turn_silence_when_confident ) if utils.is_given(end_of_turn_confidence_threshold): self._opts.end_of_turn_confidence_threshold = end_of_turn_confidence_threshold def _ensure_session(self) -> aiohttp.ClientSession: if not self._http_session: self._http_session = utils.http_context.http_session() return self._http_session async def _recognize_impl( self, buffer: utils.AudioBuffer, *, language: NotGivenOr[str] = NOT_GIVEN, conn_options: APIConnectOptions, ) -> stt.SpeechEvent: raise NotImplementedError( "Inworld STT does not support batch recognition — use streaming via stream()" ) def stream( self, *, language: NotGivenOr[str] = NOT_GIVEN, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> SpeechStream: stream = SpeechStream( stt=self, conn_options=conn_options, language=language if utils.is_given(language) else self._opts.language, ) self._streams.add(stream) return streamHelper class that provides a standard way to create an ABC using inheritance.
Create a new instance of Inworld STT.
Args
api_key- Inworld API key. If not provided, reads from INWORLD_API_KEY env var.
model- STT model identifier. Any model string supported by the Inworld STT API is accepted (e.g. "inworld/inworld-stt-1", "assemblyai/universal-streaming-multilingual", "soniox/stt-rt-v4"). Defaults to "inworld/inworld-stt-1".
language- Language code. Defaults to "en-US".
sample_rate- Audio sample rate in Hz. Defaults to 16000.
num_channels- Number of audio channels. Defaults to 1.
enable_voice_profile- Enable voice profiling (age, gender, emotion, accent). Defaults to True.
voice_profile_top_n- Number of top voice profile results per category.
vad_threshold- VAD sensitivity threshold.
min_end_of_turn_silence_when_confident- Minimum silence (ms) before end-of-turn when confidence is high. Defaults to 200.
end_of_turn_confidence_threshold- Confidence threshold for end-of-turn detection. Lower values trigger end-of-turn more eagerly. Defaults to 0.3.
base_url- Base URL for the Inworld API. Defaults to "https://api.inworld.ai/".
http_session- Optional aiohttp.ClientSession to use.
Ancestors
- livekit.agents.stt.stt.STT
- abc.ABC
- EventEmitter
- typing.Generic
Instance variables
prop model : str-
Expand source code
@property def model(self) -> str: return self._opts.modelGet the model name/identifier for this STT instance.
Returns
The model name if available, "unknown" otherwise.
Note
Plugins should override this property to provide their model information.
prop provider : str-
Expand source code
@property def provider(self) -> str: return "Inworld"Get the provider name/identifier for this STT instance.
Returns
The provider name if available, "unknown" otherwise.
Note
Plugins should override this property to provide their provider information.
Methods
def stream(self,
*,
language: NotGivenOr[str] = NOT_GIVEN,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.inworld.stt.SpeechStream-
Expand source code
def stream( self, *, language: NotGivenOr[str] = NOT_GIVEN, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> SpeechStream: stream = SpeechStream( stt=self, conn_options=conn_options, language=language if utils.is_given(language) else self._opts.language, ) self._streams.add(stream) return stream def update_options(self,
*,
model: NotGivenOr[str] = NOT_GIVEN,
language: NotGivenOr[str] = NOT_GIVEN,
enable_voice_profile: NotGivenOr[bool] = NOT_GIVEN,
voice_profile_top_n: NotGivenOr[int] = NOT_GIVEN,
vad_threshold: NotGivenOr[float] = NOT_GIVEN,
min_end_of_turn_silence_when_confident: NotGivenOr[int] = NOT_GIVEN,
end_of_turn_confidence_threshold: NotGivenOr[float] = NOT_GIVEN) ‑> None-
Expand source code
def update_options( self, *, model: NotGivenOr[str] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, enable_voice_profile: NotGivenOr[bool] = NOT_GIVEN, voice_profile_top_n: NotGivenOr[int] = NOT_GIVEN, vad_threshold: NotGivenOr[float] = NOT_GIVEN, min_end_of_turn_silence_when_confident: NotGivenOr[int] = NOT_GIVEN, end_of_turn_confidence_threshold: NotGivenOr[float] = NOT_GIVEN, ) -> None: """Update STT options. Changes apply to new streams only. Args: model: STT model identifier (e.g. "inworld/inworld-stt-1", "assemblyai/universal-streaming-multilingual"). Any model string is accepted. language: Language code (e.g. "en-US"). enable_voice_profile: Enable voice profiling. voice_profile_top_n: Number of top voice profile results. vad_threshold: VAD sensitivity threshold. min_end_of_turn_silence_when_confident: Min silence (ms) for end-of-turn. end_of_turn_confidence_threshold: Confidence threshold for end-of-turn. """ if utils.is_given(model): self._opts.model = model if utils.is_given(language): self._opts.language = language if utils.is_given(enable_voice_profile): self._opts.enable_voice_profile = enable_voice_profile if utils.is_given(voice_profile_top_n): self._opts.voice_profile_top_n = voice_profile_top_n if utils.is_given(vad_threshold): self._opts.vad_threshold = vad_threshold if utils.is_given(min_end_of_turn_silence_when_confident): self._opts.min_end_of_turn_silence_when_confident = ( min_end_of_turn_silence_when_confident ) if utils.is_given(end_of_turn_confidence_threshold): self._opts.end_of_turn_confidence_threshold = end_of_turn_confidence_thresholdUpdate STT options. Changes apply to new streams only.
Args
model- STT model identifier (e.g. "inworld/inworld-stt-1", "assemblyai/universal-streaming-multilingual"). Any model string is accepted.
language- Language code (e.g. "en-US").
enable_voice_profile- Enable voice profiling.
voice_profile_top_n- Number of top voice profile results.
vad_threshold- VAD sensitivity threshold.
min_end_of_turn_silence_when_confident- Min silence (ms) for end-of-turn.
end_of_turn_confidence_threshold- Confidence threshold for end-of-turn.
Inherited members
class SpeechStream (*,
stt: STT,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0),
language: str = 'en-US')-
Expand source code
class SpeechStream(stt.SpeechStream): def __init__( self, *, stt: STT, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, language: str = DEFAULT_LANGUAGE, ) -> None: super().__init__(stt=stt, conn_options=conn_options, sample_rate=stt._opts.sample_rate) self._stt: STT = stt self._language = language self._ws: aiohttp.ClientWebSocketResponse | None = None self._reconnect_event = asyncio.Event() self._speaking = False self._request_id = "" self._audio_duration_collector: PeriodicCollector[float] = PeriodicCollector( callback=self._on_audio_duration_report, duration=5.0, ) def _ensure_session(self) -> aiohttp.ClientSession: if not self._stt._http_session: self._stt._http_session = utils.http_context.http_session() return self._stt._http_session def _build_transcribe_config(self) -> dict: opts = self._stt._opts config: dict = { "modelId": opts.model, "audioEncoding": "LINEAR16", "sampleRateHertz": opts.sample_rate, "numberOfChannels": opts.num_channels, "language": self._language, } if opts.enable_voice_profile: config["voiceProfileConfig"] = { "enableVoiceProfile": True, "topN": opts.voice_profile_top_n, } config["endOfTurnConfidenceThreshold"] = opts.end_of_turn_confidence_threshold inworld_v1_config: dict = { "minEndOfTurnSilenceWhenConfident": opts.min_end_of_turn_silence_when_confident, } if utils.is_given(opts.vad_threshold): inworld_v1_config["vadThreshold"] = opts.vad_threshold config["inworldSttV1Config"] = inworld_v1_config return config async def _connect_ws(self) -> aiohttp.ClientWebSocketResponse: ws_url = self._stt._base_url.replace("https://", "wss://").replace("http://", "ws://") ws_url = ws_url.rstrip("/") + "/" + WS_ENDPOINT ws = await asyncio.wait_for( self._ensure_session().ws_connect( ws_url, headers={"Authorization": self._stt._authorization}, ), timeout=self._conn_options.timeout, ) self._request_id = utils.shortuuid() await ws.send_str(json.dumps({"transcribeConfig": self._build_transcribe_config()})) logger.debug("Inworld STT WebSocket connection established") return ws async def _run(self) -> None: while True: try: ws = await self._connect_ws() self._ws = ws tasks = [ asyncio.create_task(self._send_audio_task()), asyncio.create_task(self._recv_messages_task()), ] wait_reconnect_task = asyncio.create_task(self._reconnect_event.wait()) tasks_group: asyncio.Future[Any] = asyncio.gather(*tasks) try: done, _ = await asyncio.wait( [tasks_group, wait_reconnect_task], return_when=asyncio.FIRST_COMPLETED, ) for task in done: if task != wait_reconnect_task: task.result() if wait_reconnect_task not in done: break self._reconnect_event.clear() finally: await utils.aio.gracefully_cancel(*tasks, wait_reconnect_task) tasks_group.cancel() tasks_group.exception() except asyncio.TimeoutError as e: logger.error(f"Timeout during Inworld STT connection: {e}") raise APITimeoutError("Timeout connecting to Inworld STT API") from e except aiohttp.ClientResponseError as e: logger.error(f"Inworld STT status error: {e.status} {e.message}") raise APIStatusError( message=e.message, status_code=e.status, request_id=None, body=None ) from e except aiohttp.ClientError as e: logger.error(f"Inworld STT connection error: {e}") raise APIConnectionError(f"Inworld STT connection error: {e}") from e except Exception as e: logger.exception(f"Unexpected error in Inworld STT: {e}") raise APIConnectionError(f"Unexpected error: {e}") from e finally: if self._ws is not None: await self._ws.close() self._ws = None async def _send_audio_task(self) -> None: if not self._ws: return async for data in self._input_ch: if isinstance(data, self._FlushSentinel): self._audio_duration_collector.flush() try: await self._ws.send_str(json.dumps({"endTurn": {}})) except Exception as e: logger.error(f"Error sending endTurn: {e}") break elif isinstance(data, rtc.AudioFrame): self._audio_duration_collector.push(data.duration) pcm_bytes = data.data.tobytes() audio_b64 = base64.b64encode(pcm_bytes).decode() try: await self._ws.send_str(json.dumps({"audioChunk": {"content": audio_b64}})) except Exception as e: logger.error(f"Error sending audio chunk: {e}") break self._audio_duration_collector.flush() # Input channel closed — tell the server to close the stream if self._ws: try: await self._ws.send_str(json.dumps({"closeStream": {}})) except Exception: pass def _on_audio_duration_report(self, duration: float) -> None: self._event_ch.send_nowait( stt.SpeechEvent( type=SpeechEventType.RECOGNITION_USAGE, request_id=self._request_id, alternatives=[], recognition_usage=stt.RecognitionUsage(audio_duration=duration), ) ) async def _recv_messages_task(self) -> None: if not self._ws: return try: async for msg in self._ws: if msg.type == aiohttp.WSMsgType.TEXT: try: data = json.loads(msg.data) except json.JSONDecodeError: continue self._process_stream_event(data) elif msg.type in ( aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE, aiohttp.WSMsgType.CLOSING, ): return elif msg.type == aiohttp.WSMsgType.ERROR: logger.error(f"Inworld STT WebSocket error: {self._ws.exception()}") return except aiohttp.ClientError as e: logger.error(f"WebSocket error while receiving: {e}") raise except Exception as e: logger.error(f"Unexpected error receiving messages: {e}") raise def _process_stream_event(self, data: dict) -> None: result = data.get("result", {}) if "speechStarted" in result and not self._speaking: self._speaking = True self._event_ch.send_nowait( stt.SpeechEvent( type=SpeechEventType.START_OF_SPEECH, request_id=self._request_id, ) ) return t = result.get("transcription", {}) if not t: return text = t.get("transcript", "") is_final = t.get("isFinal", False) voice_profile = t.get("voiceProfile") # An empty-text final (VAD false positive, unrecognizable noise) must still # reach the END_OF_SPEECH emission below — otherwise _speaking stays True # and subsequent speechStarted events are ignored, wedging the stream. if not text and not is_final: return if text: event_type = ( SpeechEventType.FINAL_TRANSCRIPT if is_final else SpeechEventType.INTERIM_TRANSCRIPT ) metadata = None if voice_profile: metadata = {"voice_profile": voice_profile} if is_final: logger.info(f"Inworld voice profile: {voice_profile}") self._event_ch.send_nowait( stt.SpeechEvent( type=event_type, request_id=self._request_id, alternatives=[ stt.SpeechData( text=text, language=LanguageCode(self._language), metadata=metadata, ) ], ) ) if is_final and self._speaking: self._speaking = False self._event_ch.send_nowait( stt.SpeechEvent( type=SpeechEventType.END_OF_SPEECH, request_id=self._request_id, ) )Helper class that provides a standard way to create an ABC using inheritance.
Args: sample_rate : int or None, optional The desired sample rate for the audio input. If specified, the audio input will be automatically resampled to match the given sample rate before being processed for Speech-to-Text. If not provided (None), the input will retain its original sample rate.
Ancestors
- livekit.agents.stt.stt.RecognizeStream
- abc.ABC
class SynthesizeStream (*,
tts: TTS,
conn_options: APIConnectOptions)-
Expand source code
class SynthesizeStream(tts.SynthesizeStream): def __init__(self, *, tts: TTS, conn_options: APIConnectOptions): super().__init__(tts=tts, conn_options=conn_options) self._tts: TTS = tts self._opts = replace(tts._opts) async def _run(self, output_emitter: tts.AudioEmitter) -> None: request_id = utils.shortuuid() sent_tokenizer_stream = self._tts._sentence_tokenizer.stream() output_emitter.initialize( request_id=request_id, sample_rate=self._opts.sample_rate, num_channels=NUM_CHANNELS, mime_type=self._opts.mime_type, stream=True, ) pool = await self._tts._get_pool() context_id, waiter, connection = await pool.acquire_context( emitter=output_emitter, opts=self._opts, timeout=self._conn_options.timeout, ) async def _input_task() -> None: async for data in self._input_ch: if isinstance(data, self._FlushSentinel): sent_tokenizer_stream.flush() continue sent_tokenizer_stream.push_text(data) sent_tokenizer_stream.end_input() async def _send_task() -> None: async for ev in sent_tokenizer_stream: text = ev.token # Chunk to stay within Inworld's 1000 char limit for i in range(0, len(text), 1000): connection.send_text(context_id, text[i : i + 1000]) self._mark_started() connection.flush_context(context_id) connection.close_context(context_id) tasks = [ asyncio.create_task(_input_task()), asyncio.create_task(_send_task()), ] try: await asyncio.wait_for(waiter, timeout=self._conn_options.timeout + 60) except asyncio.TimeoutError: connection.close_context(context_id) raise APITimeoutError() from None except asyncio.CancelledError: connection.close_context(context_id) raise except APIError: raise except Exception as e: logger.error("Inworld stream error", extra={"context_id": context_id, "error": e}) connection.close_context(context_id) raise APIConnectionError() from e finally: await utils.aio.gracefully_cancel(*tasks) await sent_tokenizer_stream.aclose() output_emitter.end_input()Helper class that provides a standard way to create an ABC using inheritance.
Ancestors
- livekit.agents.tts.tts.SynthesizeStream
- abc.ABC
class TTS (*,
api_key: NotGivenOr[str] = NOT_GIVEN,
voice: NotGivenOr[str] = NOT_GIVEN,
model: NotGivenOr[str] = NOT_GIVEN,
encoding: NotGivenOr[Encoding] = NOT_GIVEN,
bit_rate: NotGivenOr[int] = NOT_GIVEN,
sample_rate: NotGivenOr[int] = NOT_GIVEN,
speaking_rate: NotGivenOr[float] = NOT_GIVEN,
temperature: NotGivenOr[float] = NOT_GIVEN,
timestamp_type: NotGivenOr[TimestampType] = NOT_GIVEN,
text_normalization: NotGivenOr[TextNormalization] = NOT_GIVEN,
timestamp_transport_strategy: NotGivenOr[TimestampTransportStrategy] = NOT_GIVEN,
buffer_char_threshold: NotGivenOr[int] = NOT_GIVEN,
max_buffer_delay_ms: NotGivenOr[int] = NOT_GIVEN,
base_url: str = 'https://api.inworld.ai/',
ws_url: str = 'wss://api.inworld.ai/',
http_session: aiohttp.ClientSession | None = None,
tokenizer: NotGivenOr[tokenize.SentenceTokenizer] = NOT_GIVEN,
retain_format: NotGivenOr[bool] = NOT_GIVEN,
max_connections: int = 20,
idle_connection_timeout: float = 300.0)-
Expand source code
class TTS(tts.TTS): def __init__( self, *, api_key: NotGivenOr[str] = NOT_GIVEN, voice: NotGivenOr[str] = NOT_GIVEN, model: NotGivenOr[str] = NOT_GIVEN, encoding: NotGivenOr[Encoding] = NOT_GIVEN, bit_rate: NotGivenOr[int] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, speaking_rate: NotGivenOr[float] = NOT_GIVEN, temperature: NotGivenOr[float] = NOT_GIVEN, timestamp_type: NotGivenOr[TimestampType] = NOT_GIVEN, text_normalization: NotGivenOr[TextNormalization] = NOT_GIVEN, timestamp_transport_strategy: NotGivenOr[TimestampTransportStrategy] = NOT_GIVEN, buffer_char_threshold: NotGivenOr[int] = NOT_GIVEN, max_buffer_delay_ms: NotGivenOr[int] = NOT_GIVEN, base_url: str = DEFAULT_URL, ws_url: str = DEFAULT_WS_URL, http_session: aiohttp.ClientSession | None = None, tokenizer: NotGivenOr[tokenize.SentenceTokenizer] = NOT_GIVEN, retain_format: NotGivenOr[bool] = NOT_GIVEN, max_connections: int = DEFAULT_MAX_CONNECTIONS, idle_connection_timeout: float = DEFAULT_IDLE_CONNECTION_TIMEOUT, ) -> None: """ Create a new instance of Inworld TTS. Args: api_key (str, optional): The Inworld API key. If not provided, it will be read from the INWORLD_API_KEY environment variable. voice (str, optional): The voice to use. Defaults to "Ashley". model (str, optional): The Inworld model to use. Defaults to "inworld-tts-1.5-max". encoding (str, optional): The encoding to use. Defaults to "PCM". bit_rate (int, optional): Bits per second of the audio. Defaults to 64000. sample_rate (int, optional): The audio sample rate in Hz. Defaults to 24000. speaking_rate (float, optional): The speed of the voice, in the range [0.5, 1.5]. Defaults to 1.0. temperature (float, optional): Determines the degree of randomness when sampling audio tokens to generate the response. Range (0, 2]. Defaults to 1.0. timestamp_type (str, optional): Controls timestamp metadata returned with the audio. Use "WORD" for word-level timestamps or "CHARACTER" for character-level. Useful for karaoke-style captions, word highlighting, and lipsync. text_normalization (str, optional): Controls text normalization. When "ON", numbers, dates, and abbreviations are expanded (e.g., "Dr." -> "Doctor"). When "OFF", text is read exactly as written. Defaults to automatic. timestamp_transport_strategy (str, optional): Controls how timestamp info is transported relative to audio data. "SYNC" returns timestamps in the same message as audio data. "ASYNC" allows timestamps to return in trailing messages after the audio data. Defaults to "ASYNC". buffer_char_threshold (int, optional): For streaming, the minimum number of characters in the buffer that automatically triggers audio generation. Defaults to 1000. max_buffer_delay_ms (int, optional): For streaming, the maximum time in ms to buffer before starting generation. Defaults to 3000. base_url (str, optional): The base URL for the Inworld TTS API. Defaults to "https://api.inworld.ai/". ws_url (str, optional): The WebSocket URL for streaming TTS. Defaults to "wss://api.inworld.ai/". http_session (aiohttp.ClientSession, optional): The HTTP session to use. tokenizer (tokenize.SentenceTokenizer, optional): The tokenizer to use for streaming. Defaults to `livekit.agents.tokenize.blingfire.SentenceTokenizer`. retain_format (bool, optional): Whether to retain the format of the text when tokenizing. Defaults to True. max_connections (int, optional): Maximum number of concurrent WebSocket connections. Each connection supports up to 5 concurrent synthesis streams. Defaults to 20. idle_connection_timeout (float, optional): Time in seconds after which idle connections are closed. Defaults to 300 (5 minutes). """ if not is_given(sample_rate): sample_rate = DEFAULT_SAMPLE_RATE super().__init__( capabilities=tts.TTSCapabilities( streaming=True, aligned_transcript=is_given(timestamp_type) and timestamp_type != "TIMESTAMP_TYPE_UNSPECIFIED", ), sample_rate=sample_rate, num_channels=NUM_CHANNELS, ) key = api_key if is_given(api_key) else os.getenv("INWORLD_API_KEY") if not key: raise ValueError( "Inworld API key is required, either as argument or set" " INWORLD_API_KEY environment variable" ) self._authorization = f"Basic {key}" self._base_url = base_url self._ws_url = ws_url self._session = http_session if is_given(encoding): _validate_str_param(encoding, "encoding", Encoding) if is_given(timestamp_type): _validate_str_param(timestamp_type, "timestamp_type", TimestampType) if is_given(text_normalization): text_normalization = _resolve_text_normalization(text_normalization) if is_given(timestamp_transport_strategy): _validate_str_param( timestamp_transport_strategy, "timestamp_transport_strategy", TimestampTransportStrategy, ) self._opts = _TTSOptions( voice=voice if is_given(voice) else DEFAULT_VOICE, model=model if is_given(model) else DEFAULT_MODEL, encoding=encoding if is_given(encoding) else DEFAULT_ENCODING, bit_rate=bit_rate if is_given(bit_rate) else DEFAULT_BIT_RATE, sample_rate=sample_rate if is_given(sample_rate) else DEFAULT_SAMPLE_RATE, speaking_rate=speaking_rate if is_given(speaking_rate) else DEFAULT_SPEAKING_RATE, temperature=temperature if is_given(temperature) else DEFAULT_TEMPERATURE, timestamp_type=timestamp_type, text_normalization=text_normalization, timestamp_transport_strategy=timestamp_transport_strategy if is_given(timestamp_transport_strategy) else DEFAULT_TIMESTAMP_TRANSPORT_STRATEGY, buffer_char_threshold=buffer_char_threshold if is_given(buffer_char_threshold) else DEFAULT_BUFFER_CHAR_THRESHOLD, max_buffer_delay_ms=max_buffer_delay_ms if is_given(max_buffer_delay_ms) else DEFAULT_MAX_BUFFER_DELAY_MS, ) self._max_connections = max_connections self._idle_connection_timeout = idle_connection_timeout self._pool: _ConnectionPool | None = None self._pool_lock = asyncio.Lock() self._streams = weakref.WeakSet[SynthesizeStream]() self._sentence_tokenizer = ( tokenizer if is_given(tokenizer) else tokenize.blingfire.SentenceTokenizer( retain_format=retain_format if is_given(retain_format) else True ) ) @property def model(self) -> str: return self._opts.model @property def provider(self) -> str: return "Inworld" async def _get_pool(self) -> _ConnectionPool: """Get the connection pool, creating if needed.""" async with self._pool_lock: if self._pool is None or self._pool._closed: self._pool = _ConnectionPool( session=self._ensure_session(), ws_url=self._ws_url, authorization=self._authorization, max_connections=self._max_connections, idle_timeout=self._idle_connection_timeout, ) return self._pool def update_options( self, *, voice: NotGivenOr[str] = NOT_GIVEN, model: NotGivenOr[str] = NOT_GIVEN, encoding: NotGivenOr[Encoding] = NOT_GIVEN, bit_rate: NotGivenOr[int] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, speaking_rate: NotGivenOr[float] = NOT_GIVEN, temperature: NotGivenOr[float] = NOT_GIVEN, timestamp_type: NotGivenOr[TimestampType] = NOT_GIVEN, text_normalization: NotGivenOr[TextNormalization] = NOT_GIVEN, timestamp_transport_strategy: NotGivenOr[TimestampTransportStrategy] = NOT_GIVEN, buffer_char_threshold: NotGivenOr[int] = NOT_GIVEN, max_buffer_delay_ms: NotGivenOr[int] = NOT_GIVEN, ) -> None: """ Update the TTS configuration options. Args: voice (str, optional): The voice to use. model (str, optional): The Inworld model to use. encoding (str, optional): The encoding to use. bit_rate (int, optional): Bits per second of the audio. sample_rate (int, optional): The audio sample rate in Hz. speaking_rate (float, optional): The speed of the voice. temperature (float, optional): Determines the degree of randomness when sampling audio tokens to generate the response. timestamp_type (str, optional): Controls timestamp metadata ("WORD" or "CHARACTER"). text_normalization (str, optional): Controls text normalization ("ON" or "OFF"). timestamp_transport_strategy (str, optional): Controls timestamp transport strategy ("SYNC" or "ASYNC"). buffer_char_threshold (int, optional): For streaming, min characters before triggering. max_buffer_delay_ms (int, optional): For streaming, max time to buffer. """ if is_given(voice): self._opts.voice = voice if is_given(model): self._opts.model = model if is_given(encoding): _validate_str_param(encoding, "encoding", Encoding) self._opts.encoding = encoding if is_given(bit_rate): self._opts.bit_rate = bit_rate if is_given(sample_rate): self._opts.sample_rate = sample_rate if is_given(speaking_rate): self._opts.speaking_rate = speaking_rate if is_given(temperature): self._opts.temperature = temperature if is_given(timestamp_type): _validate_str_param(timestamp_type, "timestamp_type", TimestampType) self._opts.timestamp_type = timestamp_type if is_given(text_normalization): self._opts.text_normalization = _resolve_text_normalization(text_normalization) if is_given(timestamp_transport_strategy): _validate_str_param( timestamp_transport_strategy, "timestamp_transport_strategy", TimestampTransportStrategy, ) self._opts.timestamp_transport_strategy = timestamp_transport_strategy if is_given(buffer_char_threshold): self._opts.buffer_char_threshold = buffer_char_threshold if is_given(max_buffer_delay_ms): self._opts.max_buffer_delay_ms = max_buffer_delay_ms def _ensure_session(self) -> aiohttp.ClientSession: if not self._session: self._session = utils.http_context.http_session() return self._session def prewarm(self) -> None: asyncio.create_task(self._prewarm_impl()) async def _prewarm_impl(self) -> None: # Just ensure the pool is created - first acquire will establish a connection await self._get_pool() def synthesize( self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> tts.ChunkedStream: return ChunkedStream(tts=self, input_text=text, conn_options=conn_options) def stream( self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS ) -> SynthesizeStream: stream = SynthesizeStream(tts=self, conn_options=conn_options) self._streams.add(stream) return stream async def aclose(self) -> None: for stream in list(self._streams): await stream.aclose() self._streams.clear() if self._pool: await self._pool.aclose() self._pool = None async def list_voices(self, language: str | None = None) -> list[dict[str, Any]]: """ List all available voices in the workspace associated with the API key. Args: language (str, optional): ISO 639-1 language code to filter voices (e.g., 'en', 'es', 'fr'). """ url = urljoin(self._base_url, "tts/v1/voices") params = {} if language: params["filter"] = f"language={language}" async with self._ensure_session().get( url, headers={ "Authorization": self._authorization, "X-User-Agent": USER_AGENT, "X-Request-Id": str(uuid.uuid4()), }, params=params, ) as resp: if not resp.ok: error_body = await resp.json() raise APIStatusError( message=error_body.get("message"), status_code=resp.status, request_id=None, body=None, ) data = await resp.json() return cast(list[dict[str, Any]], data.get("voices", []))Helper class that provides a standard way to create an ABC using inheritance.
Create a new instance of Inworld TTS.
Args
api_key:str, optional- The Inworld API key. If not provided, it will be read from the INWORLD_API_KEY environment variable.
voice:str, optional- The voice to use. Defaults to "Ashley".
model:str, optional- The Inworld model to use. Defaults to "inworld-tts-1.5-max".
encoding:str, optional- The encoding to use. Defaults to "PCM".
bit_rate:int, optional- Bits per second of the audio. Defaults to 64000.
sample_rate:int, optional- The audio sample rate in Hz. Defaults to 24000.
speaking_rate:float, optional- The speed of the voice, in the range [0.5, 1.5]. Defaults to 1.0.
temperature:float, optional- Determines the degree of randomness when sampling audio tokens to generate the response. Range (0, 2]. Defaults to 1.0.
timestamp_type:str, optional- Controls timestamp metadata returned with the audio. Use "WORD" for word-level timestamps or "CHARACTER" for character-level. Useful for karaoke-style captions, word highlighting, and lipsync.
text_normalization:str, optional- Controls text normalization. When "ON", numbers, dates, and abbreviations are expanded (e.g., "Dr." -> "Doctor"). When "OFF", text is read exactly as written. Defaults to automatic.
timestamp_transport_strategy:str, optional- Controls how timestamp info is transported relative to audio data. "SYNC" returns timestamps in the same message as audio data. "ASYNC" allows timestamps to return in trailing messages after the audio data. Defaults to "ASYNC".
buffer_char_threshold:int, optional- For streaming, the minimum number of characters in the buffer that automatically triggers audio generation. Defaults to 1000.
max_buffer_delay_ms:int, optional- For streaming, the maximum time in ms to buffer before starting generation. Defaults to 3000.
base_url:str, optional- The base URL for the Inworld TTS API. Defaults to "https://api.inworld.ai/".
ws_url:str, optional- The WebSocket URL for streaming TTS. Defaults to "wss://api.inworld.ai/".
http_session:aiohttp.ClientSession, optional- The HTTP session to use.
tokenizer:tokenize.SentenceTokenizer, optional- The tokenizer to use for streaming.
Defaults to
SentenceTokenizer. retain_format:bool, optional- Whether to retain the format of the text when tokenizing. Defaults to True.
max_connections:int, optional- Maximum number of concurrent WebSocket connections. Each connection supports up to 5 concurrent synthesis streams. Defaults to 20.
idle_connection_timeout:float, optional- Time in seconds after which idle connections are closed. Defaults to 300 (5 minutes).
Ancestors
- livekit.agents.tts.tts.TTS
- abc.ABC
- EventEmitter
- typing.Generic
Instance variables
prop model : str-
Expand source code
@property def model(self) -> str: return self._opts.modelGet the model name/identifier for this TTS instance.
Returns
The model name if available, "unknown" otherwise.
Note
Plugins should override this property to provide their model information.
prop provider : str-
Expand source code
@property def provider(self) -> str: return "Inworld"Get the provider name/identifier for this TTS instance.
Returns
The provider name if available, "unknown" otherwise.
Note
Plugins should override this property to provide their provider information.
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: for stream in list(self._streams): await stream.aclose() self._streams.clear() if self._pool: await self._pool.aclose() self._pool = None async def list_voices(self, language: str | None = None) ‑> list[dict[str, typing.Any]]-
Expand source code
async def list_voices(self, language: str | None = None) -> list[dict[str, Any]]: """ List all available voices in the workspace associated with the API key. Args: language (str, optional): ISO 639-1 language code to filter voices (e.g., 'en', 'es', 'fr'). """ url = urljoin(self._base_url, "tts/v1/voices") params = {} if language: params["filter"] = f"language={language}" async with self._ensure_session().get( url, headers={ "Authorization": self._authorization, "X-User-Agent": USER_AGENT, "X-Request-Id": str(uuid.uuid4()), }, params=params, ) as resp: if not resp.ok: error_body = await resp.json() raise APIStatusError( message=error_body.get("message"), status_code=resp.status, request_id=None, body=None, ) data = await resp.json() return cast(list[dict[str, Any]], data.get("voices", []))List all available voices in the workspace associated with the API key.
Args
language:str, optional- ISO 639-1 language code to filter voices (e.g., 'en', 'es', 'fr').
def prewarm(self) ‑> None-
Expand source code
def prewarm(self) -> None: asyncio.create_task(self._prewarm_impl())Pre-warm connection to the TTS service
def stream(self,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.inworld.tts.SynthesizeStream-
Expand source code
def stream( self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS ) -> SynthesizeStream: stream = SynthesizeStream(tts=self, conn_options=conn_options) self._streams.add(stream) return stream def synthesize(self,
text: str,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.agents.tts.tts.ChunkedStream-
Expand source code
def synthesize( self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> tts.ChunkedStream: return ChunkedStream(tts=self, input_text=text, conn_options=conn_options) def update_options(self,
*,
voice: NotGivenOr[str] = NOT_GIVEN,
model: NotGivenOr[str] = NOT_GIVEN,
encoding: NotGivenOr[Encoding] = NOT_GIVEN,
bit_rate: NotGivenOr[int] = NOT_GIVEN,
sample_rate: NotGivenOr[int] = NOT_GIVEN,
speaking_rate: NotGivenOr[float] = NOT_GIVEN,
temperature: NotGivenOr[float] = NOT_GIVEN,
timestamp_type: NotGivenOr[TimestampType] = NOT_GIVEN,
text_normalization: NotGivenOr[TextNormalization] = NOT_GIVEN,
timestamp_transport_strategy: NotGivenOr[TimestampTransportStrategy] = NOT_GIVEN,
buffer_char_threshold: NotGivenOr[int] = NOT_GIVEN,
max_buffer_delay_ms: NotGivenOr[int] = NOT_GIVEN) ‑> None-
Expand source code
def update_options( self, *, voice: NotGivenOr[str] = NOT_GIVEN, model: NotGivenOr[str] = NOT_GIVEN, encoding: NotGivenOr[Encoding] = NOT_GIVEN, bit_rate: NotGivenOr[int] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, speaking_rate: NotGivenOr[float] = NOT_GIVEN, temperature: NotGivenOr[float] = NOT_GIVEN, timestamp_type: NotGivenOr[TimestampType] = NOT_GIVEN, text_normalization: NotGivenOr[TextNormalization] = NOT_GIVEN, timestamp_transport_strategy: NotGivenOr[TimestampTransportStrategy] = NOT_GIVEN, buffer_char_threshold: NotGivenOr[int] = NOT_GIVEN, max_buffer_delay_ms: NotGivenOr[int] = NOT_GIVEN, ) -> None: """ Update the TTS configuration options. Args: voice (str, optional): The voice to use. model (str, optional): The Inworld model to use. encoding (str, optional): The encoding to use. bit_rate (int, optional): Bits per second of the audio. sample_rate (int, optional): The audio sample rate in Hz. speaking_rate (float, optional): The speed of the voice. temperature (float, optional): Determines the degree of randomness when sampling audio tokens to generate the response. timestamp_type (str, optional): Controls timestamp metadata ("WORD" or "CHARACTER"). text_normalization (str, optional): Controls text normalization ("ON" or "OFF"). timestamp_transport_strategy (str, optional): Controls timestamp transport strategy ("SYNC" or "ASYNC"). buffer_char_threshold (int, optional): For streaming, min characters before triggering. max_buffer_delay_ms (int, optional): For streaming, max time to buffer. """ if is_given(voice): self._opts.voice = voice if is_given(model): self._opts.model = model if is_given(encoding): _validate_str_param(encoding, "encoding", Encoding) self._opts.encoding = encoding if is_given(bit_rate): self._opts.bit_rate = bit_rate if is_given(sample_rate): self._opts.sample_rate = sample_rate if is_given(speaking_rate): self._opts.speaking_rate = speaking_rate if is_given(temperature): self._opts.temperature = temperature if is_given(timestamp_type): _validate_str_param(timestamp_type, "timestamp_type", TimestampType) self._opts.timestamp_type = timestamp_type if is_given(text_normalization): self._opts.text_normalization = _resolve_text_normalization(text_normalization) if is_given(timestamp_transport_strategy): _validate_str_param( timestamp_transport_strategy, "timestamp_transport_strategy", TimestampTransportStrategy, ) self._opts.timestamp_transport_strategy = timestamp_transport_strategy if is_given(buffer_char_threshold): self._opts.buffer_char_threshold = buffer_char_threshold if is_given(max_buffer_delay_ms): self._opts.max_buffer_delay_ms = max_buffer_delay_msUpdate the TTS configuration options.
Args
voice:str, optional- The voice to use.
model:str, optional- The Inworld model to use.
encoding:str, optional- The encoding to use.
bit_rate:int, optional- Bits per second of the audio.
sample_rate:int, optional- The audio sample rate in Hz.
speaking_rate:float, optional- The speed of the voice.
temperature:float, optional- Determines the degree of randomness when sampling audio tokens to generate the response.
timestamp_type:str, optional- Controls timestamp metadata ("WORD" or "CHARACTER").
text_normalization:str, optional- Controls text normalization ("ON" or "OFF").
timestamp_transport_strategy:str, optional- Controls timestamp transport strategy ("SYNC" or "ASYNC").
buffer_char_threshold:int, optional- For streaming, min characters before triggering.
max_buffer_delay_ms:int, optional- For streaming, max time to buffer.
Inherited members