Module livekit.plugins.liveavatar
LiveAvatar avatar plugin for LiveKit Agents
Provides LiveAvatar interactive avatar integration similar to Tavus.
Sub-modules
livekit.plugins.liveavatar.apilivekit.plugins.liveavatar.avatarlivekit.plugins.liveavatar.loglivekit.plugins.liveavatar.version
Classes
class AvatarSession (*,
avatar_id: NotGivenOr[str] = NOT_GIVEN,
api_url: NotGivenOr[str] = NOT_GIVEN,
api_key: NotGivenOr[str] = NOT_GIVEN,
avatar_participant_identity: NotGivenOr[str] = NOT_GIVEN,
avatar_participant_name: NotGivenOr[str] = NOT_GIVEN,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0))-
Expand source code
class AvatarSession: """A LiveAvatar avatar session""" def __init__( self, *, avatar_id: NotGivenOr[str] = NOT_GIVEN, api_url: NotGivenOr[str] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, avatar_participant_identity: NotGivenOr[str] = NOT_GIVEN, avatar_participant_name: NotGivenOr[str] = NOT_GIVEN, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> None: self._avatar_id = avatar_id or os.getenv("LIVEAVATAR_AVATAR_ID") self._session_id: str | None = None self._session_token: str | None = None self._api = LiveAvatarAPI(api_key=api_key, api_url=api_url, conn_options=conn_options) self._avatar_participant_identity = avatar_participant_identity or _AVATAR_AGENT_IDENTITY self._avatar_participant_name = avatar_participant_name or _AVATAR_AGENT_NAME self._tasks: set[asyncio.Task[Any]] = set() self._main_atask: asyncio.Task | None self._audio_resampler: rtc.AudioResampler | None = None self._session_data = None self._msg_ch = utils.aio.Chan[dict]() self._audio_playing = False self._playback_position = 0.0 async def start( self, agent_session: AgentSession, room: rtc.Room, *, livekit_url: NotGivenOr[str] = NOT_GIVEN, livekit_api_key: NotGivenOr[str] = NOT_GIVEN, livekit_api_secret: NotGivenOr[str] = NOT_GIVEN, ) -> None: self._agent_session = agent_session self._room = room livekit_url = livekit_url or (os.getenv("LIVEKIT_URL") or NOT_GIVEN) livekit_api_key = livekit_api_key or (os.getenv("LIVEKIT_API_KEY") or NOT_GIVEN) livekit_api_secret = livekit_api_secret or (os.getenv("LIVEKIT_API_SECRET") or NOT_GIVEN) if not livekit_url or not livekit_api_key or not livekit_api_secret: raise LiveAvatarException( "livekit_url, livekit_api_key, and livekit_api_secret must be set" ) try: job_ctx = get_job_context() self._local_participant_identity = job_ctx.token_claims().identity except RuntimeError as e: if not room.isconnected(): raise LiveAvatarException("failed to get local participant identity") from e self._local_participant_identity = room.local_participant.identity livekit_token = ( api.AccessToken( api_key=livekit_api_key, api_secret=livekit_api_secret, ) .with_kind("agent") .with_identity(self._avatar_participant_identity) .with_name(self._avatar_participant_name) .with_grants(api.VideoGrants(room_join=True, room=self._room.name)) .with_attributes({ATTRIBUTE_PUBLISH_ON_BEHALF: self._local_participant_identity}) .to_jwt() ) logger.debug("starting avatar session") session_config_data = await self._api.create_streaming_session( livekit_url=livekit_url, livekit_token=livekit_token, room=self._room, avatar_id=self._avatar_id, ) self._session_id = session_config_data["data"]["session_id"] self._session_token = session_config_data["data"]["session_token"] logger.info(f"LiveAvatar session created: {self._session_id}") session_start_data = await self._api.start_streaming_session( self._session_id, self._session_token ) self._ws_url = session_start_data["data"]["ws_url"] logger.info("LiveAvatar streaming session started") @self._agent_session.on("agent_state_changed") def on_agent_state_changed(ev): if ev.old_state == "speaking" and ev.new_state == "listening": self.send_event({"type": "agent.speak_end", "event_id": str(uuid.uuid4())}) self.send_event({"type": "agent.start_listening", "event_id": str(uuid.uuid4())}) if ev.new_state == "idle": self.send_event({"type": "agent.stop_listening", "event_id": str(uuid.uuid4())}) @self._agent_session.on("close") def on_agent_session_close(ev): self._msg_ch.close() self._audio_buffer = QueueAudioOutput(sample_rate=SAMPLE_RATE) await self._audio_buffer.start() self._audio_buffer.on("clear_buffer", self._on_clear_buffer) agent_session.output.audio = self._audio_buffer self._main_atask = asyncio.create_task(self._main_task(), name="AvatarSession._main_task") def _on_clear_buffer(self) -> None: @utils.log_exceptions(logger=logger) async def _handle_clear_buffer(audio_playing: bool) -> None: if audio_playing: notify_task = self._audio_buffer.notify_playback_finished( playback_position=self._playback_position, interrupted=True, ) self.send_event({"type": "agent.interrupt", "event_id": str(uuid.uuid4())}) self._playback_position = 0.0 if asyncio.iscoroutine(notify_task): await notify_task clear_buffer_task = asyncio.create_task(_handle_clear_buffer(self._audio_playing)) self._tasks.add(clear_buffer_task) clear_buffer_task.add_done_callback(self._tasks.discard) self._audio_playing = False def _resample_audio(self, frame: rtc.AudioFrame) -> Iterator[rtc.AudioFrame]: if self._audio_resampler: if frame.sample_rate != self._audio_resampler._input_rate: self._audio_resampler = None if self._audio_resampler is None and ( frame.sample_rate != SAMPLE_RATE or frame.num_channels != 1 ): self._audio_resampler = rtc.AudioResampler( input_rate=frame.sample_rate, output_rate=SAMPLE_RATE, num_channels=1, ) if self._audio_resampler: yield from self._audio_resampler.push(frame) else: yield frame def send_event(self, msg: dict) -> None: with contextlib.suppress(utils.aio.channel.ChanClosed): self._msg_ch.send_nowait(msg) async def _main_task(self) -> None: ws_conn = await self._api._ensure_http_session().ws_connect(url=self._ws_url) closing = False ping_interval = utils.aio.interval(KEEP_ALIVE_INTERVAL) async def _forward_audio() -> None: async for audio_frame in self._audio_buffer: if isinstance(audio_frame, rtc.AudioFrame): if not self._audio_playing: self._audio_playing = True for resampled_frame in self._resample_audio(audio_frame): data = resampled_frame.data.tobytes() encoded_audio = base64.b64encode(data).decode("utf-8") msg = { "type": "agent.speak", "event_id": str(uuid.uuid4()), "audio": encoded_audio, } self.send_event(msg) self._playback_position += resampled_frame.duration async def _keep_alive_task() -> None: try: while True: await ping_interval.tick() if closing: break msg = { "type": "session.keep_alive", "event_id": str(uuid.uuid4()), } self.send_event(msg) except asyncio.CancelledError: return @utils.log_exceptions(logger=logger) async def _send_task() -> None: nonlocal closing async for msg in self._msg_ch: try: await ws_conn.send_json(data=msg) ping_interval.reset() except Exception: break closing = True await ws_conn.close() @utils.log_exceptions(logger=logger) async def _recv_task() -> None: while True: msg = await ws_conn.receive() if msg.type in ( aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE, aiohttp.WSMsgType.CLOSING, ): if closing: return raise APIConnectionError(message="LiveAvatar connection closed unexpectedly.") io_tasks = [ asyncio.create_task(_forward_audio(), name="_forward_audio_task"), asyncio.create_task(_send_task(), name="_send_task"), asyncio.create_task(_recv_task(), name="_recv_task"), asyncio.create_task(_keep_alive_task(), name="_keep_alive_task"), ] try: done, _ = await asyncio.wait(io_tasks, return_when=asyncio.FIRST_COMPLETED) for task in done: task.result() finally: await utils.aio.cancel_and_wait(*io_tasks) await utils.aio.cancel_and_wait(*self._tasks) try: if self._session_id and self._session_token: data = await self._api.stop_streaming_session( self._session_id, self._session_token ) if data["code"] <= 200: logger.info(f"LiveAvatar session stopped: {self._session_id}") except Exception as e: logger.warning(f"Failed to stop LiveAvatar session: {e}", exc_info=True) await self._audio_buffer.aclose() await ws_conn.close()A LiveAvatar avatar session
Methods
def send_event(self, msg: dict) ‑> None-
Expand source code
def send_event(self, msg: dict) -> None: with contextlib.suppress(utils.aio.channel.ChanClosed): self._msg_ch.send_nowait(msg) async def start(self,
agent_session: AgentSession,
room: rtc.Room,
*,
livekit_url: NotGivenOr[str] = NOT_GIVEN,
livekit_api_key: NotGivenOr[str] = NOT_GIVEN,
livekit_api_secret: NotGivenOr[str] = NOT_GIVEN) ‑> None-
Expand source code
async def start( self, agent_session: AgentSession, room: rtc.Room, *, livekit_url: NotGivenOr[str] = NOT_GIVEN, livekit_api_key: NotGivenOr[str] = NOT_GIVEN, livekit_api_secret: NotGivenOr[str] = NOT_GIVEN, ) -> None: self._agent_session = agent_session self._room = room livekit_url = livekit_url or (os.getenv("LIVEKIT_URL") or NOT_GIVEN) livekit_api_key = livekit_api_key or (os.getenv("LIVEKIT_API_KEY") or NOT_GIVEN) livekit_api_secret = livekit_api_secret or (os.getenv("LIVEKIT_API_SECRET") or NOT_GIVEN) if not livekit_url or not livekit_api_key or not livekit_api_secret: raise LiveAvatarException( "livekit_url, livekit_api_key, and livekit_api_secret must be set" ) try: job_ctx = get_job_context() self._local_participant_identity = job_ctx.token_claims().identity except RuntimeError as e: if not room.isconnected(): raise LiveAvatarException("failed to get local participant identity") from e self._local_participant_identity = room.local_participant.identity livekit_token = ( api.AccessToken( api_key=livekit_api_key, api_secret=livekit_api_secret, ) .with_kind("agent") .with_identity(self._avatar_participant_identity) .with_name(self._avatar_participant_name) .with_grants(api.VideoGrants(room_join=True, room=self._room.name)) .with_attributes({ATTRIBUTE_PUBLISH_ON_BEHALF: self._local_participant_identity}) .to_jwt() ) logger.debug("starting avatar session") session_config_data = await self._api.create_streaming_session( livekit_url=livekit_url, livekit_token=livekit_token, room=self._room, avatar_id=self._avatar_id, ) self._session_id = session_config_data["data"]["session_id"] self._session_token = session_config_data["data"]["session_token"] logger.info(f"LiveAvatar session created: {self._session_id}") session_start_data = await self._api.start_streaming_session( self._session_id, self._session_token ) self._ws_url = session_start_data["data"]["ws_url"] logger.info("LiveAvatar streaming session started") @self._agent_session.on("agent_state_changed") def on_agent_state_changed(ev): if ev.old_state == "speaking" and ev.new_state == "listening": self.send_event({"type": "agent.speak_end", "event_id": str(uuid.uuid4())}) self.send_event({"type": "agent.start_listening", "event_id": str(uuid.uuid4())}) if ev.new_state == "idle": self.send_event({"type": "agent.stop_listening", "event_id": str(uuid.uuid4())}) @self._agent_session.on("close") def on_agent_session_close(ev): self._msg_ch.close() self._audio_buffer = QueueAudioOutput(sample_rate=SAMPLE_RATE) await self._audio_buffer.start() self._audio_buffer.on("clear_buffer", self._on_clear_buffer) agent_session.output.audio = self._audio_buffer self._main_atask = asyncio.create_task(self._main_task(), name="AvatarSession._main_task")
class LiveAvatarException (*args, **kwargs)-
Expand source code
class LiveAvatarException(Exception): """Exception for LiveAvatar errors"""Exception for LiveAvatar errors
Ancestors
- builtins.Exception
- builtins.BaseException