Skip to main content

Tasks and task groups

Use tasks to build complex workflows for your voice AI agents.

Overview

Tasks are focused, reusable units that perform a specific objective and return a typed result. They run inside an agent and take control of the session only until their goal is achieved. A task can define its own tools and starts executing when it's created within the context of an agent.

For multi-step flows, the framework provides TaskGroup. A task group executes an ordered sequence of tasks while allowing users to return to earlier steps for corrections. All tasks in a group share conversation context, and when the group finishes, a summarized result is returned to the agent that started it.

Tasks and task groups are core building blocks for complex voice AI workflows. Common use cases for tasks include:

  • Obtaining recording consent at the start of a call.
  • Collecting structured information such as an address or payment details.
  • Walking through a series of questions one step at a time.
  • Any discrete action that should complete and yield control.
  • Any multi-step process that can be decomposed into ordered tasks.
Prebuilt tasks

See Prebuilt tasks for ready-to-use task components for common use cases.

Defining a task

Define a task by extending the AgentTask class and specifying a result type using generics (Python) or TypeScript generics (Node.js). Use the on_enter method to begin the task's interaction with the user, and call the complete method with a result when finished. The task has full support for tools, similar to an agent.

from livekit.agents import AgentTask, function_tool
class CollectConsent(AgentTask[bool]):
def __init__(self, chat_ctx=None):
super().__init__(
instructions="""
Ask for recording consent and get a clear yes or no answer.
Be polite and professional.
""",
chat_ctx=chat_ctx,
)
async def on_enter(self) -> None:
await self.session.generate_reply(
instructions="""
Briefly introduce yourself, then ask for permission to record the call for quality assurance and training purposes.
Make it clear that they can decline.
"""
)
@function_tool
async def consent_given(self) -> None:
"""Use this when the user gives consent to record."""
self.complete(True)
@function_tool
async def consent_denied(self) -> None:
"""Use this when the user denies consent to record."""
self.complete(False)
import { llm, voice } from '@livekit/agents';
class CollectConsent extends voice.AgentTask<boolean> {
constructor(chatCtx?: llm.ChatContext) {
super({
instructions: `
Ask for recording consent and get a clear yes or no answer.
Be polite and professional.
`,
chatCtx,
tools: {
consentGiven: llm.tool({
description: 'Use this when the user gives consent to record.',
execute: async () => {
this.complete(true);
},
}),
consentDenied: llm.tool({
description: 'Use this when the user denies consent to record.',
execute: async () => {
this.complete(false);
},
}),
},
});
}
async onEnter(): Promise<void> {
const handle = this.session.generateReply({
instructions: `
Briefly introduce yourself, then ask for permission to record
the call for quality assurance and training purposes.
Make it clear that they can decline.
`,
});
await handle.waitForPlayout();
}
}

Running a task

A task must be created within the context of an active Agent, and runs automatically when it's created. The task takes control of the session until it returns a result. Await the task to receive its result.

from livekit import api
from livekit.agents import Agent, function_tool, get_job_context
class CustomerServiceAgent(Agent):
def __init__(self):
super().__init__(instructions="You are a friendly customer service representative.")
async def on_enter(self) -> None:
if await CollectConsent(chat_ctx=self.chat_ctx):
await self.session.generate_reply(instructions="Offer your assistance to the user.")
else:
await self.session.generate_reply(instructions="Inform the user that you are unable to proceed and will end the call.")
job_ctx = get_job_context()
await job_ctx.api.room.delete_room(api.DeleteRoomRequest(room=job_ctx.room.name))
import { voice } from '@livekit/agents';
class CustomerServiceAgent extends voice.Agent {
constructor() {
super({ instructions: 'You are a friendly customer service representative.' });
}
async onEnter(): Promise<void> {
const consent = await new CollectConsent(this.chatCtx).run();
if (consent) {
const handle = this.session.generateReply({
instructions: 'Offer your assistance to the user.',
});
await handle.waitForPlayout();
} else {
const handle = this.session.generateReply({
instructions: 'Inform the user that you are unable to proceed and will end the call.',
});
await handle.waitForPlayout();
this.session.shutdown({ reason: 'user-ended-call' });
}
}
}
Testing limitation

