Module livekit.plugins.camb.tts

Classes

class ChunkedStream (*,
tts: TTS,
input_text: str,
conn_options: APIConnectOptions)
Expand source code
class ChunkedStream(tts.ChunkedStream):
    def __init__(
        self,
        *,
        tts: TTS,
        input_text: str,
        conn_options: APIConnectOptions,
    ) -> None:
        super().__init__(tts=tts, input_text=input_text, conn_options=conn_options)
        self._tts: TTS = tts
        self._opts = replace(tts._opts)

    async def _run(self, output_emitter: tts.AudioEmitter) -> None:
        logger.debug(
            f"Camb.ai TTS request: voice_id={self._opts.voice_id}, "
            f"model={self._opts.speech_model}, text_length={len(self._input_text)}"
        )

        # Determine MIME type based on output format
        if self._opts.output_format in ("pcm_s16le", "pcm_s32le"):
            mime_type = "audio/pcm"
        elif self._opts.output_format == "wav":
            mime_type = "audio/wav"
        elif self._opts.output_format == "flac":
            mime_type = "audio/flac"
        else:  # adts
            mime_type = "audio/aac"

        # Build request payload
        payload: dict = {
            "text": self._input_text,
            "voice_id": self._opts.voice_id,
            "language": self._opts.language,
            "speech_model": self._opts.speech_model,
            "enhance_named_entities_pronunciation": self._opts.enhance_named_entities,
            "output_configuration": {
                "format": self._opts.output_format,
            },
        }
        if self._opts.user_instructions:
            payload["user_instructions"] = self._opts.user_instructions

        try:
            headers: dict[str, str] = {"Content-Type": "application/json"}
            if self._tts._api_key:
                headers[API_KEY_HEADER] = self._tts._api_key

            async with self._tts._ensure_session().post(
                f"{self._tts._base_url}/tts-stream",
                headers=headers,
                json=payload,
                timeout=aiohttp.ClientTimeout(
                    total=60,
                    sock_connect=self._conn_options.timeout,
                ),
            ) as resp:
                if resp.status != 200:
                    content = await resp.text()
                    raise APIStatusError(
                        "Camb.ai TTS failed",
                        status_code=resp.status,
                        request_id=resp.headers.get("x-request-id"),
                        body=content,
                    )

                output_emitter.initialize(
                    request_id=utils.shortuuid(),
                    sample_rate=self._tts._sample_rate,
                    num_channels=NUM_CHANNELS,
                    mime_type=mime_type,
                )

                async for data, _ in resp.content.iter_chunks():
                    output_emitter.push(data)

                output_emitter.flush()

        except asyncio.TimeoutError as e:
            raise APITimeoutError() from e
        except aiohttp.ClientResponseError as e:
            raise APIStatusError(
                message=e.message,
                status_code=e.status,
                request_id=None,
                body=None,
            ) from e
        except Exception as e:
            if isinstance(e, (APIStatusError, APIConnectionError, APITimeoutError)):
                raise
            raise APIConnectionError() from e

Used by the non-streamed synthesize API, some providers support chunked http responses

Ancestors

  • livekit.agents.tts.tts.ChunkedStream
  • abc.ABC
