Module livekit.api
LiveKit Server APIs for Python
pip install livekit-api
Manage rooms, participants, egress, ingress, SIP, and Agent dispatch.
Primary entry point is LiveKitAPI
.
See https://docs.livekit.io/reference/server/server-apis for more information.
Sub-modules
livekit.api.access_token
livekit.api.agent_dispatch_service
livekit.api.egress_service
livekit.api.ingress_service
livekit.api.livekit_api
livekit.api.room_service
livekit.api.sip_service
livekit.api.twirp_client
livekit.api.version
livekit.api.webhook
Classes
class AccessToken (api_key: str | None = None, api_secret: str | None = None)
-
Expand source code
class AccessToken: ParticipantKind = Literal["standard", "egress", "ingress", "sip", "agent"] def __init__( self, api_key: Optional[str] = None, api_secret: Optional[str] = None, ) -> None: api_key = api_key or os.getenv("LIVEKIT_API_KEY") api_secret = api_secret or os.getenv("LIVEKIT_API_SECRET") if not api_key or not api_secret: raise ValueError("api_key and api_secret must be set") self.api_key = api_key # iss self.api_secret = api_secret self.claims = Claims() # default jwt claims self.identity = "" # sub self.ttl = DEFAULT_TTL # exp def with_ttl(self, ttl: datetime.timedelta) -> "AccessToken": self.ttl = ttl return self def with_grants(self, grants: VideoGrants) -> "AccessToken": self.claims.video = grants return self def with_sip_grants(self, grants: SIPGrants) -> "AccessToken": self.claims.sip = grants return self def with_identity(self, identity: str) -> "AccessToken": self.identity = identity return self def with_kind(self, kind: ParticipantKind) -> "AccessToken": self.claims.kind = kind return self def with_name(self, name: str) -> "AccessToken": self.claims.name = name return self def with_metadata(self, metadata: str) -> "AccessToken": self.claims.metadata = metadata return self def with_attributes(self, attributes: dict[str, str]) -> "AccessToken": self.claims.attributes = attributes return self def with_sha256(self, sha256: str) -> "AccessToken": self.claims.sha256 = sha256 return self def with_room_preset(self, preset: str) -> "AccessToken": self.claims.room_preset = preset return self def with_room_config(self, config: RoomConfiguration) -> "AccessToken": self.claims.room_config = config return self def to_jwt(self) -> str: video = self.claims.video if video and video.room_join and (not self.identity or not video.room): raise ValueError("identity and room must be set when joining a room") # we want to exclude None values from the token jwt_claims = self.claims.asdict() jwt_claims.update( { "sub": self.identity, "iss": self.api_key, "nbf": calendar.timegm( datetime.datetime.now(datetime.timezone.utc).utctimetuple() ), "exp": calendar.timegm( ( datetime.datetime.now(datetime.timezone.utc) + self.ttl ).utctimetuple() ), } ) return jwt.encode(jwt_claims, self.api_secret, algorithm="HS256")
Class variables
var ParticipantKind
Methods
def to_jwt(self) ‑> str
-
Expand source code
def to_jwt(self) -> str: video = self.claims.video if video and video.room_join and (not self.identity or not video.room): raise ValueError("identity and room must be set when joining a room") # we want to exclude None values from the token jwt_claims = self.claims.asdict() jwt_claims.update( { "sub": self.identity, "iss": self.api_key, "nbf": calendar.timegm( datetime.datetime.now(datetime.timezone.utc).utctimetuple() ), "exp": calendar.timegm( ( datetime.datetime.now(datetime.timezone.utc) + self.ttl ).utctimetuple() ), } ) return jwt.encode(jwt_claims, self.api_secret, algorithm="HS256")
def with_attributes(self, attributes: dict[str, str]) ‑> AccessToken
-
Expand source code
def with_attributes(self, attributes: dict[str, str]) -> "AccessToken": self.claims.attributes = attributes return self
def with_grants(self,
grants: VideoGrants) ‑> AccessToken-
Expand source code
def with_grants(self, grants: VideoGrants) -> "AccessToken": self.claims.video = grants return self
def with_identity(self, identity: str) ‑> AccessToken
-
Expand source code
def with_identity(self, identity: str) -> "AccessToken": self.identity = identity return self
def with_kind(self, kind: Literal['standard', 'egress', 'ingress', 'sip', 'agent']) ‑> AccessToken
-
Expand source code
def with_kind(self, kind: ParticipantKind) -> "AccessToken": self.claims.kind = kind return self
def with_metadata(self, metadata: str) ‑> AccessToken
-
Expand source code
def with_metadata(self, metadata: str) -> "AccessToken": self.claims.metadata = metadata return self
def with_name(self, name: str) ‑> AccessToken
-
Expand source code
def with_name(self, name: str) -> "AccessToken": self.claims.name = name return self
def with_room_config(self, config: room.RoomConfiguration) ‑> AccessToken
-
Expand source code
def with_room_config(self, config: RoomConfiguration) -> "AccessToken": self.claims.room_config = config return self
def with_room_preset(self, preset: str) ‑> AccessToken
-
Expand source code
def with_room_preset(self, preset: str) -> "AccessToken": self.claims.room_preset = preset return self
def with_sha256(self, sha256: str) ‑> AccessToken
-
Expand source code
def with_sha256(self, sha256: str) -> "AccessToken": self.claims.sha256 = sha256 return self
def with_sip_grants(self,
grants: SIPGrants) ‑> AccessToken-
Expand source code
def with_sip_grants(self, grants: SIPGrants) -> "AccessToken": self.claims.sip = grants return self
def with_ttl(self, ttl: datetime.timedelta) ‑> AccessToken
-
Expand source code
def with_ttl(self, ttl: datetime.timedelta) -> "AccessToken": self.ttl = ttl return self
class LiveKitAPI (url: str | None = None,
api_key: str | None = None,
api_secret: str | None = None,
*,
timeout: aiohttp.client.ClientTimeout = ClientTimeout(total=60, connect=None, sock_read=None, sock_connect=None, ceil_threshold=5))-
Expand source code
class LiveKitAPI: """LiveKit Server API Client This class is the main entrypoint, which exposes all services. Usage: ```python from livekit import api lkapi = api.LiveKitAPI() rooms = await lkapi.room.list_rooms(api.proto_room.ListRoomsRequest(names=['test-room'])) ``` """ def __init__( self, url: Optional[str] = None, api_key: Optional[str] = None, api_secret: Optional[str] = None, *, timeout: aiohttp.ClientTimeout = aiohttp.ClientTimeout(total=60), # 60 seconds ): """Create a new LiveKitAPI instance. Args: url: LiveKit server URL (read from `LIVEKIT_URL` environment variable if not provided) api_key: API key (read from `LIVEKIT_API_KEY` environment variable if not provided) api_secret: API secret (read from `LIVEKIT_API_SECRET` environment variable if not provided) timeout: Request timeout (default: 60 seconds) """ url = url or os.getenv("LIVEKIT_URL") api_key = api_key or os.getenv("LIVEKIT_API_KEY") api_secret = api_secret or os.getenv("LIVEKIT_API_SECRET") if not url: raise ValueError("url must be set") if not api_key or not api_secret: raise ValueError("api_key and api_secret must be set") self._session = aiohttp.ClientSession(timeout=timeout) self._room = RoomService(self._session, url, api_key, api_secret) self._ingress = IngressService(self._session, url, api_key, api_secret) self._egress = EgressService(self._session, url, api_key, api_secret) self._sip = SipService(self._session, url, api_key, api_secret) self._agent_dispatch = AgentDispatchService( self._session, url, api_key, api_secret ) @property def agent_dispatch(self) -> AgentDispatchService: """Instance of the AgentDispatchService""" return self._agent_dispatch @property def room(self) -> RoomService: """Instance of the RoomService""" return self._room @property def ingress(self) -> IngressService: """Instance of the IngressService""" return self._ingress @property def egress(self) -> EgressService: """Instance of the EgressService""" return self._egress @property def sip(self) -> SipService: """Instance of the SipService""" return self._sip async def aclose(self): """Close the API client Call this before your application exits or when the API client is no longer needed.""" await self._session.close() async def __aenter__(self): """@private Support for `async with`""" return self async def __aexit__(self, exc_type, exc_val, exc_tb): """@private Support for `async with`""" await self.aclose()
LiveKit Server API Client
This class is the main entrypoint, which exposes all services.
Usage:
from livekit import api lkapi = api.LiveKitAPI() rooms = await lkapi.room.list_rooms(api.proto_room.ListRoomsRequest(names=['test-room']))
Create a new LiveKitAPI instance.
Args
url
- LiveKit server URL (read from
LIVEKIT_URL
environment variable if not provided) api_key
- API key (read from
LIVEKIT_API_KEY
environment variable if not provided) api_secret
- API secret (read from
LIVEKIT_API_SECRET
environment variable if not provided) timeout
- Request timeout (default: 60 seconds)
Instance variables
prop agent_dispatch : AgentDispatchService
-
Expand source code
@property def agent_dispatch(self) -> AgentDispatchService: """Instance of the AgentDispatchService""" return self._agent_dispatch
Instance of the AgentDispatchService
prop egress : EgressService
-
Expand source code
@property def egress(self) -> EgressService: """Instance of the EgressService""" return self._egress
Instance of the EgressService
prop ingress : IngressService
-
Expand source code
@property def ingress(self) -> IngressService: """Instance of the IngressService""" return self._ingress
Instance of the IngressService
prop room : RoomService
-
Expand source code
@property def room(self) -> RoomService: """Instance of the RoomService""" return self._room
Instance of the RoomService
prop sip : SipService
-
Expand source code
@property def sip(self) -> SipService: """Instance of the SipService""" return self._sip
Instance of the SipService
Methods
async def aclose(self)
-
Expand source code
async def aclose(self): """Close the API client Call this before your application exits or when the API client is no longer needed.""" await self._session.close()
Close the API client
Call this before your application exits or when the API client is no longer needed.
class SIPGrants (admin: bool = False, call: bool = False)
-
Expand source code
@dataclasses.dataclass class SIPGrants: # manage sip resources admin: bool = False # make outbound calls call: bool = False
SIPGrants(admin: bool = False, call: bool = False)
Class variables
var admin : bool
var call : bool
class TokenVerifier (api_key: str | None = None,
api_secret: str | None = None,
*,
leeway: datetime.timedelta = datetime.timedelta(seconds=60))-
Expand source code
class TokenVerifier: def __init__( self, api_key: Optional[str] = None, api_secret: Optional[str] = None, *, leeway: datetime.timedelta = DEFAULT_LEEWAY, ) -> None: api_key = api_key or os.getenv("LIVEKIT_API_KEY") api_secret = api_secret or os.getenv("LIVEKIT_API_SECRET") if not api_key or not api_secret: raise ValueError("api_key and api_secret must be set") self.api_key = api_key self.api_secret = api_secret self._leeway = leeway def verify(self, token: str) -> Claims: claims = jwt.decode( token, self.api_secret, issuer=self.api_key, algorithms=["HS256"], leeway=self._leeway.total_seconds(), ) video_dict = claims.get("video", dict()) video_dict = {camel_to_snake(k): v for k, v in video_dict.items()} video_dict = { k: v for k, v in video_dict.items() if k in VideoGrants.__dataclass_fields__ } video = VideoGrants(**video_dict) sip_dict = claims.get("sip", dict()) sip_dict = {camel_to_snake(k): v for k, v in sip_dict.items()} sip_dict = { k: v for k, v in sip_dict.items() if k in SIPGrants.__dataclass_fields__ } sip = SIPGrants(**sip_dict) grant_claims = Claims( identity=claims.get("sub", ""), name=claims.get("name", ""), video=video, sip=sip, attributes=claims.get("attributes", {}), metadata=claims.get("metadata", ""), sha256=claims.get("sha256", ""), ) if claims.get("roomPreset"): grant_claims.room_preset = claims.get("roomPreset") if claims.get("roomConfig"): grant_claims.room_config = ParseDict( claims.get("roomConfig"), RoomConfiguration(), ignore_unknown_fields=True, ) return grant_claims
Methods
def verify(self, token: str) ‑> Claims
-
Expand source code
def verify(self, token: str) -> Claims: claims = jwt.decode( token, self.api_secret, issuer=self.api_key, algorithms=["HS256"], leeway=self._leeway.total_seconds(), ) video_dict = claims.get("video", dict()) video_dict = {camel_to_snake(k): v for k, v in video_dict.items()} video_dict = { k: v for k, v in video_dict.items() if k in VideoGrants.__dataclass_fields__ } video = VideoGrants(**video_dict) sip_dict = claims.get("sip", dict()) sip_dict = {camel_to_snake(k): v for k, v in sip_dict.items()} sip_dict = { k: v for k, v in sip_dict.items() if k in SIPGrants.__dataclass_fields__ } sip = SIPGrants(**sip_dict) grant_claims = Claims( identity=claims.get("sub", ""), name=claims.get("name", ""), video=video, sip=sip, attributes=claims.get("attributes", {}), metadata=claims.get("metadata", ""), sha256=claims.get("sha256", ""), ) if claims.get("roomPreset"): grant_claims.room_preset = claims.get("roomPreset") if claims.get("roomConfig"): grant_claims.room_config = ParseDict( claims.get("roomConfig"), RoomConfiguration(), ignore_unknown_fields=True, ) return grant_claims
class VideoGrants (room_create: bool | None = None,
room_list: bool | None = None,
room_record: bool | None = None,
room_admin: bool | None = None,
room_join: bool | None = None,
room: str = '',
can_publish: bool = True,
can_subscribe: bool = True,
can_publish_data: bool = True,
can_publish_sources: List[str] | None = None,
can_update_own_metadata: bool | None = None,
ingress_admin: bool | None = None,
hidden: bool | None = None,
recorder: bool | None = None,
agent: bool | None = None)-
Expand source code
@dataclasses.dataclass class VideoGrants: # actions on rooms room_create: Optional[bool] = None room_list: Optional[bool] = None room_record: Optional[bool] = None # actions on a particular room room_admin: Optional[bool] = None room_join: Optional[bool] = None room: str = "" # permissions within a room can_publish: bool = True can_subscribe: bool = True can_publish_data: bool = True # TrackSource types that a participant may publish. # When set, it supersedes CanPublish. Only sources explicitly set here can be # published can_publish_sources: Optional[List[str]] = None # by default, a participant is not allowed to update its own metadata can_update_own_metadata: Optional[bool] = None # actions on ingresses ingress_admin: Optional[bool] = None # applies to all ingress # participant is not visible to other participants (useful when making bots) hidden: Optional[bool] = None # [deprecated] indicates to the room that current participant is a recorder recorder: Optional[bool] = None # indicates that the holder can register as an Agent framework worker agent: Optional[bool] = None
VideoGrants(room_create: Optional[bool] = None, room_list: Optional[bool] = None, room_record: Optional[bool] = None, room_admin: Optional[bool] = None, room_join: Optional[bool] = None, room: str = '', can_publish: bool = True, can_subscribe: bool = True, can_publish_data: bool = True, can_publish_sources: Optional[List[str]] = None, can_update_own_metadata: Optional[bool] = None, ingress_admin: Optional[bool] = None, hidden: Optional[bool] = None, recorder: Optional[bool] = None, agent: Optional[bool] = None)
Class variables
var agent : bool | None
var can_publish : bool
var can_publish_data : bool
var can_publish_sources : List[str] | None
var can_subscribe : bool
var can_update_own_metadata : bool | None
var ingress_admin : bool | None
var recorder : bool | None
var room : str
var room_admin : bool | None
var room_create : bool | None
var room_join : bool | None
var room_list : bool | None
var room_record : bool | None
class WebhookReceiver (token_verifier: TokenVerifier)
-
Expand source code
class WebhookReceiver: def __init__(self, token_verifier: TokenVerifier): self._verifier = token_verifier def receive(self, body: str, auth_token: str) -> WebhookEvent: claims = self._verifier.verify(auth_token) if claims.sha256 is None: raise Exception("sha256 was not found in the token") body_hash = hashlib.sha256(body.encode()).digest() claims_hash = base64.b64decode(claims.sha256) if body_hash != claims_hash: raise Exception("hash mismatch") return Parse(body, WebhookEvent(), ignore_unknown_fields=True)
Methods
def receive(self, body: str, auth_token: str) ‑> WebhookEvent
-
Expand source code
def receive(self, body: str, auth_token: str) -> WebhookEvent: claims = self._verifier.verify(auth_token) if claims.sha256 is None: raise Exception("sha256 was not found in the token") body_hash = hashlib.sha256(body.encode()).digest() claims_hash = base64.b64decode(claims.sha256) if body_hash != claims_hash: raise Exception("hash mismatch") return Parse(body, WebhookEvent(), ignore_unknown_fields=True)