Module livekit.agents.utils.connection_pool

Classes

class ConnectionPool (*,
max_session_duration: float | None = None,
mark_refreshed_on_get: bool = False,
connect_cb: Callable[[], Awaitable[~T]] | None = None,
close_cb: Callable[[~T], Awaitable[None]] | None = None)
Expand source code
class ConnectionPool(Generic[T]):
    """Helper class to manage persistent connections like websockets.

    Handles connection pooling and reconnection after max duration.
    Can be used as an async context manager to automatically return connections to the pool.
    """

    def __init__(
        self,
        *,
        max_session_duration: Optional[float] = None,
        mark_refreshed_on_get: bool = False,
        connect_cb: Optional[Callable[[], Awaitable[T]]] = None,
        close_cb: Optional[Callable[[T], Awaitable[None]]] = None,
    ) -> None:
        """Initialize the connection wrapper.

        Args:
            max_session_duration: Maximum duration in seconds before forcing reconnection
            mark_refreshed_on_get: If True, the session will be marked as fresh when get() is called. only used when max_session_duration is set.
            connect_cb: Optional async callback to create new connections
            close_cb: Optional async callback to close connections
        """
        self._max_session_duration = max_session_duration
        self._mark_refreshed_on_get = mark_refreshed_on_get
        self._connect_cb = connect_cb
        self._close_cb = close_cb
        self._connections: dict[T, float] = {}  # conn -> connected_at timestamp
        self._available: Set[T] = set()

        # store connections to be reaped (closed) later.
        self._to_close: Set[T] = set()

        self._prewarm_task: Optional[weakref.ref[asyncio.Task]] = None

    async def _connect(self) -> T:
        """Create a new connection.

        Returns:
            The new connection object

        Raises:
            NotImplementedError: If no connect callback was provided
        """
        if self._connect_cb is None:
            raise NotImplementedError("Must provide connect_cb or implement connect()")
        connection = await self._connect_cb()
        self._connections[connection] = time.time()
        return connection

    async def _drain_to_close(self) -> None:
        """Drain and close all the connections queued for closing."""
        for conn in list(self._to_close):
            await self._maybe_close_connection(conn)
        self._to_close.clear()

    @asynccontextmanager
    async def connection(self) -> AsyncGenerator[T, None]:
        """Get a connection from the pool and automatically return it when done.

        Yields:
            An active connection object
        """
        conn = await self.get()
        try:
            yield conn
        except Exception:
            self.remove(conn)
            raise
        else:
            self.put(conn)

    async def get(self) -> T:
        """Get an available connection or create a new one if needed.

        Returns:
            An active connection object
        """
        await self._drain_to_close()

        now = time.time()

        # try to reuse an available connection that hasn't expired
        while self._available:
            conn = self._available.pop()
            if (
                self._max_session_duration is None
                or now - self._connections[conn] <= self._max_session_duration
            ):
                if self._mark_refreshed_on_get:
                    self._connections[conn] = now
                return conn
            # connection expired; mark it for resetting.
            self.remove(conn)

        return await self._connect()

    def put(self, conn: T) -> None:
        """Mark a connection as available for reuse.

        If connection has been reset, it will not be added to the pool.

        Args:
            conn: The connection to make available
        """
        if conn in self._connections:
            self._available.add(conn)

    async def _maybe_close_connection(self, conn: T) -> None:
        """Close a connection if close_cb is provided.

        Args:
            conn: The connection to close
        """
        if self._close_cb is not None:
            await self._close_cb(conn)

    def remove(self, conn: T) -> None:
        """Remove a specific connection from the pool.

        Marks the connection to be closed during the next drain cycle.

        Args:
            conn: The connection to reset
        """
        self._available.discard(conn)
        if conn in self._connections:
            self._to_close.add(conn)
            self._connections.pop(conn, None)

    def invalidate(self) -> None:
        """Clear all existing connections.

        Marks all current connections to be closed during the next drain cycle.
        """
        for conn in list(self._connections.keys()):
            self._to_close.add(conn)
        self._connections.clear()
        self._available.clear()

    def prewarm(self) -> None:
        """Initiate prewarming of the connection pool without blocking.

        This method starts a background task that creates a new connection if none exist.
        The task automatically cleans itself up when the connection pool is closed.
        """
        if self._prewarm_task is not None or self._connections:
            return

        async def _prewarm_impl():
            if not self._connections:
                conn = await self._connect()
                self._available.add(conn)

        task = asyncio.create_task(_prewarm_impl())
        self._prewarm_task = weakref.ref(task)

    async def aclose(self):
        """Close all connections, draining any pending connection closures."""
        if self._prewarm_task is not None:
            task = self._prewarm_task()
            if task:
                aio.gracefully_cancel(task)

        self.invalidate()
        await self._drain_to_close()

