Module livekit.api

LiveKit Server APIs for Python

pip install livekit-api

Manage rooms, participants, egress, ingress, SIP, and Agent dispatch.

Primary entry point is LiveKitAPI.

See https://docs.livekit.io/reference/server/server-apis for more information.

Sub-modules

livekit.api.access_token
livekit.api.agent_dispatch_service
livekit.api.egress_service
livekit.api.ingress_service
livekit.api.livekit_api
livekit.api.room_service
livekit.api.sip_service
livekit.api.twirp_client
livekit.api.version
livekit.api.webhook

Classes

class AccessToken (api_key: str | None = None, api_secret: str | None = None)
Expand source code
class AccessToken:
    ParticipantKind = Literal["standard", "egress", "ingress", "sip", "agent"]

    def __init__(
        self,
        api_key: Optional[str] = None,
        api_secret: Optional[str] = None,
    ) -> None:
        api_key = api_key or os.getenv("LIVEKIT_API_KEY")
        api_secret = api_secret or os.getenv("LIVEKIT_API_SECRET")

        if not api_key or not api_secret:
            raise ValueError("api_key and api_secret must be set")

        self.api_key = api_key  # iss
        self.api_secret = api_secret
        self.claims = Claims()

        # default jwt claims
        self.identity = ""  # sub
        self.ttl = DEFAULT_TTL  # exp

    def with_ttl(self, ttl: datetime.timedelta) -> "AccessToken":
        self.ttl = ttl
        return self

    def with_grants(self, grants: VideoGrants) -> "AccessToken":
        self.claims.video = grants
        return self

    def with_sip_grants(self, grants: SIPGrants) -> "AccessToken":
        self.claims.sip = grants
        return self

    def with_identity(self, identity: str) -> "AccessToken":
        self.identity = identity
        return self

    def with_kind(self, kind: ParticipantKind) -> "AccessToken":
        self.claims.kind = kind
        return self

    def with_name(self, name: str) -> "AccessToken":
        self.claims.name = name
        return self

    def with_metadata(self, metadata: str) -> "AccessToken":
        self.claims.metadata = metadata
        return self

    def with_attributes(self, attributes: dict[str, str]) -> "AccessToken":
        self.claims.attributes = attributes
        return self

    def with_sha256(self, sha256: str) -> "AccessToken":
        self.claims.sha256 = sha256
        return self

    def with_room_preset(self, preset: str) -> "AccessToken":
        self.claims.room_preset = preset
        return self

    def with_room_config(self, config: RoomConfiguration) -> "AccessToken":
        self.claims.room_config = config
        return self

    def to_jwt(self) -> str:
        video = self.claims.video
        if video and video.room_join and (not self.identity or not video.room):
            raise ValueError("identity and room must be set when joining a room")

        # we want to exclude None values from the token
        jwt_claims = self.claims.asdict()
        jwt_claims.update(
            {
                "sub": self.identity,
                "iss": self.api_key,
                "nbf": calendar.timegm(
                    datetime.datetime.now(datetime.timezone.utc).utctimetuple()
                ),
                "exp": calendar.timegm(
                    (
                        datetime.datetime.now(datetime.timezone.utc) + self.ttl
                    ).utctimetuple()
                ),
            }
        )
        return jwt.encode(jwt_claims, self.api_secret, algorithm="HS256")

Class variables

var ParticipantKind

Methods

def to_jwt(self) ‑> str
Expand source code
def to_jwt(self) -> str:
    video = self.claims.video
    if video and video.room_join and (not self.identity or not video.room):
        raise ValueError("identity and room must be set when joining a room")

    # we want to exclude None values from the token
    jwt_claims = self.claims.asdict()
    jwt_claims.update(
        {
            "sub": self.identity,
            "iss": self.api_key,
            "nbf": calendar.timegm(
                datetime.datetime.now(datetime.timezone.utc).utctimetuple()
            ),
            "exp": calendar.timegm(
                (
                    datetime.datetime.now(datetime.timezone.utc) + self.ttl
                ).utctimetuple()
            ),
        }
    )
    return jwt.encode(jwt_claims, self.api_secret, algorithm="HS256")
def with_attributes(self, attributes: dict[str, str]) ‑> AccessToken
Expand source code
def with_attributes(self, attributes: dict[str, str]) -> "AccessToken":
    self.claims.attributes = attributes
    return self
def with_grants(self,
grants: VideoGrants) ‑> AccessToken
Expand source code
def with_grants(self, grants: VideoGrants) -> "AccessToken":
    self.claims.video = grants
    return self
def with_identity(self, identity: str) ‑> AccessToken
Expand source code
def with_identity(self, identity: str) -> "AccessToken":
    self.identity = identity
    return self