class TTS (*,
api_key: str | None = None,
base_url: str = 'https://client.camb.ai/apis',
credentials_info: NotGivenOr[dict] = NOT_GIVEN,
credentials_file: NotGivenOr[str] = NOT_GIVEN,
voice_id: int = 147320,
language: str = 'en-us',
model: SpeechModel = 'mars-flash',
user_instructions: str | None = None,
output_format: OutputFormat = 'pcm_s16le',
enhance_named_entities: bool = False,
sample_rate: int | None = None,
http_session: aiohttp.ClientSession | None = None)
Expand source code
class TTS(tts.TTS):
    def __init__(
        self,
        *,
        api_key: str | None = None,
        base_url: str = API_BASE_URL,
        credentials_info: NotGivenOr[dict] = NOT_GIVEN,  # Future Vertex AI
        credentials_file: NotGivenOr[str] = NOT_GIVEN,  # Future Vertex AI
        voice_id: int = DEFAULT_VOICE_ID,
        language: str = DEFAULT_LANGUAGE,
        model: SpeechModel = DEFAULT_MODEL,
        user_instructions: str | None = None,
        output_format: OutputFormat = DEFAULT_OUTPUT_FORMAT,
        enhance_named_entities: bool = False,
        sample_rate: int | None = None,
        http_session: aiohttp.ClientSession | None = None,
    ) -> None:
        """
        Create a new instance of Camb.ai TTS.

        ``api_key`` must be set to your Camb.ai API key, either using the argument or by
        setting the ``CAMB_API_KEY`` environmental variable.

        Args:
            api_key: Camb.ai API key. If not provided, reads from CAMB_API_KEY env var.
            base_url: Camb.ai API base URL.
            credentials_info: GCP credentials dict for Vertex AI (future support).
            credentials_file: GCP credentials file path for Vertex AI (future support).
            voice_id: Voice ID to use. Use list_voices() to discover available voices.
            language: BCP-47 locale (e.g., 'en-us', 'fr-fr').
            model: MARS model to use ('mars-flash', 'mars-pro', 'mars-instruct').
            user_instructions: Style/tone guidance (3-1000 chars, requires mars-instruct).
            output_format: Audio output format (default: 'pcm_s16le').
            enhance_named_entities: Enhanced pronunciation for named entities.
            sample_rate: Audio sample rate in Hz. If None, auto-detected from model.
            http_session: Optional aiohttp.ClientSession to reuse.
        """
        resolved_sample_rate = sample_rate or MODEL_SAMPLE_RATES.get(model, 22050)

        super().__init__(
            capabilities=tts.TTSCapabilities(streaming=False),
            sample_rate=resolved_sample_rate,
            num_channels=NUM_CHANNELS,
        )

        self._api_key = api_key or os.environ.get("CAMB_API_KEY")
        if not self._api_key:
            raise ValueError(
                "Camb.ai API key must be provided via api_key parameter or "
                "CAMB_API_KEY environment variable"
            )

        if is_given(credentials_info) or is_given(credentials_file):
            logger.warning("Vertex AI credentials provided but not yet implemented - using API key")

        self._credentials_info = credentials_info
        self._credentials_file = credentials_file
        self._base_url = base_url
        self._session = http_session

        self._opts = _TTSOptions(
            voice_id=voice_id,
            language=language,
            speech_model=model,
            output_format=output_format,
            user_instructions=user_instructions,
            enhance_named_entities=enhance_named_entities,
        )

    def _ensure_session(self) -> aiohttp.ClientSession:
        if not self._session:
            self._session = utils.http_context.http_session()
        return self._session

    @property
    def model(self) -> str:
        return self._opts.speech_model

    @property
    def provider(self) -> str:
        return "Camb.ai"

    def update_options(
        self,
        *,
        voice_id: int | None = None,
        language: str | None = None,
        model: SpeechModel | None = None,
        user_instructions: str | None = None,
    ) -> None:
        """Update TTS options dynamically."""
        if voice_id is not None:
            self._opts.voice_id = voice_id
        if language is not None:
            self._opts.language = language
        if model is not None:
            self._opts.speech_model = model
            self._sample_rate = MODEL_SAMPLE_RATES.get(model, 22050)
        if user_instructions is not None:
            self._opts.user_instructions = user_instructions

    def synthesize(
        self,
        text: str,
        *,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    ) -> ChunkedStream:
        return ChunkedStream(tts=self, input_text=text, conn_options=conn_options)

    async def aclose(self) -> None:
        pass

Helper class that provides a standard way to create an ABC using inheritance.

Create a new instance of Camb.ai TTS.

api_key must be set to your Camb.ai API key, either using the argument or by setting the CAMB_API_KEY environmental variable.

Args

api_key
Camb.ai API key. If not provided, reads from CAMB_API_KEY env var.
base_url
Camb.ai API base URL.
credentials_info
GCP credentials dict for Vertex AI (future support).
credentials_file
GCP credentials file path for Vertex AI (future support).
voice_id
Voice ID to use. Use list_voices() to discover available voices.
language
BCP-47 locale (e.g., 'en-us', 'fr-fr').
model
MARS model to use ('mars-flash', 'mars-pro', 'mars-instruct').
user_instructions
Style/tone guidance (3-1000 chars, requires mars-instruct).
output_format
Audio output format (default: 'pcm_s16le').
enhance_named_entities
Enhanced pronunciation for named entities.
sample_rate
Audio sample rate in Hz. If None, auto-detected from model.
http_session
Optional aiohttp.ClientSession to reuse.

Ancestors

  • livekit.agents.tts.tts.TTS
  • abc.ABC
  • EventEmitter
  • typing.Generic

Instance variables

prop model : str
Expand source code
@property
def model(self) -> str:
    return self._opts.speech_model

Get the model name/identifier for this TTS instance.

Returns

The model name if available, "unknown" otherwise.

Note

Plugins should override this property to provide their model information.

prop provider : str
Expand source code
@property
def provider(self) -> str:
    return "Camb.ai"

Get the provider name/identifier for this TTS instance.

Returns

The provider name if available, "unknown" otherwise.

Note

Plugins should override this property to provide their provider information.

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    pass
def synthesize(self,
text: str,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> ChunkedStream
Expand source code
def synthesize(
    self,
    text: str,
    *,
    conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
) -> ChunkedStream:
    return ChunkedStream(tts=self, input_text=text, conn_options=conn_options)
def update_options(self,
*,
voice_id: int | None = None,
language: str | None = None,
model: SpeechModel | None = None,
user_instructions: str | None = None) ‑> None
Expand source code
def update_options(
    self,
    *,
    voice_id: int | None = None,
    language: str | None = None,
    model: SpeechModel | None = None,
    user_instructions: str | None = None,
) -> None:
    """Update TTS options dynamically."""
    if voice_id is not None:
        self._opts.voice_id = voice_id
    if language is not None:
        self._opts.language = language
    if model is not None:
        self._opts.speech_model = model
        self._sample_rate = MODEL_SAMPLE_RATES.get(model, 22050)
    if user_instructions is not None:
        self._opts.user_instructions = user_instructions

Update TTS options dynamically.

Inherited members