get_job_context() is unavailable in test environments and raises a RuntimeError when called. If your agent uses get_job_context(), avoid testing code paths that invoke it, or mock the call using unittest.mock.

Task results

Use any result type you want. For complex results, use a custom dataclass (Python) or interface (Node.js).

from dataclasses import dataclass
@dataclass
class ContactInfoResult:
name: str
email_address: str
phone_number: str
class GetContactInfoTask(AgentTask[ContactInfoResult]):
# ....
interface ContactInfoResult {
name: string;
emailAddress: string;
phoneNumber: string;
}
class GetContactInfoTask extends voice.AgentTask<ContactInfoResult> {
// ....
}

Unordered collection within tasks

You can use a single task to collect multiple pieces of information in any order. The following example collects strengths, weaknesses, and work style in a hypothetical interview. Candidates can answer the questions in any order:

@dataclass
class BehavioralResults:
strengths: str
weaknesses: str
work_style: str
class BehavioralTask(AgentTask[BehavioralResults]):
def __init__(self) -> None:
super().__init__(
instructions="Collect strengths, weaknesses, and work style in any order."
)
self._results = {}
@function_tool()
async def record_strengths(self, strengths_summary: str):
"""Record candidate's strengths"""
self._results["strengths"] = strengths_summary
self._check_completion()
@function_tool()
async def record_weaknesses(self, weaknesses_summary: str):
"""Record candidate's weaknesses"""
self._results["weaknesses"] = weaknesses_summary
self._check_completion()
@function_tool()
async def record_work_style(self, work_style: str):
"""Record candidate's work style"""
self._results["work_style"] = work_style
self._check_completion()
def _check_completion(self):
required_keys = {"strengths", "weaknesses", "work_style"}
if self._results.keys() == required_keys:
results = BehavioralResults(
strengths=self._results["strengths"],
weaknesses=self._results["weaknesses"],
work_style=self._results["work_style"]
)
self.complete(results)
else:
self.session.generate_reply(
instructions="Continue collecting remaining information."
)
import { llm, voice } from '@livekit/agents';
import { z } from 'zod';
interface BehavioralResults {
strengths: string;
weaknesses: string;
workStyle: string;
}
class BehavioralTask extends voice.AgentTask<BehavioralResults> {
private results: Partial<BehavioralResults> = {};
constructor() {
super({
instructions: 'Collect strengths, weaknesses, and work style in any order.',
tools: {
recordStrengths: llm.tool({
description: "Record candidate's strengths",
parameters: z.object({
strengthsSummary: z.string().describe("Summary of candidate's strengths"),
}),
execute: async ({ strengthsSummary }) => {
this.results.strengths = strengthsSummary;
this.checkCompletion();
},
}),
recordWeaknesses: llm.tool({
description: "Record candidate's weaknesses",
parameters: z.object({
weaknessesSummary: z.string().describe("Summary of candidate's weaknesses"),
}),
execute: async ({ weaknessesSummary }) => {
this.results.weaknesses = weaknessesSummary;
this.checkCompletion();
},
}),
recordWorkStyle: llm.tool({
description: "Record candidate's work style",
parameters: z.object({
workStyle: z.string().describe("Description of candidate's work style"),
}),
execute: async ({ workStyle }) => {
this.results.workStyle = workStyle;
this.checkCompletion();
},
}),
},
});
}
private checkCompletion(): void {
const { strengths, weaknesses, workStyle } = this.results;
if (strengths && weaknesses && workStyle) {
this.complete({ strengths, weaknesses, workStyle });
} else {
this.session.generateReply({
instructions: 'Continue collecting remaining information.',
});
}
}
}

Task group

Experimental feature

TaskGroup is currently experimental and the API might change in a future release.

