Module livekit.plugins.trugen
TruGen.AI plugin for LiveKit Agents
Classes
class AvatarSession (*,
avatar_id: NotGivenOr[str | None] = NOT_GIVEN,
api_key: NotGivenOr[str] = NOT_GIVEN,
avatar_participant_identity: NotGivenOr[str] = NOT_GIVEN,
avatar_participant_name: NotGivenOr[str] = NOT_GIVEN,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0))-
Expand source code
class AvatarSession: """TruGen Realtime Avatar Session""" def __init__( self, *, avatar_id: NotGivenOr[str | None] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, avatar_participant_identity: NotGivenOr[str] = NOT_GIVEN, avatar_participant_name: NotGivenOr[str] = NOT_GIVEN, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> None: self._avatar_id = ( _DEFAULT_AVATAR_ID if avatar_id is NOT_GIVEN or avatar_id is None else avatar_id ) self._api_key = os.getenv("TRUGEN_API_KEY") if api_key is NOT_GIVEN else api_key if not self._api_key: raise TrugenException( "The api_key not found; set this by passing api_key to the client or " "by setting the TRUGEN_API_KEY environment variable" ) if avatar_participant_identity is NOT_GIVEN or avatar_participant_identity is None: self._avatar_participant_identity = _AVATAR_AGENT_IDENTITY else: self._avatar_participant_identity = str(avatar_participant_identity) if avatar_participant_name is NOT_GIVEN or avatar_participant_name is None: self._avatar_participant_name = _AVATAR_AGENT_NAME else: self._avatar_participant_name = str(avatar_participant_name) self._http_session: aiohttp.ClientSession | None = None self._conn_options = conn_options def _ensure_http_session(self) -> aiohttp.ClientSession: if self._http_session is None: self._http_session = utils.http_context.http_session() return self._http_session async def start( self, agent_session: AgentSession, room: rtc.Room, *, livekit_url: NotGivenOr[str] = NOT_GIVEN, livekit_api_key: NotGivenOr[str] = NOT_GIVEN, livekit_api_secret: NotGivenOr[str] = NOT_GIVEN, ) -> None: if livekit_url is NOT_GIVEN: livekit_url = os.getenv("LIVEKIT_URL") or NOT_GIVEN if livekit_api_key is NOT_GIVEN: livekit_api_key = os.getenv("LIVEKIT_API_KEY") or NOT_GIVEN if livekit_api_secret is NOT_GIVEN: livekit_api_secret = os.getenv("LIVEKIT_API_SECRET") or NOT_GIVEN if ( livekit_url is NOT_GIVEN or livekit_api_key is NOT_GIVEN or livekit_api_secret is NOT_GIVEN or not livekit_url or not livekit_api_key or not livekit_api_secret ): raise TrugenException( "livekit_url, livekit_api_key, and livekit_api_secret not found; " "either pass them as arguments here or set environment variables." ) job_ctx = get_job_context() local_participant_identity = job_ctx.local_participant_identity livekit_token = ( api.AccessToken(api_key=livekit_api_key, api_secret=livekit_api_secret) .with_kind("agent") .with_identity(self._avatar_participant_identity) .with_name(self._avatar_participant_name) .with_grants(api.VideoGrants(room_join=True, room=room.name)) # allow the avatar agent to publish audio and video on behalf of your local agent .with_attributes({ATTRIBUTE_PUBLISH_ON_BEHALF: local_participant_identity}) .to_jwt() ) logger.debug("Starting Realtime Avatar Session") await self._start_session(livekit_url, livekit_token) agent_session.output.audio = DataStreamAudioOutput( room=room, destination_identity=self._avatar_participant_identity, wait_remote_track=rtc.TrackKind.KIND_VIDEO, ) async def _start_session(self, livekit_url: str, livekit_token: str) -> None: assert self._api_key is not None api_key = self._api_key assert isinstance(api_key, str) for i in range(self._conn_options.max_retry + 1): try: async with self._ensure_http_session().post( f"{_BASE_API_URL}/v1/sessions", headers={ "x-api-key": api_key, }, json={ "avatar_id": self._avatar_id, "livekit_url": livekit_url, "livekit_token": livekit_token, }, timeout=aiohttp.ClientTimeout(sock_connect=self._conn_options.timeout), ) as response: if not response.ok: text = await response.text() raise APIStatusError( "Server returned an error", status_code=response.status, body=text ) return except Exception as e: if isinstance(e, APIStatusError): logger.warning( "API Error; Unable to trigger TruGen.AI API backend.", extra={"status_code": e.status_code, "body": e.body}, ) if not e.retryable: raise else: logger.warning( "API Error; Unable to trigger TruGen.AI API backend.", extra={"error": str(e)}, ) if i < self._conn_options.max_retry: await asyncio.sleep(self._conn_options.retry_interval) raise APIConnectionError("Max retries exhausted; Unable to start TruGen.AI Avatar Session.")TruGen Realtime Avatar Session
Methods
async def start(self,
agent_session: AgentSession,
room: rtc.Room,
*,
livekit_url: NotGivenOr[str] = NOT_GIVEN,
livekit_api_key: NotGivenOr[str] = NOT_GIVEN,
livekit_api_secret: NotGivenOr[str] = NOT_GIVEN) ‑> None-
Expand source code
async def start( self, agent_session: AgentSession, room: rtc.Room, *, livekit_url: NotGivenOr[str] = NOT_GIVEN, livekit_api_key: NotGivenOr[str] = NOT_GIVEN, livekit_api_secret: NotGivenOr[str] = NOT_GIVEN, ) -> None: if livekit_url is NOT_GIVEN: livekit_url = os.getenv("LIVEKIT_URL") or NOT_GIVEN if livekit_api_key is NOT_GIVEN: livekit_api_key = os.getenv("LIVEKIT_API_KEY") or NOT_GIVEN if livekit_api_secret is NOT_GIVEN: livekit_api_secret = os.getenv("LIVEKIT_API_SECRET") or NOT_GIVEN if ( livekit_url is NOT_GIVEN or livekit_api_key is NOT_GIVEN or livekit_api_secret is NOT_GIVEN or not livekit_url or not livekit_api_key or not livekit_api_secret ): raise TrugenException( "livekit_url, livekit_api_key, and livekit_api_secret not found; " "either pass them as arguments here or set environment variables." ) job_ctx = get_job_context() local_participant_identity = job_ctx.local_participant_identity livekit_token = ( api.AccessToken(api_key=livekit_api_key, api_secret=livekit_api_secret) .with_kind("agent") .with_identity(self._avatar_participant_identity) .with_name(self._avatar_participant_name) .with_grants(api.VideoGrants(room_join=True, room=room.name)) # allow the avatar agent to publish audio and video on behalf of your local agent .with_attributes({ATTRIBUTE_PUBLISH_ON_BEHALF: local_participant_identity}) .to_jwt() ) logger.debug("Starting Realtime Avatar Session") await self._start_session(livekit_url, livekit_token) agent_session.output.audio = DataStreamAudioOutput( room=room, destination_identity=self._avatar_participant_identity, wait_remote_track=rtc.TrackKind.KIND_VIDEO, )
class TrugenException (*args, **kwargs)-
Expand source code
class TrugenException(Exception): """Exception for TruGen.AI errors"""Exception for TruGen.AI errors
Ancestors
- builtins.Exception
- builtins.BaseException