def with_kind(self, kind: Literal['standard', 'egress', 'ingress', 'sip', 'agent']) ‑> AccessToken
Expand source code
def with_kind(self, kind: ParticipantKind) -> "AccessToken":
    self.claims.kind = kind
    return self
def with_metadata(self, metadata: str) ‑> AccessToken
Expand source code
def with_metadata(self, metadata: str) -> "AccessToken":
    self.claims.metadata = metadata
    return self
def with_name(self, name: str) ‑> AccessToken
Expand source code
def with_name(self, name: str) -> "AccessToken":
    self.claims.name = name
    return self
def with_room_config(self, config: room.RoomConfiguration) ‑> AccessToken
Expand source code
def with_room_config(self, config: RoomConfiguration) -> "AccessToken":
    self.claims.room_config = config
    return self
def with_room_preset(self, preset: str) ‑> AccessToken
Expand source code
def with_room_preset(self, preset: str) -> "AccessToken":
    self.claims.room_preset = preset
    return self
def with_sha256(self, sha256: str) ‑> AccessToken
Expand source code
def with_sha256(self, sha256: str) -> "AccessToken":
    self.claims.sha256 = sha256
    return self
def with_sip_grants(self,
grants: SIPGrants) ‑> AccessToken
Expand source code
def with_sip_grants(self, grants: SIPGrants) -> "AccessToken":
    self.claims.sip = grants
    return self
def with_ttl(self, ttl: datetime.timedelta) ‑> AccessToken
Expand source code
def with_ttl(self, ttl: datetime.timedelta) -> "AccessToken":
    self.ttl = ttl
    return self
class LiveKitAPI (url: str | None = None,
api_key: str | None = None,
api_secret: str | None = None,
*,
timeout: aiohttp.client.ClientTimeout = ClientTimeout(total=60, connect=None, sock_read=None, sock_connect=None, ceil_threshold=5))
Expand source code
class LiveKitAPI:
    """LiveKit Server API Client

    This class is the main entrypoint, which exposes all services.

    Usage:

    ```python
    from livekit import api
    lkapi = api.LiveKitAPI()
    rooms = await lkapi.room.list_rooms(api.proto_room.ListRoomsRequest(names=['test-room']))
    ```
    """

    def __init__(
        self,
        url: Optional[str] = None,
        api_key: Optional[str] = None,
        api_secret: Optional[str] = None,
        *,
        timeout: aiohttp.ClientTimeout = aiohttp.ClientTimeout(total=60),  # 60 seconds
    ):
        """Create a new LiveKitAPI instance.

        Args:
            url: LiveKit server URL (read from `LIVEKIT_URL` environment variable if not provided)
            api_key: API key (read from `LIVEKIT_API_KEY` environment variable if not provided)
            api_secret: API secret (read from `LIVEKIT_API_SECRET` environment variable if not provided)
            timeout: Request timeout (default: 60 seconds)
        """
        url = url or os.getenv("LIVEKIT_URL")
        api_key = api_key or os.getenv("LIVEKIT_API_KEY")
        api_secret = api_secret or os.getenv("LIVEKIT_API_SECRET")

        if not url:
            raise ValueError("url must be set")

        if not api_key or not api_secret:
            raise ValueError("api_key and api_secret must be set")

        self._session = aiohttp.ClientSession(timeout=timeout)
        self._room = RoomService(self._session, url, api_key, api_secret)
        self._ingress = IngressService(self._session, url, api_key, api_secret)
        self._egress = EgressService(self._session, url, api_key, api_secret)
        self._sip = SipService(self._session, url, api_key, api_secret)
        self._agent_dispatch = AgentDispatchService(
            self._session, url, api_key, api_secret
        )

    @property
    def agent_dispatch(self) -> AgentDispatchService:
        """Instance of the AgentDispatchService"""
        return self._agent_dispatch

    @property
    def room(self) -> RoomService:
        """Instance of the RoomService"""
        return self._room

    @property
    def ingress(self) -> IngressService:
        """Instance of the IngressService"""
        return self._ingress

    @property
    def egress(self) -> EgressService:
        """Instance of the EgressService"""
        return self._egress

    @property
    def sip(self) -> SipService:
        """Instance of the SipService"""
        return self._sip

    async def aclose(self):
        """Close the API client

        Call this before your application exits or when the API client is no longer needed."""
        await self._session.close()

    async def __aenter__(self):
        """@private

        Support for `async with`"""
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """@private

        Support for `async with`"""
        await self.aclose()

LiveKit Server API Client

This class is the main entrypoint, which exposes all services.

Usage:

