Module livekit.api.access_token

Functions

def camel_to_snake(t: str)
Expand source code
def camel_to_snake(t: str):
    return re.sub(r"(?<!^)(?=[A-Z])", "_", t).lower()
def snake_to_lower_camel(t: str)
Expand source code
def snake_to_lower_camel(t: str):
    return "".join(
        word.capitalize() if i else word for i, word in enumerate(t.split("_"))
    )

Classes

class AccessToken (api_key: str | None = None, api_secret: str | None = None)
Expand source code
class AccessToken:
    ParticipantKind = Literal["standard", "egress", "ingress", "sip", "agent"]

    def __init__(
        self,
        api_key: Optional[str] = None,
        api_secret: Optional[str] = None,
    ) -> None:
        api_key = api_key or os.getenv("LIVEKIT_API_KEY")
        api_secret = api_secret or os.getenv("LIVEKIT_API_SECRET")

        if not api_key or not api_secret:
            raise ValueError("api_key and api_secret must be set")

        self.api_key = api_key  # iss
        self.api_secret = api_secret
        self.claims = Claims()

        # default jwt claims
        self.identity = ""  # sub
        self.ttl = DEFAULT_TTL  # exp

    def with_ttl(self, ttl: datetime.timedelta) -> "AccessToken":
        self.ttl = ttl
        return self

    def with_grants(self, grants: VideoGrants) -> "AccessToken":
        self.claims.video = grants
        return self

    def with_sip_grants(self, grants: SIPGrants) -> "AccessToken":
        self.claims.sip = grants
        return self

    def with_identity(self, identity: str) -> "AccessToken":
        self.identity = identity
        return self

    def with_kind(self, kind: ParticipantKind) -> "AccessToken":
        self.claims.kind = kind
        return self

    def with_name(self, name: str) -> "AccessToken":
        self.claims.name = name
        return self

    def with_metadata(self, metadata: str) -> "AccessToken":
        self.claims.metadata = metadata
        return self

    def with_attributes(self, attributes: dict[str, str]) -> "AccessToken":
        self.claims.attributes = attributes
        return self

    def with_sha256(self, sha256: str) -> "AccessToken":
        self.claims.sha256 = sha256
        return self

    def with_room_preset(self, preset: str) -> "AccessToken":
        self.claims.room_preset = preset
        return self

    def with_room_config(self, config: RoomConfiguration) -> "AccessToken":
        self.claims.room_config = config
        return self

    def to_jwt(self) -> str:
        video = self.claims.video
        if video and video.room_join and (not self.identity or not video.room):
            raise ValueError("identity and room must be set when joining a room")

        # we want to exclude None values from the token
        jwt_claims = self.claims.asdict()
        jwt_claims.update(
            {
                "sub": self.identity,
                "iss": self.api_key,
                "nbf": calendar.timegm(
                    datetime.datetime.now(datetime.timezone.utc).utctimetuple()
                ),
                "exp": calendar.timegm(
                    (
                        datetime.datetime.now(datetime.timezone.utc) + self.ttl
                    ).utctimetuple()
                ),
            }
        )
        return jwt.encode(jwt_claims, self.api_secret, algorithm="HS256")

Class variables

var ParticipantKind

Methods

def to_jwt(self) ‑> str
Expand source code
def to_jwt(self) -> str:
    video = self.claims.video
    if video and video.room_join and (not self.identity or not video.room):
        raise ValueError("identity and room must be set when joining a room")

    # we want to exclude None values from the token
    jwt_claims = self.claims.asdict()
    jwt_claims.update(
        {
            "sub": self.identity,
            "iss": self.api_key,
            "nbf": calendar.timegm(
                datetime.datetime.now(datetime.timezone.utc).utctimetuple()
            ),
            "exp": calendar.timegm(
                (
                    datetime.datetime.now(datetime.timezone.utc) + self.ttl
                ).utctimetuple()
            ),
        }
    )
    return jwt.encode(jwt_claims, self.api_secret, algorithm="HS256")
def with_attributes(self, attributes: dict[str, str]) ‑> AccessToken
Expand source code
def with_attributes(self, attributes: dict[str, str]) -> "AccessToken":
    self.claims.attributes = attributes
    return self
def with_grants(self,
grants: VideoGrants) ‑> AccessToken
Expand source code
def with_grants(self, grants: VideoGrants) -> "AccessToken":
    self.claims.video = grants
    return self
def with_identity(self, identity: str) ‑> AccessToken
Expand source code
def with_identity(self, identity: str) -> "AccessToken":
    self.identity = identity
    return self
def with_kind(self, kind: Literal['standard', 'egress', 'ingress', 'sip', 'agent']) ‑> AccessToken
Expand source code
def with_kind(self, kind: ParticipantKind) -> "AccessToken":
    self.claims.kind = kind
    return self
def with_metadata(self, metadata: str) ‑> AccessToken
Expand source code
def with_metadata(self, metadata: str) -> "AccessToken":
    self.claims.metadata = metadata
    return self