Task groups let you build complex, user-friendly workflows that mirror real conversational behavior—where users might need to revisit or correct earlier steps without losing context. They're designed as ordered, multi-step flows that can be broken into discrete tasks, with built-in regression support for safely moving backward.

TaskGroup supports task chaining, which allows tasks to call or re-enter other tasks dynamically while maintaining the overall flow order. This lets users return to earlier steps as often as needed. All tasks in the group share the same conversation context, and when the group finishes, the summarized context can be passed back to the controlling agent.

Configuration options

TaskGroup supports the following parameters:

summarize_chat_ctxbooleanDefault: true

Whether to summarize the interactions within the TaskGroup into one message and merge into the main context.

chat_ctxllm.ChatContextDefault: llm.ChatContext

The shared chat context within the TaskGroup. Pass the current chat context to ensure conversational continuity.

return_exceptionsbooleanDefault: false

Controls error handling when a sub-task raises an unhandled exception. When set to true, the exception is added to the results dictionary and the sequence continues. When set to false, the exception propagates immediately and the sequence stops.

on_task_completed(event: TaskCompletedEvent) => Promise<void>

An async callback invoked after each sub-task completes successfully. It receives a TaskCompletedEvent with the following fields:

  • agent_task: AgentTask instance that just finished.
  • task_id: String ID of the task.
  • result: Value the task returned.

Basic usage

Initialize and set up a TaskGroup by adding tasks to it. Add tasks in the order they should be executed:

from livekit.agents.beta.workflows import GetEmailTask, TaskGroup
# Create and configure TaskGroup with the current agent's chat context
chat_ctx = self.chat_ctx
task_group = TaskGroup(chat_ctx=chat_ctx)
# Add tasks using lambda factories
task_group.add(
lambda: GetEmailTask(),
id="get_email_task",
description="Collects the user's email"
)
task_group.add(
lambda: GetCommuteTask(),
id="get_commute_task",
description="Records the user's commute flexibility"
)
# Execute the task group
results = await task_group # Returns TaskGroupResult object
task_results = results.task_results
# Access results by task ID
print(task_results)
# Output: {
# "get_email_task": GetEmailResult(email="john.doe@gmail.com"),
# "get_commute_task": CommuteResult(can_commute=True, commute_method="subway")
# }
import { beta, llm } from '@livekit/agents';
// Create and configure TaskGroup with the current agent's chat context
const chatCtx = this.chatCtx;
const taskGroup = new beta.TaskGroup({ chatCtx });
// Add tasks using arrow-function factories
taskGroup.add(() => new GetEmailTask(), {
id: 'get_email_task',
description: "Collects the user's email",
});
taskGroup.add(() => new GetCommuteTask(), {
id: 'get_commute_task',
description: "Records the user's commute flexibility",
});
// Execute the task group
const results = await taskGroup.run(); // Returns TaskGroupResult object
const taskResults = results.taskResults;
// Access results by task ID
console.log(taskResults);
// Output: {
// get_email_task: { email: "john.doe@gmail.com" },
// get_commute_task: { canCommute: true, commuteMethod: "subway" }
// }

The TaskGroup.add() method takes a task factory and an options object (Python: task_factory, id, description as arguments; Node.js: factory function and { id, description }):

  • Task factory: A callable that returns a task instance (Python: typically a lambda; Node.js: an arrow function).
  • id: A string identifier for the task used to access results.
  • description: A string description that helps the LLM understand when to regress to this task.

The factory allows for tasks to be reinitialized with the same arguments when revisited. The task id and description are passed to the LLM as task identifiers when the LLM needs to regress to a previous task. This allows the LLM to understand the task's purpose and context when revisiting it. Task chaining is supported, allowing users to return to earlier steps as often as needed.

All tasks share the same conversation context. The context is summarized and passed back to the controlling agent when the group finishes. This option can be disabled when initializing the task group:

# Disable context summarization
task_group = TaskGroup(summarize_chat_ctx=False)
// Disable context summarization
const taskGroup = new beta.TaskGroup({ summarizeChatCtx: false });