from livekit import api
lkapi = api.LiveKitAPI()
rooms = await lkapi.room.list_rooms(api.proto_room.ListRoomsRequest(names=['test-room']))

Create a new LiveKitAPI instance.

Args

url
LiveKit server URL (read from LIVEKIT_URL environment variable if not provided)
api_key
API key (read from LIVEKIT_API_KEY environment variable if not provided)
api_secret
API secret (read from LIVEKIT_API_SECRET environment variable if not provided)
timeout
Request timeout (default: 60 seconds)

Instance variables

prop agent_dispatchAgentDispatchService
Expand source code
@property
def agent_dispatch(self) -> AgentDispatchService:
    """Instance of the AgentDispatchService"""
    return self._agent_dispatch

Instance of the AgentDispatchService

prop egressEgressService
Expand source code
@property
def egress(self) -> EgressService:
    """Instance of the EgressService"""
    return self._egress

Instance of the EgressService

prop ingressIngressService
Expand source code
@property
def ingress(self) -> IngressService:
    """Instance of the IngressService"""
    return self._ingress

Instance of the IngressService

prop roomRoomService
Expand source code
@property
def room(self) -> RoomService:
    """Instance of the RoomService"""
    return self._room

Instance of the RoomService

prop sipSipService
Expand source code
@property
def sip(self) -> SipService:
    """Instance of the SipService"""
    return self._sip

Instance of the SipService

Methods

async def aclose(self)
Expand source code
async def aclose(self):
    """Close the API client

    Call this before your application exits or when the API client is no longer needed."""
    await self._session.close()

Close the API client

Call this before your application exits or when the API client is no longer needed.

class SIPGrants (admin: bool = False, call: bool = False)
Expand source code
@dataclasses.dataclass
class SIPGrants:
    # manage sip resources
    admin: bool = False
    # make outbound calls
    call: bool = False

SIPGrants(admin: bool = False, call: bool = False)

Class variables

var admin : bool
var call : bool
class TokenVerifier (api_key: str | None = None,
api_secret: str | None = None,
*,
leeway: datetime.timedelta = datetime.timedelta(seconds=60))
Expand source code
class TokenVerifier:
    def __init__(
        self,
        api_key: Optional[str] = None,
        api_secret: Optional[str] = None,
        *,
        leeway: datetime.timedelta = DEFAULT_LEEWAY,
    ) -> None:
        api_key = api_key or os.getenv("LIVEKIT_API_KEY")
        api_secret = api_secret or os.getenv("LIVEKIT_API_SECRET")

        if not api_key or not api_secret:
            raise ValueError("api_key and api_secret must be set")

        self.api_key = api_key
        self.api_secret = api_secret
        self._leeway = leeway

    def verify(self, token: str) -> Claims:
        claims = jwt.decode(
            token,
            self.api_secret,
            issuer=self.api_key,
            algorithms=["HS256"],
            leeway=self._leeway.total_seconds(),
        )

        video_dict = claims.get("video", dict())
        video_dict = {camel_to_snake(k): v for k, v in video_dict.items()}
        video_dict = {
            k: v for k, v in video_dict.items() if k in VideoGrants.__dataclass_fields__
        }
        video = VideoGrants(**video_dict)

        sip_dict = claims.get("sip", dict())
        sip_dict = {camel_to_snake(k): v for k, v in sip_dict.items()}
        sip_dict = {
            k: v for k, v in sip_dict.items() if k in SIPGrants.__dataclass_fields__
        }
        sip = SIPGrants(**sip_dict)

        grant_claims = Claims(
            identity=claims.get("sub", ""),
            name=claims.get("name", ""),
            video=video,
            sip=sip,
            attributes=claims.get("attributes", {}),
            metadata=claims.get("metadata", ""),
            sha256=claims.get("sha256", ""),
        )

        if claims.get("roomPreset"):
            grant_claims.room_preset = claims.get("roomPreset")
        if claims.get("roomConfig"):
            grant_claims.room_config = ParseDict(
                claims.get("roomConfig"),
                RoomConfiguration(),
                ignore_unknown_fields=True,
            )

        return grant_claims

Methods