def with_name(self, name: str) ‑> AccessToken
Expand source code
def with_name(self, name: str) -> "AccessToken":
    self.claims.name = name
    return self
def with_room_config(self, config: room.RoomConfiguration) ‑> AccessToken
Expand source code
def with_room_config(self, config: RoomConfiguration) -> "AccessToken":
    self.claims.room_config = config
    return self
def with_room_preset(self, preset: str) ‑> AccessToken
Expand source code
def with_room_preset(self, preset: str) -> "AccessToken":
    self.claims.room_preset = preset
    return self
def with_sha256(self, sha256: str) ‑> AccessToken
Expand source code
def with_sha256(self, sha256: str) -> "AccessToken":
    self.claims.sha256 = sha256
    return self
def with_sip_grants(self,
grants: SIPGrants) ‑> AccessToken
Expand source code
def with_sip_grants(self, grants: SIPGrants) -> "AccessToken":
    self.claims.sip = grants
    return self
def with_ttl(self, ttl: datetime.timedelta) ‑> AccessToken
Expand source code
def with_ttl(self, ttl: datetime.timedelta) -> "AccessToken":
    self.ttl = ttl
    return self
class Claims (identity: str = '',
name: str = '',
kind: str = '',
metadata: str = '',
video: VideoGrants | None = None,
sip: SIPGrants | None = None,
attributes: dict[str, str] | None = None,
sha256: str | None = None,
room_preset: str | None = None,
room_config: room.RoomConfiguration | None = None)
Expand source code
@dataclasses.dataclass
class Claims:
    identity: str = ""
    name: str = ""
    kind: str = ""
    metadata: str = ""
    video: Optional[VideoGrants] = None
    sip: Optional[SIPGrants] = None
    attributes: Optional[dict[str, str]] = None
    sha256: Optional[str] = None
    room_preset: Optional[str] = None
    room_config: Optional[RoomConfiguration] = None

    def asdict(self) -> dict:
        # in order to produce minimal JWT size, exclude None or empty values
        claims = dataclasses.asdict(
            self,
            dict_factory=lambda items: {
                snake_to_lower_camel(k): v
                for k, v in items
                if v is not None and v != ""
            },
        )
        if self.room_config:
            claims["roomConfig"] = MessageToDict(self.room_config)
        return claims

Claims(identity: str = '', name: str = '', kind: str = '', metadata: str = '', video: Optional[livekit.api.access_token.VideoGrants] = None, sip: Optional[livekit.api.access_token.SIPGrants] = None, attributes: Optional[dict[str, str]] = None, sha256: Optional[str] = None, room_preset: Optional[str] = None, room_config: Optional[room.RoomConfiguration] = None)

Class variables

var attributes : dict[str, str] | None
var identity : str
var kind : str
var metadata : str
var name : str
var room_config : room.RoomConfiguration | None
var room_preset : str | None
var sha256 : str | None
var sipSIPGrants | None
var videoVideoGrants | None

Methods

def asdict(self) ‑> dict
Expand source code
def asdict(self) -> dict:
    # in order to produce minimal JWT size, exclude None or empty values
    claims = dataclasses.asdict(
        self,
        dict_factory=lambda items: {
            snake_to_lower_camel(k): v
            for k, v in items
            if v is not None and v != ""
        },
    )
    if self.room_config:
        claims["roomConfig"] = MessageToDict(self.room_config)
    return claims
class RoomConfiguration (*args, **kwargs)

A ProtocolMessage

Ancestors

  • google._upb._message.Message
  • google.protobuf.message.Message

Class variables

var DESCRIPTOR
class SIPGrants (admin: bool = False, call: bool = False)
Expand source code
@dataclasses.dataclass
class SIPGrants:
    # manage sip resources
    admin: bool = False
    # make outbound calls
    call: bool = False

SIPGrants(admin: bool = False, call: bool = False)

Class variables

var admin : bool
var call : bool
class TokenVerifier (api_key: str | None = None,
api_secret: str | None = None,
*,
leeway: datetime.timedelta = datetime.timedelta(seconds=60))
Expand source code
class TokenVerifier:
    def __init__(
        self,
        api_key: Optional[str] = None,
        api_secret: Optional[str] = None,
        *,
        leeway: datetime.timedelta = DEFAULT_LEEWAY,
    ) -> None:
        api_key = api_key or os.getenv("LIVEKIT_API_KEY")
        api_secret = api_secret or os.getenv("LIVEKIT_API_SECRET")

        if not api_key or not api_secret:
            raise ValueError("api_key and api_secret must be set")

        self.api_key = api_key
        self.api_secret = api_secret
        self._leeway = leeway

    def verify(self, token: str) -> Claims:
        claims = jwt.decode(
            token,
            self.api_secret,
            issuer=self.api_key,
            algorithms=["HS256"],
            leeway=self._leeway.total_seconds(),
        )

        video_dict = claims.get("video", dict())
        video_dict = {camel_to_snake(k): v for k, v in video_dict.items()}
        video_dict = {
            k: v for k, v in video_dict.items() if k in VideoGrants.__dataclass_fields__
        }
        video = VideoGrants(**video_dict)

        sip_dict = claims.get("sip", dict())
        sip_dict = {camel_to_snake(k): v for k, v in sip_dict.items()}
        sip_dict = {
            k: v for k, v in sip_dict.items() if k in SIPGrants.__dataclass_fields__
        }
        sip = SIPGrants(**sip_dict)

        grant_claims = Claims(
            identity=claims.get("sub", ""),
            name=claims.get("name", ""),
            video=video,
            sip=sip,
            attributes=claims.get("attributes", {}),
            metadata=claims.get("metadata", ""),
            sha256=claims.get("sha256", ""),
        )

        if claims.get("roomPreset"):
            grant_claims.room_preset = claims.get("roomPreset")
        if claims.get("roomConfig"):
            grant_claims.room_config = ParseDict(
                claims.get("roomConfig"),
                RoomConfiguration(),
                ignore_unknown_fields=True,
            )

        return grant_claims

