Module livekit.plugins.tavus
Tavus virtual avatar plugin for LiveKit Agents
See https://docs.livekit.io/agents/integrations/avatar/tavus/ for more information.
Sub-modules
livekit.plugins.tavus.api
livekit.plugins.tavus.avatar
livekit.plugins.tavus.log
livekit.plugins.tavus.version
Classes
class AvatarSession (*,
replica_id: NotGivenOr[str] = NOT_GIVEN,
persona_id: NotGivenOr[str] = NOT_GIVEN,
api_url: NotGivenOr[str] = NOT_GIVEN,
api_key: NotGivenOr[str] = NOT_GIVEN,
avatar_participant_identity: NotGivenOr[str] = NOT_GIVEN,
avatar_participant_name: NotGivenOr[str] = NOT_GIVEN,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0))-
Expand source code
class AvatarSession: """A Tavus avatar session""" def __init__( self, *, replica_id: NotGivenOr[str] = NOT_GIVEN, persona_id: NotGivenOr[str] = NOT_GIVEN, api_url: NotGivenOr[str] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, avatar_participant_identity: NotGivenOr[str] = NOT_GIVEN, avatar_participant_name: NotGivenOr[str] = NOT_GIVEN, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> None: self._http_session: aiohttp.ClientSession | None = None self._conn_options = conn_options self.conversation_id: str | None = None self._persona_id = persona_id self._replica_id = replica_id self._api = TavusAPI( api_url=api_url, api_key=api_key, conn_options=conn_options, session=self._ensure_http_session(), ) self._avatar_participant_identity = avatar_participant_identity or _AVATAR_AGENT_IDENTITY self._avatar_participant_name = avatar_participant_name or _AVATAR_AGENT_NAME def _ensure_http_session(self) -> aiohttp.ClientSession: if self._http_session is None: self._http_session = utils.http_context.http_session() return self._http_session async def start( self, agent_session: AgentSession, room: rtc.Room, *, livekit_url: NotGivenOr[str] = NOT_GIVEN, livekit_api_key: NotGivenOr[str] = NOT_GIVEN, livekit_api_secret: NotGivenOr[str] = NOT_GIVEN, ) -> None: livekit_url = livekit_url or (os.getenv("LIVEKIT_URL") or NOT_GIVEN) livekit_api_key = livekit_api_key or (os.getenv("LIVEKIT_API_KEY") or NOT_GIVEN) livekit_api_secret = livekit_api_secret or (os.getenv("LIVEKIT_API_SECRET") or NOT_GIVEN) if not livekit_url or not livekit_api_key or not livekit_api_secret: raise TavusException( "livekit_url, livekit_api_key, and livekit_api_secret must be set " "by arguments or environment variables" ) try: job_ctx = get_job_context() decoded = job_ctx.decode_token() local_participant_identity = decoded["sub"] except (RuntimeError, KeyError): if not room.isconnected(): raise TavusException( "local participant identity not found in token, and room is not connected" ) from None local_participant_identity = room.local_participant.identity livekit_token = ( api.AccessToken(api_key=livekit_api_key, api_secret=livekit_api_secret) .with_kind("agent") .with_identity(self._avatar_participant_identity) .with_name(self._avatar_participant_name) .with_grants(api.VideoGrants(room_join=True, room=room.name)) # allow the avatar agent to publish audio and video on behalf of your local agent .with_attributes({ATTRIBUTE_PUBLISH_ON_BEHALF: local_participant_identity}) .to_jwt() ) logger.debug("starting avatar session") self.conversation_id = await self._api.create_conversation( persona_id=self._persona_id, replica_id=self._replica_id, properties={"livekit_ws_url": livekit_url, "livekit_room_token": livekit_token}, ) agent_session.output.audio = DataStreamAudioOutput( room=room, destination_identity=self._avatar_participant_identity, sample_rate=SAMPLE_RATE, wait_remote_track=rtc.TrackKind.KIND_VIDEO, )
A Tavus avatar session
Methods
async def start(self,
agent_session: AgentSession,
room: rtc.Room,
*,
livekit_url: NotGivenOr[str] = NOT_GIVEN,
livekit_api_key: NotGivenOr[str] = NOT_GIVEN,
livekit_api_secret: NotGivenOr[str] = NOT_GIVEN) ‑> None-
Expand source code
async def start( self, agent_session: AgentSession, room: rtc.Room, *, livekit_url: NotGivenOr[str] = NOT_GIVEN, livekit_api_key: NotGivenOr[str] = NOT_GIVEN, livekit_api_secret: NotGivenOr[str] = NOT_GIVEN, ) -> None: livekit_url = livekit_url or (os.getenv("LIVEKIT_URL") or NOT_GIVEN) livekit_api_key = livekit_api_key or (os.getenv("LIVEKIT_API_KEY") or NOT_GIVEN) livekit_api_secret = livekit_api_secret or (os.getenv("LIVEKIT_API_SECRET") or NOT_GIVEN) if not livekit_url or not livekit_api_key or not livekit_api_secret: raise TavusException( "livekit_url, livekit_api_key, and livekit_api_secret must be set " "by arguments or environment variables" ) try: job_ctx = get_job_context() decoded = job_ctx.decode_token() local_participant_identity = decoded["sub"] except (RuntimeError, KeyError): if not room.isconnected(): raise TavusException( "local participant identity not found in token, and room is not connected" ) from None local_participant_identity = room.local_participant.identity livekit_token = ( api.AccessToken(api_key=livekit_api_key, api_secret=livekit_api_secret) .with_kind("agent") .with_identity(self._avatar_participant_identity) .with_name(self._avatar_participant_name) .with_grants(api.VideoGrants(room_join=True, room=room.name)) # allow the avatar agent to publish audio and video on behalf of your local agent .with_attributes({ATTRIBUTE_PUBLISH_ON_BEHALF: local_participant_identity}) .to_jwt() ) logger.debug("starting avatar session") self.conversation_id = await self._api.create_conversation( persona_id=self._persona_id, replica_id=self._replica_id, properties={"livekit_ws_url": livekit_url, "livekit_room_token": livekit_token}, ) agent_session.output.audio = DataStreamAudioOutput( room=room, destination_identity=self._avatar_participant_identity, sample_rate=SAMPLE_RATE, wait_remote_track=rtc.TrackKind.KIND_VIDEO, )
class TavusException (*args, **kwargs)
-
Expand source code
class TavusException(Exception): """Exception for Tavus errors"""
Exception for Tavus errors
Ancestors
- builtins.Exception
- builtins.BaseException