Module livekit.plugins.avatartalk.api

Classes

class AvatarTalkAPI (api_url: str | livekit.agents.types.NotGiven = NOT_GIVEN,
api_key: str | livekit.agents.types.NotGiven = NOT_GIVEN)
Expand source code
class AvatarTalkAPI:
    def __init__(
        self,
        api_url: NotGivenOr[str] = NOT_GIVEN,
        api_key: NotGivenOr[str] = NOT_GIVEN,
    ):
        self._api_url = api_url or DEFAULT_API_URL
        avatartalk_api_key = api_key or os.getenv("AVATARTALK_API_KEY")
        if avatartalk_api_key is None:
            raise AvatarTalkException("AVATARTALK_API_KEY must be set")

        self._api_key = avatartalk_api_key
        self._headers = {"Authorization": f"Bearer {self._api_key}"}

    async def _request(self, method: str, path: str, **kwargs):
        async with aiohttp.ClientSession() as session:
            async with session.request(
                method, f"{self._api_url}{path}", headers=self._headers, **kwargs
            ) as response:
                if response.ok:
                    return await response.json()
                else:
                    r = await response.json()
                    raise AvatarTalkException(f"API request failed: {response.status} {r}")

    async def start_session(
        self,
        livekit_url: str,
        avatar: str,
        emotion: str,
        room_name: str,
        livekit_listener_token: str,
        livekit_room_token: str,
        agent_identity: str,
    ) -> dict[str, Any]:
        return await self._request(
            "POST",
            "/livekit/create-session",
            json={
                "livekit_url": livekit_url,
                "avatar": avatar,
                "emotion": emotion,
                "room_name": room_name,
                "room_token": livekit_room_token,
                "listener_token": livekit_listener_token,
                "agent_identity": agent_identity,
            },
        )

    async def stop_session(self, task_id: str) -> dict[str, Any]:
        return await self._request("DELETE", f"/livekit/delete-session/{task_id}")

Methods

async def start_session(self,
livekit_url: str,
avatar: str,
emotion: str,
room_name: str,
livekit_listener_token: str,
livekit_room_token: str,
agent_identity: str) ‑> dict[str, typing.Any]
Expand source code
async def start_session(
    self,
    livekit_url: str,
    avatar: str,
    emotion: str,
    room_name: str,
    livekit_listener_token: str,
    livekit_room_token: str,
    agent_identity: str,
) -> dict[str, Any]:
    return await self._request(
        "POST",
        "/livekit/create-session",
        json={
            "livekit_url": livekit_url,
            "avatar": avatar,
            "emotion": emotion,
            "room_name": room_name,
            "room_token": livekit_room_token,
            "listener_token": livekit_listener_token,
            "agent_identity": agent_identity,
        },
    )
async def stop_session(self, task_id: str) ‑> dict[str, typing.Any]
Expand source code
async def stop_session(self, task_id: str) -> dict[str, Any]:
    return await self._request("DELETE", f"/livekit/delete-session/{task_id}")
class AvatarTalkException (*args, **kwargs)
Expand source code
class AvatarTalkException(Exception):
    """Exception for AvatarTalkAPI errors"""

Exception for AvatarTalkAPI errors

Ancestors

  • builtins.Exception
  • builtins.BaseException