Module livekit.api.ingress_service
Global variables
var SVC
-
@private
Classes
class CreateIngressRequest (*args, **kwargs)
-
A ProtocolMessage
Ancestors
- google._upb._message.Message
- google.protobuf.message.Message
Class variables
var DESCRIPTOR
class DeleteIngressRequest (*args, **kwargs)
-
A ProtocolMessage
Ancestors
- google._upb._message.Message
- google.protobuf.message.Message
Class variables
var DESCRIPTOR
class IngressInfo (*args, **kwargs)
-
A ProtocolMessage
Ancestors
- google._upb._message.Message
- google.protobuf.message.Message
Class variables
var DESCRIPTOR
class IngressService (session: aiohttp.client.ClientSession, url: str, api_key: str, api_secret: str)
-
Expand source code
class IngressService(Service): """Client for LiveKit Ingress Service API Recommended way to use this service is via `livekit.api.LiveKitAPI`: ```python from livekit import api lkapi = api.LiveKitAPI() ingress = lkapi.ingress ``` Also see https://docs.livekit.io/home/ingress/overview/ """ def __init__( self, session: aiohttp.ClientSession, url: str, api_key: str, api_secret: str ): super().__init__(session, url, api_key, api_secret) async def create_ingress(self, create: CreateIngressRequest) -> IngressInfo: return await self._client.request( SVC, "CreateIngress", create, self._auth_header(VideoGrants(ingress_admin=True)), IngressInfo, ) async def update_ingress(self, update: UpdateIngressRequest) -> IngressInfo: return await self._client.request( SVC, "UpdateIngress", update, self._auth_header(VideoGrants(ingress_admin=True)), IngressInfo, ) async def list_ingress(self, list: ListIngressRequest) -> ListIngressResponse: return await self._client.request( SVC, "ListIngress", list, self._auth_header(VideoGrants(ingress_admin=True)), ListIngressResponse, ) async def delete_ingress(self, delete: DeleteIngressRequest) -> IngressInfo: return await self._client.request( SVC, "DeleteIngress", delete, self._auth_header(VideoGrants(ingress_admin=True)), IngressInfo, )
Client for LiveKit Ingress Service API
Recommended way to use this service is via
LiveKitAPI
:from livekit import api lkapi = api.LiveKitAPI() ingress = lkapi.ingress
Ancestors
- livekit.api._service.Service
- abc.ABC
Methods
async def create_ingress(self, create: ingress.CreateIngressRequest) ‑> ingress.IngressInfo
-
Expand source code
async def create_ingress(self, create: CreateIngressRequest) -> IngressInfo: return await self._client.request( SVC, "CreateIngress", create, self._auth_header(VideoGrants(ingress_admin=True)), IngressInfo, )
async def delete_ingress(self, delete: ingress.DeleteIngressRequest) ‑> ingress.IngressInfo
-
Expand source code
async def delete_ingress(self, delete: DeleteIngressRequest) -> IngressInfo: return await self._client.request( SVC, "DeleteIngress", delete, self._auth_header(VideoGrants(ingress_admin=True)), IngressInfo, )
async def list_ingress(self, list: ingress.ListIngressRequest) ‑> ingress.ListIngressResponse
-
Expand source code
async def list_ingress(self, list: ListIngressRequest) -> ListIngressResponse: return await self._client.request( SVC, "ListIngress", list, self._auth_header(VideoGrants(ingress_admin=True)), ListIngressResponse, )
async def update_ingress(self, update: ingress.UpdateIngressRequest) ‑> ingress.IngressInfo
-
Expand source code
async def update_ingress(self, update: UpdateIngressRequest) -> IngressInfo: return await self._client.request( SVC, "UpdateIngress", update, self._auth_header(VideoGrants(ingress_admin=True)), IngressInfo, )
class ListIngressRequest (*args, **kwargs)
-
A ProtocolMessage
Ancestors
- google._upb._message.Message
- google.protobuf.message.Message
Class variables
var DESCRIPTOR
class ListIngressResponse (*args, **kwargs)
-
A ProtocolMessage
Ancestors
- google._upb._message.Message
- google.protobuf.message.Message
Class variables
var DESCRIPTOR
class UpdateIngressRequest (*args, **kwargs)
-
A ProtocolMessage
Ancestors
- google._upb._message.Message
- google.protobuf.message.Message
Class variables
var DESCRIPTOR