Module livekit.plugins.runway.avatar

Classes

class AvatarSession (*,
avatar_id: NotGivenOr[str | None] = NOT_GIVEN,
preset_id: NotGivenOr[str | None] = NOT_GIVEN,
max_duration: NotGivenOr[int] = NOT_GIVEN,
api_url: NotGivenOr[str] = NOT_GIVEN,
api_key: NotGivenOr[str] = NOT_GIVEN,
avatar_participant_identity: NotGivenOr[str] = NOT_GIVEN,
avatar_participant_name: NotGivenOr[str] = NOT_GIVEN,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0))
Expand source code
class AvatarSession(BaseAvatarSession):
    """A Runway Characters avatar session.

    Creates a realtime session backed by Runway's avatar inference pipeline.
    The customer's LiveKit agent owns the conversational AI stack (STT, LLM, TTS);
    Runway provides the visual layer — audio in, avatar video out.
    """

    def __init__(
        self,
        *,
        avatar_id: NotGivenOr[str | None] = NOT_GIVEN,
        preset_id: NotGivenOr[str | None] = NOT_GIVEN,
        max_duration: NotGivenOr[int] = NOT_GIVEN,
        api_url: NotGivenOr[str] = NOT_GIVEN,
        api_key: NotGivenOr[str] = NOT_GIVEN,
        avatar_participant_identity: NotGivenOr[str] = NOT_GIVEN,
        avatar_participant_name: NotGivenOr[str] = NOT_GIVEN,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    ) -> None:
        if not avatar_id and not preset_id:
            raise RunwayException("Either avatar_id or preset_id must be provided")
        if avatar_id and preset_id:
            raise RunwayException("Provide avatar_id or preset_id, not both")

        if avatar_id:
            self._avatar: dict[str, str] = {"type": "custom", "avatarId": str(avatar_id)}
        else:
            self._avatar = {"type": "runway-preset", "presetId": str(preset_id)}
        self._max_duration = max_duration

        self._api_url = api_url or os.getenv("RUNWAYML_BASE_URL", DEFAULT_API_URL)
        self._api_key = api_key or os.getenv("RUNWAYML_API_SECRET")
        if self._api_key is None:
            raise RunwayException(
                "api_key must be set either by passing it to AvatarSession or "
                "by setting the RUNWAYML_API_SECRET environment variable"
            )

        self._avatar_participant_identity = avatar_participant_identity or _AVATAR_AGENT_IDENTITY
        self._avatar_participant_name = avatar_participant_name or _AVATAR_AGENT_NAME
        self._http_session: aiohttp.ClientSession | None = None
        self._conn_options = conn_options
        self._room: rtc.Room | None = None
        self._realtime_session_id: str | None = None
        self._end_session_task: asyncio.Task[None] | None = None

    def _ensure_http_session(self) -> aiohttp.ClientSession:
        if self._http_session is None:
            self._http_session = utils.http_context.http_session()
        return self._http_session

    async def start(
        self,
        agent_session: AgentSession,
        room: rtc.Room,
        *,
        livekit_url: NotGivenOr[str] = NOT_GIVEN,
        livekit_api_key: NotGivenOr[str] = NOT_GIVEN,
        livekit_api_secret: NotGivenOr[str] = NOT_GIVEN,
    ) -> None:
        await super().start(agent_session, room)
        self._room = room

        livekit_url = livekit_url or (os.getenv("LIVEKIT_URL") or NOT_GIVEN)
        livekit_api_key = livekit_api_key or (os.getenv("LIVEKIT_API_KEY") or NOT_GIVEN)
        livekit_api_secret = livekit_api_secret or (os.getenv("LIVEKIT_API_SECRET") or NOT_GIVEN)
        if not livekit_url or not livekit_api_key or not livekit_api_secret:
            raise RunwayException(
                "livekit_url, livekit_api_key, and livekit_api_secret must be set "
                "by arguments or environment variables"
            )

        job_ctx = get_job_context()
        self._local_participant_identity = job_ctx.local_participant_identity

        livekit_token = (
            api.AccessToken(api_key=livekit_api_key, api_secret=livekit_api_secret)
            .with_kind("agent")
            .with_identity(self._avatar_participant_identity)
            .with_name(self._avatar_participant_name)
            .with_grants(api.VideoGrants(room_join=True, room=room.name))
            .with_attributes({ATTRIBUTE_PUBLISH_ON_BEHALF: self._local_participant_identity})
            .to_jwt()
        )

        logger.debug("starting Runway avatar session")
        await self._create_session(livekit_url, livekit_token, room.name)

        @agent_session.on("close")
        def _on_agent_session_close(_: Any) -> None:
            self._ensure_end_session_task()

        agent_session.output.audio = DataStreamAudioOutput(
            room=room,
            destination_identity=self._avatar_participant_identity,
            wait_remote_track=rtc.TrackKind.KIND_VIDEO,
            sample_rate=SAMPLE_RATE,
        )

    async def _create_session(self, livekit_url: str, livekit_token: str, room_name: str) -> None:
        body: dict[str, object] = {
            "model": "gwm1_avatars",
            "avatar": self._avatar,
            "livekit": {
                "url": livekit_url,
                "token": livekit_token,
                "roomName": room_name,
                "agentIdentity": self._local_participant_identity,
            },
        }

        if is_given(self._max_duration):
            body["maxDuration"] = self._max_duration

        for attempt in range(self._conn_options.max_retry):
            try:
                async with self._ensure_http_session().post(
                    f"{self._api_url}/v1/realtime_sessions",
                    headers={
                        "Authorization": f"Bearer {self._api_key}",
                        "X-Runway-Version": API_VERSION,
                    },
                    json=body,
                    timeout=aiohttp.ClientTimeout(sock_connect=self._conn_options.timeout),
                ) as response:
                    if not response.ok:
                        text = await response.text()
                        raise APIStatusError(
                            "Runway API returned an error",
                            status_code=response.status,
                            body=text,
                        )
                    payload = await response.json()
                    session_id = payload.get("id") if isinstance(payload, dict) else None
                    if isinstance(session_id, str):
                        self._realtime_session_id = session_id
                    return

            except Exception as error:
                if isinstance(error, APIStatusError) and not error.retryable:
                    raise

                if isinstance(error, APIConnectionError):
                    logger.warning(
                        "failed to call Runway avatar API",
                        extra={"error": str(error)},
                    )
                else:
                    logger.exception("failed to call Runway avatar API")

                if attempt < self._conn_options.max_retry - 1:
                    await asyncio.sleep(self._conn_options.retry_interval)

        raise APIConnectionError("Failed to start Runway Avatar Session after all retries")

    def _ensure_end_session_task(self) -> asyncio.Task[None] | None:
        if self._end_session_task is not None:
            return self._end_session_task

        if self._room is None:
            return None

        self._end_session_task = asyncio.create_task(
            self._end_runway_realtime_session(self._room),
            name="runway_end_realtime_session",
        )
        return self._end_session_task

    async def _end_runway_realtime_session(self, room: rtc.Room) -> None:
        # Preferred path: data-channel END_CALL while the room is still connected.
        # The Runway worker handles this message and shuts the session down through
        # the normal "user ended call" lifecycle (COMPLETED, not CANCELLED).
        if room.isconnected():
            try:
                await room.local_participant.publish_data(
                    json.dumps({"type": "END_CALL"}).encode("utf-8"),
                    reliable=True,
                    destination_identities=[self._avatar_participant_identity],
                )
                logger.debug("sent Runway realtime session end call")
                return
            except Exception as exc:
                logger.warning(
                    "error ending Runway realtime session via data channel",
                    extra={"error": str(exc)},
                )

        # Fallback for hard shutdowns where the room is already disconnected
        # (e.g. aclose() registered as a job shutdown callback runs after
        # room.disconnect()): cancel the session via API so we don't keep
        # billing until maxDuration.
        await self._cancel_runway_realtime_session()

    async def _cancel_runway_realtime_session(self) -> None:
        session_id = self._realtime_session_id
        if session_id is None:
            logger.warning("could not cancel Runway realtime session; no session id available")
            return

        try:
            async with self._ensure_http_session().delete(
                f"{self._api_url}/v1/realtime_sessions/{session_id}",
                headers={
                    "Authorization": f"Bearer {self._api_key}",
                    "X-Runway-Version": API_VERSION,
                },
                timeout=aiohttp.ClientTimeout(total=self._conn_options.timeout),
            ) as response:
                if response.ok:
                    logger.debug(
                        "cancelled Runway realtime session",
                        extra={"session_id": session_id},
                    )
                else:
                    logger.warning(
                        "could not cancel Runway realtime session",
                        extra={
                            "session_id": session_id,
                            "status": response.status,
                            "body": await response.text(),
                        },
                    )
        except Exception as exc:
            logger.warning(
                "error cancelling Runway realtime session",
                extra={"session_id": session_id, "error": str(exc)},
            )

    async def aclose(self) -> None:
        if end_session_task := self._ensure_end_session_task():
            await asyncio.shield(end_session_task)

