Module livekit.agents.llm.async_toolset

Classes

class AsyncToolset (*,
id: str,
tools: Sequence[Tool] | None = None,
tool_handling: ToolHandlingOptions | None = None,
on_duplicate_call: DuplicateMode | None = None)
Expand source code
class AsyncToolset(Toolset):
    """Session-scoped toolset whose tools survive agent handoff.

    Background updates from tools in this toolset are delivered to whichever agent
    is current at delivery time, so a ``ctx.update()`` started under agent A still
    completes after a handoff to agent B. Tools placed on ``Agent(tools=...)``
    instead use the activity-scoped executor and are cancelled/awaited on handoff.

    Example::

        @function_tool(on_duplicate="confirm", flags=ToolFlag.CANCELLABLE)
        async def book_flight(ctx, origin: str, destination: str) -> dict:
            await ctx.update(f"Looking up flights {origin} → {destination}...")
            flights = await search(origin, destination)
            await ctx.update(f"Found {len(flights)}, picking the best...")
            return await book_best(flights)

        session = AgentSession(tools=[AsyncToolset(id="booking", tools=[book_flight])])
    """

    def __init__(
        self,
        *,
        id: str,
        tools: Sequence[Tool] | None = None,
        tool_handling: ToolHandlingOptions | None = None,
        # deprecated
        on_duplicate_call: DuplicateMode | None = None,
    ) -> None:
        if on_duplicate_call is not None:
            raise TypeError(
                "AsyncToolset(on_duplicate_call=...) has been deprecated; "
                "set `on_duplicate=...` on each @function_tool instead."
            )

        super().__init__(id=id, tools=tools)
        self._async_tool_options_override = (
            tool_handling.get("async_options") if tool_handling is not None else None
        )
        self._executor = _ToolExecutor(owning_activity=None)

    def _attach_activity(self, *, activity: AgentActivity | None, session: AgentSession) -> None:
        """Bind this toolset to a scope. ``activity=None`` makes it session-scoped
        (replies survive handoff); otherwise replies stay with ``activity``'s agent."""
        self._executor.set_owning_activity(activity)

        if self._async_tool_options_override is not None:
            resolved = _resolve_async_tool_options(self._async_tool_options_override)
        elif activity is not None and is_given(activity._agent._async_tool_options):
            resolved = _resolve_async_tool_options(activity._agent._async_tool_options)
        else:
            resolved = session._async_tool_options
        self._executor.set_tool_options(resolved)

    async def aclose(self) -> None:
        await super().aclose()
        await self._executor.drain()
        await self._executor.aclose()

Session-scoped toolset whose tools survive agent handoff.

Background updates from tools in this toolset are delivered to whichever agent is current at delivery time, so a ctx.update() started under agent A still completes after a handoff to agent B. Tools placed on Agent(tools=...) instead use the activity-scoped executor and are cancelled/awaited on handoff.

Example::

@function_tool(on_duplicate="confirm", flags=ToolFlag.CANCELLABLE)
async def book_flight(ctx, origin: str, destination: str) -> dict:
    await ctx.update(f"Looking up flights {origin} → {destination}...")
    flights = await search(origin, destination)
    await ctx.update(f"Found {len(flights)}, picking the best...")
    return await book_best(flights)

session = AgentSession(tools=[AsyncToolset(id="booking", tools=[book_flight])])

Ancestors

  • livekit.agents.llm.tool_context.Toolset

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    await super().aclose()
    await self._executor.drain()
    await self._executor.aclose()

Close the toolset and release any held resources.

Agent-scoped toolsets (passed to Agent(tools=...)) are closed when the AgentActivity ends (on agent transition or session close). Session-scoped toolsets (passed to AgentSession(tools=...)) are closed only when the AgentSession shuts down.