def verify(self, token: str) ‑> Claims
Expand source code
def verify(self, token: str) -> Claims:
    claims = jwt.decode(
        token,
        self.api_secret,
        issuer=self.api_key,
        algorithms=["HS256"],
        leeway=self._leeway.total_seconds(),
    )

    video_dict = claims.get("video", dict())
    video_dict = {camel_to_snake(k): v for k, v in video_dict.items()}
    video_dict = {
        k: v for k, v in video_dict.items() if k in VideoGrants.__dataclass_fields__
    }
    video = VideoGrants(**video_dict)

    sip_dict = claims.get("sip", dict())
    sip_dict = {camel_to_snake(k): v for k, v in sip_dict.items()}
    sip_dict = {
        k: v for k, v in sip_dict.items() if k in SIPGrants.__dataclass_fields__
    }
    sip = SIPGrants(**sip_dict)

    grant_claims = Claims(
        identity=claims.get("sub", ""),
        name=claims.get("name", ""),
        video=video,
        sip=sip,
        attributes=claims.get("attributes", {}),
        metadata=claims.get("metadata", ""),
        sha256=claims.get("sha256", ""),
    )

    if claims.get("roomPreset"):
        grant_claims.room_preset = claims.get("roomPreset")
    if claims.get("roomConfig"):
        grant_claims.room_config = ParseDict(
            claims.get("roomConfig"),
            RoomConfiguration(),
            ignore_unknown_fields=True,
        )

    return grant_claims
class VideoGrants (room_create: bool | None = None,
room_list: bool | None = None,
room_record: bool | None = None,
room_admin: bool | None = None,
room_join: bool | None = None,
room: str = '',
can_publish: bool = True,
can_subscribe: bool = True,
can_publish_data: bool = True,
can_publish_sources: List[str] | None = None,
can_update_own_metadata: bool | None = None,
ingress_admin: bool | None = None,
hidden: bool | None = None,
recorder: bool | None = None,
agent: bool | None = None)
Expand source code
@dataclasses.dataclass
class VideoGrants:
    # actions on rooms
    room_create: Optional[bool] = None
    room_list: Optional[bool] = None
    room_record: Optional[bool] = None

    # actions on a particular room
    room_admin: Optional[bool] = None
    room_join: Optional[bool] = None
    room: str = ""

    # permissions within a room
    can_publish: bool = True
    can_subscribe: bool = True
    can_publish_data: bool = True

    # TrackSource types that a participant may publish.
    # When set, it supersedes CanPublish. Only sources explicitly set here can be
    # published
    can_publish_sources: Optional[List[str]] = None

    # by default, a participant is not allowed to update its own metadata
    can_update_own_metadata: Optional[bool] = None

    # actions on ingresses
    ingress_admin: Optional[bool] = None  # applies to all ingress

    # participant is not visible to other participants (useful when making bots)
    hidden: Optional[bool] = None

    # [deprecated] indicates to the room that current participant is a recorder
    recorder: Optional[bool] = None

    # indicates that the holder can register as an Agent framework worker
    agent: Optional[bool] = None

VideoGrants(room_create: Optional[bool] = None, room_list: Optional[bool] = None, room_record: Optional[bool] = None, room_admin: Optional[bool] = None, room_join: Optional[bool] = None, room: str = '', can_publish: bool = True, can_subscribe: bool = True, can_publish_data: bool = True, can_publish_sources: Optional[List[str]] = None, can_update_own_metadata: Optional[bool] = None, ingress_admin: Optional[bool] = None, hidden: Optional[bool] = None, recorder: Optional[bool] = None, agent: Optional[bool] = None)

Class variables

var agent : bool | None
var can_publish : bool
var can_publish_data : bool
var can_publish_sources : List[str] | None
var can_subscribe : bool
var can_update_own_metadata : bool | None
var hidden : bool | None
var ingress_admin : bool | None
var recorder : bool | None
var room : str
var room_admin : bool | None
var room_create : bool | None
var room_join : bool | None
var room_list : bool | None
var room_record : bool | None
class WebhookReceiver (token_verifier: TokenVerifier)
Expand source code
class WebhookReceiver:
    def __init__(self, token_verifier: TokenVerifier):
        self._verifier = token_verifier

    def receive(self, body: str, auth_token: str) -> WebhookEvent:
        claims = self._verifier.verify(auth_token)
        if claims.sha256 is None:
            raise Exception("sha256 was not found in the token")

        body_hash = hashlib.sha256(body.encode()).digest()
        claims_hash = base64.b64decode(claims.sha256)

        if body_hash != claims_hash:
            raise Exception("hash mismatch")

        return Parse(body, WebhookEvent(), ignore_unknown_fields=True)

Methods

def receive(self, body: str, auth_token: str) ‑> WebhookEvent
Expand source code
def receive(self, body: str, auth_token: str) -> WebhookEvent:
    claims = self._verifier.verify(auth_token)
    if claims.sha256 is None:
        raise Exception("sha256 was not found in the token")

    body_hash = hashlib.sha256(body.encode()).digest()
    claims_hash = base64.b64decode(claims.sha256)

    if body_hash != claims_hash:
        raise Exception("hash mismatch")

    return Parse(body, WebhookEvent(), ignore_unknown_fields=True)