Module livekit.plugins.neuphonic

Classes

class ChunkedStream (*,
tts: TTS,
input_text: str,
opts: _TTSOptions,
session: aiohttp.ClientSession,
conn_options: APIConnectOptions)
Expand source code
class ChunkedStream(tts.ChunkedStream):
    """Synthesize chunked text using the SSE endpoint"""

    def __init__(
        self,
        *,
        tts: TTS,
        input_text: str,
        opts: _TTSOptions,
        session: aiohttp.ClientSession,
        conn_options: APIConnectOptions,
    ) -> None:
        super().__init__(tts=tts, input_text=input_text, conn_options=conn_options)
        self._opts, self._session = opts, session

    async def _run(self) -> None:
        request_id = utils.shortuuid()
        bstream = utils.audio.AudioByteStream(
            sample_rate=self._opts.sampling_rate, num_channels=NUM_CHANNELS
        )

        json_data = {
            "text": self._input_text,
            **self._opts.model_params,
        }

        headers = {
            AUTHORIZATION_HEADER: self._opts.api_key,
        }

        try:
            async with self._session.post(
                f"https://{self._opts.base_url}/sse/speak/{self._opts.lang_code}",
                headers=headers,
                json=json_data,
                timeout=aiohttp.ClientTimeout(
                    total=30,
                    sock_connect=self._conn_options.timeout,
                ),
                read_bufsize=10
                * 1024
                * 1024,  # large read_bufsize to avoid `ValueError: Chunk too big`
            ) as response:
                response.raise_for_status()
                emitter = tts.SynthesizedAudioEmitter(
                    event_ch=self._event_ch,
                    request_id=request_id,
                )

                async for line in response.content:
                    message = line.decode("utf-8").strip()
                    if message:
                        parsed_message = _parse_sse_message(message)

                        if (
                            parsed_message is not None
                            and parsed_message.get("data", {}).get("audio") is not None
                        ):
                            audio_bytes = base64.b64decode(parsed_message["data"]["audio"])

                            for frame in bstream.write(audio_bytes):
                                emitter.push(frame)

                for frame in bstream.flush():
                    emitter.push(frame)
                emitter.flush()
        except asyncio.TimeoutError as e:
            raise APITimeoutError() from e
        except aiohttp.ClientResponseError as e:
            raise APIStatusError(
                message=e.message,
                status_code=e.status,
                request_id=None,
                body=None,
            ) from e
        except Exception as e:
            raise APIConnectionError() from e

Synthesize chunked text using the SSE endpoint

Ancestors

  • livekit.agents.tts.tts.ChunkedStream
  • abc.ABC