class AsyncRunContext (*,
session: AgentSession[Userdata_T],
speech_handle: SpeechHandle,
function_call: FunctionCall)
Expand source code
class RunContext(Generic[Userdata_T]):
    # private ctor
    def __init__(
        self,
        *,
        session: AgentSession[Userdata_T],
        speech_handle: SpeechHandle,
        function_call: FunctionCall,
    ) -> None:
        self._session = session
        self._speech_handle = speech_handle
        self._function_call = function_call

        self._initial_step_idx = speech_handle.num_steps - 1
        self._filler_schedulers: list[_FillerScheduler] = []

        # synthesized progress-update pairs, populated whether or not an executor is attached
        self._updates: list[tuple[FunctionCall, FunctionCallOutput]] = []

        # set/cleared by the executor around the tool's lifetime
        self._executor: _ToolExecutor | None = None
        self._first_update_fut: asyncio.Future[Any] | None = None

    @property
    def session(self) -> AgentSession[Userdata_T]:
        return self._session

    @property
    def speech_handle(self) -> SpeechHandle:
        return self._speech_handle

    @property
    def function_call(self) -> FunctionCall:
        return self._function_call

    @property
    def userdata(self) -> Userdata_T:
        return self.session.userdata

    def disallow_interruptions(self) -> None:
        """Disable interruptions for this FunctionCall.

        Delegates to the SpeechHandle.allow_interruptions setter,
        which will raise a RuntimeError if the handle is already interrupted.

        Raises:
            RuntimeError: If the SpeechHandle is already interrupted.
        """
        self.speech_handle.allow_interruptions = False

    async def wait_for_playout(self) -> None:
        """Waits for the speech playout corresponding to this function call step.

        Unlike `SpeechHandle.wait_for_playout`, which waits for the full
        assistant turn to complete (including all function tools),
        this method only waits for the assistant's spoken response prior running
        this tool to finish playing."""
        await self.speech_handle._wait_for_generation(step_idx=self._initial_step_idx)

    @asynccontextmanager
    async def with_filler(
        self,
        source: _FillerSource,
        *,
        delay: float = 0,
        interval: float | None = None,
        max_steps: int | None = None,
    ) -> AsyncIterator[None]:
        """Schedule filler speech while a long-running step blocks the tool.

        While the context is open, a background scheduler waits for the session to be
        continuously idle for ``delay`` seconds, then plays ``source``. With ``interval``
        set, it then sleeps that many wall-clock seconds before restarting the dwell
        wait. ``interval=None`` (default) fires at most once.

        Args:
            source: Either a string (spoken via ``session.say``), or a callable
                ``(step: int) -> SpeechHandle | str | None`` invoked at fire time with
                the iteration count. Returning ``None`` skips this fire and retries on
                the next interval; the step counter only advances when a handle is
                produced. Use ``max_steps`` to cap the total number of fires.
            delay: Continuous-idle dwell required before each fire. ``0`` = fire as
                soon as the session is next idle.
            interval: Wall-clock cooldown after each fire. ``None`` = fire at most once.
            max_steps: Maximum number of fires across the lifetime of the cm.
                ``None`` = no limit.
        """
        scheduler = _FillerScheduler(
            session=self._session,
            speech_handle=self._speech_handle,
            source=source,
            delay=delay,
            interval=interval,
            max_steps=max_steps,
        )
        self._filler_schedulers.append(scheduler)
        try:
            yield
        finally:
            await scheduler.aclose()
            self._filler_schedulers.remove(scheduler)

    @asynccontextmanager
    async def foreground(self) -> AsyncIterator[AgentActivity]:
        """Wait for idle, then hold the floor while interactive work runs.

        Use cases:

        - wrap an ``await AgentTask()`` so it doesn't race with current speech
          or another tool's queued reply
        - wrap a direct ``generate_reply`` / ``say`` for the same reason
        - group multiple interactive calls so no deferred tool reply lands between them

        On enter, drains this tool's pending deferred reply first so its speech
        plays before the floor is held — keeps chat order matching code order.
        """
        await self._drain_pending_reply()
        async with self._session._wait_for_idle_and_hold() as activity:
            yield activity

    async def update(
        self,
        message: str | Any,
        *,
        template: str | Callable[[UpdatePromptArgs], str] | None = None,
    ) -> None:
        """Push a progress update into the conversation.

        The first update releases control to the LLM with ``message`` as the tool's
        synthetic return; subsequent updates are coalesced into a deferred reply.
        Outside the voice path (e.g. ``execute_function_call``) updates are recorded
        on the result but no reply is fired.

        Args:
            message: Progress message; strings are wrapped by ``template``.
            template: Per-call override — either a ``str.format()`` template or a
                callable receiving ``UpdatePromptArgs``. Defaults to the executor's
                resolved ``update`` template (or the module default when standalone).
        """
        # update() is a deliberate agent action — reset any active filler dwell so a
        # pending filler doesn't race the real update to the speech queue
        for s in self._filler_schedulers:
            s.reset_dwell()

        if isinstance(message, str):
            if template is None:
                if self._executor is not None:
                    template = self._executor._tool_options["update_template"]
                else:
                    from .tool_executor import UPDATE_TEMPLATE

                    template = UPDATE_TEMPLATE
            from .tool_executor import _render

            message = _render(
                template,
                {
                    "function_name": self.function_call.name,
                    "call_id": self.function_call.call_id,
                    "message": message,
                },
            )

        # first update keeps the original call_id
        update_step = len(self._updates)
        pair = self._make_update_pair(
            message, call_id_suffix=f"_update_{update_step}" if update_step > 0 else ""
        )
        self._updates.append(pair)

        if self._executor is None:
            return  # standalone — nothing else to do

        assert self._first_update_fut is not None
        if not self._first_update_fut.done():
            self._first_update_fut.set_result(message)
            self._function_call.extra["__livekit_agents_tool_non_blocking"] = True
            return

        await self._executor._enqueue_reply(self, [pair[0], pair[1]])

    def _attach_executor(
        self, executor: _ToolExecutor, first_update_fut: asyncio.Future[Any]
    ) -> None:
        if self._first_update_fut is not None:
            raise ValueError("Executor already attached")
        self._executor = executor
        self._first_update_fut = first_update_fut

    def _detach_executor(self) -> None:
        self._executor = None
        self._first_update_fut = None

    async def _drain_pending_reply(self) -> None:
        """Wait for this tool's pending deferred reply to finish delivery, if any."""
        if self._executor is None:
            return
        reply_task = self._executor._reply_task
        if reply_task is None or reply_task.done():
            return
        try:
            await asyncio.shield(reply_task)
        except Exception:
            pass  # reply task's own errors aren't our concern

    def _make_update_pair(
        self, message: Any, *, call_id_suffix: str = ""
    ) -> tuple[FunctionCall, FunctionCallOutput]:
        """Synthesize a (FunctionCall, FunctionCallOutput) pair for a progress update.

        The new FunctionCall carries ``{call_id}{call_id_suffix}``; name/arguments/extra
        are copied. ``make_tool_output`` is reused so error handling matches dispatch.
        """
        from .generation import make_tool_output

        fnc_call = FunctionCall(
            call_id=f"{self.function_call.call_id}{call_id_suffix}",
            name=self.function_call.name,
            arguments=self.function_call.arguments,
            extra=dict(self.function_call.extra),
        )
        tool_output = make_tool_output(fnc_call=fnc_call, output=message, exception=None)
        # fall back to a stub when the message isn't a valid tool output (e.g. raw object)
        if tool_output.fnc_call_out is None:
            fnc_call_out = FunctionCallOutput(
                name=fnc_call.name,
                call_id=fnc_call.call_id,
                output=str(message or ""),
                is_error=False,
            )
        else:
            fnc_call_out = tool_output.fnc_call_out
        return (fnc_call, fnc_call_out)

