Module livekit.plugins.lemonslice.avatar
Classes
class AvatarSession (*,
agent_id: NotGivenOr[str] = NOT_GIVEN,
agent_image_url: NotGivenOr[str] = NOT_GIVEN,
agent_prompt: NotGivenOr[str] = NOT_GIVEN,
idle_timeout: NotGivenOr[int] = NOT_GIVEN,
api_url: NotGivenOr[str] = NOT_GIVEN,
api_key: NotGivenOr[str] = NOT_GIVEN,
avatar_participant_identity: NotGivenOr[str] = NOT_GIVEN,
avatar_participant_name: NotGivenOr[str] = NOT_GIVEN,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0))-
Expand source code
class AvatarSession: """A LemonSlice avatar session""" def __init__( self, *, agent_id: NotGivenOr[str] = NOT_GIVEN, agent_image_url: NotGivenOr[str] = NOT_GIVEN, agent_prompt: NotGivenOr[str] = NOT_GIVEN, idle_timeout: NotGivenOr[int] = NOT_GIVEN, api_url: NotGivenOr[str] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, avatar_participant_identity: NotGivenOr[str] = NOT_GIVEN, avatar_participant_name: NotGivenOr[str] = NOT_GIVEN, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> None: self._agent_id = agent_id self._agent_image_url = agent_image_url self._agent_prompt = agent_prompt self._idle_timeout = idle_timeout self._api_url = api_url self._api_key = api_key self._http_session: aiohttp.ClientSession | None = None self._conn_options = conn_options self._avatar_participant_identity = avatar_participant_identity or _AVATAR_AGENT_IDENTITY self._avatar_participant_name = avatar_participant_name or _AVATAR_AGENT_NAME async def start( self, agent_session: AgentSession, room: rtc.Room, *, livekit_url: NotGivenOr[str] = NOT_GIVEN, livekit_api_key: NotGivenOr[str] = NOT_GIVEN, livekit_api_secret: NotGivenOr[str] = NOT_GIVEN, ) -> None: livekit_url = livekit_url or (os.getenv("LIVEKIT_URL") or NOT_GIVEN) livekit_api_key = livekit_api_key or (os.getenv("LIVEKIT_API_KEY") or NOT_GIVEN) livekit_api_secret = livekit_api_secret or (os.getenv("LIVEKIT_API_SECRET") or NOT_GIVEN) if not livekit_url or not livekit_api_key or not livekit_api_secret: raise LemonSliceException( "livekit_url, livekit_api_key, and livekit_api_secret must be set " "by arguments or environment variables" ) job_ctx = get_job_context() local_participant_identity = job_ctx.local_participant_identity livekit_token = ( api.AccessToken(api_key=livekit_api_key, api_secret=livekit_api_secret) .with_kind("agent") .with_identity(self._avatar_participant_identity) .with_name(self._avatar_participant_name) .with_grants(api.VideoGrants(room_join=True, room=room.name)) # allow the avatar agent to publish audio and video on behalf of your local agent .with_attributes({ATTRIBUTE_PUBLISH_ON_BEHALF: local_participant_identity}) .to_jwt() ) async with LemonSliceAPI( api_url=self._api_url, api_key=self._api_key, conn_options=self._conn_options, session=self._http_session, ) as lemonslice_api: await lemonslice_api.start_agent_session( agent_id=self._agent_id, agent_image_url=self._agent_image_url, agent_prompt=self._agent_prompt, idle_timeout=self._idle_timeout, livekit_url=livekit_url, livekit_token=livekit_token, ) agent_session.output.audio = DataStreamAudioOutput( room=room, destination_identity=self._avatar_participant_identity, sample_rate=SAMPLE_RATE, wait_remote_track=rtc.TrackKind.KIND_VIDEO, clear_buffer_timeout=None, )A LemonSlice avatar session
Methods
async def start(self,
agent_session: AgentSession,
room: rtc.Room,
*,
livekit_url: NotGivenOr[str] = NOT_GIVEN,
livekit_api_key: NotGivenOr[str] = NOT_GIVEN,
livekit_api_secret: NotGivenOr[str] = NOT_GIVEN) ‑> None-
Expand source code
async def start( self, agent_session: AgentSession, room: rtc.Room, *, livekit_url: NotGivenOr[str] = NOT_GIVEN, livekit_api_key: NotGivenOr[str] = NOT_GIVEN, livekit_api_secret: NotGivenOr[str] = NOT_GIVEN, ) -> None: livekit_url = livekit_url or (os.getenv("LIVEKIT_URL") or NOT_GIVEN) livekit_api_key = livekit_api_key or (os.getenv("LIVEKIT_API_KEY") or NOT_GIVEN) livekit_api_secret = livekit_api_secret or (os.getenv("LIVEKIT_API_SECRET") or NOT_GIVEN) if not livekit_url or not livekit_api_key or not livekit_api_secret: raise LemonSliceException( "livekit_url, livekit_api_key, and livekit_api_secret must be set " "by arguments or environment variables" ) job_ctx = get_job_context() local_participant_identity = job_ctx.local_participant_identity livekit_token = ( api.AccessToken(api_key=livekit_api_key, api_secret=livekit_api_secret) .with_kind("agent") .with_identity(self._avatar_participant_identity) .with_name(self._avatar_participant_name) .with_grants(api.VideoGrants(room_join=True, room=room.name)) # allow the avatar agent to publish audio and video on behalf of your local agent .with_attributes({ATTRIBUTE_PUBLISH_ON_BEHALF: local_participant_identity}) .to_jwt() ) async with LemonSliceAPI( api_url=self._api_url, api_key=self._api_key, conn_options=self._conn_options, session=self._http_session, ) as lemonslice_api: await lemonslice_api.start_agent_session( agent_id=self._agent_id, agent_image_url=self._agent_image_url, agent_prompt=self._agent_prompt, idle_timeout=self._idle_timeout, livekit_url=livekit_url, livekit_token=livekit_token, ) agent_session.output.audio = DataStreamAudioOutput( room=room, destination_identity=self._avatar_participant_identity, sample_rate=SAMPLE_RATE, wait_remote_track=rtc.TrackKind.KIND_VIDEO, clear_buffer_timeout=None, )