Module livekit.plugins.neuphonic
Neuphonic plugin for LiveKit Agents
See https://docs.livekit.io/agents/integrations/tts/neuphonic/ for more information.
Classes
class ChunkedStream (*,
tts: TTS,
input_text: str,
conn_options: APIConnectOptions)-
Expand source code
class ChunkedStream(tts.ChunkedStream): """Synthesize chunked text using the SSE endpoint""" def __init__( self, *, tts: TTS, input_text: str, conn_options: APIConnectOptions, ) -> None: super().__init__(tts=tts, input_text=input_text, conn_options=conn_options) self._tts: TTS = tts self._opts = replace(tts._opts) async def _run(self, output_emitter: tts.AudioEmitter) -> None: try: async with self._tts._ensure_session().post( f"{self._opts.base_url}/sse/speak/{self._opts.lang_code}", headers={API_AUTH_HEADER: self._opts.api_key}, json={ "text": self._input_text, "voice_id": self._opts.voice_id, "lang_code": self._opts.lang_code, "encoding": "pcm_linear", "sampling_rate": self._opts.sample_rate, "speed": self._opts.speed, }, timeout=aiohttp.ClientTimeout( total=30, sock_connect=self._conn_options.timeout, ), # large read_bufsize to avoid `ValueError: Chunk too big` read_bufsize=10 * 1024 * 1024, ) as resp: resp.raise_for_status() output_emitter.initialize( request_id=utils.shortuuid(), sample_rate=self._opts.sample_rate, num_channels=1, mime_type="audio/pcm", ) async for line in resp.content: message = line.decode("utf-8") if not message: continue parsed_message = _parse_sse_message(message) if ( parsed_message is not None and parsed_message.get("data", {}).get("audio") is not None ): audio_bytes = base64.b64decode(parsed_message["data"]["audio"]) output_emitter.push(audio_bytes) output_emitter.flush() except asyncio.TimeoutError: raise APITimeoutError() from None except aiohttp.ClientResponseError as e: raise APIStatusError( message=e.message, status_code=e.status, request_id=None, body=None ) from None except Exception as e: raise APIConnectionError() from eSynthesize chunked text using the SSE endpoint
Ancestors
- livekit.agents.tts.tts.ChunkedStream
- abc.ABC
class TTS (*,
api_key: str | None = None,
lang_code: TTSLangCodes | str = 'en',
encoding: str = 'pcm_linear',
voice_id: str = '8e9c4bc8-3979-48ab-8626-df53befc2090',
speed: float | None = 1.0,
sample_rate: int = 22050,
http_session: aiohttp.ClientSession | None = None,
word_tokenizer: NotGivenOr[tokenize.WordTokenizer] = NOT_GIVEN,
tokenizer: NotGivenOr[tokenize.SentenceTokenizer] = NOT_GIVEN,
base_url: str = 'https://api.neuphonic.com')-
Expand source code
class TTS(tts.TTS): def __init__( self, *, api_key: str | None = None, lang_code: TTSLangCodes | str = "en", encoding: str = "pcm_linear", voice_id: str = "8e9c4bc8-3979-48ab-8626-df53befc2090", speed: float | None = 1.0, sample_rate: int = 22050, http_session: aiohttp.ClientSession | None = None, word_tokenizer: NotGivenOr[tokenize.WordTokenizer] = NOT_GIVEN, tokenizer: NotGivenOr[tokenize.SentenceTokenizer] = NOT_GIVEN, base_url: str = "https://api.neuphonic.com", ) -> None: """ Create a new instance of NeuPhonic TTS. See https://docs.neuphonic.com for more details on the NeuPhonic API. Args: lang_code (TTSLangCodes | str, optional): The language code for synthesis. Defaults to "en". encoding (str, optional): The audio encoding format. Defaults to "pcm_linear". voice_id (str, optional): The voice ID for the desired voice. speed (float, optional): The audio playback speed. Defaults to 1.0. sample_rate (int, optional): The audio sample rate in Hz. Defaults to 22050. api_key (str, optional): The NeuPhonic API key. If not provided, it will be read from the NEUPHONIC_API_KEY environment variable. http_session (aiohttp.ClientSession | None, optional): An existing aiohttp ClientSession to use. If not provided, a new session will be created. word_tokenizer (tokenize.WordTokenizer, optional): The word tokenizer to use. Defaults to tokenize.basic.WordTokenizer(). tokenizer (tokenize.SentenceTokenizer, optional): The sentence tokenizer to use. Defaults to tokenize.blingfire.SentenceTokenizer(). base_url (str, optional): The base URL for the NeuPhonic API. Defaults to "https://api.neuphonic.com". """ # noqa: E501 super().__init__( capabilities=tts.TTSCapabilities(streaming=True), sample_rate=sample_rate, num_channels=1, ) neuphonic_api_key = api_key or os.environ.get("NEUPHONIC_API_KEY") if not neuphonic_api_key: raise ValueError("NEUPHONIC_API_KEY must be set") if not is_given(word_tokenizer): word_tokenizer = tokenize.basic.WordTokenizer(ignore_punctuation=False) self._opts = _TTSOptions( lang_code=lang_code, encoding=encoding, sample_rate=sample_rate, voice_id=voice_id, speed=speed, api_key=neuphonic_api_key, base_url=base_url, word_tokenizer=word_tokenizer, ) self._session = http_session self._pool = utils.ConnectionPool[aiohttp.ClientWebSocketResponse]( connect_cb=self._connect_ws, close_cb=self._close_ws, max_session_duration=300, mark_refreshed_on_get=True, ) self._streams = weakref.WeakSet[SynthesizeStream]() self._sentence_tokenizer = ( tokenizer if is_given(tokenizer) else tokenize.blingfire.SentenceTokenizer() ) async def _connect_ws(self, timeout: float) -> aiohttp.ClientWebSocketResponse: session = self._ensure_session() url = self._opts.get_ws_url( f"/speak/en?api_key={self._opts.api_key}&speed={self._opts.speed}&lang_code={self._opts.lang_code}&sampling_rate={self._opts.sample_rate}&voice_id={self._opts.voice_id}" ) headers = {API_AUTH_HEADER: self._opts.api_key} return await asyncio.wait_for(session.ws_connect(url, headers=headers), timeout) async def _close_ws(self, ws: aiohttp.ClientWebSocketResponse) -> None: await ws.close() @property def model(self) -> str: return "Octave" @property def provider(self) -> str: return "Neuphonic" def _ensure_session(self) -> aiohttp.ClientSession: if not self._session: self._session = utils.http_context.http_session() return self._session def prewarm(self) -> None: self._pool.prewarm() def update_options( self, *, lang_code: NotGivenOr[TTSLangCodes | str] = NOT_GIVEN, voice_id: NotGivenOr[str] = NOT_GIVEN, speed: NotGivenOr[float | None] = NOT_GIVEN, ) -> None: """ Update the Text-to-Speech (TTS) configuration options. This allows updating the TTS settings, including lang_code, voice_id, and speed. If any parameter is not provided, the existing value will be retained. Args: lang_code (TTSLangCodes | str, optional): The language code for synthesis. voice_id (str, optional): The voice ID for the desired voice. speed (float, optional): The audio playback speed. """ if is_given(lang_code): self._opts.lang_code = lang_code if is_given(voice_id): self._opts.voice_id = voice_id if is_given(speed): self._opts.speed = speed def synthesize( self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> ChunkedStream: return ChunkedStream(tts=self, input_text=text, conn_options=conn_options) def stream( self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS ) -> SynthesizeStream: stream = SynthesizeStream(tts=self, conn_options=conn_options) self._streams.add(stream) return stream async def aclose(self) -> None: for stream in list(self._streams): await stream.aclose() self._streams.clear() await self._pool.aclose()Helper class that provides a standard way to create an ABC using inheritance.
Create a new instance of NeuPhonic TTS.
See https://docs.neuphonic.com for more details on the NeuPhonic API.
Args
lang_code:TTSLangCodes | str, optional- The language code for synthesis. Defaults to "en".
encoding:str, optional- The audio encoding format. Defaults to "pcm_linear".
voice_id:str, optional- The voice ID for the desired voice.
speed:float, optional- The audio playback speed. Defaults to 1.0.
sample_rate:int, optional- The audio sample rate in Hz. Defaults to 22050.
api_key:str, optional- The NeuPhonic API key. If not provided, it will be read from the NEUPHONIC_API_KEY environment variable.
http_session:aiohttp.ClientSession | None, optional- An existing aiohttp ClientSession to use. If not provided, a new session will be created.
word_tokenizer:tokenize.WordTokenizer, optional- The word tokenizer to use. Defaults to tokenize.basic.WordTokenizer().
tokenizer:tokenize.SentenceTokenizer, optional- The sentence tokenizer to use. Defaults to tokenize.blingfire.SentenceTokenizer().
base_url:str, optional- The base URL for the NeuPhonic API. Defaults to "https://api.neuphonic.com".
Ancestors
- livekit.agents.tts.tts.TTS
- abc.ABC
- EventEmitter
- typing.Generic
Instance variables
prop model : str-
Expand source code
@property def model(self) -> str: return "Octave"Get the model name/identifier for this TTS instance.
Returns
The model name if available, "unknown" otherwise.
Note
Plugins should override this property to provide their model information.
prop provider : str-
Expand source code
@property def provider(self) -> str: return "Neuphonic"Get the provider name/identifier for this TTS instance.
Returns
The provider name if available, "unknown" otherwise.
Note
Plugins should override this property to provide their provider information.
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: for stream in list(self._streams): await stream.aclose() self._streams.clear() await self._pool.aclose() def prewarm(self) ‑> None-
Expand source code
def prewarm(self) -> None: self._pool.prewarm()Pre-warm connection to the TTS service
def stream(self,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.neuphonic.tts.SynthesizeStream-
Expand source code
def stream( self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS ) -> SynthesizeStream: stream = SynthesizeStream(tts=self, conn_options=conn_options) self._streams.add(stream) return stream def synthesize(self,
text: str,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.neuphonic.tts.ChunkedStream-
Expand source code
def synthesize( self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> ChunkedStream: return ChunkedStream(tts=self, input_text=text, conn_options=conn_options) def update_options(self,
*,
lang_code: NotGivenOr[TTSLangCodes | str] = NOT_GIVEN,
voice_id: NotGivenOr[str] = NOT_GIVEN,
speed: NotGivenOr[float | None] = NOT_GIVEN) ‑> None-
Expand source code
def update_options( self, *, lang_code: NotGivenOr[TTSLangCodes | str] = NOT_GIVEN, voice_id: NotGivenOr[str] = NOT_GIVEN, speed: NotGivenOr[float | None] = NOT_GIVEN, ) -> None: """ Update the Text-to-Speech (TTS) configuration options. This allows updating the TTS settings, including lang_code, voice_id, and speed. If any parameter is not provided, the existing value will be retained. Args: lang_code (TTSLangCodes | str, optional): The language code for synthesis. voice_id (str, optional): The voice ID for the desired voice. speed (float, optional): The audio playback speed. """ if is_given(lang_code): self._opts.lang_code = lang_code if is_given(voice_id): self._opts.voice_id = voice_id if is_given(speed): self._opts.speed = speedUpdate the Text-to-Speech (TTS) configuration options.
This allows updating the TTS settings, including lang_code, voice_id, and speed. If any parameter is not provided, the existing value will be retained.
Args
lang_code:TTSLangCodes | str, optional- The language code for synthesis.
voice_id:str, optional- The voice ID for the desired voice.
speed:float, optional- The audio playback speed.
Inherited members