Abstract base class for generic types.

On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::

class Mapping[KT, VT]:
    def __getitem__(self, key: KT) -> VT:
        ...
    # Etc.

On older versions of Python, however, generic classes have to explicitly inherit from Generic.

After a class has been declared to be generic, it can then be used as follows::

def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT:
    try:
        return mapping[key]
    except KeyError:
        return default

Ancestors

  • typing.Generic

Instance variables

prop function_call : FunctionCall
Expand source code
@property
def function_call(self) -> FunctionCall:
    return self._function_call
prop session : AgentSession[Userdata_T]
Expand source code
@property
def session(self) -> AgentSession[Userdata_T]:
    return self._session
prop speech_handle : SpeechHandle
Expand source code
@property
def speech_handle(self) -> SpeechHandle:
    return self._speech_handle
prop userdata : Userdata_T
Expand source code
@property
def userdata(self) -> Userdata_T:
    return self.session.userdata

Methods

def disallow_interruptions(self) ‑> None
Expand source code
def disallow_interruptions(self) -> None:
    """Disable interruptions for this FunctionCall.

    Delegates to the SpeechHandle.allow_interruptions setter,
    which will raise a RuntimeError if the handle is already interrupted.

    Raises:
        RuntimeError: If the SpeechHandle is already interrupted.
    """
    self.speech_handle.allow_interruptions = False

