Module livekit.api.ingress_service

Classes

class IngressService (session: aiohttp.client.ClientSession, url: str, api_key: str, api_secret: str)
Expand source code
class IngressService(Service):
    def __init__(
        self, session: aiohttp.ClientSession, url: str, api_key: str, api_secret: str
    ):
        super().__init__(session, url, api_key, api_secret)

    async def create_ingress(
        self, create: proto_ingress.CreateIngressRequest
    ) -> proto_ingress.IngressInfo:
        return await self._client.request(
            SVC,
            "CreateIngress",
            create,
            self._auth_header(VideoGrants(ingress_admin=True)),
            proto_ingress.IngressInfo,
        )

    async def update_ingress(
        self, update: proto_ingress.UpdateIngressRequest
    ) -> proto_ingress.IngressInfo:
        return await self._client.request(
            SVC,
            "UpdateIngress",
            update,
            self._auth_header(VideoGrants(ingress_admin=True)),
            proto_ingress.IngressInfo,
        )

    async def list_ingress(
        self, list: proto_ingress.ListIngressRequest
    ) -> proto_ingress.ListIngressResponse:
        return await self._client.request(
            SVC,
            "ListIngress",
            list,
            self._auth_header(VideoGrants(ingress_admin=True)),
            proto_ingress.ListIngressResponse,
        )

    async def delete_ingress(
        self, delete: proto_ingress.DeleteIngressRequest
    ) -> proto_ingress.IngressInfo:
        return await self._client.request(
            SVC,
            "DeleteIngress",
            delete,
            self._auth_header(VideoGrants(ingress_admin=True)),
            proto_ingress.IngressInfo,
        )

Helper class that provides a standard way to create an ABC using inheritance.

Ancestors

  • livekit.api._service.Service
  • abc.ABC

Methods

async def create_ingress(self, create: ingress.CreateIngressRequest) ‑> ingress.IngressInfo
Expand source code
async def create_ingress(
    self, create: proto_ingress.CreateIngressRequest
) -> proto_ingress.IngressInfo:
    return await self._client.request(
        SVC,
        "CreateIngress",
        create,
        self._auth_header(VideoGrants(ingress_admin=True)),
        proto_ingress.IngressInfo,
    )
async def delete_ingress(self, delete: ingress.DeleteIngressRequest) ‑> ingress.IngressInfo
Expand source code
async def delete_ingress(
    self, delete: proto_ingress.DeleteIngressRequest
) -> proto_ingress.IngressInfo:
    return await self._client.request(
        SVC,
        "DeleteIngress",
        delete,
        self._auth_header(VideoGrants(ingress_admin=True)),
        proto_ingress.IngressInfo,
    )
async def list_ingress(self, list: ingress.ListIngressRequest) ‑> ingress.ListIngressResponse
Expand source code
async def list_ingress(
    self, list: proto_ingress.ListIngressRequest
) -> proto_ingress.ListIngressResponse:
    return await self._client.request(
        SVC,
        "ListIngress",
        list,
        self._auth_header(VideoGrants(ingress_admin=True)),
        proto_ingress.ListIngressResponse,
    )
async def update_ingress(self, update: ingress.UpdateIngressRequest) ‑> ingress.IngressInfo
Expand source code
async def update_ingress(
    self, update: proto_ingress.UpdateIngressRequest
) -> proto_ingress.IngressInfo:
    return await self._client.request(
        SVC,
        "UpdateIngress",
        update,
        self._auth_header(VideoGrants(ingress_admin=True)),
        proto_ingress.IngressInfo,
    )