Module livekit.agents.utils.aio.task_set
Classes
class TaskSet (loop: asyncio.AbstractEventLoop | None = None)
-
Small utility to create task in a fire-and-forget fashion.
Expand source code
class TaskSet: """ Small utility to create task in a fire-and-forget fashion. """ def __init__(self, loop: asyncio.AbstractEventLoop | None = None) -> None: self._loop = loop or asyncio.get_event_loop() self._set = set[asyncio.Task[Any]]() self._closed = False def create_task(self, coro: Coroutine[Any, Any, _T]) -> asyncio.Task[_T]: if self._closed: raise RuntimeError("TaskSet is closed") task = self._loop.create_task(coro) self._set.add(task) task.add_done_callback(self._set.remove) return task async def aclose(self) -> None: self._closed = True await asyncio.gather(*self._set, return_exceptions=True) self._set.clear()
Methods
async def aclose(self) ‑> None
def create_task(self, coro: Coroutine[Any, Any, _T]) ‑> _asyncio.Task[~_T]