Module livekit.plugins.groq.tts

Classes

class ChunkedStream (*,
tts: TTS,
input_text: str,
conn_options: APIConnectOptions,
opts: _TTSOptions,
session: aiohttp.ClientSession,
segment_id: NotGivenOr[str])
Expand source code
class ChunkedStream(tts.ChunkedStream):
    def __init__(
        self,
        *,
        tts: TTS,
        input_text: str,
        conn_options: APIConnectOptions,
        opts: _TTSOptions,
        session: aiohttp.ClientSession,
        segment_id: NotGivenOr[str],
    ) -> None:
        super().__init__(tts=tts, input_text=input_text, conn_options=conn_options)
        self._opts = opts
        self._session = session
        self._segment_id = segment_id if is_given(segment_id) else None

    async def _run(self) -> None:
        request_id = utils.shortuuid()
        headers = {
            "Authorization": f"Bearer {self._opts.api_key}",
            "Content-Type": "application/json",
        }
        payload = {
            "model": self._opts.model,
            "voice": self._opts.voice,
            "input": self._input_text,
            "response_format": "wav",
        }

        decoder = utils.codecs.AudioStreamDecoder(
            sample_rate=SAMPLE_RATE,
            num_channels=NUM_CHANNELS,
        )

        decode_task: asyncio.Task | None = None
        api_url = f"{self._opts.base_url}/audio/speech"
        try:
            async with self._session.post(api_url, headers=headers, json=payload) as response:
                if not response.content_type.startswith("audio"):
                    content = await response.text()
                    logger.error("Groq returned non-audio data: %s", content)
                    return

                async def _decode_loop():
                    try:
                        async for bytes_data, _ in response.content.iter_chunks():
                            decoder.push(bytes_data)
                    finally:
                        decoder.end_input()

                decode_task = asyncio.create_task(_decode_loop())
                emitter = tts.SynthesizedAudioEmitter(
                    event_ch=self._event_ch,
                    request_id=request_id,
                    segment_id=self._segment_id,
                )
                async for frame in decoder:
                    emitter.push(frame)
                emitter.flush()

        except asyncio.TimeoutError as e:
            raise APITimeoutError() from e
        except aiohttp.ClientResponseError as e:
            raise APIStatusError(
                message=e.message,
                status_code=e.status,
                request_id=request_id,
                body=None,
            ) from e
        except Exception as e:
            raise APIConnectionError() from e
        finally:
            if decode_task:
                await utils.aio.gracefully_cancel(decode_task)
            await decoder.aclose()

Used by the non-streamed synthesize API, some providers support chunked http responses

Ancestors

  • livekit.agents.tts.tts.ChunkedStream
  • abc.ABC
class TTS (*,
base_url: NotGivenOr[str] = NOT_GIVEN,
model: TTSModels | str = 'playai-tts',
voice: TTSVoices | str = 'Arista-PlayAI',
api_key: NotGivenOr[str] = NOT_GIVEN,
http_session: aiohttp.ClientSession | None = None)
Expand source code
class TTS(tts.TTS):
    def __init__(
        self,
        *,
        base_url: NotGivenOr[str] = NOT_GIVEN,
        model: TTSModels | str = "playai-tts",
        voice: TTSVoices | str = "Arista-PlayAI",
        api_key: NotGivenOr[str] = NOT_GIVEN,
        http_session: aiohttp.ClientSession | None = None,
    ) -> None:
        """
        Create a new instance of Groq TTS.

        if `api_key` is not provided, it will be read from the ``GROQ_API_KEY``
        environmental variable.

        Args:
            model (SpeechModels | str, optional): Model to use. Default is "playai-tts".
            voice (SpeechVoices | str, optional): Voice to use. Default is "Autumn-PlayAI".
            api_key (str | None, optional): API key to use. Default is None.
        """

        super().__init__(
            capabilities=tts.TTSCapabilities(
                streaming=False,
            ),
            sample_rate=SAMPLE_RATE,
            num_channels=1,
        )

        self._session = http_session

        if not base_url:
            base_url = DEFAULT_BASE_URL

        groq_api_key = api_key if is_given(api_key) else os.getenv("GROQ_API_KEY")
        if not groq_api_key:
            raise ValueError("GROQ_API_KEY is not set")

        self._opts = _TTSOptions(
            model=model,
            voice=voice,
            api_key=groq_api_key,
            base_url=base_url,
        )

    def _ensure_session(self) -> aiohttp.ClientSession:
        if not self._session:
            self._session = utils.http_context.http_session()

        return self._session

    def update_options(
        self,
        *,
        model: NotGivenOr[TTSModels] = NOT_GIVEN,
        voice: NotGivenOr[TTSVoices] = NOT_GIVEN,
    ) -> None:
        """
        Update the TTS options.

        Args:
            model (SpeechModels | str, optional): Model to use. Default is None.
            voice (SpeechVoices | str, optional): Voice to use. Default is None.
        """
        if is_given(model):
            self._opts.model = model
        if is_given(voice):
            self._opts.voice = voice

    def synthesize(
        self,
        text: str,
        *,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
        segment_id: NotGivenOr[str] = NOT_GIVEN,
    ) -> ChunkedStream:
        return ChunkedStream(
            tts=self,
            input_text=text,
            conn_options=conn_options,
            opts=self._opts,
            session=self._ensure_session(),
            segment_id=segment_id,
        )

Helper class that provides a standard way to create an ABC using inheritance.

Create a new instance of Groq TTS.

if api_key is not provided, it will be read from the GROQ_API_KEY environmental variable.

Args

model : SpeechModels | str, optional
Model to use. Default is "playai-tts".
voice : SpeechVoices | str, optional
Voice to use. Default is "Autumn-PlayAI".
api_key : str | None, optional
API key to use. Default is None.

Ancestors

  • livekit.agents.tts.tts.TTS
  • abc.ABC
  • EventEmitter
  • typing.Generic

Methods

def synthesize(self,
text: str,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0),
segment_id: NotGivenOr[str] = NOT_GIVEN) ‑> ChunkedStream
Expand source code
def synthesize(
    self,
    text: str,
    *,
    conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    segment_id: NotGivenOr[str] = NOT_GIVEN,
) -> ChunkedStream:
    return ChunkedStream(
        tts=self,
        input_text=text,
        conn_options=conn_options,
        opts=self._opts,
        session=self._ensure_session(),
        segment_id=segment_id,
    )
def update_options(self,
*,
model: NotGivenOr[TTSModels] = NOT_GIVEN,
voice: NotGivenOr[TTSVoices] = NOT_GIVEN) ‑> None
Expand source code
def update_options(
    self,
    *,
    model: NotGivenOr[TTSModels] = NOT_GIVEN,
    voice: NotGivenOr[TTSVoices] = NOT_GIVEN,
) -> None:
    """
    Update the TTS options.

    Args:
        model (SpeechModels | str, optional): Model to use. Default is None.
        voice (SpeechVoices | str, optional): Voice to use. Default is None.
    """
    if is_given(model):
        self._opts.model = model
    if is_given(voice):
        self._opts.voice = voice

Update the TTS options.

Args

model : SpeechModels | str, optional
Model to use. Default is None.
voice : SpeechVoices | str, optional
Voice to use. Default is None.

Inherited members