Module livekit.agents.utils.http_context

Functions

def http_session() ‑> aiohttp.client.ClientSession
Expand source code
def http_session() -> aiohttp.ClientSession:
    """Optional utility function to avoid having to manually manage an aiohttp.ClientSession lifetime.
    On job processes, this http session will be bound to the main event loop.
    """

    val = _ContextVar.get(None)  # type: ignore
    if val is None:
        raise RuntimeError(
            "Attempted to use an http session outside of a job context. This is probably because you are trying to use a plugin without using the agent worker api. You may need to create your own aiohttp.ClientSession, pass it into the plugin constructor as a kwarg, and manage its lifecycle."
        )

    return val()  # type: ignore

Optional utility function to avoid having to manually manage an aiohttp.ClientSession lifetime. On job processes, this http session will be bound to the main event loop.