Helper class to manage persistent connections like websockets.

Handles connection pooling and reconnection after max duration. Can be used as an async context manager to automatically return connections to the pool.

Initialize the connection wrapper.

Args

max_session_duration
Maximum duration in seconds before forcing reconnection
mark_refreshed_on_get
If True, the session will be marked as fresh when get() is called. only used when max_session_duration is set.
connect_cb
Optional async callback to create new connections
close_cb
Optional async callback to close connections

Ancestors

  • typing.Generic

Methods

async def aclose(self)
Expand source code
async def aclose(self):
    """Close all connections, draining any pending connection closures."""
    if self._prewarm_task is not None:
        task = self._prewarm_task()
        if task:
            aio.gracefully_cancel(task)

    self.invalidate()
    await self._drain_to_close()

Close all connections, draining any pending connection closures.

async def connection(self) ‑> AsyncGenerator[~T, None]
Expand source code
@asynccontextmanager
async def connection(self) -> AsyncGenerator[T, None]:
    """Get a connection from the pool and automatically return it when done.

    Yields:
        An active connection object
    """
    conn = await self.get()
    try:
        yield conn
    except Exception:
        self.remove(conn)
        raise
    else:
        self.put(conn)

Get a connection from the pool and automatically return it when done.

Yields

An active connection object

async def get(self) ‑> ~T
Expand source code
async def get(self) -> T:
    """Get an available connection or create a new one if needed.

    Returns:
        An active connection object
    """
    await self._drain_to_close()

    now = time.time()

    # try to reuse an available connection that hasn't expired
    while self._available:
        conn = self._available.pop()
        if (
            self._max_session_duration is None
            or now - self._connections[conn] <= self._max_session_duration
        ):
            if self._mark_refreshed_on_get:
                self._connections[conn] = now
            return conn
        # connection expired; mark it for resetting.
        self.remove(conn)

    return await self._connect()

Get an available connection or create a new one if needed.

Returns

An active connection object

def invalidate(self) ‑> None
Expand source code
def invalidate(self) -> None:
    """Clear all existing connections.

    Marks all current connections to be closed during the next drain cycle.
    """
    for conn in list(self._connections.keys()):
        self._to_close.add(conn)
    self._connections.clear()
    self._available.clear()

Clear all existing connections.

Marks all current connections to be closed during the next drain cycle.

def prewarm(self) ‑> None
Expand source code
def prewarm(self) -> None:
    """Initiate prewarming of the connection pool without blocking.

    This method starts a background task that creates a new connection if none exist.
    The task automatically cleans itself up when the connection pool is closed.
    """
    if self._prewarm_task is not None or self._connections:
        return

    async def _prewarm_impl():
        if not self._connections:
            conn = await self._connect()
            self._available.add(conn)

    task = asyncio.create_task(_prewarm_impl())
    self._prewarm_task = weakref.ref(task)

Initiate prewarming of the connection pool without blocking.

This method starts a background task that creates a new connection if none exist. The task automatically cleans itself up when the connection pool is closed.

def put(self, conn: ~T) ‑> None
Expand source code
def put(self, conn: T) -> None:
    """Mark a connection as available for reuse.

    If connection has been reset, it will not be added to the pool.

    Args:
        conn: The connection to make available
    """
    if conn in self._connections:
        self._available.add(conn)

Mark a connection as available for reuse.

If connection has been reset, it will not be added to the pool.

Args

conn
The connection to make available
def remove(self, conn: ~T) ‑> None
Expand source code
def remove(self, conn: T) -> None:
    """Remove a specific connection from the pool.

    Marks the connection to be closed during the next drain cycle.

    Args:
        conn: The connection to reset
    """
    self._available.discard(conn)
    if conn in self._connections:
        self._to_close.add(conn)
        self._connections.pop(conn, None)

Remove a specific connection from the pool.

Marks the connection to be closed during the next drain cycle.

Args

conn
The connection to reset