Module livekit.agents.utils.aio.channel
Classes
class Chan (maxsize: int = 0, loop: asyncio.AbstractEventLoop | None = None)
-
Abstract base class for generic types.
On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::
class Mapping[KT, VT]: def __getitem__(self, key: KT) -> VT: ... # Etc.
On older versions of Python, however, generic classes have to explicitly inherit from Generic.
After a class has been declared to be generic, it can then be used as follows::
def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
Expand source code
class Chan(Generic[T]): def __init__( self, maxsize: int = 0, loop: asyncio.AbstractEventLoop | None = None ) -> None: self._loop = loop or asyncio.get_event_loop() self._maxsize = max(maxsize, 0) # self._finished_ev = asyncio.Event() self._close_ev = asyncio.Event() self._closed = False self._gets: Deque[asyncio.Future[T | None]] = deque() self._puts: Deque[asyncio.Future[T | None]] = deque() self._queue: Deque[T] = deque() def _wakeup_next(self, waiters: deque[asyncio.Future[T | None]]): while waiters: waiter = waiters.popleft() if not waiter.done(): waiter.set_result(None) break async def send(self, value: T) -> None: while self.full() and not self._close_ev.is_set(): p = self._loop.create_future() self._puts.append(p) try: await p except ChanClosed: raise except: p.cancel() with contextlib.suppress(ValueError): self._puts.remove(p) if not self.full() and not p.cancelled(): self._wakeup_next(self._puts) raise self.send_nowait(value) def send_nowait(self, value: T) -> None: if self.full(): raise ChanFull if self._close_ev.is_set(): raise ChanClosed self._queue.append(value) self._wakeup_next(self._gets) async def recv(self) -> T: while self.empty() and not self._close_ev.is_set(): g = self._loop.create_future() self._gets.append(g) try: await g except ChanClosed: raise except Exception: g.cancel() with contextlib.suppress(ValueError): self._gets.remove(g) if not self.empty() and not g.cancelled(): self._wakeup_next(self._gets) raise return self.recv_nowait() def recv_nowait(self) -> T: if self.empty(): if self._close_ev.is_set(): raise ChanClosed else: raise ChanEmpty item = self._queue.popleft() # if self.empty() and self._close_ev.is_set(): # self._finished_ev.set() self._wakeup_next(self._puts) return item def close(self) -> None: self._closed = True self._close_ev.set() for putter in self._puts: if not putter.cancelled(): putter.set_exception(ChanClosed()) while len(self._gets) > self.qsize(): getter = self._gets.pop() if not getter.cancelled(): getter.set_exception(ChanClosed()) while self._gets: self._wakeup_next(self._gets) # if self.empty(): # self._finished_ev.set() @property def closed(self) -> bool: return self._closed # async def join(self) -> None: # await self._finished_ev.wait() def qsize(self) -> int: """the number of elements queued (unread) in the channel buffer""" return len(self._queue) def full(self) -> bool: if self._maxsize <= 0: return False else: return self.qsize() >= self._maxsize def empty(self) -> bool: return not self._queue def __aiter__(self) -> AsyncIterator[T]: return self async def __anext__(self) -> T: try: return await self.recv() except ChanClosed: raise StopAsyncIteration
Ancestors
- typing.Generic
Instance variables
prop closed : bool
-
Expand source code
@property def closed(self) -> bool: return self._closed
Methods
def close(self) ‑> None
def empty(self) ‑> bool
def full(self) ‑> bool
def qsize(self) ‑> int
-
the number of elements queued (unread) in the channel buffer
async def recv(self) ‑> ~T
def recv_nowait(self) ‑> ~T
async def send(self, value: T) ‑> None
def send_nowait(self, value: T) ‑> None
class ChanClosed (*args, **kwargs)
-
Common base class for all non-exit exceptions.
Expand source code
class ChanClosed(Exception): pass
Ancestors
- builtins.Exception
- builtins.BaseException
class ChanEmpty (*args, **kwargs)
-
Common base class for all non-exit exceptions.
Expand source code
class ChanEmpty(Exception): pass
Ancestors
- builtins.Exception
- builtins.BaseException
class ChanFull (*args, **kwargs)
-
Common base class for all non-exit exceptions.
Expand source code
class ChanFull(Exception): pass
Ancestors
- builtins.Exception
- builtins.BaseException
class ChanReceiver (*args, **kwargs)
-
Base class for protocol classes.
Protocol classes are defined as::
class Proto(Protocol): def meth(self) -> int: ...
Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing).
For example::
class C: def meth(self) -> int: return 0 def func(x: Proto) -> int: return x.meth() func(C()) # Passes static type check
See PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as::
class GenProto[T](Protocol): def meth(self) -> T: ...
Expand source code
class ChanReceiver(Protocol[T_co]): async def recv(self) -> T_co: ... def recv_nowait(self) -> T_co: ... def close(self) -> None: ... def __aiter__(self) -> AsyncIterator[T_co]: ... async def __anext__(self) -> T_co: ...
Ancestors
- typing.Protocol
- typing.Generic
Methods
def close(self) ‑> None
async def recv(self) ‑> +T_co
def recv_nowait(self) ‑> +T_co
class ChanSender (*args, **kwargs)
-
Base class for protocol classes.
Protocol classes are defined as::
class Proto(Protocol): def meth(self) -> int: ...
Such classes are primarily used with static type checkers that recognize structural subtyping (static duck-typing).
For example::
class C: def meth(self) -> int: return 0 def func(x: Proto) -> int: return x.meth() func(C()) # Passes static type check
See PEP 544 for details. Protocol classes decorated with @typing.runtime_checkable act as simple-minded runtime protocols that check only the presence of given attributes, ignoring their type signatures. Protocol classes can be generic, they are defined as::
class GenProto[T](Protocol): def meth(self) -> T: ...
Expand source code
class ChanSender(Protocol[T_contra]): async def send(self, value: T_contra) -> None: ... def send_nowait(self, value: T_contra) -> None: ... def close(self) -> None: ...
Ancestors
- typing.Protocol
- typing.Generic
Methods
def close(self) ‑> None
async def send(self, value: T_contra) ‑> None
def send_nowait(self, value: T_contra) ‑> None