Module livekit.agents.utils.aio

Sub-modules

livekit.agents.utils.aio.channel
livekit.agents.utils.aio.debug
livekit.agents.utils.aio.duplex_unix
livekit.agents.utils.aio.itertools
livekit.agents.utils.aio.task_set

Functions

async def gracefully_cancel(*futures: _asyncio.Future)
def interval(interval: float) ‑> livekit.agents.utils.aio.interval.Interval
def sleep(delay: float) ‑> livekit.agents.utils.aio.sleep.Sleep

Classes

class Chan (maxsize: int = 0, loop: asyncio.AbstractEventLoop | None = None)

Abstract base class for generic types.

On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::

class Mapping[KT, VT]:
    def __getitem__(self, key: KT) -> VT:
        ...
    # Etc.

On older versions of Python, however, generic classes have to explicitly inherit from Generic.

After a class has been declared to be generic, it can then be used as follows::

def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT:
    try:
        return mapping[key]
    except KeyError:
        return default
Expand source code
class Chan(Generic[T]):
    def __init__(
        self, maxsize: int = 0, loop: asyncio.AbstractEventLoop | None = None
    ) -> None:
        self._loop = loop or asyncio.get_event_loop()
        self._maxsize = max(maxsize, 0)
        #        self._finished_ev = asyncio.Event()
        self._close_ev = asyncio.Event()
        self._closed = False
        self._gets: Deque[asyncio.Future[T | None]] = deque()
        self._puts: Deque[asyncio.Future[T | None]] = deque()
        self._queue: Deque[T] = deque()

    def _wakeup_next(self, waiters: deque[asyncio.Future[T | None]]):
        while waiters:
            waiter = waiters.popleft()
            if not waiter.done():
                waiter.set_result(None)
                break

    async def send(self, value: T) -> None:
        while self.full() and not self._close_ev.is_set():
            p = self._loop.create_future()
            self._puts.append(p)
            try:
                await p
            except ChanClosed:
                raise
            except:
                p.cancel()
                with contextlib.suppress(ValueError):
                    self._puts.remove(p)

                if not self.full() and not p.cancelled():
                    self._wakeup_next(self._puts)
                raise

        self.send_nowait(value)

    def send_nowait(self, value: T) -> None:
        if self.full():
            raise ChanFull

        if self._close_ev.is_set():
            raise ChanClosed

        self._queue.append(value)
        self._wakeup_next(self._gets)

    async def recv(self) -> T:
        while self.empty() and not self._close_ev.is_set():
            g = self._loop.create_future()
            self._gets.append(g)

            try:
                await g
            except ChanClosed:
                raise
            except Exception:
                g.cancel()
                with contextlib.suppress(ValueError):
                    self._gets.remove(g)

                if not self.empty() and not g.cancelled():
                    self._wakeup_next(self._gets)

                raise

        return self.recv_nowait()

    def recv_nowait(self) -> T:
        if self.empty():
            if self._close_ev.is_set():
                raise ChanClosed
            else:
                raise ChanEmpty
        item = self._queue.popleft()
        #        if self.empty() and self._close_ev.is_set():
        #            self._finished_ev.set()
        self._wakeup_next(self._puts)
        return item

    def close(self) -> None:
        self._closed = True
        self._close_ev.set()
        for putter in self._puts:
            if not putter.cancelled():
                putter.set_exception(ChanClosed())

        while len(self._gets) > self.qsize():
            getter = self._gets.pop()
            if not getter.cancelled():
                getter.set_exception(ChanClosed())

        while self._gets:
            self._wakeup_next(self._gets)

    #        if self.empty():
    #            self._finished_ev.set()

    @property
    def closed(self) -> bool:
        return self._closed

    #    async def join(self) -> None:
    #        await self._finished_ev.wait()

    def qsize(self) -> int:
        """the number of elements queued (unread) in the channel buffer"""
        return len(self._queue)

    def full(self) -> bool:
        if self._maxsize <= 0:
            return False
        else:
            return self.qsize() >= self._maxsize

    def empty(self) -> bool:
        return not self._queue

    def __aiter__(self) -> AsyncIterator[T]:
        return self

    async def __anext__(self) -> T:
        try:
            return await self.recv()
        except ChanClosed:
            raise StopAsyncIteration

Ancestors

  • typing.Generic

Instance variables

prop closed : bool
Expand source code
@property
def closed(self) -> bool:
    return self._closed

Methods

def close(self) ‑> None
def empty(self) ‑> bool
def full(self) ‑> bool
def qsize(self) ‑> int

the number of elements queued (unread) in the channel buffer

async def recv(self) ‑> ~T
def recv_nowait(self) ‑> ~T
async def send(self, value: T) ‑> None
def send_nowait(self, value: T) ‑> None
class ChanClosed (*args, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class ChanClosed(Exception):
    pass

Ancestors

  • builtins.Exception
  • builtins.BaseException
class ChanReceiver (*args, **kwargs)

Base class for protocol classes.

Protocol classes are defined as::

class Proto(Protocol):
    def meth(self) -> int:
        ...

Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing).

For example::

class C:
    def meth(self) -> int:
        return 0

