Module livekit.agents.llm.async_toolset
Classes
class AsyncRunContext (*,
run_ctx: RunContext[Userdata_T],
toolset: AsyncToolset)-
Expand source code
class AsyncRunContext(RunContext[Userdata_T]): """Run context for async tool functions.""" def __init__(self, *, run_ctx: RunContext[Userdata_T], toolset: AsyncToolset) -> None: super().__init__( session=run_ctx.session, speech_handle=run_ctx.speech_handle, function_call=run_ctx.function_call, ) self._toolset = toolset self._pending_fut = asyncio.Future[Any]() self._step_idx: int = 0 async def update(self, message: str | Any, *, _template: str = UPDATE_TEMPLATE) -> None: """Push an intermediate progress update into the conversation. Updates the chat context immediately and enqueues a speech delivery at the toolset level. Multiple updates from different tools are coalesced — only the latest pending update triggers a reply. Args: message: The update message for the LLM (e.g. "Found 3 flights, selecting the best option..."). """ if isinstance(message, str): message = _template.format( function_name=self.function_call.name, call_id=self.function_call.call_id, message=message, ) if not self._pending_fut.done(): # first update will mark the tool execution in AgentActivity done self._pending_fut.set_result(message) # make the speech handle awaitable in the rest of the function self._function_call.extra["__livekit_agents_tool_pending"] = True return self._step_idx += 1 tool_output = self._make_tool_output( message, call_id=f"{self.function_call.call_id}/update_{self._step_idx}" ) if tool_output.fnc_call_out is None: return tool_items: list[ChatItem] = [tool_output.fnc_call, tool_output.fnc_call_out] await self._toolset._enqueue_reply(self, tool_items) def _make_tool_output( self, output: Any | BaseException, call_id: str | None ) -> ToolExecutionOutput: exception: BaseException | None = None if isinstance(output, BaseException): exception = output output = None fnc_call = FunctionCall( call_id=call_id if call_id is not None else self.function_call.call_id, name=self.function_call.name, arguments=self.function_call.arguments, extra=self.function_call.extra, ) return make_tool_output(fnc_call=fnc_call, output=output, exception=exception)Run context for async tool functions.
Ancestors
- livekit.agents.voice.events.RunContext
- typing.Generic
Methods
async def update(self, message: str | Any) ‑> None-
Expand source code
async def update(self, message: str | Any, *, _template: str = UPDATE_TEMPLATE) -> None: """Push an intermediate progress update into the conversation. Updates the chat context immediately and enqueues a speech delivery at the toolset level. Multiple updates from different tools are coalesced — only the latest pending update triggers a reply. Args: message: The update message for the LLM (e.g. "Found 3 flights, selecting the best option..."). """ if isinstance(message, str): message = _template.format( function_name=self.function_call.name, call_id=self.function_call.call_id, message=message, ) if not self._pending_fut.done(): # first update will mark the tool execution in AgentActivity done self._pending_fut.set_result(message) # make the speech handle awaitable in the rest of the function self._function_call.extra["__livekit_agents_tool_pending"] = True return self._step_idx += 1 tool_output = self._make_tool_output( message, call_id=f"{self.function_call.call_id}/update_{self._step_idx}" ) if tool_output.fnc_call_out is None: return tool_items: list[ChatItem] = [tool_output.fnc_call, tool_output.fnc_call_out] await self._toolset._enqueue_reply(self, tool_items)Push an intermediate progress update into the conversation.
Updates the chat context immediately and enqueues a speech delivery at the toolset level. Multiple updates from different tools are coalesced — only the latest pending update triggers a reply.
Args
message- The update message for the LLM (e.g. "Found 3 flights, selecting the best option…").
class AsyncToolset (*,
id: str,
tools: list[Tool] | None = None,
on_duplicate_call: DuplicateMode = 'confirm')-
Expand source code
class AsyncToolset(Toolset): """A toolset for running long-running functions in the background. Tools with an :class:`AsyncRunContext` parameter are wrapped to run in the background. Each ``ctx.update()`` and the final ``return`` inject a tool output into the conversation; the agent then generates a natural-language reply to the user based on that output. Example:: @function_tool async def book_flight(ctx: AsyncRunContext, origin: str, destination: str) -> dict: await ctx.update(f"Looking up flights from {origin} to {destination}...") # → agent says: "Sure, let me look up flights from NYC to Tokyo for you!" flights = await search_flights(origin, destination) await ctx.update(f"Found {len(flights)} flights, picking the best one...") # → agent says: "I found 3 flights — picking the best option for you now." booking = await book_best_flight(flights) return {"confirmation": booking.id} # → agent says: "All set! Your booking confirmation number is FL-847293." async_tools = AsyncToolset(id="booking", tools=[book_flight]) """ DuplicateMode = Literal["allow", "replace", "reject", "confirm"] def __init__( self, *, id: str, tools: list[Tool] | None = None, on_duplicate_call: DuplicateMode = "confirm", ) -> None: super().__init__(id=id, tools=tools) self._on_duplicate_call = on_duplicate_call self._tools = [ self._wrap_tool(t) if isinstance(t, FunctionTool | RawFunctionTool) else t for t in self._tools ] self._running_tasks: dict[str, _RunningTask] = {} # speech delivery — shared across all tools in this toolset self._pending_updates: list[_PendingUpdate] = [] self._reply_task: asyncio.Task[None] | None = None @function_tool async def get_running_tasks(self) -> list[dict]: """Get the list of running async tool calls.""" return [task.ctx.function_call.model_dump() for task in self._running_tasks.values()] @function_tool async def cancel_task(self, call_id: str) -> str: """Cancel a running async tool call by call_id.""" success = await self.cancel(call_id) if success: return f"Task {call_id} cancelled successfully." else: return f"Task {call_id} not found or already completed." async def cancel(self, call_id: str) -> bool: task = self._running_tasks.get(call_id) if task is not None: await utils.aio.cancel_and_wait(task.exe_task) return True return False async def aclose(self) -> None: """Cancel all tasks.""" await super().aclose() tasks = [task.exe_task for task in self._running_tasks.values()] if self._reply_task is not None: tasks.append(self._reply_task) await utils.aio.cancel_and_wait(*tasks) self._running_tasks.clear() def _wrap_tool(self, tool: FunctionTool | RawFunctionTool) -> FunctionTool | RawFunctionTool: if not _has_async_context_param(tool): return tool raw_schema = _build_raw_schema(tool) # inject confirm_duplicate parameter for confirm mode confirm_duplicate_param = "_lk_agents_confirm_duplicate" if self._on_duplicate_call == "confirm": props = raw_schema["parameters"].setdefault("properties", {}) props[confirm_duplicate_param] = { "type": ["boolean"], "description": ( "Set this to True to confirm you want to run a duplicate. " "Only do this when user confirms the duplication is needed." ), "default": False, } @function_tool(raw_schema=raw_schema, flags=tool.info.flags) async def wrapper(ctx: RunContext, raw_arguments: dict[str, Any]) -> Any: call_id = ctx.function_call.call_id fnc_name = ctx.function_call.name # duplicate detection confirm_duplicate = raw_arguments.pop(confirm_duplicate_param, None) duplicate_result = await self._check_duplicate(fnc_name, confirm_duplicate) if duplicate_result is not None: logger.debug( "duplicate tool call rejected", extra={"call_id": call_id, "function": fnc_name} ) return duplicate_result if call_id in self._running_tasks: raise ValueError(f"Task already running for call_id: {call_id}") async_ctx = AsyncRunContext(run_ctx=ctx, toolset=self) async def _execute_tool() -> Any: try: fnc_args, fnc_kwargs = prepare_function_arguments( fnc=tool, json_arguments=raw_arguments, call_ctx=async_ctx ) output = await tool(*fnc_args, **fnc_kwargs) except asyncio.CancelledError: logger.debug( "async tool cancelled", extra={"call_id": call_id, "function": fnc_name} ) if not async_ctx._pending_fut.done(): async_ctx._pending_fut.set_result(None) return except Exception as e: output = e logger.exception( "error in async tool", extra={"call_id": call_id, "function": fnc_name} ) if not async_ctx._pending_fut.done(): # pending() was never called — return output directly if isinstance(output, BaseException): async_ctx._pending_fut.set_exception(output) else: async_ctx._pending_fut.set_result(output) return if output is None: return tool_output = async_ctx._make_tool_output(output, call_id=f"{call_id}/finished") if tool_output.fnc_call_out is None: return await self._enqueue_reply( async_ctx, [tool_output.fnc_call, tool_output.fnc_call_out] ) exe_task = asyncio.create_task(_execute_tool(), name=f"async_tool_{fnc_name}") _pass_through_activity_task_info(exe_task) self._running_tasks[call_id] = _RunningTask(ctx=async_ctx, exe_task=exe_task) exe_task.add_done_callback(lambda _: self._running_tasks.pop(call_id, None)) return await async_ctx._pending_fut return wrapper async def _enqueue_reply(self, ctx: AsyncRunContext, items: list[ChatItem]) -> None: """Enqueue a reply for delivery. Coalesces with pending replies.""" agent = ctx.session.current_agent chat_ctx = agent.chat_ctx.copy() chat_ctx.insert(items) await agent.update_chat_ctx(chat_ctx) self._pending_updates.append(_PendingUpdate(ctx=ctx, items=items)) if self._reply_task is None or self._reply_task.done(): self._reply_task = asyncio.create_task( self._deliver_reply(ctx.session), name="async_toolset_deliver_reply" ) async def _deliver_reply(self, session: AgentSession) -> None: """Wait for the agent to be idle, then generate a reply.""" await session.wait_for_inactive() # no await after this line # snapshot and clear pending updates updates = self._pending_updates[:] self._pending_updates.clear() pending_items: list[ChatItem] = [] for update in updates: pending_items.extend(update.items) if not pending_items: return # skip if the agent already spoke after our updates # (e.g. user asked something and got a reply) agent_chat_items = session.current_agent.chat_ctx.items if agent_chat_items and agent_chat_items[-1].created_at > pending_items[-1].created_at: logger.debug("skipping async toolset reply — agent already spoke after updates") # TODO: use a LLM to verify if another reply is needed? return pending_call_ids = [ item.call_id for item in pending_items if item.type == "function_call_output" ] session.generate_reply( instructions=REPLY_INSTRUCTIONS.format(pending_call_ids=pending_call_ids), tool_choice="none", ) async def _check_duplicate(self, fnc_name: str, confirm_duplicate: bool) -> str | None: """Check for duplicate running tasks. Returns a message if blocked, None otherwise.""" if self._on_duplicate_call == "allow": return None running_fnc_calls = [ t.ctx.function_call for t in self._running_tasks.values() if t.ctx.function_call.name == fnc_name ] if len(running_fnc_calls) == 0: return None if self._on_duplicate_call == "replace": await asyncio.gather(*[self.cancel(fnc_call.call_id) for fnc_call in running_fnc_calls]) return None if self._on_duplicate_call == "reject": return DUPLICATE_REJECT.format( function_name=fnc_name, running_fnc_calls="\n".join( [fnc_call.model_dump_json() for fnc_call in running_fnc_calls] ), ) elif self._on_duplicate_call == "confirm" and not confirm_duplicate: return DUPLICATE_CONFIRM.format( function_name=fnc_name, running_fnc_calls="\n".join( [fnc_call.model_dump_json() for fnc_call in running_fnc_calls] ), ) return NoneA toolset for running long-running functions in the background.
Tools with an :class:
AsyncRunContextparameter are wrapped to run in the background. Eachctx.update()and the finalreturninject a tool output into the conversation; the agent then generates a natural-language reply to the user based on that output.Example::
@function_tool async def book_flight(ctx: AsyncRunContext, origin: str, destination: str) -> dict: await ctx.update(f"Looking up flights from {origin} to {destination}...") # → agent says: "Sure, let me look up flights from NYC to Tokyo for you!" flights = await search_flights(origin, destination) await ctx.update(f"Found {len(flights)} flights, picking the best one...") # → agent says: "I found 3 flights — picking the best option for you now." booking = await book_best_flight(flights) return {"confirmation": booking.id} # → agent says: "All set! Your booking confirmation number is FL-847293." async_tools = AsyncToolset(id="booking", tools=[book_flight])Ancestors
- livekit.agents.llm.tool_context.Toolset
Class variables
var DuplicateMode
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: """Cancel all tasks.""" await super().aclose() tasks = [task.exe_task for task in self._running_tasks.values()] if self._reply_task is not None: tasks.append(self._reply_task) await utils.aio.cancel_and_wait(*tasks) self._running_tasks.clear()Cancel all tasks.
async def cancel(self, call_id: str) ‑> bool-
Expand source code
async def cancel(self, call_id: str) -> bool: task = self._running_tasks.get(call_id) if task is not None: await utils.aio.cancel_and_wait(task.exe_task) return True return False async def cancel_task(self, call_id: str) ‑> str-
Expand source code
@function_tool async def cancel_task(self, call_id: str) -> str: """Cancel a running async tool call by call_id.""" success = await self.cancel(call_id) if success: return f"Task {call_id} cancelled successfully." else: return f"Task {call_id} not found or already completed."Cancel a running async tool call by call_id.
async def get_running_tasks(self) ‑> list[dict]-
Expand source code
@function_tool async def get_running_tasks(self) -> list[dict]: """Get the list of running async tool calls.""" return [task.ctx.function_call.model_dump() for task in self._running_tasks.values()]Get the list of running async tool calls.