Disable interruptions for this FunctionCall.

Delegates to the SpeechHandle.allow_interruptions setter, which will raise a RuntimeError if the handle is already interrupted.

Raises

RuntimeError
If the SpeechHandle is already interrupted.
async def foreground(self) ‑> AsyncIterator[AgentActivity]
Expand source code
@asynccontextmanager
async def foreground(self) -> AsyncIterator[AgentActivity]:
    """Wait for idle, then hold the floor while interactive work runs.

    Use cases:

    - wrap an ``await AgentTask()`` so it doesn't race with current speech
      or another tool's queued reply
    - wrap a direct ``generate_reply`` / ``say`` for the same reason
    - group multiple interactive calls so no deferred tool reply lands between them

    On enter, drains this tool's pending deferred reply first so its speech
    plays before the floor is held — keeps chat order matching code order.
    """
    await self._drain_pending_reply()
    async with self._session._wait_for_idle_and_hold() as activity:
        yield activity

Wait for idle, then hold the floor while interactive work runs.

Use cases:

  • wrap an await AgentTask() so it doesn't race with current speech or another tool's queued reply
  • wrap a direct generate_reply / say for the same reason
  • group multiple interactive calls so no deferred tool reply lands between them

On enter, drains this tool's pending deferred reply first so its speech plays before the floor is held — keeps chat order matching code order.

async def update(self,
message: str | Any,
*,
template: str | Callable[[UpdatePromptArgs], str] | None = None) ‑> None
Expand source code
async def update(
    self,
    message: str | Any,
    *,
    template: str | Callable[[UpdatePromptArgs], str] | None = None,
) -> None:
    """Push a progress update into the conversation.

    The first update releases control to the LLM with ``message`` as the tool's
    synthetic return; subsequent updates are coalesced into a deferred reply.
    Outside the voice path (e.g. ``execute_function_call``) updates are recorded
    on the result but no reply is fired.

    Args:
        message: Progress message; strings are wrapped by ``template``.
        template: Per-call override — either a ``str.format()`` template or a
            callable receiving ``UpdatePromptArgs``. Defaults to the executor's
            resolved ``update`` template (or the module default when standalone).
    """
    # update() is a deliberate agent action — reset any active filler dwell so a
    # pending filler doesn't race the real update to the speech queue
    for s in self._filler_schedulers:
        s.reset_dwell()

    if isinstance(message, str):
        if template is None:
            if self._executor is not None:
                template = self._executor._tool_options["update_template"]
            else:
                from .tool_executor import UPDATE_TEMPLATE

                template = UPDATE_TEMPLATE
        from .tool_executor import _render

        message = _render(
            template,
            {
                "function_name": self.function_call.name,
                "call_id": self.function_call.call_id,
                "message": message,
            },
        )

    # first update keeps the original call_id
    update_step = len(self._updates)
    pair = self._make_update_pair(
        message, call_id_suffix=f"_update_{update_step}" if update_step > 0 else ""
    )
    self._updates.append(pair)

    if self._executor is None:
        return  # standalone — nothing else to do

    assert self._first_update_fut is not None
    if not self._first_update_fut.done():
        self._first_update_fut.set_result(message)
        self._function_call.extra["__livekit_agents_tool_non_blocking"] = True
        return

    await self._executor._enqueue_reply(self, [pair[0], pair[1]])

