Module livekit.agents.llm.async_toolset
Classes
class AsyncToolset (*,
id: str,
tools: Sequence[Tool] | None = None,
tool_handling: ToolHandlingOptions | None = None,
on_duplicate_call: DuplicateMode | None = None)-
Expand source code
class AsyncToolset(Toolset): """Session-scoped toolset whose tools survive agent handoff. Background updates from tools in this toolset are delivered to whichever agent is current at delivery time, so a ``ctx.update()`` started under agent A still completes after a handoff to agent B. Tools placed on ``Agent(tools=...)`` instead use the activity-scoped executor and are cancelled/awaited on handoff. Example:: @function_tool(on_duplicate="confirm", flags=ToolFlag.CANCELLABLE) async def book_flight(ctx, origin: str, destination: str) -> dict: await ctx.update(f"Looking up flights {origin} → {destination}...") flights = await search(origin, destination) await ctx.update(f"Found {len(flights)}, picking the best...") return await book_best(flights) session = AgentSession(tools=[AsyncToolset(id="booking", tools=[book_flight])]) """ def __init__( self, *, id: str, tools: Sequence[Tool] | None = None, tool_handling: ToolHandlingOptions | None = None, # deprecated on_duplicate_call: DuplicateMode | None = None, ) -> None: if on_duplicate_call is not None: raise TypeError( "AsyncToolset(on_duplicate_call=...) has been deprecated; " "set `on_duplicate=...` on each @function_tool instead." ) super().__init__(id=id, tools=tools) self._async_tool_options_override = ( tool_handling.get("async_options") if tool_handling is not None else None ) self._executor = _ToolExecutor(owning_activity=None) def _attach_activity(self, *, activity: AgentActivity | None, session: AgentSession) -> None: """Bind this toolset to a scope. ``activity=None`` makes it session-scoped (replies survive handoff); otherwise replies stay with ``activity``'s agent.""" self._executor.set_owning_activity(activity) if self._async_tool_options_override is not None: resolved = _resolve_async_tool_options(self._async_tool_options_override) elif activity is not None and is_given(activity._agent._async_tool_options): resolved = _resolve_async_tool_options(activity._agent._async_tool_options) else: resolved = session._async_tool_options self._executor.set_tool_options(resolved) async def aclose(self) -> None: await super().aclose() await self._executor.drain() await self._executor.aclose()Session-scoped toolset whose tools survive agent handoff.
Background updates from tools in this toolset are delivered to whichever agent is current at delivery time, so a
ctx.update()started under agent A still completes after a handoff to agent B. Tools placed onAgent(tools=...)instead use the activity-scoped executor and are cancelled/awaited on handoff.Example::
@function_tool(on_duplicate="confirm", flags=ToolFlag.CANCELLABLE) async def book_flight(ctx, origin: str, destination: str) -> dict: await ctx.update(f"Looking up flights {origin} → {destination}...") flights = await search(origin, destination) await ctx.update(f"Found {len(flights)}, picking the best...") return await book_best(flights) session = AgentSession(tools=[AsyncToolset(id="booking", tools=[book_flight])])Ancestors
- livekit.agents.llm.tool_context.Toolset
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: await super().aclose() await self._executor.drain() await self._executor.aclose()Close the toolset and release any held resources.
Agent-scoped toolsets (passed to
Agent(tools=...)) are closed when theAgentActivityends (on agent transition or session close). Session-scoped toolsets (passed toAgentSession(tools=...)) are closed only when theAgentSessionshuts down.
class AsyncRunContext (*,
session: AgentSession[Userdata_T],
speech_handle: SpeechHandle,
function_call: FunctionCall)-
Expand source code
class RunContext(Generic[Userdata_T]): # private ctor def __init__( self, *, session: AgentSession[Userdata_T], speech_handle: SpeechHandle, function_call: FunctionCall, ) -> None: self._session = session self._speech_handle = speech_handle self._function_call = function_call self._initial_step_idx = speech_handle.num_steps - 1 self._filler_schedulers: list[_FillerScheduler] = [] # synthesized progress-update pairs, populated whether or not an executor is attached self._updates: list[tuple[FunctionCall, FunctionCallOutput]] = [] # set/cleared by the executor around the tool's lifetime self._executor: _ToolExecutor | None = None self._first_update_fut: asyncio.Future[Any] | None = None @property def session(self) -> AgentSession[Userdata_T]: return self._session @property def speech_handle(self) -> SpeechHandle: return self._speech_handle @property def function_call(self) -> FunctionCall: return self._function_call @property def userdata(self) -> Userdata_T: return self.session.userdata def disallow_interruptions(self) -> None: """Disable interruptions for this FunctionCall. Delegates to the SpeechHandle.allow_interruptions setter, which will raise a RuntimeError if the handle is already interrupted. Raises: RuntimeError: If the SpeechHandle is already interrupted. """ self.speech_handle.allow_interruptions = False async def wait_for_playout(self) -> None: """Waits for the speech playout corresponding to this function call step. Unlike `SpeechHandle.wait_for_playout`, which waits for the full assistant turn to complete (including all function tools), this method only waits for the assistant's spoken response prior running this tool to finish playing.""" await self.speech_handle._wait_for_generation(step_idx=self._initial_step_idx) @asynccontextmanager async def with_filler( self, source: _FillerSource, *, delay: float = 0, interval: float | None = None, max_steps: int | None = None, ) -> AsyncIterator[None]: """Schedule filler speech while a long-running step blocks the tool. While the context is open, a background scheduler waits for the session to be continuously idle for ``delay`` seconds, then plays ``source``. With ``interval`` set, it then sleeps that many wall-clock seconds before restarting the dwell wait. ``interval=None`` (default) fires at most once. Args: source: Either a string (spoken via ``session.say``), or a callable ``(step: int) -> SpeechHandle | str | None`` invoked at fire time with the iteration count. Returning ``None`` skips this fire and retries on the next interval; the step counter only advances when a handle is produced. Use ``max_steps`` to cap the total number of fires. delay: Continuous-idle dwell required before each fire. ``0`` = fire as soon as the session is next idle. interval: Wall-clock cooldown after each fire. ``None`` = fire at most once. max_steps: Maximum number of fires across the lifetime of the cm. ``None`` = no limit. """ scheduler = _FillerScheduler( session=self._session, speech_handle=self._speech_handle, source=source, delay=delay, interval=interval, max_steps=max_steps, ) self._filler_schedulers.append(scheduler) try: yield finally: await scheduler.aclose() self._filler_schedulers.remove(scheduler) @asynccontextmanager async def foreground(self) -> AsyncIterator[AgentActivity]: """Wait for idle, then hold the floor while interactive work runs. Use cases: - wrap an ``await AgentTask()`` so it doesn't race with current speech or another tool's queued reply - wrap a direct ``generate_reply`` / ``say`` for the same reason - group multiple interactive calls so no deferred tool reply lands between them On enter, drains this tool's pending deferred reply first so its speech plays before the floor is held — keeps chat order matching code order. """ await self._drain_pending_reply() async with self._session._wait_for_idle_and_hold() as activity: yield activity async def update( self, message: str | Any, *, template: str | Callable[[UpdatePromptArgs], str] | None = None, ) -> None: """Push a progress update into the conversation. The first update releases control to the LLM with ``message`` as the tool's synthetic return; subsequent updates are coalesced into a deferred reply. Outside the voice path (e.g. ``execute_function_call``) updates are recorded on the result but no reply is fired. Args: message: Progress message; strings are wrapped by ``template``. template: Per-call override — either a ``str.format()`` template or a callable receiving ``UpdatePromptArgs``. Defaults to the executor's resolved ``update`` template (or the module default when standalone). """ # update() is a deliberate agent action — reset any active filler dwell so a # pending filler doesn't race the real update to the speech queue for s in self._filler_schedulers: s.reset_dwell() if isinstance(message, str): if template is None: if self._executor is not None: template = self._executor._tool_options["update_template"] else: from .tool_executor import UPDATE_TEMPLATE template = UPDATE_TEMPLATE from .tool_executor import _render message = _render( template, { "function_name": self.function_call.name, "call_id": self.function_call.call_id, "message": message, }, ) # first update keeps the original call_id update_step = len(self._updates) pair = self._make_update_pair( message, call_id_suffix=f"_update_{update_step}" if update_step > 0 else "" ) self._updates.append(pair) if self._executor is None: return # standalone — nothing else to do assert self._first_update_fut is not None if not self._first_update_fut.done(): self._first_update_fut.set_result(message) self._function_call.extra["__livekit_agents_tool_non_blocking"] = True return await self._executor._enqueue_reply(self, [pair[0], pair[1]]) def _attach_executor( self, executor: _ToolExecutor, first_update_fut: asyncio.Future[Any] ) -> None: if self._first_update_fut is not None: raise ValueError("Executor already attached") self._executor = executor self._first_update_fut = first_update_fut def _detach_executor(self) -> None: self._executor = None self._first_update_fut = None async def _drain_pending_reply(self) -> None: """Wait for this tool's pending deferred reply to finish delivery, if any.""" if self._executor is None: return reply_task = self._executor._reply_task if reply_task is None or reply_task.done(): return try: await asyncio.shield(reply_task) except Exception: pass # reply task's own errors aren't our concern def _make_update_pair( self, message: Any, *, call_id_suffix: str = "" ) -> tuple[FunctionCall, FunctionCallOutput]: """Synthesize a (FunctionCall, FunctionCallOutput) pair for a progress update. The new FunctionCall carries ``{call_id}{call_id_suffix}``; name/arguments/extra are copied. ``make_tool_output`` is reused so error handling matches dispatch. """ from .generation import make_tool_output fnc_call = FunctionCall( call_id=f"{self.function_call.call_id}{call_id_suffix}", name=self.function_call.name, arguments=self.function_call.arguments, extra=dict(self.function_call.extra), ) tool_output = make_tool_output(fnc_call=fnc_call, output=message, exception=None) # fall back to a stub when the message isn't a valid tool output (e.g. raw object) if tool_output.fnc_call_out is None: fnc_call_out = FunctionCallOutput( name=fnc_call.name, call_id=fnc_call.call_id, output=str(message or ""), is_error=False, ) else: fnc_call_out = tool_output.fnc_call_out return (fnc_call, fnc_call_out)Abstract base class for generic types.
On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::
class Mapping[KT, VT]: def __getitem__(self, key: KT) -> VT: ... # Etc.On older versions of Python, however, generic classes have to explicitly inherit from Generic.
After a class has been declared to be generic, it can then be used as follows::
def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return defaultAncestors
- typing.Generic
Instance variables
prop function_call : FunctionCall-
Expand source code
@property def function_call(self) -> FunctionCall: return self._function_call prop session : AgentSession[Userdata_T]-
Expand source code
@property def session(self) -> AgentSession[Userdata_T]: return self._session prop speech_handle : SpeechHandle-
Expand source code
@property def speech_handle(self) -> SpeechHandle: return self._speech_handle prop userdata : Userdata_T-
Expand source code
@property def userdata(self) -> Userdata_T: return self.session.userdata
Methods
def disallow_interruptions(self) ‑> None-
Expand source code
def disallow_interruptions(self) -> None: """Disable interruptions for this FunctionCall. Delegates to the SpeechHandle.allow_interruptions setter, which will raise a RuntimeError if the handle is already interrupted. Raises: RuntimeError: If the SpeechHandle is already interrupted. """ self.speech_handle.allow_interruptions = FalseDisable interruptions for this FunctionCall.
Delegates to the SpeechHandle.allow_interruptions setter, which will raise a RuntimeError if the handle is already interrupted.
Raises
RuntimeError- If the SpeechHandle is already interrupted.
async def foreground(self) ‑> AsyncIterator[AgentActivity]-
Expand source code
@asynccontextmanager async def foreground(self) -> AsyncIterator[AgentActivity]: """Wait for idle, then hold the floor while interactive work runs. Use cases: - wrap an ``await AgentTask()`` so it doesn't race with current speech or another tool's queued reply - wrap a direct ``generate_reply`` / ``say`` for the same reason - group multiple interactive calls so no deferred tool reply lands between them On enter, drains this tool's pending deferred reply first so its speech plays before the floor is held — keeps chat order matching code order. """ await self._drain_pending_reply() async with self._session._wait_for_idle_and_hold() as activity: yield activityWait for idle, then hold the floor while interactive work runs.
Use cases:
- wrap an
await AgentTask()so it doesn't race with current speech or another tool's queued reply - wrap a direct
generate_reply/sayfor the same reason - group multiple interactive calls so no deferred tool reply lands between them
On enter, drains this tool's pending deferred reply first so its speech plays before the floor is held — keeps chat order matching code order.
- wrap an
async def update(self,
message: str | Any,
*,
template: str | Callable[[UpdatePromptArgs], str] | None = None) ‑> None-
Expand source code
async def update( self, message: str | Any, *, template: str | Callable[[UpdatePromptArgs], str] | None = None, ) -> None: """Push a progress update into the conversation. The first update releases control to the LLM with ``message`` as the tool's synthetic return; subsequent updates are coalesced into a deferred reply. Outside the voice path (e.g. ``execute_function_call``) updates are recorded on the result but no reply is fired. Args: message: Progress message; strings are wrapped by ``template``. template: Per-call override — either a ``str.format()`` template or a callable receiving ``UpdatePromptArgs``. Defaults to the executor's resolved ``update`` template (or the module default when standalone). """ # update() is a deliberate agent action — reset any active filler dwell so a # pending filler doesn't race the real update to the speech queue for s in self._filler_schedulers: s.reset_dwell() if isinstance(message, str): if template is None: if self._executor is not None: template = self._executor._tool_options["update_template"] else: from .tool_executor import UPDATE_TEMPLATE template = UPDATE_TEMPLATE from .tool_executor import _render message = _render( template, { "function_name": self.function_call.name, "call_id": self.function_call.call_id, "message": message, }, ) # first update keeps the original call_id update_step = len(self._updates) pair = self._make_update_pair( message, call_id_suffix=f"_update_{update_step}" if update_step > 0 else "" ) self._updates.append(pair) if self._executor is None: return # standalone — nothing else to do assert self._first_update_fut is not None if not self._first_update_fut.done(): self._first_update_fut.set_result(message) self._function_call.extra["__livekit_agents_tool_non_blocking"] = True return await self._executor._enqueue_reply(self, [pair[0], pair[1]])Push a progress update into the conversation.
The first update releases control to the LLM with
messageas the tool's synthetic return; subsequent updates are coalesced into a deferred reply. Outside the voice path (e.g.execute_function_call) updates are recorded on the result but no reply is fired.Args
message- Progress message; strings are wrapped by
template. template- Per-call override — either a
str.format()template or a callable receivingUpdatePromptArgs. Defaults to the executor's resolvedupdatetemplate (or the module default when standalone).
async def wait_for_playout(self) ‑> None-
Expand source code
async def wait_for_playout(self) -> None: """Waits for the speech playout corresponding to this function call step. Unlike `SpeechHandle.wait_for_playout`, which waits for the full assistant turn to complete (including all function tools), this method only waits for the assistant's spoken response prior running this tool to finish playing.""" await self.speech_handle._wait_for_generation(step_idx=self._initial_step_idx)Waits for the speech playout corresponding to this function call step.
Unlike
SpeechHandle.wait_for_playout, which waits for the full assistant turn to complete (including all function tools), this method only waits for the assistant's spoken response prior running this tool to finish playing. async def with_filler(self,
source: _FillerSource,
*,
delay: float = 0,
interval: float | None = None,
max_steps: int | None = None) ‑> AsyncIterator[None]-
Expand source code
@asynccontextmanager async def with_filler( self, source: _FillerSource, *, delay: float = 0, interval: float | None = None, max_steps: int | None = None, ) -> AsyncIterator[None]: """Schedule filler speech while a long-running step blocks the tool. While the context is open, a background scheduler waits for the session to be continuously idle for ``delay`` seconds, then plays ``source``. With ``interval`` set, it then sleeps that many wall-clock seconds before restarting the dwell wait. ``interval=None`` (default) fires at most once. Args: source: Either a string (spoken via ``session.say``), or a callable ``(step: int) -> SpeechHandle | str | None`` invoked at fire time with the iteration count. Returning ``None`` skips this fire and retries on the next interval; the step counter only advances when a handle is produced. Use ``max_steps`` to cap the total number of fires. delay: Continuous-idle dwell required before each fire. ``0`` = fire as soon as the session is next idle. interval: Wall-clock cooldown after each fire. ``None`` = fire at most once. max_steps: Maximum number of fires across the lifetime of the cm. ``None`` = no limit. """ scheduler = _FillerScheduler( session=self._session, speech_handle=self._speech_handle, source=source, delay=delay, interval=interval, max_steps=max_steps, ) self._filler_schedulers.append(scheduler) try: yield finally: await scheduler.aclose() self._filler_schedulers.remove(scheduler)Schedule filler speech while a long-running step blocks the tool.
While the context is open, a background scheduler waits for the session to be continuously idle for
delayseconds, then playssource. Withintervalset, it then sleeps that many wall-clock seconds before restarting the dwell wait.interval=None(default) fires at most once.Args
source- Either a string (spoken via
session.say), or a callable(step: int) -> SpeechHandle | str | Noneinvoked at fire time with the iteration count. ReturningNoneskips this fire and retries on the next interval; the step counter only advances when a handle is produced. Usemax_stepsto cap the total number of fires. delay- Continuous-idle dwell required before each fire.
0= fire as soon as the session is next idle. interval- Wall-clock cooldown after each fire.
None= fire at most once. max_steps- Maximum number of fires across the lifetime of the cm.
None= no limit.