Module livekit.plugins.neuphonic

Neuphonic plugin for LiveKit Agents

See https://docs.livekit.io/agents/integrations/tts/neuphonic/ for more information.

Classes

class ChunkedStream (*,
tts: TTS,
input_text: str,
conn_options: APIConnectOptions)
Expand source code
class ChunkedStream(tts.ChunkedStream):
    """Synthesize chunked text using the SSE endpoint"""

    def __init__(
        self,
        *,
        tts: TTS,
        input_text: str,
        conn_options: APIConnectOptions,
    ) -> None:
        super().__init__(tts=tts, input_text=input_text, conn_options=conn_options)
        self._tts: TTS = tts
        self._opts = replace(tts._opts)

    async def _run(self, output_emitter: tts.AudioEmitter) -> None:
        try:
            async with self._tts._ensure_session().post(
                f"{self._opts.base_url}/sse/speak/{self._opts.lang_code}",
                headers={API_AUTH_HEADER: self._opts.api_key},
                json={
                    "text": self._input_text,
                    "voice_id": self._opts.voice_id,
                    "lang_code": self._opts.lang_code,
                    "encoding": "pcm_linear",
                    "sampling_rate": self._opts.sample_rate,
                    "speed": self._opts.speed,
                },
                timeout=aiohttp.ClientTimeout(
                    total=30,
                    sock_connect=self._conn_options.timeout,
                ),
                # large read_bufsize to avoid `ValueError: Chunk too big`
                read_bufsize=10 * 1024 * 1024,
            ) as resp:
                resp.raise_for_status()

                output_emitter.initialize(
                    request_id=utils.shortuuid(),
                    sample_rate=self._opts.sample_rate,
                    num_channels=1,
                    mime_type="audio/pcm",
                )

                async for line in resp.content:
                    message = line.decode("utf-8")
                    if not message:
                        continue

                    parsed_message = _parse_sse_message(message)

                    if (
                        parsed_message is not None
                        and parsed_message.get("data", {}).get("audio") is not None
                    ):
                        audio_bytes = base64.b64decode(parsed_message["data"]["audio"])
                        output_emitter.push(audio_bytes)

                output_emitter.flush()
        except asyncio.TimeoutError:
            raise APITimeoutError() from None
        except aiohttp.ClientResponseError as e:
            raise APIStatusError(
                message=e.message, status_code=e.status, request_id=None, body=None
            ) from None
        except Exception as e:
            raise APIConnectionError() from e

Synthesize chunked text using the SSE endpoint

Ancestors

  • livekit.agents.tts.tts.ChunkedStream
  • abc.ABC