Push a progress update into the conversation.

The first update releases control to the LLM with message as the tool's synthetic return; subsequent updates are coalesced into a deferred reply. Outside the voice path (e.g. execute_function_call) updates are recorded on the result but no reply is fired.

Args

message
Progress message; strings are wrapped by template.
template
Per-call override — either a str.format() template or a callable receiving UpdatePromptArgs. Defaults to the executor's resolved update template (or the module default when standalone).
async def wait_for_playout(self) ‑> None
Expand source code
async def wait_for_playout(self) -> None:
    """Waits for the speech playout corresponding to this function call step.

    Unlike `SpeechHandle.wait_for_playout`, which waits for the full
    assistant turn to complete (including all function tools),
    this method only waits for the assistant's spoken response prior running
    this tool to finish playing."""
    await self.speech_handle._wait_for_generation(step_idx=self._initial_step_idx)

Waits for the speech playout corresponding to this function call step.

Unlike SpeechHandle.wait_for_playout, which waits for the full assistant turn to complete (including all function tools), this method only waits for the assistant's spoken response prior running this tool to finish playing.

async def with_filler(self,
source: _FillerSource,
*,
delay: float = 0,
interval: float | None = None,
max_steps: int | None = None) ‑> AsyncIterator[None]
Expand source code
@asynccontextmanager
async def with_filler(
    self,
    source: _FillerSource,
    *,
    delay: float = 0,
    interval: float | None = None,
    max_steps: int | None = None,
) -> AsyncIterator[None]:
    """Schedule filler speech while a long-running step blocks the tool.

    While the context is open, a background scheduler waits for the session to be
    continuously idle for ``delay`` seconds, then plays ``source``. With ``interval``
    set, it then sleeps that many wall-clock seconds before restarting the dwell
    wait. ``interval=None`` (default) fires at most once.

    Args:
        source: Either a string (spoken via ``session.say``), or a callable
            ``(step: int) -> SpeechHandle | str | None`` invoked at fire time with
            the iteration count. Returning ``None`` skips this fire and retries on
            the next interval; the step counter only advances when a handle is
            produced. Use ``max_steps`` to cap the total number of fires.
        delay: Continuous-idle dwell required before each fire. ``0`` = fire as
            soon as the session is next idle.
        interval: Wall-clock cooldown after each fire. ``None`` = fire at most once.
        max_steps: Maximum number of fires across the lifetime of the cm.
            ``None`` = no limit.
    """
    scheduler = _FillerScheduler(
        session=self._session,
        speech_handle=self._speech_handle,
        source=source,
        delay=delay,
        interval=interval,
        max_steps=max_steps,
    )
    self._filler_schedulers.append(scheduler)
    try:
        yield
    finally:
        await scheduler.aclose()
        self._filler_schedulers.remove(scheduler)

Schedule filler speech while a long-running step blocks the tool.

While the context is open, a background scheduler waits for the session to be continuously idle for delay seconds, then plays source. With interval set, it then sleeps that many wall-clock seconds before restarting the dwell wait. interval=None (default) fires at most once.

Args

source
Either a string (spoken via session.say), or a callable (step: int) -> SpeechHandle | str | None invoked at fire time with the iteration count. Returning None skips this fire and retries on the next interval; the step counter only advances when a handle is produced. Use max_steps to cap the total number of fires.
delay
Continuous-idle dwell required before each fire. 0 = fire as soon as the session is next idle.
interval
Wall-clock cooldown after each fire. None = fire at most once.
max_steps
Maximum number of fires across the lifetime of the cm. None = no limit.