Module livekit.plugins.tavus.api
Classes
class TavusAPI (api_key: str | livekit.agents.types.NotGiven = NOT_GIVEN,
api_url: str | livekit.agents.types.NotGiven = NOT_GIVEN,
*,
conn_options: livekit.agents.types.APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0),
session: aiohttp.client.ClientSession | None = None)-
Expand source code
class TavusAPI: def __init__( self, api_key: NotGivenOr[str] = NOT_GIVEN, api_url: NotGivenOr[str] = NOT_GIVEN, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, session: Optional[aiohttp.ClientSession] = None, ) -> None: tavus_api_key = api_key or os.getenv("TAVUS_API_KEY") if tavus_api_key is None: raise TavusException("TAVUS_API_KEY must be set") self._api_key = tavus_api_key self._api_url = api_url or DEFAULT_API_URL self._conn_options = conn_options self._session = session or aiohttp.ClientSession() async def create_conversation( self, *, replica_id: NotGivenOr[str] = NOT_GIVEN, persona_id: NotGivenOr[str] = NOT_GIVEN, properties: NotGivenOr[dict[str, Any]] = NOT_GIVEN, extra_payload: NotGivenOr[dict[str, Any]] = NOT_GIVEN, ) -> str: replica_id = replica_id or (os.getenv("TAVUS_REPLICA_ID") or NOT_GIVEN) if not replica_id: raise TavusException("TAVUS_REPLICA_ID must be set") persona_id = persona_id or (os.getenv("TAVUS_PERSONA_ID") or NOT_GIVEN) if not persona_id: # create a persona if not provided persona_id = await self.create_persona() properties = properties or {} payload = { "replica_id": replica_id, "persona_id": persona_id, "properties": properties, } if utils.is_given(extra_payload): payload.update(extra_payload) if "conversation_name" not in payload: payload["conversation_name"] = utils.shortuuid("lk_conversation_") response_data = await self._post("conversations", payload) return response_data["conversation_id"] # type: ignore async def create_persona( self, name: NotGivenOr[str] = NOT_GIVEN, *, extra_payload: NotGivenOr[dict[str, Any]] = NOT_GIVEN, ) -> str: name = name or utils.shortuuid("lk_persona_") payload = { "persona_name": name, "pipeline_mode": "echo", "layers": { "transport": {"transport_type": "livekit"}, }, } if utils.is_given(extra_payload): payload.update(extra_payload) response_data = await self._post("personas", payload) return response_data["persona_id"] # type: ignore async def _post(self, endpoint: str, payload: dict[str, Any]) -> dict[str, Any]: """ Make a POST request to the Tavus API with retry logic. Args: endpoint: API endpoint path (without leading slash) payload: JSON payload for the request Returns: Response data as a dictionary Raises: APIConnectionError: If the request fails after all retries """ for i in range(self._conn_options.max_retry): try: async with self._session.post( f"{self._api_url}/{endpoint}", headers={ "Content-Type": "application/json", "x-api-key": self._api_key, }, json=payload, timeout=aiohttp.ClientTimeout(sock_connect=self._conn_options.timeout), ) as response: if not response.ok: text = await response.text() raise APIStatusError( "Server returned an error", status_code=response.status, body=text ) return await response.json() # type: ignore except Exception as e: if isinstance(e, APIConnectionError): logger.warning("failed to call tavus api", extra={"error": str(e)}) else: logger.exception("failed to call tavus api") if i < self._conn_options.max_retry - 1: await asyncio.sleep(self._conn_options.retry_interval) raise APIConnectionError("Failed to call Tavus API after all retries")
Methods
async def create_conversation(self,
*,
replica_id: str | livekit.agents.types.NotGiven = NOT_GIVEN,
persona_id: str | livekit.agents.types.NotGiven = NOT_GIVEN,
properties: dict[str, typing.Any] | livekit.agents.types.NotGiven = NOT_GIVEN,
extra_payload: dict[str, typing.Any] | livekit.agents.types.NotGiven = NOT_GIVEN) ‑> str-
Expand source code
async def create_conversation( self, *, replica_id: NotGivenOr[str] = NOT_GIVEN, persona_id: NotGivenOr[str] = NOT_GIVEN, properties: NotGivenOr[dict[str, Any]] = NOT_GIVEN, extra_payload: NotGivenOr[dict[str, Any]] = NOT_GIVEN, ) -> str: replica_id = replica_id or (os.getenv("TAVUS_REPLICA_ID") or NOT_GIVEN) if not replica_id: raise TavusException("TAVUS_REPLICA_ID must be set") persona_id = persona_id or (os.getenv("TAVUS_PERSONA_ID") or NOT_GIVEN) if not persona_id: # create a persona if not provided persona_id = await self.create_persona() properties = properties or {} payload = { "replica_id": replica_id, "persona_id": persona_id, "properties": properties, } if utils.is_given(extra_payload): payload.update(extra_payload) if "conversation_name" not in payload: payload["conversation_name"] = utils.shortuuid("lk_conversation_") response_data = await self._post("conversations", payload) return response_data["conversation_id"] # type: ignore
async def create_persona(self,
name: str | livekit.agents.types.NotGiven = NOT_GIVEN,
*,
extra_payload: dict[str, typing.Any] | livekit.agents.types.NotGiven = NOT_GIVEN) ‑> str-
Expand source code
async def create_persona( self, name: NotGivenOr[str] = NOT_GIVEN, *, extra_payload: NotGivenOr[dict[str, Any]] = NOT_GIVEN, ) -> str: name = name or utils.shortuuid("lk_persona_") payload = { "persona_name": name, "pipeline_mode": "echo", "layers": { "transport": {"transport_type": "livekit"}, }, } if utils.is_given(extra_payload): payload.update(extra_payload) response_data = await self._post("personas", payload) return response_data["persona_id"] # type: ignore
class TavusException (*args, **kwargs)
-
Expand source code
class TavusException(Exception): """Exception for Tavus errors"""
Exception for Tavus errors
Ancestors
- builtins.Exception
- builtins.BaseException