Methods

def verify(self, token: str) ‑> Claims
Expand source code
def verify(self, token: str) -> Claims:
    claims = jwt.decode(
        token,
        self.api_secret,
        issuer=self.api_key,
        algorithms=["HS256"],
        leeway=self._leeway.total_seconds(),
    )

    video_dict = claims.get("video", dict())
    video_dict = {camel_to_snake(k): v for k, v in video_dict.items()}
    video_dict = {
        k: v for k, v in video_dict.items() if k in VideoGrants.__dataclass_fields__
    }
    video = VideoGrants(**video_dict)

    sip_dict = claims.get("sip", dict())
    sip_dict = {camel_to_snake(k): v for k, v in sip_dict.items()}
    sip_dict = {
        k: v for k, v in sip_dict.items() if k in SIPGrants.__dataclass_fields__
    }
    sip = SIPGrants(**sip_dict)

    grant_claims = Claims(
        identity=claims.get("sub", ""),
        name=claims.get("name", ""),
        video=video,
        sip=sip,
        attributes=claims.get("attributes", {}),
        metadata=claims.get("metadata", ""),
        sha256=claims.get("sha256", ""),
    )

    if claims.get("roomPreset"):
        grant_claims.room_preset = claims.get("roomPreset")
    if claims.get("roomConfig"):
        grant_claims.room_config = ParseDict(
            claims.get("roomConfig"),
            RoomConfiguration(),
            ignore_unknown_fields=True,
        )

    return grant_claims
class VideoGrants (room_create: bool | None = None,
room_list: bool | None = None,
room_record: bool | None = None,
room_admin: bool | None = None,
room_join: bool | None = None,
room: str = '',
can_publish: bool = True,
can_subscribe: bool = True,
can_publish_data: bool = True,
can_publish_sources: List[str] | None = None,
can_update_own_metadata: bool | None = None,
ingress_admin: bool | None = None,
hidden: bool | None = None,
recorder: bool | None = None,
agent: bool | None = None)
Expand source code
@dataclasses.dataclass
class VideoGrants:
    # actions on rooms
    room_create: Optional[bool] = None
    room_list: Optional[bool] = None
    room_record: Optional[bool] = None

    # actions on a particular room
    room_admin: Optional[bool] = None
    room_join: Optional[bool] = None
    room: str = ""

    # permissions within a room
    can_publish: bool = True
    can_subscribe: bool = True
    can_publish_data: bool = True

    # TrackSource types that a participant may publish.
    # When set, it supersedes CanPublish. Only sources explicitly set here can be
    # published
    can_publish_sources: Optional[List[str]] = None

    # by default, a participant is not allowed to update its own metadata
    can_update_own_metadata: Optional[bool] = None

    # actions on ingresses
    ingress_admin: Optional[bool] = None  # applies to all ingress

    # participant is not visible to other participants (useful when making bots)
    hidden: Optional[bool] = None

    # [deprecated] indicates to the room that current participant is a recorder
    recorder: Optional[bool] = None

    # indicates that the holder can register as an Agent framework worker
    agent: Optional[bool] = None

VideoGrants(room_create: Optional[bool] = None, room_list: Optional[bool] = None, room_record: Optional[bool] = None, room_admin: Optional[bool] = None, room_join: Optional[bool] = None, room: str = '', can_publish: bool = True, can_subscribe: bool = True, can_publish_data: bool = True, can_publish_sources: Optional[List[str]] = None, can_update_own_metadata: Optional[bool] = None, ingress_admin: Optional[bool] = None, hidden: Optional[bool] = None, recorder: Optional[bool] = None, agent: Optional[bool] = None)

Class variables

var agent : bool | None
var can_publish : bool
var can_publish_data : bool
var can_publish_sources : List[str] | None
var can_subscribe : bool
var can_update_own_metadata : bool | None
var hidden : bool | None
var ingress_admin : bool | None
var recorder : bool | None
var room : str
var room_admin : bool | None
var room_create : bool | None
var room_join : bool | None
var room_list : bool | None
var room_record : bool | None