Module livekit.plugins.avatartalk
AvatarTalk virtual avatar plugin for LiveKit Agents
See https://docs.livekit.io/agents/integrations/avatar/avatartalk/ for more information.
Sub-modules
livekit.plugins.avatartalk.apilivekit.plugins.avatartalk.avatarlivekit.plugins.avatartalk.loglivekit.plugins.avatartalk.version
Classes
class AvatarSession (*,
api_url: str | livekit.agents.types.NotGiven = NOT_GIVEN,
api_secret: str | livekit.agents.types.NotGiven = NOT_GIVEN,
avatar: str | livekit.agents.types.NotGiven | None = NOT_GIVEN,
emotion: str | livekit.agents.types.NotGiven | None = NOT_GIVEN,
avatar_participant_identity: str | livekit.agents.types.NotGiven | None = NOT_GIVEN,
avatar_participant_name: str | livekit.agents.types.NotGiven | None = NOT_GIVEN)-
Expand source code
class AvatarSession: """AvatarTalkAPI avatar session""" def __init__( self, *, api_url: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, avatar: NotGivenOr[str | None] = NOT_GIVEN, emotion: NotGivenOr[str | None] = NOT_GIVEN, avatar_participant_identity: NotGivenOr[str | None] = NOT_GIVEN, avatar_participant_name: NotGivenOr[str | None] = NOT_GIVEN, ): self._avatartalk_api = AvatarTalkAPI(api_url, api_secret) self._avatar = avatar or (os.getenv("AVATARTALK_AVATAR") or DEFAULT_AVATAR_NAME) self._emotion = emotion or (os.getenv("AVATARTALK_EMOTION") or DEFAULT_AVATAR_EMOTION) self._avatar_participant_identity = avatar_participant_identity or _AVATAR_AGENT_IDENTITY self._avatar_participant_name = avatar_participant_name or _AVATAR_AGENT_NAME self._agent_track = None def __generate_lk_token( self, livekit_api_key: str, livekit_api_secret: str, room: rtc.Room, participant_identity: str, participant_name: str, as_agent: bool = True, local_participant_identity: Optional[str] = None, ): token = ( api.AccessToken(api_key=livekit_api_key, api_secret=livekit_api_secret) .with_identity(participant_identity) .with_name(participant_name) .with_grants(api.VideoGrants(room_join=True, room=room.name)) .with_attributes({ATTRIBUTE_PUBLISH_ON_BEHALF: local_participant_identity}) ) if as_agent: token = token.with_kind("agent") if local_participant_identity is not None: token = token.with_attributes({ATTRIBUTE_PUBLISH_ON_BEHALF: local_participant_identity}) return token.to_jwt() async def start( self, agent_session: AgentSession, room: rtc.Room, *, livekit_url: NotGivenOr[str | None] = NOT_GIVEN, livekit_api_key: NotGivenOr[str | None] = NOT_GIVEN, livekit_api_secret: NotGivenOr[str | None] = NOT_GIVEN, ): livekit_url = livekit_url or (os.getenv("LIVEKIT_URL") or NOT_GIVEN) livekit_api_key = livekit_api_key or (os.getenv("LIVEKIT_API_KEY") or NOT_GIVEN) livekit_api_secret = livekit_api_secret or (os.getenv("LIVEKIT_API_SECRET") or NOT_GIVEN) if not livekit_url or not livekit_api_key or not livekit_api_secret: raise AvatarTalkException( "livekit_url, livekit_api_key, and livekit_api_secret must be set by arguments or environment variables" ) session_task_mapping = {} try: job_ctx = get_job_context() local_participant_identity = job_ctx.token_claims().identity async def _shutdown_session(): if room.name not in session_task_mapping: return await self._avatartalk_api.stop_session(session_task_mapping[room.name]) job_ctx.add_shutdown_callback(_shutdown_session) except (RuntimeError, KeyError): if not room.isconnected(): raise AvatarTalkException( "local participant identity not found in token, and room is not connected" ) from None local_participant_identity = room.local_participant.identity livekit_token = self.__generate_lk_token( livekit_api_key, livekit_api_secret, room, self._avatar_participant_identity, self._avatar_participant_name, as_agent=True, local_participant_identity=local_participant_identity, ) livekit_listener_token = self.__generate_lk_token( livekit_api_key, livekit_api_secret, room, "listener", "listener", as_agent=False, local_participant_identity=None, ) logger.debug( "Starting Avatartalk agent session", extra={"avatar": self._avatar, "room_name": room.name}, ) try: resp = await self._avatartalk_api.start_session( livekit_url=livekit_url, avatar=self._avatar, emotion=self._emotion, room_name=room.name, livekit_listener_token=livekit_listener_token, livekit_room_token=livekit_token, agent_identity=local_participant_identity, ) self.conversation_id = resp["task_id"] logger.debug( "Avatartalk agent session started", extra={ "avatar": self._avatar, "emotion": self._emotion, "room_name": room.name, "task_id": self.conversation_id, }, ) session_task_mapping[room.name] = self.conversation_id agent_session.output.audio = DataStreamAudioOutput( room=room, destination_identity="listener", sample_rate=SAMPLE_RATE, # wait_remote_track=rtc.TrackKind.KIND_VIDEO, ) except AvatarTalkException as e: logger.error(e)AvatarTalkAPI avatar session
Methods
async def start(self,
agent_session: livekit.agents.voice.agent_session.AgentSession,
room: Room,
*,
livekit_url: str | livekit.agents.types.NotGiven | None = NOT_GIVEN,
livekit_api_key: str | livekit.agents.types.NotGiven | None = NOT_GIVEN,
livekit_api_secret: str | livekit.agents.types.NotGiven | None = NOT_GIVEN)-
Expand source code
async def start( self, agent_session: AgentSession, room: rtc.Room, *, livekit_url: NotGivenOr[str | None] = NOT_GIVEN, livekit_api_key: NotGivenOr[str | None] = NOT_GIVEN, livekit_api_secret: NotGivenOr[str | None] = NOT_GIVEN, ): livekit_url = livekit_url or (os.getenv("LIVEKIT_URL") or NOT_GIVEN) livekit_api_key = livekit_api_key or (os.getenv("LIVEKIT_API_KEY") or NOT_GIVEN) livekit_api_secret = livekit_api_secret or (os.getenv("LIVEKIT_API_SECRET") or NOT_GIVEN) if not livekit_url or not livekit_api_key or not livekit_api_secret: raise AvatarTalkException( "livekit_url, livekit_api_key, and livekit_api_secret must be set by arguments or environment variables" ) session_task_mapping = {} try: job_ctx = get_job_context() local_participant_identity = job_ctx.token_claims().identity async def _shutdown_session(): if room.name not in session_task_mapping: return await self._avatartalk_api.stop_session(session_task_mapping[room.name]) job_ctx.add_shutdown_callback(_shutdown_session) except (RuntimeError, KeyError): if not room.isconnected(): raise AvatarTalkException( "local participant identity not found in token, and room is not connected" ) from None local_participant_identity = room.local_participant.identity livekit_token = self.__generate_lk_token( livekit_api_key, livekit_api_secret, room, self._avatar_participant_identity, self._avatar_participant_name, as_agent=True, local_participant_identity=local_participant_identity, ) livekit_listener_token = self.__generate_lk_token( livekit_api_key, livekit_api_secret, room, "listener", "listener", as_agent=False, local_participant_identity=None, ) logger.debug( "Starting Avatartalk agent session", extra={"avatar": self._avatar, "room_name": room.name}, ) try: resp = await self._avatartalk_api.start_session( livekit_url=livekit_url, avatar=self._avatar, emotion=self._emotion, room_name=room.name, livekit_listener_token=livekit_listener_token, livekit_room_token=livekit_token, agent_identity=local_participant_identity, ) self.conversation_id = resp["task_id"] logger.debug( "Avatartalk agent session started", extra={ "avatar": self._avatar, "emotion": self._emotion, "room_name": room.name, "task_id": self.conversation_id, }, ) session_task_mapping[room.name] = self.conversation_id agent_session.output.audio = DataStreamAudioOutput( room=room, destination_identity="listener", sample_rate=SAMPLE_RATE, # wait_remote_track=rtc.TrackKind.KIND_VIDEO, ) except AvatarTalkException as e: logger.error(e)
class AvatarTalkException (*args, **kwargs)-
Expand source code
class AvatarTalkException(Exception): """Exception for AvatarTalkAPI errors"""Exception for AvatarTalkAPI errors
Ancestors
- builtins.Exception
- builtins.BaseException