Module livekit.agents.utils.aio.channel

Classes

class Chan (maxsize: int = 0, loop: asyncio.AbstractEventLoop | None = None)

Abstract base class for generic types.

On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::

class Mapping[KT, VT]:
    def __getitem__(self, key: KT) -> VT:
        ...
    # Etc.

On older versions of Python, however, generic classes have to explicitly inherit from Generic.

After a class has been declared to be generic, it can then be used as follows::

def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT:
    try:
        return mapping[key]
    except KeyError:
        return default
Expand source code
class Chan(Generic[T]):
    def __init__(
        self, maxsize: int = 0, loop: asyncio.AbstractEventLoop | None = None
    ) -> None:
        self._loop = loop or asyncio.get_event_loop()
        self._maxsize = max(maxsize, 0)
        #        self._finished_ev = asyncio.Event()
        self._close_ev = asyncio.Event()
        self._closed = False
        self._gets: Deque[asyncio.Future[T | None]] = deque()
        self._puts: Deque[asyncio.Future[T | None]] = deque()
        self._queue: Deque[T] = deque()

    def _wakeup_next(self, waiters: deque[asyncio.Future[T | None]]):
        while waiters:
            waiter = waiters.popleft()
            if not waiter.done():
                waiter.set_result(None)
                break

    async def send(self, value: T) -> None:
        while self.full() and not self._close_ev.is_set():
            p = self._loop.create_future()
            self._puts.append(p)
            try:
                await p
            except ChanClosed:
                raise
            except:
                p.cancel()
                with contextlib.suppress(ValueError):
                    self._puts.remove(p)

                if not self.full() and not p.cancelled():
                    self._wakeup_next(self._puts)
                raise

        self.send_nowait(value)

    def send_nowait(self, value: T) -> None:
        if self.full():
            raise ChanFull

        if self._close_ev.is_set():
            raise ChanClosed

        self._queue.append(value)
        self._wakeup_next(self._gets)

    async def recv(self) -> T:
        while self.empty() and not self._close_ev.is_set():
            g = self._loop.create_future()
            self._gets.append(g)

            try:
                await g
            except ChanClosed:
                raise
            except Exception:
                g.cancel()
                with contextlib.suppress(ValueError):
                    self._gets.remove(g)

                if not self.empty() and not g.cancelled():
                    self._wakeup_next(self._gets)

                raise

        return self.recv_nowait()

    def recv_nowait(self) -> T:
        if self.empty():
            if self._close_ev.is_set():
                raise ChanClosed
            else:
                raise ChanEmpty
        item = self._queue.popleft()
        #        if self.empty() and self._close_ev.is_set():
        #            self._finished_ev.set()
        self._wakeup_next(self._puts)
        return item

    def close(self) -> None:
        self._closed = True
        self._close_ev.set()
        for putter in self._puts:
            if not putter.cancelled():
                putter.set_exception(ChanClosed())

        while len(self._gets) > self.qsize():
            getter = self._gets.pop()
            if not getter.cancelled():
                getter.set_exception(ChanClosed())

        while self._gets:
            self._wakeup_next(self._gets)

    #        if self.empty():
    #            self._finished_ev.set()

    @property
    def closed(self) -> bool:
        return self._closed

    #    async def join(self) -> None:
    #        await self._finished_ev.wait()

    def qsize(self) -> int:
        """the number of elements queued (unread) in the channel buffer"""
        return len(self._queue)

    def full(self) -> bool:
        if self._maxsize <= 0:
            return False
        else:
            return self.qsize() >= self._maxsize

    def empty(self) -> bool:
        return not self._queue

    def __aiter__(self) -> AsyncIterator[T]:
        return self

    async def __anext__(self) -> T:
        try:
            return await self.recv()
        except ChanClosed:
            raise StopAsyncIteration

Ancestors

  • typing.Generic

Instance variables

prop closed : bool
Expand source code
@property
def closed(self) -> bool:
    return self._closed

Methods

def close(self) ‑> None
def empty(self) ‑> bool
def full(self) ‑> bool
def qsize(self) ‑> int

the number of elements queued (unread) in the channel buffer

async def recv(self) ‑> ~T
def recv_nowait(self) ‑> ~T
async def send(self, value: T) ‑> None
def send_nowait(self, value: T) ‑> None
class ChanClosed (*args, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class ChanClosed(Exception):
    pass

Ancestors

  • builtins.Exception
  • builtins.BaseException
class ChanEmpty (*args, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class ChanEmpty(Exception):
    pass

Ancestors

  • builtins.Exception
  • builtins.BaseException
class ChanFull (*args, **kwargs)

Common base class for all non-exit exceptions.

Expand source code
class ChanFull(Exception):
    pass

Ancestors

  • builtins.Exception
  • builtins.BaseException
class ChanReceiver (*args, **kwargs)

Base class for protocol classes.

Protocol classes are defined as::

class Proto(Protocol):
    def meth(self) -> int:
        ...

Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing).

For example::

class C:
    def meth(self) -> int:
        return 0

def func(x: Proto) -> int:
    return x.meth()

func(C())  # Passes static type check

See PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as::

class GenProto[T](Protocol):
    def meth(self) -> T:
        ...
Expand source code
class ChanReceiver(Protocol[T_co]):
    async def recv(self) -> T_co: ...

    def recv_nowait(self) -> T_co: ...

    def close(self) -> None: ...

    def __aiter__(self) -> AsyncIterator[T_co]: ...

    async def __anext__(self) -> T_co: ...

Ancestors

  • typing.Protocol
  • typing.Generic

Methods

def close(self) ‑> None
async def recv(self) ‑> +T_co
def recv_nowait(self) ‑> +T_co
class ChanSender (*args, **kwargs)

Base class for protocol classes.

Protocol classes are defined as::

class Proto(Protocol):
    def meth(self) -> int:
        ...

Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing).

For example::

class C:
    def meth(self) -> int:
        return 0

def func(x: Proto) -> int:
    return x.meth()

func(C())  # Passes static type check

See PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as::

class GenProto[T](Protocol):
    def meth(self) -> T:
        ...
Expand source code
class ChanSender(Protocol[T_contra]):
    async def send(self, value: T_contra) -> None: ...

    def send_nowait(self, value: T_contra) -> None: ...

    def close(self) -> None: ...

Ancestors

  • typing.Protocol
  • typing.Generic

Methods

def close(self) ‑> None
async def send(self, value: T_contra) ‑> None
def send_nowait(self, value: T_contra) ‑> None