Task completion callbacks

Add a callback function to a task group to run custom logic after each task completes. The callback receives a TaskCompletedEvent containing the completed task's ID, instance, and result.

Use the on_task_completed parameter to set the callback function. The following example prints a message after each task finishes:

from livekit.agents.beta.workflows import TaskGroup, TaskCompletedEvent
async def print_task_result(event: TaskCompletedEvent) -> None:
print(f"Task '{event.task_id}' completed with result: {event.result}")
task_group = TaskGroup(
chat_ctx=self.chat_ctx,
on_task_completed=print_task_result,
)
task_group.add(
lambda: IntroTask(),
id="intro_task",
description="Collects name and introduction",
)
task_group.add(
lambda: CommuteTask(),
id="commute_task",
description="Asks about commute flexibility",
)
results = await task_group
import { beta } from '@livekit/agents';
const taskGroup = new beta.TaskGroup({
chatCtx: this.chatCtx,
onTaskCompleted: async ({ taskId, result }) => {
console.log(`Task '${taskId}' completed with result:`, result);
},
});
taskGroup.add(() => new IntroTask(), {
id: 'intro_task',
description: 'Collects name and introduction',
});
taskGroup.add(() => new CommuteTask(), {
id: 'commute_task',
description: 'Asks about commute flexibility',
});
const results = await taskGroup.run();

Complete workflow example

The following is a complete example showing how to build an interview workflow with TaskGroup. It collects basic candidate information and then asks about their commute flexibility:

from livekit.agents import AgentTask, function_tool, RunContext
from livekit.agents.beta.workflows import TaskGroup
from dataclasses import dataclass
@dataclass
class IntroResults:
name: str
intro: str
@dataclass
class CommuteResults:
can_commute: bool
commute_method: str
class IntroTask(AgentTask[IntroResults]):
def __init__(self) -> None:
super().__init__(
instructions="Welcome the candidate and collect their name and introduction."
)
async def on_enter(self) -> None:
await self.session.generate_reply(
instructions="Welcome the candidate and gather their name."
)
@function_tool()
async def record_intro(self, context: RunContext, name: str, intro_notes: str) -> None:
"""Record the candidate's name and introduction"""
context.session.userdata.candidate_name = name
results = IntroResults(name=name, intro=intro_notes)
self.complete(results)
class CommuteTask(AgentTask[CommuteResults]):
def __init__(self) -> None:
super().__init__(
instructions="Ask about the candidate's ability to commute to the office."
)
@function_tool()
async def record_commute_flexibility(
self,
context: RunContext,
can_commute: bool,
commute_method: str
) -> None:
"""Record commute flexibility and transportation method"""
results = CommuteResults(can_commute=can_commute, commute_method=commute_method)
self.complete(results)
# Set up the workflow
task_group = TaskGroup()
task_group.add(
lambda: IntroTask(),
id="intro_task",
description="Collects name and introduction"
)
task_group.add(
lambda: CommuteTask(),
id="commute_task",
description="Asks about commute flexibility"
)
# Execute and get results
results = await task_group
task_results = results.task_results
import { beta, llm, voice } from '@livekit/agents';
import { z } from 'zod';
interface IntroResults {
name: string;
intro: string;
}
interface CommuteResults {
canCommute: boolean;
commuteMethod: string;
}
interface InterviewUserData {
candidateName?: string;
}
class IntroTask extends voice.AgentTask<IntroResults, InterviewUserData> {
constructor() {
super({
instructions: 'Welcome the candidate and collect their name and introduction.',
tools: {
recordIntro: llm.tool({
description: "Record the candidate's name and introduction",
parameters: z.object({
name: z.string().describe("The candidate's name"),
introNotes: z.string().describe('Introduction notes'),
}),
execute: async ({ name, introNotes }, { ctx }) => {
ctx.userData.candidateName = name;
this.complete({ name, intro: introNotes });
},
}),
},
});
}
async onEnter(): Promise<void> {
const handle = this.session.generateReply({
instructions: 'Welcome the candidate and gather their name.',
});
await handle.waitForPlayout();
}
}
class CommuteTask extends voice.AgentTask<CommuteResults> {
constructor() {
super({
instructions: "Ask about the candidate's ability to commute to the office.",
tools: {
recordCommuteFlexibility: llm.tool({
description: 'Record commute flexibility and transportation method',
parameters: z.object({
canCommute: z.boolean().describe('Whether the candidate can commute'),
commuteMethod: z.string().describe('Transportation method'),
}),
execute: async ({ canCommute, commuteMethod }) => {
this.complete({ canCommute, commuteMethod });
},
}),
},
});
}
}
// Set up the workflow
const taskGroup = new beta.TaskGroup();
taskGroup.add(() => new IntroTask(), {
id: 'intro_task',
description: 'Collects name and introduction',
});
taskGroup.add(() => new CommuteTask(), {
id: 'commute_task',
description: 'Asks about commute flexibility',
});
// Execute and get results
const results = await taskGroup.run();
const taskResults = results.taskResults;

