Overview
Tasks are focused, reusable units that perform a specific objective and return a typed result. They run inside an agent and take control of the session only until their goal is achieved. A task can define its own tools and starts executing when it's created within the context of an agent.
For multi-step flows, the framework provides TaskGroup. A task group executes an ordered sequence of tasks while allowing users to return to earlier steps for corrections. All tasks in a group share conversation context, and when the group finishes, a summarized result is returned to the agent that started it.
Tasks and task groups are core building blocks for complex voice AI workflows. Common use cases for tasks include:
- Obtaining recording consent at the start of a call.
- Collecting structured information such as an address or payment details.
- Walking through a series of questions one step at a time.
- Any discrete action that should complete and yield control.
- Any multi-step process that can be decomposed into ordered tasks.
See Prebuilt tasks for ready-to-use task components for common use cases.
Defining a task
Define a task by extending the AgentTask class and specifying a result type using generics (Python) or TypeScript generics (Node.js). Use the on_enter method to begin the task's interaction with the user, and call the complete method with a result when finished. The task has full support for tools, similar to an agent.
from livekit.agents import AgentTask, function_toolclass CollectConsent(AgentTask[bool]):def __init__(self, chat_ctx=None):super().__init__(instructions="""Ask for recording consent and get a clear yes or no answer.Be polite and professional.""",chat_ctx=chat_ctx,)async def on_enter(self) -> None:await self.session.generate_reply(instructions="""Briefly introduce yourself, then ask for permission to record the call for quality assurance and training purposes.Make it clear that they can decline.""")@function_toolasync def consent_given(self) -> None:"""Use this when the user gives consent to record."""self.complete(True)@function_toolasync def consent_denied(self) -> None:"""Use this when the user denies consent to record."""self.complete(False)
import { llm, voice } from '@livekit/agents';class CollectConsent extends voice.AgentTask<boolean> {constructor(chatCtx?: llm.ChatContext) {super({instructions: `Ask for recording consent and get a clear yes or no answer.Be polite and professional.`,chatCtx,tools: {consentGiven: llm.tool({description: 'Use this when the user gives consent to record.',execute: async () => {this.complete(true);},}),consentDenied: llm.tool({description: 'Use this when the user denies consent to record.',execute: async () => {this.complete(false);},}),},});}async onEnter(): Promise<void> {const handle = this.session.generateReply({instructions: `Briefly introduce yourself, then ask for permission to recordthe call for quality assurance and training purposes.Make it clear that they can decline.`,});await handle.waitForPlayout();}}
Running a task
A task must be created within the context of an active Agent, and runs automatically when it's created. The task takes control of the session until it returns a result. Await the task to receive its result.
from livekit import apifrom livekit.agents import Agent, function_tool, get_job_contextclass CustomerServiceAgent(Agent):def __init__(self):super().__init__(instructions="You are a friendly customer service representative.")async def on_enter(self) -> None:if await CollectConsent(chat_ctx=self.chat_ctx):await self.session.generate_reply(instructions="Offer your assistance to the user.")else:await self.session.generate_reply(instructions="Inform the user that you are unable to proceed and will end the call.")job_ctx = get_job_context()await job_ctx.api.room.delete_room(api.DeleteRoomRequest(room=job_ctx.room.name))
import { voice } from '@livekit/agents';class CustomerServiceAgent extends voice.Agent {constructor() {super({ instructions: 'You are a friendly customer service representative.' });}async onEnter(): Promise<void> {const consent = await new CollectConsent(this.chatCtx).run();if (consent) {const handle = this.session.generateReply({instructions: 'Offer your assistance to the user.',});await handle.waitForPlayout();} else {const handle = this.session.generateReply({instructions: 'Inform the user that you are unable to proceed and will end the call.',});await handle.waitForPlayout();this.session.shutdown({ reason: 'user-ended-call' });}}}
get_job_context() is unavailable in test environments and raises a RuntimeError when called. If your agent uses get_job_context(), avoid testing code paths that invoke it, or mock the call using unittest.mock.
Task results
Use any result type you want. For complex results, use a custom dataclass (Python) or interface (Node.js).
from dataclasses import dataclass@dataclassclass ContactInfoResult:name: stremail_address: strphone_number: strclass GetContactInfoTask(AgentTask[ContactInfoResult]):# ....
interface ContactInfoResult {name: string;emailAddress: string;phoneNumber: string;}class GetContactInfoTask extends voice.AgentTask<ContactInfoResult> {// ....}
Unordered collection within tasks
You can use a single task to collect multiple pieces of information in any order. The following example collects strengths, weaknesses, and work style in a hypothetical interview. Candidates can answer the questions in any order:
@dataclassclass BehavioralResults:strengths: strweaknesses: strwork_style: strclass BehavioralTask(AgentTask[BehavioralResults]):def __init__(self) -> None:super().__init__(instructions="Collect strengths, weaknesses, and work style in any order.")self._results = {}@function_tool()async def record_strengths(self, strengths_summary: str):"""Record candidate's strengths"""self._results["strengths"] = strengths_summaryself._check_completion()@function_tool()async def record_weaknesses(self, weaknesses_summary: str):"""Record candidate's weaknesses"""self._results["weaknesses"] = weaknesses_summaryself._check_completion()@function_tool()async def record_work_style(self, work_style: str):"""Record candidate's work style"""self._results["work_style"] = work_styleself._check_completion()def _check_completion(self):required_keys = {"strengths", "weaknesses", "work_style"}if self._results.keys() == required_keys:results = BehavioralResults(strengths=self._results["strengths"],weaknesses=self._results["weaknesses"],work_style=self._results["work_style"])self.complete(results)else:self.session.generate_reply(instructions="Continue collecting remaining information.")
import { llm, voice } from '@livekit/agents';import { z } from 'zod';interface BehavioralResults {strengths: string;weaknesses: string;workStyle: string;}class BehavioralTask extends voice.AgentTask<BehavioralResults> {private results: Partial<BehavioralResults> = {};constructor() {super({instructions: 'Collect strengths, weaknesses, and work style in any order.',tools: {recordStrengths: llm.tool({description: "Record candidate's strengths",parameters: z.object({strengthsSummary: z.string().describe("Summary of candidate's strengths"),}),execute: async ({ strengthsSummary }) => {this.results.strengths = strengthsSummary;this.checkCompletion();},}),recordWeaknesses: llm.tool({description: "Record candidate's weaknesses",parameters: z.object({weaknessesSummary: z.string().describe("Summary of candidate's weaknesses"),}),execute: async ({ weaknessesSummary }) => {this.results.weaknesses = weaknessesSummary;this.checkCompletion();},}),recordWorkStyle: llm.tool({description: "Record candidate's work style",parameters: z.object({workStyle: z.string().describe("Description of candidate's work style"),}),execute: async ({ workStyle }) => {this.results.workStyle = workStyle;this.checkCompletion();},}),},});}private checkCompletion(): void {const { strengths, weaknesses, workStyle } = this.results;if (strengths && weaknesses && workStyle) {this.complete({ strengths, weaknesses, workStyle });} else {this.session.generateReply({instructions: 'Continue collecting remaining information.',});}}}
Task group
TaskGroup is currently experimental and the API might change in a future release.
Task groups let you build complex, user-friendly workflows that mirror real conversational behavior—where users might need to revisit or correct earlier steps without losing context. They're designed as ordered, multi-step flows that can be broken into discrete tasks, with built-in regression support for safely moving backward.
TaskGroup supports task chaining, which allows tasks to call or re-enter other tasks dynamically while maintaining the overall flow order. This lets users return to earlier steps as often as needed. All tasks in the group share the same conversation context, and when the group finishes, the summarized context can be passed back to the controlling agent.
Configuration options
TaskGroup supports the following parameters:
summarize_chat_ctxbooleanDefault: trueWhether to summarize the interactions within the TaskGroup into one message and merge into the main context.
chat_ctxllm.ChatContextDefault: llm.ChatContextThe shared chat context within the TaskGroup. Pass the current chat context to ensure conversational continuity.
return_exceptionsbooleanDefault: falseControls error handling when a sub-task raises an unhandled exception. When set to true, the exception is added to the results dictionary and the sequence continues. When set to false, the exception propagates immediately and the sequence stops.
on_task_completed(event: TaskCompletedEvent) => Promise<void>An async callback invoked after each sub-task completes successfully. It receives a TaskCompletedEvent with the following fields:
agent_task:AgentTaskinstance that just finished.task_id: String ID of the task.result: Value the task returned.
Basic usage
Initialize and set up a TaskGroup by adding tasks to it. Add tasks in the order they should be executed:
from livekit.agents.beta.workflows import GetEmailTask, TaskGroup# Create and configure TaskGroup with the current agent's chat contextchat_ctx = self.chat_ctxtask_group = TaskGroup(chat_ctx=chat_ctx)# Add tasks using lambda factoriestask_group.add(lambda: GetEmailTask(),id="get_email_task",description="Collects the user's email")task_group.add(lambda: GetCommuteTask(),id="get_commute_task",description="Records the user's commute flexibility")# Execute the task groupresults = await task_group # Returns TaskGroupResult objecttask_results = results.task_results# Access results by task IDprint(task_results)# Output: {# "get_email_task": GetEmailResult(email="john.doe@gmail.com"),# "get_commute_task": CommuteResult(can_commute=True, commute_method="subway")# }
import { beta, llm } from '@livekit/agents';// Create and configure TaskGroup with the current agent's chat contextconst chatCtx = this.chatCtx;const taskGroup = new beta.TaskGroup({ chatCtx });// Add tasks using arrow-function factoriestaskGroup.add(() => new GetEmailTask(), {id: 'get_email_task',description: "Collects the user's email",});taskGroup.add(() => new GetCommuteTask(), {id: 'get_commute_task',description: "Records the user's commute flexibility",});// Execute the task groupconst results = await taskGroup.run(); // Returns TaskGroupResult objectconst taskResults = results.taskResults;// Access results by task IDconsole.log(taskResults);// Output: {// get_email_task: { email: "john.doe@gmail.com" },// get_commute_task: { canCommute: true, commuteMethod: "subway" }// }
The TaskGroup.add() method takes a task factory and an options object (Python: task_factory, id, description as arguments; Node.js: factory function and { id, description }):
- Task factory: A callable that returns a task instance (Python: typically a lambda; Node.js: an arrow function).
- id: A string identifier for the task used to access results.
- description: A string description that helps the LLM understand when to regress to this task.
The factory allows for tasks to be reinitialized with the same arguments when revisited. The task id and description are passed to the LLM as task identifiers when the LLM needs to regress to a previous task. This allows the LLM to understand the task's purpose and context when revisiting it. Task chaining is supported, allowing users to return to earlier steps as often as needed.
All tasks share the same conversation context. The context is summarized and passed back to the controlling agent when the group finishes. This option can be disabled when initializing the task group:
# Disable context summarizationtask_group = TaskGroup(summarize_chat_ctx=False)
// Disable context summarizationconst taskGroup = new beta.TaskGroup({ summarizeChatCtx: false });
Task completion callbacks
Add a callback function to a task group to run custom logic after each task completes. The callback receives a TaskCompletedEvent containing the completed task's ID, instance, and result.
Use the on_task_completed parameter to set the callback function. The following example prints a message after each task finishes:
from livekit.agents.beta.workflows import TaskGroup, TaskCompletedEventasync def print_task_result(event: TaskCompletedEvent) -> None:print(f"Task '{event.task_id}' completed with result: {event.result}")task_group = TaskGroup(chat_ctx=self.chat_ctx,on_task_completed=print_task_result,)task_group.add(lambda: IntroTask(),id="intro_task",description="Collects name and introduction",)task_group.add(lambda: CommuteTask(),id="commute_task",description="Asks about commute flexibility",)results = await task_group
import { beta } from '@livekit/agents';const taskGroup = new beta.TaskGroup({chatCtx: this.chatCtx,onTaskCompleted: async ({ taskId, result }) => {console.log(`Task '${taskId}' completed with result:`, result);},});taskGroup.add(() => new IntroTask(), {id: 'intro_task',description: 'Collects name and introduction',});taskGroup.add(() => new CommuteTask(), {id: 'commute_task',description: 'Asks about commute flexibility',});const results = await taskGroup.run();
Complete workflow example
The following is a complete example showing how to build an interview workflow with TaskGroup. It collects basic candidate information and then asks about their commute flexibility:
from livekit.agents import AgentTask, function_tool, RunContextfrom livekit.agents.beta.workflows import TaskGroupfrom dataclasses import dataclass@dataclassclass IntroResults:name: strintro: str@dataclassclass CommuteResults:can_commute: boolcommute_method: strclass IntroTask(AgentTask[IntroResults]):def __init__(self) -> None:super().__init__(instructions="Welcome the candidate and collect their name and introduction.")async def on_enter(self) -> None:await self.session.generate_reply(instructions="Welcome the candidate and gather their name.")@function_tool()async def record_intro(self, context: RunContext, name: str, intro_notes: str) -> None:"""Record the candidate's name and introduction"""context.session.userdata.candidate_name = nameresults = IntroResults(name=name, intro=intro_notes)self.complete(results)class CommuteTask(AgentTask[CommuteResults]):def __init__(self) -> None:super().__init__(instructions="Ask about the candidate's ability to commute to the office.")@function_tool()async def record_commute_flexibility(self,context: RunContext,can_commute: bool,commute_method: str) -> None:"""Record commute flexibility and transportation method"""results = CommuteResults(can_commute=can_commute, commute_method=commute_method)self.complete(results)# Set up the workflowtask_group = TaskGroup()task_group.add(lambda: IntroTask(),id="intro_task",description="Collects name and introduction")task_group.add(lambda: CommuteTask(),id="commute_task",description="Asks about commute flexibility")# Execute and get resultsresults = await task_grouptask_results = results.task_results
import { beta, llm, voice } from '@livekit/agents';import { z } from 'zod';interface IntroResults {name: string;intro: string;}interface CommuteResults {canCommute: boolean;commuteMethod: string;}interface InterviewUserData {candidateName?: string;}class IntroTask extends voice.AgentTask<IntroResults, InterviewUserData> {constructor() {super({instructions: 'Welcome the candidate and collect their name and introduction.',tools: {recordIntro: llm.tool({description: "Record the candidate's name and introduction",parameters: z.object({name: z.string().describe("The candidate's name"),introNotes: z.string().describe('Introduction notes'),}),execute: async ({ name, introNotes }, { ctx }) => {ctx.userData.candidateName = name;this.complete({ name, intro: introNotes });},}),},});}async onEnter(): Promise<void> {const handle = this.session.generateReply({instructions: 'Welcome the candidate and gather their name.',});await handle.waitForPlayout();}}class CommuteTask extends voice.AgentTask<CommuteResults> {constructor() {super({instructions: "Ask about the candidate's ability to commute to the office.",tools: {recordCommuteFlexibility: llm.tool({description: 'Record commute flexibility and transportation method',parameters: z.object({canCommute: z.boolean().describe('Whether the candidate can commute'),commuteMethod: z.string().describe('Transportation method'),}),execute: async ({ canCommute, commuteMethod }) => {this.complete({ canCommute, commuteMethod });},}),},});}}// Set up the workflowconst taskGroup = new beta.TaskGroup();taskGroup.add(() => new IntroTask(), {id: 'intro_task',description: 'Collects name and introduction',});taskGroup.add(() => new CommuteTask(), {id: 'commute_task',description: 'Asks about commute flexibility',});// Execute and get resultsconst results = await taskGroup.run();const taskResults = results.taskResults;
Best practices for testing task groups
The following sections provide specific guidelines for testing TaskGroup in both Python and Node.js SDKs.
Add a short delay before the first session.run() in Python
TaskGroup temporarily sets llm=None during task transitions. In the Python SDK, session.run() doesn't fall back to session.llm during this window, which can raise the following exception if the test calls session.run() too early:
RuntimeError: trying to generate reply without an LLM model.
Add a small delay between session.start() and the first session.run() call so the first sub-task can take over:
await session.start()await asyncio.sleep(0.5)await session.run(...)
This delay isn't required in Node.js because null LLM values automatically fall back to session.llm.
Parse function call arguments
Test run results return function call arguments as raw JSON strings. You must parse the JSON before accessing properties.
In Python, access the function call arguments via item.arguments:
fnc = result.expect.next_event().is_function_call(name="my_tool")args = json.loads(fnc.event().item.arguments)assert args["can_commute"] is True
In Node.js, access the function call arguments via item.args:
const fnc = result.expect.nextEvent().isFunctionCall({ name: 'myTool' });const args = JSON.parse(fnc.event().item.args);expect(args.canCommute).toBe(true);
Initialize userData when tasks depend on it
If tasks write to ctx.userData, initialize it when creating the session. Property assignments silently fail if userData is null or undefined.
AgentSession(llm=llm, userdata=MyUserdata(candidate_name=""))
new voice.AgentSession<MyUserData>({ llm, userData: {} });
Don't assert on startup output
Output generated during agent startup (for example from session.say() or session.generate_reply()) is not included in RunResult.
Structure tests to assert agent responses to user input, not startup messages.
Avoid awaiting playout inside onEnter() when triggered from a tool
If onEnter() runs inside a tool's execute function, awaiting speech playout can cause a circular wait. The tool call remains active until onEnter() returns.
Call generateReply() without awaiting it:
async def on_enter(self) -> None:self.session.generate_reply(instructions="Welcome the user.") # don't await
async onEnter(): Promise<void> {this.session.generateReply({ instructions: 'Welcome the user.' }); // no await}
Consider multi-turn LLM behavior
An LLM might not call a task's completion tool on the first turn. It might require multiple exchanges before completing the task.
Prefer containsFunctionCall() over nextEvent() for more resilient tests, and use generous timeouts:
containsFunctionCall()checks whether the call occurred anywhere in the response.nextEvent()only checks the immediate next event.
result.expect.contains_function_call(name="consent_given")
result.expect.containsFunctionCall({ name: 'consentGiven' });
Increase cleanup timeouts in Node.js
Session cleanup can be slow when a TaskGroup is mid-flow. Set an explicit timeout in your cleanup hook to avoid afterEach failures:
afterEach(async () => {await session?.close();}, 30000);
Example tests for task group
Examples
The following examples show tasks and task groups in production-style agents:
Survey agent (Python)
Interview screening agent that runs a TaskGroup of five tasks: intro, email capture, commute, experience, and behavioral. Uses session userdata, a disqualify tool, CSV export, and post-interview LLM evaluation.
Basic agent task (Node.js)
Survey agent that runs reusable AgentTasks from onEnter and from tools. Uses a generic info-collection task, then shows handoff to a separate weather agent and back.
Basic task group (Node.js)
Onboarding agent that starts a two-step TaskGroup (name then email) via a tool. Demonstrates onTaskCompleted, context summarization, and regression so users can correct earlier answers (e.g. "change my name to …").
Additional resources
The following topics provide more information on creating complex workflows for your voice AI agents.