class TTS (*,
model: TTSModels | str = 'neu_hq',
voice_id: NotGivenOr[str] = NOT_GIVEN,
lang_code: TTSLangCodes | str = 'en',
encoding: TTSEncodings | str = 'pcm_linear',
speed: float = 1.0,
sample_rate: int = 22050,
api_key: NotGivenOr[str] = NOT_GIVEN,
http_session: aiohttp.ClientSession | None = None,
base_url: str = 'api.neuphonic.com')
Expand source code
class TTS(tts.TTS):
    def __init__(
        self,
        *,
        model: TTSModels | str = "neu_hq",
        voice_id: NotGivenOr[str] = NOT_GIVEN,
        lang_code: TTSLangCodes | str = "en",
        encoding: TTSEncodings | str = "pcm_linear",
        speed: float = 1.0,
        sample_rate: int = 22050,
        api_key: NotGivenOr[str] = NOT_GIVEN,
        http_session: aiohttp.ClientSession | None = None,
        base_url: str = API_BASE_URL,
    ) -> None:
        """
        Create a new instance of the Neuphonic TTS.

        See https://docs.neuphonic.com for more documentation on all of these options, or go to https://app.neuphonic.com/ to test out different options.

        Args:
            model (TTSModels | str, optional): The Neuphonic model to use. See Defaults to "neu_hq".
            voice_id (str, optional): The voice ID for the desired voice. Defaults to None.
            lang_code (TTSLanguages | str, optional): The language code for synthesis. Defaults to "en".
            encoding (TTSEncodings | str, optional): The audio encoding format. Defaults to "pcm_mulaw".
            speed (float, optional): The audio playback speed. Defaults to 1.0.
            sample_rate (int, optional): The audio sample rate in Hz. Defaults to 22050.
            api_key (str | None, optional): The Neuphonic API key. If not provided, it will be read from the NEUPHONIC_API_TOKEN environment variable.
            http_session (aiohttp.ClientSession | None, optional): An existing aiohttp ClientSession to use. If not provided, a new session will be created.
            base_url (str, optional): The base URL for the Neuphonic API. Defaults to "api.neuphonic.com".
        """  # noqa: E501
        super().__init__(
            capabilities=tts.TTSCapabilities(streaming=True),
            sample_rate=sample_rate,
            num_channels=NUM_CHANNELS,
        )

        neuphonic_api_key = api_key if is_given(api_key) else os.environ.get("NEUPHONIC_API_TOKEN")
        if not neuphonic_api_key:
            raise ValueError("API key must be provided or set in NEUPHONIC_API_TOKEN")

        self._opts = _TTSOptions(
            model=model,
            voice_id=voice_id,
            lang_code=lang_code,
            encoding=encoding,
            speed=speed,
            sampling_rate=sample_rate,
            api_key=neuphonic_api_key,
            base_url=base_url,
        )

        self._session = http_session
        self._pool = utils.ConnectionPool[aiohttp.ClientWebSocketResponse](
            connect_cb=self._connect_ws,
            close_cb=self._close_ws,
            max_session_duration=90,
            mark_refreshed_on_get=True,
        )
        self._streams = weakref.WeakSet[SynthesizeStream]()

    async def _connect_ws(self) -> aiohttp.ClientWebSocketResponse:
        session = self._ensure_session()
        url = f"wss://{self._opts.base_url}/speak/{self._opts.lang_code}{self._opts.get_query_param_string()}"

        return await asyncio.wait_for(
            session.ws_connect(url, headers={AUTHORIZATION_HEADER: self._opts.api_key}),
            self._conn_options.timeout,
        )

    async def _close_ws(self, ws: aiohttp.ClientWebSocketResponse):
        await ws.close()

    def _ensure_session(self) -> aiohttp.ClientSession:
        if not self._session:
            self._session = utils.http_context.http_session()

        return self._session

    def prewarm(self) -> None:
        self._pool.prewarm()

    def update_options(
        self,
        *,
        model: NotGivenOr[TTSModels] = NOT_GIVEN,
        voice_id: NotGivenOr[str] = NOT_GIVEN,
        lang_code: NotGivenOr[TTSLangCodes] = NOT_GIVEN,
        encoding: NotGivenOr[TTSEncodings] = NOT_GIVEN,
        speed: NotGivenOr[float] = NOT_GIVEN,
        sample_rate: NotGivenOr[int] = NOT_GIVEN,
    ) -> None:
        """
        Update the Text-to-Speech (TTS) configuration options.

        This method allows updating the TTS settings, including model type, voice_id, lang_code,
        encoding, speed and sample_rate. If any parameter is not provided, the existing value will be
        retained.

        Args:
            model (TTSModels | str, optional): The Neuphonic model to use.
            voice_id (str, optional): The voice ID for the desired voice.
            lang_code (TTSLanguages | str, optional): The language code for synthesis..
            encoding (TTSEncodings | str, optional): The audio encoding format.
            speed (float, optional): The audio playback speed.
            sample_rate (int, optional): The audio sample rate in Hz.
        """  # noqa: E501
        if is_given(model):
            self._opts.model = model
        if is_given(voice_id):
            self._opts.voice_id = voice_id
        if is_given(lang_code):
            self._opts.lang_code = lang_code
        if is_given(encoding):
            self._opts.encoding = encoding
        if is_given(speed):
            self._opts.speed = speed
        if is_given(sample_rate):
            self._opts.sampling_rate = sample_rate
        self._pool.invalidate()

    def synthesize(
        self,
        text: str,
        *,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    ) -> ChunkedStream:
        return ChunkedStream(
            tts=self,
            input_text=text,
            conn_options=conn_options,
            opts=self._opts,
            session=self._ensure_session(),
        )

    def stream(
        self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS
    ) -> SynthesizeStream:
        stream = SynthesizeStream(
            tts=self,
            pool=self._pool,
            opts=self._opts,
        )
        self._streams.add(stream)
        return stream

    async def aclose(self) -> None:
        for stream in list(self._streams):
            await stream.aclose()
        self._streams.clear()
        await self._pool.aclose()
        await super().aclose()

Helper class that provides a standard way to create an ABC using inheritance.

Create a new instance of the Neuphonic TTS.

See https://docs.neuphonic.com for more documentation on all of these options, or go to https://app.neuphonic.com/ to test out different options.

Args