class TTS (*,
api_key: str | None = None,
lang_code: TTSLangCodes | str = 'en',
encoding: str = 'pcm_linear',
voice_id: str = '8e9c4bc8-3979-48ab-8626-df53befc2090',
speed: float | None = 1.0,
sample_rate: int = 22050,
http_session: aiohttp.ClientSession | None = None,
word_tokenizer: NotGivenOr[tokenize.WordTokenizer] = NOT_GIVEN,
tokenizer: NotGivenOr[tokenize.SentenceTokenizer] = NOT_GIVEN,
base_url: str = 'https://api.neuphonic.com')
Expand source code
class TTS(tts.TTS):
    def __init__(
        self,
        *,
        api_key: str | None = None,
        lang_code: TTSLangCodes | str = "en",
        encoding: str = "pcm_linear",
        voice_id: str = "8e9c4bc8-3979-48ab-8626-df53befc2090",
        speed: float | None = 1.0,
        sample_rate: int = 22050,
        http_session: aiohttp.ClientSession | None = None,
        word_tokenizer: NotGivenOr[tokenize.WordTokenizer] = NOT_GIVEN,
        tokenizer: NotGivenOr[tokenize.SentenceTokenizer] = NOT_GIVEN,
        base_url: str = "https://api.neuphonic.com",
    ) -> None:
        """
        Create a new instance of NeuPhonic TTS.

        See https://docs.neuphonic.com for more details on the NeuPhonic API.

        Args:
            lang_code (TTSLangCodes | str, optional): The language code for synthesis. Defaults to "en".
            encoding (str, optional): The audio encoding format. Defaults to "pcm_linear".
            voice_id (str, optional): The voice ID for the desired voice.
            speed (float, optional): The audio playback speed. Defaults to 1.0.
            sample_rate (int, optional): The audio sample rate in Hz. Defaults to 22050.
            api_key (str, optional): The NeuPhonic API key. If not provided, it will be read from the NEUPHONIC_API_KEY environment variable.
            http_session (aiohttp.ClientSession | None, optional): An existing aiohttp ClientSession to use. If not provided, a new session will be created.
            word_tokenizer (tokenize.WordTokenizer, optional): The word tokenizer to use. Defaults to tokenize.basic.WordTokenizer().
            tokenizer (tokenize.SentenceTokenizer, optional): The sentence tokenizer to use. Defaults to tokenize.blingfire.SentenceTokenizer().
            base_url (str, optional): The base URL for the NeuPhonic API. Defaults to "https://api.neuphonic.com".
        """  # noqa: E501

        super().__init__(
            capabilities=tts.TTSCapabilities(streaming=True),
            sample_rate=sample_rate,
            num_channels=1,
        )
        neuphonic_api_key = api_key or os.environ.get("NEUPHONIC_API_KEY")
        if not neuphonic_api_key:
            raise ValueError("NEUPHONIC_API_KEY must be set")

        if not is_given(word_tokenizer):
            word_tokenizer = tokenize.basic.WordTokenizer(ignore_punctuation=False)

        self._opts = _TTSOptions(
            lang_code=lang_code,
            encoding=encoding,
            sample_rate=sample_rate,
            voice_id=voice_id,
            speed=speed,
            api_key=neuphonic_api_key,
            base_url=base_url,
            word_tokenizer=word_tokenizer,
        )
        self._session = http_session
        self._pool = utils.ConnectionPool[aiohttp.ClientWebSocketResponse](
            connect_cb=self._connect_ws,
            close_cb=self._close_ws,
            max_session_duration=300,
            mark_refreshed_on_get=True,
        )
        self._streams = weakref.WeakSet[SynthesizeStream]()
        self._sentence_tokenizer = (
            tokenizer if is_given(tokenizer) else tokenize.blingfire.SentenceTokenizer()
        )

    async def _connect_ws(self, timeout: float) -> aiohttp.ClientWebSocketResponse:
        session = self._ensure_session()
        url = self._opts.get_ws_url(
            f"/speak/en?api_key={self._opts.api_key}&speed={self._opts.speed}&lang_code={self._opts.lang_code}&sampling_rate={self._opts.sample_rate}&voice_id={self._opts.voice_id}"
        )

        headers = {API_AUTH_HEADER: self._opts.api_key}
        return await asyncio.wait_for(session.ws_connect(url, headers=headers), timeout)

    async def _close_ws(self, ws: aiohttp.ClientWebSocketResponse) -> None:
        await ws.close()

    @property
    def model(self) -> str:
        return "Octave"

    @property
    def provider(self) -> str:
        return "Neuphonic"

    def _ensure_session(self) -> aiohttp.ClientSession:
        if not self._session:
            self._session = utils.http_context.http_session()

        return self._session

    def prewarm(self) -> None:
        self._pool.prewarm()

    def update_options(
        self,
        *,
        lang_code: NotGivenOr[TTSLangCodes | str] = NOT_GIVEN,
        voice_id: NotGivenOr[str] = NOT_GIVEN,
        speed: NotGivenOr[float | None] = NOT_GIVEN,
    ) -> None:
        """
        Update the Text-to-Speech (TTS) configuration options.

        This allows updating the TTS settings, including lang_code, voice_id, and speed.
        If any parameter is not provided, the existing value will be retained.

        Args:
            lang_code (TTSLangCodes | str, optional): The language code for synthesis.
            voice_id (str, optional): The voice ID for the desired voice.
            speed (float, optional): The audio playback speed.
        """
        if is_given(lang_code):
            self._opts.lang_code = lang_code
        if is_given(voice_id):
            self._opts.voice_id = voice_id
        if is_given(speed):
            self._opts.speed = speed

    def synthesize(
        self,
        text: str,
        *,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    ) -> ChunkedStream:
        return ChunkedStream(tts=self, input_text=text, conn_options=conn_options)

    def stream(
        self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS
    ) -> SynthesizeStream:
        stream = SynthesizeStream(tts=self, conn_options=conn_options)
        self._streams.add(stream)
        return stream

    async def aclose(self) -> None:
        for stream in list(self._streams):
            await stream.aclose()

        self._streams.clear()
        await self._pool.aclose()

