Module livekit.api.ingress_service

Global variables

var SVC

@private

Classes

class CreateIngressRequest (*args, **kwargs)

A ProtocolMessage

Ancestors

  • google._upb._message.Message
  • google.protobuf.message.Message

Class variables

var DESCRIPTOR
class DeleteIngressRequest (*args, **kwargs)

A ProtocolMessage

Ancestors

  • google._upb._message.Message
  • google.protobuf.message.Message

Class variables

var DESCRIPTOR
class IngressInfo (*args, **kwargs)

A ProtocolMessage

Ancestors

  • google._upb._message.Message
  • google.protobuf.message.Message

Class variables

var DESCRIPTOR
class IngressService (session: aiohttp.client.ClientSession, url: str, api_key: str, api_secret: str)
Expand source code
class IngressService(Service):
    """Client for LiveKit Ingress Service API

    Recommended way to use this service is via `livekit.api.LiveKitAPI`:

    ```python
    from livekit import api
    lkapi = api.LiveKitAPI()
    ingress = lkapi.ingress
    ```

    Also see https://docs.livekit.io/home/ingress/overview/
    """

    def __init__(
        self, session: aiohttp.ClientSession, url: str, api_key: str, api_secret: str
    ):
        super().__init__(session, url, api_key, api_secret)

    async def create_ingress(self, create: CreateIngressRequest) -> IngressInfo:
        return await self._client.request(
            SVC,
            "CreateIngress",
            create,
            self._auth_header(VideoGrants(ingress_admin=True)),
            IngressInfo,
        )

    async def update_ingress(self, update: UpdateIngressRequest) -> IngressInfo:
        return await self._client.request(
            SVC,
            "UpdateIngress",
            update,
            self._auth_header(VideoGrants(ingress_admin=True)),
            IngressInfo,
        )

    async def list_ingress(self, list: ListIngressRequest) -> ListIngressResponse:
        return await self._client.request(
            SVC,
            "ListIngress",
            list,
            self._auth_header(VideoGrants(ingress_admin=True)),
            ListIngressResponse,
        )

    async def delete_ingress(self, delete: DeleteIngressRequest) -> IngressInfo:
        return await self._client.request(
            SVC,
            "DeleteIngress",
            delete,
            self._auth_header(VideoGrants(ingress_admin=True)),
            IngressInfo,
        )

Client for LiveKit Ingress Service API

Recommended way to use this service is via LiveKitAPI:

from livekit import api
lkapi = api.LiveKitAPI()
ingress = lkapi.ingress

Also see https://docs.livekit.io/home/ingress/overview/

Ancestors

  • livekit.api._service.Service
  • abc.ABC

Methods

async def create_ingress(self, create: ingress.CreateIngressRequest) ‑> ingress.IngressInfo
Expand source code
async def create_ingress(self, create: CreateIngressRequest) -> IngressInfo:
    return await self._client.request(
        SVC,
        "CreateIngress",
        create,
        self._auth_header(VideoGrants(ingress_admin=True)),
        IngressInfo,
    )
async def delete_ingress(self, delete: ingress.DeleteIngressRequest) ‑> ingress.IngressInfo
Expand source code
async def delete_ingress(self, delete: DeleteIngressRequest) -> IngressInfo:
    return await self._client.request(
        SVC,
        "DeleteIngress",
        delete,
        self._auth_header(VideoGrants(ingress_admin=True)),
        IngressInfo,
    )
async def list_ingress(self, list: ingress.ListIngressRequest) ‑> ingress.ListIngressResponse
Expand source code
async def list_ingress(self, list: ListIngressRequest) -> ListIngressResponse:
    return await self._client.request(
        SVC,
        "ListIngress",
        list,
        self._auth_header(VideoGrants(ingress_admin=True)),
        ListIngressResponse,
    )
async def update_ingress(self, update: ingress.UpdateIngressRequest) ‑> ingress.IngressInfo
Expand source code
async def update_ingress(self, update: UpdateIngressRequest) -> IngressInfo:
    return await self._client.request(
        SVC,
        "UpdateIngress",
        update,
        self._auth_header(VideoGrants(ingress_admin=True)),
        IngressInfo,
    )
class ListIngressRequest (*args, **kwargs)

A ProtocolMessage

Ancestors

  • google._upb._message.Message
  • google.protobuf.message.Message

Class variables

var DESCRIPTOR
class ListIngressResponse (*args, **kwargs)

A ProtocolMessage

Ancestors

  • google._upb._message.Message
  • google.protobuf.message.Message

Class variables

var DESCRIPTOR
class UpdateIngressRequest (*args, **kwargs)

A ProtocolMessage

Ancestors

  • google._upb._message.Message
  • google.protobuf.message.Message

Class variables

var DESCRIPTOR