Module livekit.api.twirp_client

Classes

class TwirpClient (session: aiohttp.client.ClientSession, host: str, pkg: str, prefix: str = 'twirp')
Expand source code
class TwirpClient:
    def __init__(
        self,
        session: aiohttp.ClientSession,
        host: str,
        pkg: str,
        prefix: str = DEFAULT_PREFIX,
    ) -> None:
        parse_res = urlparse(host)
        scheme = parse_res.scheme
        if scheme.startswith("ws"):
            scheme = scheme.replace("ws", "http")

        host = f"{scheme}://{parse_res.netloc}/{parse_res.path}"
        self.host = host.rstrip("/")
        self.pkg = pkg
        self.prefix = prefix
        self._session = session

    async def request(
        self,
        service: str,
        method: str,
        data: Message,
        headers: Dict[str, str],
        response_class: Type[T],
    ) -> T:
        url = f"{self.host}/{self.prefix}/{self.pkg}.{service}/{method}"
        headers["Content-Type"] = "application/protobuf"

        serialized_data = data.SerializeToString()
        async with self._session.post(
            url, headers=headers, data=serialized_data
        ) as resp:
            if resp.status == 200:
                return response_class.FromString(await resp.read())
            else:
                # when we have an error, Twirp always encode it in json
                error_data = await resp.json()
                raise TwirpError(error_data["code"], error_data["msg"])

Methods

async def request(self,
service: str,
method: str,
data: google.protobuf.message.Message,
headers: Dict[str, str],
response_class: Type[~T]) ‑> ~T
Expand source code
async def request(
    self,
    service: str,
    method: str,
    data: Message,
    headers: Dict[str, str],
    response_class: Type[T],
) -> T:
    url = f"{self.host}/{self.prefix}/{self.pkg}.{service}/{method}"
    headers["Content-Type"] = "application/protobuf"

    serialized_data = data.SerializeToString()
    async with self._session.post(
        url, headers=headers, data=serialized_data
    ) as resp:
        if resp.status == 200:
            return response_class.FromString(await resp.read())
        else:
            # when we have an error, Twirp always encode it in json
            error_data = await resp.json()
            raise TwirpError(error_data["code"], error_data["msg"])
class TwirpError (code: str, msg: str)
Expand source code
class TwirpError(Exception):
    def __init__(self, code: str, msg: str) -> None:
        self._code = code
        self._msg = msg

    @property
    def code(self) -> str:
        return self._code

    @property
    def message(self) -> str:
        return self._msg

Common base class for all non-exit exceptions.

Ancestors

  • builtins.Exception
  • builtins.BaseException

Instance variables

prop code : str
Expand source code
@property
def code(self) -> str:
    return self._code
prop message : str
Expand source code
@property
def message(self) -> str:
    return self._msg
class TwirpErrorCode
Expand source code
class TwirpErrorCode:
    CANCELED = "canceled"
    UNKNOWN = "unknown"
    INVALID_ARGUMENT = "invalid_argument"
    MALFORMED = "malformed"
    DEADLINE_EXCEEDED = "deadline_exceeded"
    NOT_FOUND = "not_found"
    BAD_ROUTE = "bad_route"
    ALREADY_EXISTS = "already_exists"
    PERMISSION_DENIED = "permission_denied"
    UNAUTHENTICATED = "unauthenticated"
    RESOURCE_EXHAUSTED = "resource_exhausted"
    FAILED_PRECONDITION = "failed_precondition"
    ABORTED = "aborted"
    OUT_OF_RANGE = "out_of_range"
    UNIMPLEMENTED = "unimplemented"
    INTERNAL = "internal"
    UNAVAILABLE = "unavailable"
    DATA_LOSS = "dataloss"

Class variables

var ABORTED
var ALREADY_EXISTS
var BAD_ROUTE
var CANCELED
var DATA_LOSS
var DEADLINE_EXCEEDED
var FAILED_PRECONDITION
var INTERNAL
var INVALID_ARGUMENT
var MALFORMED
var NOT_FOUND
var OUT_OF_RANGE
var PERMISSION_DENIED
var RESOURCE_EXHAUSTED
var UNAUTHENTICATED
var UNAVAILABLE
var UNIMPLEMENTED
var UNKNOWN