def func(x: Proto) -> int:
    return x.meth()

func(C())  # Passes static type check

See PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as::

class GenProto[T](Protocol):
    def meth(self) -> T:
        ...
Expand source code
class ChanReceiver(Protocol[T_co]):
    async def recv(self) -> T_co: ...

    def recv_nowait(self) -> T_co: ...

    def close(self) -> None: ...

    def __aiter__(self) -> AsyncIterator[T_co]: ...

    async def __anext__(self) -> T_co: ...

Ancestors

  • typing.Protocol
  • typing.Generic

Methods

def close(self) ‑> None
async def recv(self) ‑> +T_co
def recv_nowait(self) ‑> +T_co
class ChanSender (*args, **kwargs)

Base class for protocol classes.

Protocol classes are defined as::

class Proto(Protocol):
    def meth(self) -> int:
        ...

Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing).

For example::

class C:
    def meth(self) -> int:
        return 0

def func(x: Proto) -> int:
    return x.meth()

func(C())  # Passes static type check

See PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as::

class GenProto[T](Protocol):
    def meth(self) -> T:
        ...
Expand source code
class ChanSender(Protocol[T_contra]):
    async def send(self, value: T_contra) -> None: ...

    def send_nowait(self, value: T_contra) -> None: ...

    def close(self) -> None: ...

Ancestors

  • typing.Protocol
  • typing.Generic

Methods

def close(self) ‑> None
async def send(self, value: T_contra) ‑> None
def send_nowait(self, value: T_contra) ‑> None
class Interval (interval: float)
Expand source code
class Interval:
    def __init__(self, interval: float) -> None:
        self._interval = interval
        self._last_sleep = 0.0
        self._i = 0
        self._handler: asyncio.TimerHandle | None = None

    def reset(self) -> None:
        if self._fut and self._handler and not self._handler.cancelled():
            self._handler.cancel()
            loop = asyncio.get_event_loop()
            self._handler = loop.call_later(self._interval, _finish_fut, self._fut)
        else:
            self._last_sleep = 0

    async def tick(self) -> int:
        loop = asyncio.get_event_loop()

        if self._last_sleep:
            self._fut = loop.create_future()
            delay = self._last_sleep - loop.time() + self._interval
            self._handler = loop.call_later(delay, _finish_fut, self._fut)
            try:
                await self._fut
            finally:
                self._handler.cancel()
            self._i += 1

        self._last_sleep = loop.time()
        return self._i

    def __aiter__(self) -> "Interval":
        return self

    async def __anext__(self):
        return await self.tick()

Methods

def reset(self) ‑> None
async def tick(self) ‑> int
class Sleep (delay: float)

Same as asyncio.sleep except it is resettable

Expand source code
class Sleep:
    """Same as asyncio.sleep except it is resettable"""

    def __init__(self, delay: float) -> None:
        self._delay = delay
        self._handler: asyncio.TimerHandle | None = None

    def reset(self, new_delay: float | None = None) -> None:
        if new_delay is None:
            new_delay = self._delay

        self._delay = new_delay

        if self._handler is None:
            return

        if self._handler.cancelled() or self._fut.done():
            raise SleepFinished

        self._handler.cancel()
        loop = asyncio.get_event_loop()
        self._handler = loop.call_later(new_delay, _finish_fut, self._fut)

    def cancel(self) -> None:
        if self._handler is None:
            return

        self._handler.cancel()
        self._fut.cancel()

    async def _sleep(self) -> None:
        if self._delay <= 0:
            self._fut = asyncio.Future[None]()
            self._fut.set_result(None)
            return

        loop = asyncio.get_event_loop()
        self._fut = loop.create_future()
        self._handler = loop.call_later(self._delay, _finish_fut, self._fut)

        try:
            await self._fut
        finally:
            self._handler.cancel()

    def __await__(self):
        return self._sleep().__await__()

Methods

def cancel(self) ‑> None
def reset(self, new_delay: float | None = None) ‑> None
class SleepFinished (*args, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class SleepFinished(Exception):
    pass

Ancestors

  • builtins.Exception
  • builtins.BaseException
class TaskSet (loop: asyncio.AbstractEventLoop | None = None)

Small utility to create task in a fire-and-forget fashion.

Expand source code
class TaskSet:
    """
    Small utility to create task in a fire-and-forget fashion.
    """

    def __init__(self, loop: asyncio.AbstractEventLoop | None = None) -> None:
        self._loop = loop or asyncio.get_event_loop()
        self._set = set[asyncio.Task[Any]]()
        self._closed = False

    def create_task(self, coro: Coroutine[Any, Any, _T]) -> asyncio.Task[_T]:
        if self._closed:
            raise RuntimeError("TaskSet is closed")

        task = self._loop.create_task(coro)
        self._set.add(task)
        task.add_done_callback(self._set.remove)
        return task

    async def aclose(self) -> None:
        self._closed = True
        await asyncio.gather(*self._set, return_exceptions=True)
        self._set.clear()

Methods

async def aclose(self) ‑> None
def create_task(self, coro: Coroutine[Any, Any, _T]) ‑> _asyncio.Task[~_T]