model : TTSModels | str, optional
The Neuphonic model to use. See Defaults to "neu_hq".
voice_id : str, optional
The voice ID for the desired voice. Defaults to None.
lang_code : TTSLanguages | str, optional
The language code for synthesis. Defaults to "en".
encoding : TTSEncodings | str, optional
The audio encoding format. Defaults to "pcm_mulaw".
speed : float, optional
The audio playback speed. Defaults to 1.0.
sample_rate : int, optional
The audio sample rate in Hz. Defaults to 22050.
api_key : str | None, optional
The Neuphonic API key. If not provided, it will be read from the NEUPHONIC_API_TOKEN environment variable.
http_session : aiohttp.ClientSession | None, optional
An existing aiohttp ClientSession to use. If not provided, a new session will be created.
base_url : str, optional
The base URL for the Neuphonic API. Defaults to "api.neuphonic.com".

Ancestors

  • livekit.agents.tts.tts.TTS
  • abc.ABC
  • EventEmitter
  • typing.Generic

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    for stream in list(self._streams):
        await stream.aclose()
    self._streams.clear()
    await self._pool.aclose()
    await super().aclose()
def prewarm(self) ‑> None
Expand source code
def prewarm(self) -> None:
    self._pool.prewarm()

Pre-warm connection to the TTS service

def stream(self,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.neuphonic.tts.SynthesizeStream
Expand source code
def stream(
    self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS
) -> SynthesizeStream:
    stream = SynthesizeStream(
        tts=self,
        pool=self._pool,
        opts=self._opts,
    )
    self._streams.add(stream)
    return stream
def synthesize(self,
text: str,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.neuphonic.tts.ChunkedStream
Expand source code
def synthesize(
    self,
    text: str,
    *,
    conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
) -> ChunkedStream:
    return ChunkedStream(
        tts=self,
        input_text=text,
        conn_options=conn_options,
        opts=self._opts,
        session=self._ensure_session(),
    )
def update_options(self,
*,
model: NotGivenOr[TTSModels] = NOT_GIVEN,
voice_id: NotGivenOr[str] = NOT_GIVEN,
lang_code: NotGivenOr[TTSLangCodes] = NOT_GIVEN,
encoding: NotGivenOr[TTSEncodings] = NOT_GIVEN,
speed: NotGivenOr[float] = NOT_GIVEN,
sample_rate: NotGivenOr[int] = NOT_GIVEN) ‑> None
Expand source code
def update_options(
    self,
    *,
    model: NotGivenOr[TTSModels] = NOT_GIVEN,
    voice_id: NotGivenOr[str] = NOT_GIVEN,
    lang_code: NotGivenOr[TTSLangCodes] = NOT_GIVEN,
    encoding: NotGivenOr[TTSEncodings] = NOT_GIVEN,
    speed: NotGivenOr[float] = NOT_GIVEN,
    sample_rate: NotGivenOr[int] = NOT_GIVEN,
) -> None:
    """
    Update the Text-to-Speech (TTS) configuration options.

    This method allows updating the TTS settings, including model type, voice_id, lang_code,
    encoding, speed and sample_rate. If any parameter is not provided, the existing value will be
    retained.

    Args:
        model (TTSModels | str, optional): The Neuphonic model to use.
        voice_id (str, optional): The voice ID for the desired voice.
        lang_code (TTSLanguages | str, optional): The language code for synthesis..
        encoding (TTSEncodings | str, optional): The audio encoding format.
        speed (float, optional): The audio playback speed.
        sample_rate (int, optional): The audio sample rate in Hz.
    """  # noqa: E501
    if is_given(model):
        self._opts.model = model
    if is_given(voice_id):
        self._opts.voice_id = voice_id
    if is_given(lang_code):
        self._opts.lang_code = lang_code
    if is_given(encoding):
        self._opts.encoding = encoding
    if is_given(speed):
        self._opts.speed = speed
    if is_given(sample_rate):
        self._opts.sampling_rate = sample_rate
    self._pool.invalidate()

Update the Text-to-Speech (TTS) configuration options.

This method allows updating the TTS settings, including model type, voice_id, lang_code, encoding, speed and sample_rate. If any parameter is not provided, the existing value will be retained.

Args

model : TTSModels | str, optional
The Neuphonic model to use.
voice_id : str, optional
The voice ID for the desired voice.
lang_code : TTSLanguages | str, optional
The language code for synthesis..
encoding : TTSEncodings | str, optional
The audio encoding format.
speed : float, optional
The audio playback speed.
sample_rate : int, optional
The audio sample rate in Hz.

Inherited members