Helper class that provides a standard way to create an ABC using inheritance.

Create a new instance of NeuPhonic TTS.

See https://docs.neuphonic.com for more details on the NeuPhonic API.

Args

lang_code : TTSLangCodes | str, optional
The language code for synthesis. Defaults to "en".
encoding : str, optional
The audio encoding format. Defaults to "pcm_linear".
voice_id : str, optional
The voice ID for the desired voice.
speed : float, optional
The audio playback speed. Defaults to 1.0.
sample_rate : int, optional
The audio sample rate in Hz. Defaults to 22050.
api_key : str, optional
The NeuPhonic API key. If not provided, it will be read from the NEUPHONIC_API_KEY environment variable.
http_session : aiohttp.ClientSession | None, optional
An existing aiohttp ClientSession to use. If not provided, a new session will be created.
word_tokenizer : tokenize.WordTokenizer, optional
The word tokenizer to use. Defaults to tokenize.basic.WordTokenizer().
tokenizer : tokenize.SentenceTokenizer, optional
The sentence tokenizer to use. Defaults to tokenize.blingfire.SentenceTokenizer().
base_url : str, optional
The base URL for the NeuPhonic API. Defaults to "https://api.neuphonic.com".

Ancestors

  • livekit.agents.tts.tts.TTS
  • abc.ABC
  • EventEmitter
  • typing.Generic

Instance variables

prop model : str
Expand source code
@property
def model(self) -> str:
    return "Octave"

Get the model name/identifier for this TTS instance.

Returns

The model name if available, "unknown" otherwise.

Note

Plugins should override this property to provide their model information.

prop provider : str
Expand source code
@property
def provider(self) -> str:
    return "Neuphonic"

Get the provider name/identifier for this TTS instance.

Returns

The provider name if available, "unknown" otherwise.

Note

Plugins should override this property to provide their provider information.

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    for stream in list(self._streams):
        await stream.aclose()

    self._streams.clear()
    await self._pool.aclose()
def prewarm(self) ‑> None
Expand source code
def prewarm(self) -> None:
    self._pool.prewarm()

Pre-warm connection to the TTS service

def stream(self,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.neuphonic.tts.SynthesizeStream
Expand source code
def stream(
    self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS
) -> SynthesizeStream:
    stream = SynthesizeStream(tts=self, conn_options=conn_options)
    self._streams.add(stream)
    return stream
def synthesize(self,
text: str,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.neuphonic.tts.ChunkedStream
Expand source code
def synthesize(
    self,
    text: str,
    *,
    conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
) -> ChunkedStream:
    return ChunkedStream(tts=self, input_text=text, conn_options=conn_options)
def update_options(self,
*,
lang_code: NotGivenOr[TTSLangCodes | str] = NOT_GIVEN,
voice_id: NotGivenOr[str] = NOT_GIVEN,
speed: NotGivenOr[float | None] = NOT_GIVEN) ‑> None
Expand source code
def update_options(
    self,
    *,
    lang_code: NotGivenOr[TTSLangCodes | str] = NOT_GIVEN,
    voice_id: NotGivenOr[str] = NOT_GIVEN,
    speed: NotGivenOr[float | None] = NOT_GIVEN,
) -> None:
    """
    Update the Text-to-Speech (TTS) configuration options.

    This allows updating the TTS settings, including lang_code, voice_id, and speed.
    If any parameter is not provided, the existing value will be retained.

    Args:
        lang_code (TTSLangCodes | str, optional): The language code for synthesis.
        voice_id (str, optional): The voice ID for the desired voice.
        speed (float, optional): The audio playback speed.
    """
    if is_given(lang_code):
        self._opts.lang_code = lang_code
    if is_given(voice_id):
        self._opts.voice_id = voice_id
    if is_given(speed):
        self._opts.speed = speed

Update the Text-to-Speech (TTS) configuration options.

This allows updating the TTS settings, including lang_code, voice_id, and speed. If any parameter is not provided, the existing value will be retained.

Args

lang_code : TTSLangCodes | str, optional
The language code for synthesis.
voice_id : str, optional
The voice ID for the desired voice.
speed : float, optional
The audio playback speed.

Inherited members