Module livekit.plugins.liveavatar.api

Classes

class LiveAvatarAPI (api_key: str,
*,
api_url: str = 'https://api.liveavatar.com/v1/sessions',
conn_options: livekit.agents.types.APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0),
session: aiohttp.client.ClientSession | None = None)
Expand source code
class LiveAvatarAPI:
    def __init__(
        self,
        api_key: str,
        *,
        api_url: str = DEFAULT_API_URL,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
        session: Optional[aiohttp.ClientSession] = None,
    ) -> None:
        self._api_key = api_key or os.getenv("LIVEAVATAR_API_KEY")
        if self._api_key is None:
            raise LiveAvatarException("api_key or LIVEAVATAR_API_KEY must be set")

        self._api_url = api_url or DEFAULT_API_URL
        self._conn_options = conn_options
        self._session = session or aiohttp.ClientSession()

    def _ensure_http_session(self) -> aiohttp.ClientSession:
        if self._session is None:
            self._session = utils.http_context.http_session()
        return self._session

    async def create_streaming_session(
        self,
        *,
        livekit_url: str,
        livekit_token: str,
        room: rtc.Room,
        avatar_id: str,
    ) -> dict[str, Any]:
        """Create a new streaming session, return a session id"""

        livekit_config = {
            "livekit_room": room.name,
            "livekit_url": livekit_url,
            "livekit_client_token": livekit_token,
        }

        payload = {
            "mode": "CUSTOM",
            "avatar_id": avatar_id,
            "livekit_config": livekit_config,
        }

        self._headers = {
            "accept": "application/json",
            "content-type": "application/json",
            "X-API-KEY": self._api_key,
        }
        response_data = await self._post(endpoint="/token", payload=payload, headers=self._headers)
        return response_data

    async def start_streaming_session(self, session_id: str, session_token: str) -> dict[str, Any]:
        """Start the streaming session"""
        payload = {"session_id": session_id}
        headers = {"content-type": "application/json", "Authorization": f"Bearer {session_token}"}
        response_data = await self._post(endpoint="/start", payload=payload, headers=headers)
        return response_data

    async def stop_streaming_session(self, session_id: str, session_token: str) -> dict[str, Any]:
        """Stop the streaming session"""
        payload = {
            "session_id": session_id,
            "reason": "USER_DISCONNECTED",
        }
        headers = {"content-type": "application/json", "Authorization": f"Bearer {session_token}"}
        response_data = await self._post(endpoint="/stop", payload=payload, headers=headers)
        return response_data

    async def _post(
        self, *, endpoint: str, payload: dict[str, Any], headers: dict[str, Any]
    ) -> dict[str, Any]:
        url = self._api_url + endpoint
        for i in range(self._conn_options.max_retry):
            try:
                async with self._ensure_http_session().post(
                    url=url, headers=headers, json=payload
                ) as response:
                    if not response.ok:
                        text = await response.text()
                        raise APIStatusError(
                            f"Server returned an error for {url}: {response.status}",
                            status_code=response.status,
                            body=text,
                        )
                    return await response.json()  # type: ignore
            except (aiohttp.ClientError, asyncio.TimeoutError) as e:
                logger.warning(
                    f"API request to {url} failed on attempt {i}",
                    extra={"error": str(e)},
                )
            except Exception:
                logger.exception("failed to call LiveAvatar API")

            if i < self._conn_options.max_retry - 1:
                await asyncio.sleep(self._conn_options.retry_interval)

        raise APIConnectionError("Failed to call LiveAvatar API after all retries.")

Methods

async def create_streaming_session(self,
*,
livekit_url: str,
livekit_token: str,
room: Room,
avatar_id: str) ‑> dict[str, typing.Any]
Expand source code
async def create_streaming_session(
    self,
    *,
    livekit_url: str,
    livekit_token: str,
    room: rtc.Room,
    avatar_id: str,
) -> dict[str, Any]:
    """Create a new streaming session, return a session id"""

    livekit_config = {
        "livekit_room": room.name,
        "livekit_url": livekit_url,
        "livekit_client_token": livekit_token,
    }

    payload = {
        "mode": "CUSTOM",
        "avatar_id": avatar_id,
        "livekit_config": livekit_config,
    }

    self._headers = {
        "accept": "application/json",
        "content-type": "application/json",
        "X-API-KEY": self._api_key,
    }
    response_data = await self._post(endpoint="/token", payload=payload, headers=self._headers)
    return response_data

Create a new streaming session, return a session id

async def start_streaming_session(self, session_id: str, session_token: str) ‑> dict[str, typing.Any]
Expand source code
async def start_streaming_session(self, session_id: str, session_token: str) -> dict[str, Any]:
    """Start the streaming session"""
    payload = {"session_id": session_id}
    headers = {"content-type": "application/json", "Authorization": f"Bearer {session_token}"}
    response_data = await self._post(endpoint="/start", payload=payload, headers=headers)
    return response_data

Start the streaming session

async def stop_streaming_session(self, session_id: str, session_token: str) ‑> dict[str, typing.Any]
Expand source code
async def stop_streaming_session(self, session_id: str, session_token: str) -> dict[str, Any]:
    """Stop the streaming session"""
    payload = {
        "session_id": session_id,
        "reason": "USER_DISCONNECTED",
    }
    headers = {"content-type": "application/json", "Authorization": f"Bearer {session_token}"}
    response_data = await self._post(endpoint="/stop", payload=payload, headers=headers)
    return response_data

Stop the streaming session

class LiveAvatarException (*args, **kwargs)
Expand source code
class LiveAvatarException(Exception):
    """Exception for LiveAvatar errors"""

Exception for LiveAvatar errors

Ancestors

  • builtins.Exception
  • builtins.BaseException