Best practices for testing task groups

The following sections provide specific guidelines for testing TaskGroup in both Python and Node.js SDKs.

Add a short delay before the first session.run() in Python

TaskGroup temporarily sets llm=None during task transitions. In the Python SDK, session.run() doesn't fall back to session.llm during this window, which can raise the following exception if the test calls session.run() too early:

RuntimeError: trying to generate reply without an LLM model.

Add a small delay between session.start() and the first session.run() call so the first sub-task can take over:

await session.start()
await asyncio.sleep(0.5)
await session.run(...)

This delay isn't required in Node.js because null LLM values automatically fall back to session.llm.

Parse function call arguments

Test run results return function call arguments as raw JSON strings. You must parse the JSON before accessing properties.

In Python, access the function call arguments via item.arguments:

fnc = result.expect.next_event().is_function_call(name="my_tool")
args = json.loads(fnc.event().item.arguments)
assert args["can_commute"] is True

In Node.js, access the function call arguments via item.args:

const fnc = result.expect.nextEvent().isFunctionCall({ name: 'myTool' });
const args = JSON.parse(fnc.event().item.args);
expect(args.canCommute).toBe(true);

Initialize userData when tasks depend on it

If tasks write to ctx.userData, initialize it when creating the session. Property assignments silently fail if userData is null or undefined.

AgentSession(llm=llm, userdata=MyUserdata(candidate_name=""))
new voice.AgentSession<MyUserData>({ llm, userData: {} });

Don't assert on startup output

Output generated during agent startup (for example from session.say() or session.generate_reply()) is not included in RunResult.

Structure tests to assert agent responses to user input, not startup messages.

Avoid awaiting playout inside onEnter() when triggered from a tool

If onEnter() runs inside a tool's execute function, awaiting speech playout can cause a circular wait. The tool call remains active until onEnter() returns.

Call generateReply() without awaiting it:

async def on_enter(self) -> None:
self.session.generate_reply(instructions="Welcome the user.") # don't await
async onEnter(): Promise<void> {
this.session.generateReply({ instructions: 'Welcome the user.' }); // no await
}

Consider multi-turn LLM behavior

An LLM might not call a task's completion tool on the first turn. It might require multiple exchanges before completing the task.

Prefer containsFunctionCall() over nextEvent() for more resilient tests, and use generous timeouts:

  • containsFunctionCall() checks whether the call occurred anywhere in the response.
  • nextEvent() only checks the immediate next event.
result.expect.contains_function_call(name="consent_given")
result.expect.containsFunctionCall({ name: 'consentGiven' });

Increase cleanup timeouts in Node.js

Session cleanup can be slow when a TaskGroup is mid-flow. Set an explicit timeout in your cleanup hook to avoid afterEach failures:

afterEach(async () => {
await session?.close();
}, 30000);

Example tests for task group

Examples

The following examples show tasks and task groups in production-style agents:

Additional resources

The following topics provide more information on creating complex workflows for your voice AI agents.