Module livekit.agents.utils.http_server

Classes

class HttpServer (host: str, port: int, loop: asyncio.AbstractEventLoop | None = None)
Expand source code
class HttpServer:
    def __init__(self, host: str, port: int, loop: asyncio.AbstractEventLoop | None = None) -> None:
        self._loop = loop or asyncio.get_event_loop()
        self._host = host
        self._port = port
        self._app = web.Application(loop=self._loop)
        self._lock = asyncio.Lock()

    @property
    def app(self) -> web.Application:
        return self._app

    @property
    def port(self) -> int:
        return self._port

    async def start(self) -> None:
        async with self._lock:
            handler = self._app.make_handler()
            self._server = await self._loop.create_server(handler, self._host, self._port)

            if self._port == 0:
                self._port = self._server.sockets[0].getsockname()[1]

            await self._server.start_serving()

    async def aclose(self) -> None:
        async with self._lock:
            self._server.close()
            await self._server.wait_closed()

Subclasses

  • livekit.agents.telemetry.http_server.HttpServer

Instance variables

prop app : web.Application
Expand source code
@property
def app(self) -> web.Application:
    return self._app
prop port : int
Expand source code
@property
def port(self) -> int:
    return self._port

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    async with self._lock:
        self._server.close()
        await self._server.wait_closed()
async def start(self) ‑> None
Expand source code
async def start(self) -> None:
    async with self._lock:
        handler = self._app.make_handler()
        self._server = await self._loop.create_server(handler, self._host, self._port)

        if self._port == 0:
            self._port = self._server.sockets[0].getsockname()[1]

        await self._server.start_serving()