Module livekit.plugins.camb.tts
Classes
class ChunkedStream (*,
tts: TTS,
input_text: str,
conn_options: APIConnectOptions)-
Expand source code
class ChunkedStream(tts.ChunkedStream): def __init__( self, *, tts: TTS, input_text: str, conn_options: APIConnectOptions, ) -> None: super().__init__(tts=tts, input_text=input_text, conn_options=conn_options) self._tts: TTS = tts self._opts = replace(tts._opts) async def _run(self, output_emitter: tts.AudioEmitter) -> None: logger.debug( f"Camb.ai TTS request: voice_id={self._opts.voice_id}, " f"model={self._opts.speech_model}, text_length={len(self._input_text)}" ) # Determine MIME type based on output format if self._opts.output_format in ("pcm_s16le", "pcm_s32le"): mime_type = "audio/pcm" elif self._opts.output_format == "wav": mime_type = "audio/wav" elif self._opts.output_format == "flac": mime_type = "audio/flac" else: # adts mime_type = "audio/aac" # Build request payload payload: dict = { "text": self._input_text, "voice_id": self._opts.voice_id, "language": self._opts.language, "speech_model": self._opts.speech_model, "enhance_named_entities_pronunciation": self._opts.enhance_named_entities, "output_configuration": { "format": self._opts.output_format, }, } if self._opts.user_instructions: payload["user_instructions"] = self._opts.user_instructions try: headers: dict[str, str] = {"Content-Type": "application/json"} if self._tts._api_key: headers[API_KEY_HEADER] = self._tts._api_key async with self._tts._ensure_session().post( f"{self._tts._base_url}/tts-stream", headers=headers, json=payload, timeout=aiohttp.ClientTimeout( total=60, sock_connect=self._conn_options.timeout, ), ) as resp: if resp.status != 200: content = await resp.text() raise APIStatusError( "Camb.ai TTS failed", status_code=resp.status, request_id=resp.headers.get("x-request-id"), body=content, ) output_emitter.initialize( request_id=utils.shortuuid(), sample_rate=self._tts._sample_rate, num_channels=NUM_CHANNELS, mime_type=mime_type, ) async for data, _ in resp.content.iter_chunks(): output_emitter.push(data) output_emitter.flush() except asyncio.TimeoutError as e: raise APITimeoutError() from e except aiohttp.ClientResponseError as e: raise APIStatusError( message=e.message, status_code=e.status, request_id=None, body=None, ) from e except Exception as e: if isinstance(e, (APIStatusError, APIConnectionError, APITimeoutError)): raise raise APIConnectionError() from eUsed by the non-streamed synthesize API, some providers support chunked http responses
Ancestors
- livekit.agents.tts.tts.ChunkedStream
- abc.ABC
class TTS (*,
api_key: str | None = None,
base_url: str = 'https://client.camb.ai/apis',
credentials_info: NotGivenOr[dict] = NOT_GIVEN,
credentials_file: NotGivenOr[str] = NOT_GIVEN,
voice_id: int = 147320,
language: str = 'en-us',
model: SpeechModel = 'mars-flash',
user_instructions: str | None = None,
output_format: OutputFormat = 'pcm_s16le',
enhance_named_entities: bool = False,
sample_rate: int | None = None,
http_session: aiohttp.ClientSession | None = None)-
Expand source code
class TTS(tts.TTS): def __init__( self, *, api_key: str | None = None, base_url: str = API_BASE_URL, credentials_info: NotGivenOr[dict] = NOT_GIVEN, # Future Vertex AI credentials_file: NotGivenOr[str] = NOT_GIVEN, # Future Vertex AI voice_id: int = DEFAULT_VOICE_ID, language: str = DEFAULT_LANGUAGE, model: SpeechModel = DEFAULT_MODEL, user_instructions: str | None = None, output_format: OutputFormat = DEFAULT_OUTPUT_FORMAT, enhance_named_entities: bool = False, sample_rate: int | None = None, http_session: aiohttp.ClientSession | None = None, ) -> None: """ Create a new instance of Camb.ai TTS. ``api_key`` must be set to your Camb.ai API key, either using the argument or by setting the ``CAMB_API_KEY`` environmental variable. Args: api_key: Camb.ai API key. If not provided, reads from CAMB_API_KEY env var. base_url: Camb.ai API base URL. credentials_info: GCP credentials dict for Vertex AI (future support). credentials_file: GCP credentials file path for Vertex AI (future support). voice_id: Voice ID to use. Use list_voices() to discover available voices. language: BCP-47 locale (e.g., 'en-us', 'fr-fr'). model: MARS model to use ('mars-flash', 'mars-pro', 'mars-instruct'). user_instructions: Style/tone guidance (3-1000 chars, requires mars-instruct). output_format: Audio output format (default: 'pcm_s16le'). enhance_named_entities: Enhanced pronunciation for named entities. sample_rate: Audio sample rate in Hz. If None, auto-detected from model. http_session: Optional aiohttp.ClientSession to reuse. """ resolved_sample_rate = sample_rate or MODEL_SAMPLE_RATES.get(model, 22050) super().__init__( capabilities=tts.TTSCapabilities(streaming=False), sample_rate=resolved_sample_rate, num_channels=NUM_CHANNELS, ) self._api_key = api_key or os.environ.get("CAMB_API_KEY") if not self._api_key: raise ValueError( "Camb.ai API key must be provided via api_key parameter or " "CAMB_API_KEY environment variable" ) if is_given(credentials_info) or is_given(credentials_file): logger.warning("Vertex AI credentials provided but not yet implemented - using API key") self._credentials_info = credentials_info self._credentials_file = credentials_file self._base_url = base_url self._session = http_session self._opts = _TTSOptions( voice_id=voice_id, language=language, speech_model=model, output_format=output_format, user_instructions=user_instructions, enhance_named_entities=enhance_named_entities, ) def _ensure_session(self) -> aiohttp.ClientSession: if not self._session: self._session = utils.http_context.http_session() return self._session @property def model(self) -> str: return self._opts.speech_model @property def provider(self) -> str: return "Camb.ai" def update_options( self, *, voice_id: int | None = None, language: str | None = None, model: SpeechModel | None = None, user_instructions: str | None = None, ) -> None: """Update TTS options dynamically.""" if voice_id is not None: self._opts.voice_id = voice_id if language is not None: self._opts.language = language if model is not None: self._opts.speech_model = model self._sample_rate = MODEL_SAMPLE_RATES.get(model, 22050) if user_instructions is not None: self._opts.user_instructions = user_instructions def synthesize( self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> ChunkedStream: return ChunkedStream(tts=self, input_text=text, conn_options=conn_options) async def aclose(self) -> None: passHelper class that provides a standard way to create an ABC using inheritance.
Create a new instance of Camb.ai TTS.
api_keymust be set to your Camb.ai API key, either using the argument or by setting theCAMB_API_KEYenvironmental variable.Args
api_key- Camb.ai API key. If not provided, reads from CAMB_API_KEY env var.
base_url- Camb.ai API base URL.
credentials_info- GCP credentials dict for Vertex AI (future support).
credentials_file- GCP credentials file path for Vertex AI (future support).
voice_id- Voice ID to use. Use list_voices() to discover available voices.
language- BCP-47 locale (e.g., 'en-us', 'fr-fr').
model- MARS model to use ('mars-flash', 'mars-pro', 'mars-instruct').
user_instructions- Style/tone guidance (3-1000 chars, requires mars-instruct).
output_format- Audio output format (default: 'pcm_s16le').
enhance_named_entities- Enhanced pronunciation for named entities.
sample_rate- Audio sample rate in Hz. If None, auto-detected from model.
http_session- Optional aiohttp.ClientSession to reuse.
Ancestors
- livekit.agents.tts.tts.TTS
- abc.ABC
- EventEmitter
- typing.Generic
Instance variables
prop model : str-
Expand source code
@property def model(self) -> str: return self._opts.speech_modelGet the model name/identifier for this TTS instance.
Returns
The model name if available, "unknown" otherwise.
Note
Plugins should override this property to provide their model information.
prop provider : str-
Expand source code
@property def provider(self) -> str: return "Camb.ai"Get the provider name/identifier for this TTS instance.
Returns
The provider name if available, "unknown" otherwise.
Note
Plugins should override this property to provide their provider information.
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: pass def synthesize(self,
text: str,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> ChunkedStream-
Expand source code
def synthesize( self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> ChunkedStream: return ChunkedStream(tts=self, input_text=text, conn_options=conn_options) def update_options(self,
*,
voice_id: int | None = None,
language: str | None = None,
model: SpeechModel | None = None,
user_instructions: str | None = None) ‑> None-
Expand source code
def update_options( self, *, voice_id: int | None = None, language: str | None = None, model: SpeechModel | None = None, user_instructions: str | None = None, ) -> None: """Update TTS options dynamically.""" if voice_id is not None: self._opts.voice_id = voice_id if language is not None: self._opts.language = language if model is not None: self._opts.speech_model = model self._sample_rate = MODEL_SAMPLE_RATES.get(model, 22050) if user_instructions is not None: self._opts.user_instructions = user_instructionsUpdate TTS options dynamically.
Inherited members