A Runway Characters avatar session.

Creates a realtime session backed by Runway's avatar inference pipeline. The customer's LiveKit agent owns the conversational AI stack (STT, LLM, TTS); Runway provides the visual layer — audio in, avatar video out.

Ancestors

  • livekit.agents.voice.avatar._types.AvatarSession

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    if end_session_task := self._ensure_end_session_task():
        await asyncio.shield(end_session_task)
async def start(self,
agent_session: AgentSession,
room: rtc.Room,
*,
livekit_url: NotGivenOr[str] = NOT_GIVEN,
livekit_api_key: NotGivenOr[str] = NOT_GIVEN,
livekit_api_secret: NotGivenOr[str] = NOT_GIVEN) ‑> None
Expand source code
async def start(
    self,
    agent_session: AgentSession,
    room: rtc.Room,
    *,
    livekit_url: NotGivenOr[str] = NOT_GIVEN,
    livekit_api_key: NotGivenOr[str] = NOT_GIVEN,
    livekit_api_secret: NotGivenOr[str] = NOT_GIVEN,
) -> None:
    await super().start(agent_session, room)
    self._room = room

    livekit_url = livekit_url or (os.getenv("LIVEKIT_URL") or NOT_GIVEN)
    livekit_api_key = livekit_api_key or (os.getenv("LIVEKIT_API_KEY") or NOT_GIVEN)
    livekit_api_secret = livekit_api_secret or (os.getenv("LIVEKIT_API_SECRET") or NOT_GIVEN)
    if not livekit_url or not livekit_api_key or not livekit_api_secret:
        raise RunwayException(
            "livekit_url, livekit_api_key, and livekit_api_secret must be set "
            "by arguments or environment variables"
        )

    job_ctx = get_job_context()
    self._local_participant_identity = job_ctx.local_participant_identity

    livekit_token = (
        api.AccessToken(api_key=livekit_api_key, api_secret=livekit_api_secret)
        .with_kind("agent")
        .with_identity(self._avatar_participant_identity)
        .with_name(self._avatar_participant_name)
        .with_grants(api.VideoGrants(room_join=True, room=room.name))
        .with_attributes({ATTRIBUTE_PUBLISH_ON_BEHALF: self._local_participant_identity})
        .to_jwt()
    )

    logger.debug("starting Runway avatar session")
    await self._create_session(livekit_url, livekit_token, room.name)

    @agent_session.on("close")
    def _on_agent_session_close(_: Any) -> None:
        self._ensure_end_session_task()

    agent_session.output.audio = DataStreamAudioOutput(
        room=room,
        destination_identity=self._avatar_participant_identity,
        wait_remote_track=rtc.TrackKind.KIND_VIDEO,
        sample_rate=SAMPLE_RATE,
    )
class RunwayException (*args, **kwargs)
Expand source code
class RunwayException(Exception):
    """Exception for Runway errors"""

Exception for Runway errors

Ancestors

  • builtins.Exception
  • builtins.BaseException