Module livekit.plugins.did.api
Classes
class DIDAPI (api_key: str | livekit.agents.types.NotGiven = NOT_GIVEN,
api_url: str | livekit.agents.types.NotGiven = NOT_GIVEN,
*,
conn_options: livekit.agents.types.APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0),
session: aiohttp.client.ClientSession | None = None)-
Expand source code
class DIDAPI: def __init__( self, api_key: NotGivenOr[str] = NOT_GIVEN, api_url: NotGivenOr[str] = NOT_GIVEN, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, session: aiohttp.ClientSession | None = None, ) -> None: did_api_key = api_key if utils.is_given(api_key) else os.getenv("DID_API_KEY") if not did_api_key: raise DIDException("DID_API_KEY must be set") self._api_key = did_api_key self._api_url = api_url if utils.is_given(api_url) else DEFAULT_API_URL self._conn_options = conn_options self._session = session or aiohttp.ClientSession() async def join_session( self, *, agent_id: str, transport: dict[str, Any], audio_config: dict[str, Any], ) -> str: """Dispatch a D-ID avatar worker into the room. Returns the session id. """ payload: dict[str, Any] = { "transport": transport, "audio_config": audio_config, } response_data = await self._post(f"v2/agents/{agent_id}/sessions/join", payload) return response_data["id"] # type: ignore async def _post(self, endpoint: str, payload: dict[str, Any]) -> dict[str, Any]: url = f"{self._api_url}/{endpoint}" num_attempts = self._conn_options.max_retry + 1 for attempt in range(num_attempts): try: async with self._session.post( url, headers={ "Content-Type": "application/json", "Authorization": f"Basic {self._api_key}", }, json=payload, timeout=aiohttp.ClientTimeout(sock_connect=self._conn_options.timeout), ) as response: if not response.ok: text = await response.text() raise APIStatusError( "D-ID API error", status_code=response.status, body=text, ) return await response.json() # type: ignore except APIStatusError: raise except (aiohttp.ClientError, asyncio.TimeoutError) as e: logger.warning( f"D-ID API request failed (attempt {attempt + 1}/{num_attempts})", extra={"error": str(e)}, ) if attempt == num_attempts - 1: raise APIConnectionError(f"Failed to connect to D-ID API at {url}") from e await asyncio.sleep(self._conn_options.retry_interval) raise APIConnectionError(f"Failed to connect to D-ID API at {url}")Methods
async def join_session(self,
*,
agent_id: str,
transport: dict[str, typing.Any],
audio_config: dict[str, typing.Any]) ‑> str-
Expand source code
async def join_session( self, *, agent_id: str, transport: dict[str, Any], audio_config: dict[str, Any], ) -> str: """Dispatch a D-ID avatar worker into the room. Returns the session id. """ payload: dict[str, Any] = { "transport": transport, "audio_config": audio_config, } response_data = await self._post(f"v2/agents/{agent_id}/sessions/join", payload) return response_data["id"] # type: ignoreDispatch a D-ID avatar worker into the room.
Returns the session id.