Module livekit.agents.beta.workflows

Sub-modules

livekit.agents.beta.workflows.address
livekit.agents.beta.workflows.credit_card
livekit.agents.beta.workflows.dob
livekit.agents.beta.workflows.dtmf_inputs
livekit.agents.beta.workflows.email_address
livekit.agents.beta.workflows.name
livekit.agents.beta.workflows.phone_number
livekit.agents.beta.workflows.task_group
livekit.agents.beta.workflows.utils
livekit.agents.beta.workflows.warm_transfer

Classes

class GetAddressResult (address: str)
Expand source code
@dataclass
class GetAddressResult:
    address: str

GetAddressResult(address: 'str')

Instance variables

var address : str
class GetAddressTask (*,
instructions: NotGivenOr[InstructionParts | Instructions | str] = NOT_GIVEN,
chat_ctx: NotGivenOr[llm.ChatContext] = NOT_GIVEN,
turn_detection: NotGivenOr[TurnDetectionMode | None] = NOT_GIVEN,
tools: NotGivenOr[list[llm.Tool | llm.Toolset]] = NOT_GIVEN,
stt: NotGivenOr[stt.STT | None] = NOT_GIVEN,
vad: NotGivenOr[vad.VAD | None] = NOT_GIVEN,
llm: NotGivenOr[llm.LLM | llm.RealtimeModel | None] = NOT_GIVEN,
tts: NotGivenOr[tts.TTS | None] = NOT_GIVEN,
allow_interruptions: NotGivenOr[bool] = NOT_GIVEN,
require_confirmation: NotGivenOr[bool] = NOT_GIVEN,
extra_instructions: str = '')
Expand source code
class GetAddressTask(AgentTask[GetAddressResult]):
    def __init__(
        self,
        *,
        instructions: NotGivenOr[InstructionParts | Instructions | str] = NOT_GIVEN,
        chat_ctx: NotGivenOr[llm.ChatContext] = NOT_GIVEN,
        turn_detection: NotGivenOr[TurnDetectionMode | None] = NOT_GIVEN,
        tools: NotGivenOr[list[llm.Tool | llm.Toolset]] = NOT_GIVEN,
        stt: NotGivenOr[stt.STT | None] = NOT_GIVEN,
        vad: NotGivenOr[vad.VAD | None] = NOT_GIVEN,
        llm: NotGivenOr[llm.LLM | llm.RealtimeModel | None] = NOT_GIVEN,
        tts: NotGivenOr[tts.TTS | None] = NOT_GIVEN,
        allow_interruptions: NotGivenOr[bool] = NOT_GIVEN,
        require_confirmation: NotGivenOr[bool] = NOT_GIVEN,
        # deprecated
        extra_instructions: str = "",
    ) -> None:
        if not is_given(instructions):
            instructions = InstructionParts(persona=PERSONA, extra=extra_instructions)
        elif extra_instructions:
            logger.warning("`extra_instructions` will be ignored when `instructions` is provided")

        if isinstance(instructions, InstructionParts):
            instructions = Instructions(INSTRUCTIONS_TEMPLATE).format(
                persona=instructions.persona if is_given(instructions.persona) else PERSONA,
                extra=instructions.extra,
                _modality_specific=Instructions(audio=AUDIO_SPECIFIC, text=TEXT_SPECIFIC),
                _confirmation=Instructions(
                    # confirmation is enabled by default for audio, disabled by default for text
                    audio=CONFIRMATION_INSTRUCTION if require_confirmation is not False else "",
                    text=CONFIRMATION_INSTRUCTION if require_confirmation is True else "",
                ),
            )

        assert is_given(instructions)  # for type checking
        super().__init__(
            instructions=instructions,
            chat_ctx=chat_ctx,
            turn_detection=turn_detection,
            tools=tools or [],
            stt=stt,
            vad=vad,
            llm=llm,
            tts=tts,
            allow_interruptions=allow_interruptions,
        )

        self._current_address = ""
        self._require_confirmation = require_confirmation

    async def on_enter(self) -> None:
        self.session.generate_reply(instructions="Ask the user to provide their address.")

    @function_tool()
    async def update_address(
        self, street_address: str, unit_number: str, locality: str, country: str, ctx: RunContext
    ) -> str | None:
        """Update the address provided by the user.

        Args:
            street_address (str): Dependent on country, may include fields like house number, street name, block, or district
            unit_number (str): The unit number, for example Floor 1 or Apartment 12. If there is no unit number, return ''
            locality (str): Dependent on country, may include fields like city, zip code, or province
            country (str): The country the user lives in spelled out fully
        """
        address_fields = (
            [street_address, unit_number, locality, country]
            if unit_number.strip()
            else [street_address, locality, country]
        )
        address = " ".join(address_fields)
        self._current_address = address

        if not self._confirmation_required(ctx):
            if not self.done():
                self.complete(GetAddressResult(address=self._current_address))
            return None

        confirm_tool = self._build_confirm_tool(address=address)
        current_tools = [t for t in self.tools if t.id != "confirm_address"]
        current_tools.append(confirm_tool)
        await self.update_tools(current_tools)

        return (
            f"The address has been updated to {address}\n"
            f"Repeat the address field by field: {address_fields} if needed\n"
            f"Prompt the user for confirmation, do not call `confirm_address` directly"
        )

    def _build_confirm_tool(self, *, address: str) -> llm.FunctionTool:
        # confirm tool is only injected after update_address is called,
        # preventing the LLM from hallucinating a confirmation without user input
        @function_tool()
        async def confirm_address() -> None:
            """Call after the user confirms the address is correct."""
            if address != self._current_address:
                self.session.generate_reply(
                    instructions="The address has changed since confirmation was requested, ask the user to confirm the updated address."
                )
                return

            if not self.done():
                self.complete(GetAddressResult(address=address))

        return confirm_address

    @function_tool(flags=ToolFlag.IGNORE_ON_ENTER)
    async def decline_address_capture(self, reason: str) -> None:
        """Handles the case when the user explicitly declines to provide an address.

        Args:
            reason: A short explanation of why the user declined to provide the address
        """
        if not self.done():
            self.complete(ToolError(f"couldn't get the address: {reason}"))

    def _confirmation_required(self, ctx: RunContext) -> bool:
        if is_given(self._require_confirmation):
            return self._require_confirmation
        return ctx.speech_handle.input_details.modality == "audio"

Abstract base class for generic types.

On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::

class Mapping[KT, VT]:
    def __getitem__(self, key: KT) -> VT:
        ...
    # Etc.

On older versions of Python, however, generic classes have to explicitly inherit from Generic.

After a class has been declared to be generic, it can then be used as follows::

def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT:
    try:
        return mapping[key]
    except KeyError:
        return default

Ancestors

  • livekit.agents.voice.agent.AgentTask
  • livekit.agents.voice.agent.Agent
  • typing.Generic

Methods

async def decline_address_capture(self, reason: str) ‑> None
Expand source code
@function_tool(flags=ToolFlag.IGNORE_ON_ENTER)
async def decline_address_capture(self, reason: str) -> None:
    """Handles the case when the user explicitly declines to provide an address.

    Args:
        reason: A short explanation of why the user declined to provide the address
    """
    if not self.done():
        self.complete(ToolError(f"couldn't get the address: {reason}"))

Handles the case when the user explicitly declines to provide an address.

Args

reason
A short explanation of why the user declined to provide the address
async def on_enter(self) ‑> None
Expand source code
async def on_enter(self) -> None:
    self.session.generate_reply(instructions="Ask the user to provide their address.")

Called when the task is entered

async def update_address(self,
street_address: str,
unit_number: str,
locality: str,
country: str,
ctx: RunContext) ‑> str | None
Expand source code
@function_tool()
async def update_address(
    self, street_address: str, unit_number: str, locality: str, country: str, ctx: RunContext
) -> str | None:
    """Update the address provided by the user.

    Args:
        street_address (str): Dependent on country, may include fields like house number, street name, block, or district
        unit_number (str): The unit number, for example Floor 1 or Apartment 12. If there is no unit number, return ''
        locality (str): Dependent on country, may include fields like city, zip code, or province
        country (str): The country the user lives in spelled out fully
    """
    address_fields = (
        [street_address, unit_number, locality, country]
        if unit_number.strip()
        else [street_address, locality, country]
    )
    address = " ".join(address_fields)
    self._current_address = address

    if not self._confirmation_required(ctx):
        if not self.done():
            self.complete(GetAddressResult(address=self._current_address))
        return None

    confirm_tool = self._build_confirm_tool(address=address)
    current_tools = [t for t in self.tools if t.id != "confirm_address"]
    current_tools.append(confirm_tool)
    await self.update_tools(current_tools)

    return (
        f"The address has been updated to {address}\n"
        f"Repeat the address field by field: {address_fields} if needed\n"
        f"Prompt the user for confirmation, do not call `confirm_address` directly"
    )

Update the address provided by the user.

Args

street_address : str
Dependent on country, may include fields like house number, street name, block, or district
unit_number : str
The unit number, for example Floor 1 or Apartment 12. If there is no unit number, return ''
locality : str
Dependent on country, may include fields like city, zip code, or province
country : str
The country the user lives in spelled out fully
class GetCreditCardResult (cardholder_name: str,
issuer: str,
card_number: str,
security_code: str,
expiration_date: str)
Expand source code
@dataclass
class GetCreditCardResult:
    cardholder_name: str
    issuer: str
    card_number: str
    security_code: str
    expiration_date: str

GetCreditCardResult(cardholder_name: 'str', issuer: 'str', card_number: 'str', security_code: 'str', expiration_date: 'str')

Instance variables

var card_number : str
var cardholder_name : str
var expiration_date : str
var issuer : str
var security_code : str
class GetCreditCardTask (chat_ctx: NotGivenOr[llm.ChatContext] = NOT_GIVEN,
turn_detection: NotGivenOr[TurnDetectionMode | None] = NOT_GIVEN,
tools: NotGivenOr[list[llm.Tool | llm.Toolset]] = NOT_GIVEN,
stt: NotGivenOr[stt.STT | None] = NOT_GIVEN,
vad: NotGivenOr[vad.VAD | None] = NOT_GIVEN,
llm: NotGivenOr[llm.LLM | llm.RealtimeModel | None] = NOT_GIVEN,
tts: NotGivenOr[tts.TTS | None] = NOT_GIVEN,
allow_interruptions: NotGivenOr[bool] = NOT_GIVEN,
require_confirmation: NotGivenOr[bool] = NOT_GIVEN)
Expand source code
class GetCreditCardTask(AgentTask[GetCreditCardResult]):
    def __init__(
        self,
        chat_ctx: NotGivenOr[llm.ChatContext] = NOT_GIVEN,
        turn_detection: NotGivenOr[TurnDetectionMode | None] = NOT_GIVEN,
        tools: NotGivenOr[list[llm.Tool | llm.Toolset]] = NOT_GIVEN,
        stt: NotGivenOr[stt.STT | None] = NOT_GIVEN,
        vad: NotGivenOr[vad.VAD | None] = NOT_GIVEN,
        llm: NotGivenOr[llm.LLM | llm.RealtimeModel | None] = NOT_GIVEN,
        tts: NotGivenOr[tts.TTS | None] = NOT_GIVEN,
        allow_interruptions: NotGivenOr[bool] = NOT_GIVEN,
        require_confirmation: NotGivenOr[bool] = NOT_GIVEN,
    ) -> None:
        super().__init__(
            instructions="*none*",
            chat_ctx=chat_ctx,
            turn_detection=turn_detection,
            tools=tools or [],
            stt=stt,
            vad=vad,
            llm=llm,
            tts=tts,
            allow_interruptions=allow_interruptions,
        )
        self._require_confirmation = require_confirmation

    async def on_enter(self) -> None:
        while not self.done():
            task_group = TaskGroup()
            task_group.add(
                lambda: GetNameTask(
                    last_name=True,
                    extra_instructions="This is in the context of credit card information collection, ask specifically for the full name listed on it.",
                    require_confirmation=self._require_confirmation,
                ),
                id="cardholder_name_task",
                description="Collects the cardholder's full name",
            )
            task_group.add(
                lambda: GetCardNumberTask(require_confirmation=self._require_confirmation),
                id="card_number_task",
                description="Collects the user's card number",
            )
            task_group.add(
                lambda: GetSecurityCodeTask(require_confirmation=self._require_confirmation),
                id="security_code_task",
                description="Collects the card's security code",
            )
            task_group.add(
                lambda: GetExpirationDateTask(require_confirmation=self._require_confirmation),
                id="expiration_date_task",
                description="Collects the card's expiration date",
            )
            try:
                results = await task_group
                name = f"{results.task_results['cardholder_name_task'].first_name} {results.task_results['cardholder_name_task'].last_name}"
                result = GetCreditCardResult(
                    cardholder_name=name,
                    issuer=results.task_results["card_number_task"].issuer,
                    card_number=results.task_results["card_number_task"].card_number,
                    security_code=results.task_results["security_code_task"].security_code,
                    expiration_date=results.task_results["expiration_date_task"].date,
                )
                self.complete(result)
            except CardCollectionRestartError:
                continue
            except (CardCaptureDeclinedError, ToolError) as e:
                self.complete(e)

Abstract base class for generic types.

On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::

class Mapping[KT, VT]:
    def __getitem__(self, key: KT) -> VT:
        ...
    # Etc.

On older versions of Python, however, generic classes have to explicitly inherit from Generic.

After a class has been declared to be generic, it can then be used as follows::

def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT:
    try:
        return mapping[key]
    except KeyError:
        return default

Ancestors

  • livekit.agents.voice.agent.AgentTask
  • livekit.agents.voice.agent.Agent
  • typing.Generic

Methods

async def on_enter(self) ‑> None
Expand source code
async def on_enter(self) -> None:
    while not self.done():
        task_group = TaskGroup()
        task_group.add(
            lambda: GetNameTask(
                last_name=True,
                extra_instructions="This is in the context of credit card information collection, ask specifically for the full name listed on it.",
                require_confirmation=self._require_confirmation,
            ),
            id="cardholder_name_task",
            description="Collects the cardholder's full name",
        )
        task_group.add(
            lambda: GetCardNumberTask(require_confirmation=self._require_confirmation),
            id="card_number_task",
            description="Collects the user's card number",
        )
        task_group.add(
            lambda: GetSecurityCodeTask(require_confirmation=self._require_confirmation),
            id="security_code_task",
            description="Collects the card's security code",
        )
        task_group.add(
            lambda: GetExpirationDateTask(require_confirmation=self._require_confirmation),
            id="expiration_date_task",
            description="Collects the card's expiration date",
        )
        try:
            results = await task_group
            name = f"{results.task_results['cardholder_name_task'].first_name} {results.task_results['cardholder_name_task'].last_name}"
            result = GetCreditCardResult(
                cardholder_name=name,
                issuer=results.task_results["card_number_task"].issuer,
                card_number=results.task_results["card_number_task"].card_number,
                security_code=results.task_results["security_code_task"].security_code,
                expiration_date=results.task_results["expiration_date_task"].date,
            )
            self.complete(result)
        except CardCollectionRestartError:
            continue
        except (CardCaptureDeclinedError, ToolError) as e:
            self.complete(e)

Called when the task is entered

class GetDOBResult (date_of_birth: date, time_of_birth: time | None = None)
Expand source code
@dataclass
class GetDOBResult:
    date_of_birth: date
    time_of_birth: time | None = None

GetDOBResult(date_of_birth: 'date', time_of_birth: 'time | None' = None)

Instance variables

var date_of_birth : datetime.date
var time_of_birth : datetime.time | None
class GetDOBTask (extra_instructions: str = '',
include_time: bool = False,
chat_ctx: NotGivenOr[llm.ChatContext] = NOT_GIVEN,
turn_detection: NotGivenOr[TurnDetectionMode | None] = NOT_GIVEN,
tools: NotGivenOr[list[llm.Tool | llm.Toolset]] = NOT_GIVEN,
stt: NotGivenOr[stt.STT | None] = NOT_GIVEN,
vad: NotGivenOr[vad.VAD | None] = NOT_GIVEN,
llm: NotGivenOr[llm.LLM | llm.RealtimeModel | None] = NOT_GIVEN,
tts: NotGivenOr[tts.TTS | None] = NOT_GIVEN,
allow_interruptions: NotGivenOr[bool] = NOT_GIVEN,
require_confirmation: NotGivenOr[bool] = NOT_GIVEN)
Expand source code
class GetDOBTask(AgentTask[GetDOBResult]):
    def __init__(
        self,
        extra_instructions: str = "",
        include_time: bool = False,
        chat_ctx: NotGivenOr[llm.ChatContext] = NOT_GIVEN,
        turn_detection: NotGivenOr[TurnDetectionMode | None] = NOT_GIVEN,
        tools: NotGivenOr[list[llm.Tool | llm.Toolset]] = NOT_GIVEN,
        stt: NotGivenOr[stt.STT | None] = NOT_GIVEN,
        vad: NotGivenOr[vad.VAD | None] = NOT_GIVEN,
        llm: NotGivenOr[llm.LLM | llm.RealtimeModel | None] = NOT_GIVEN,
        tts: NotGivenOr[tts.TTS | None] = NOT_GIVEN,
        allow_interruptions: NotGivenOr[bool] = NOT_GIVEN,
        require_confirmation: NotGivenOr[bool] = NOT_GIVEN,
    ) -> None:
        time_instructions = (
            ""
            if not include_time
            else (
                "Also ask for and capture the time of birth if the user knows it. "
                "The time is optional - if the user doesn't know it, proceed without it.\n"
            )
        )
        confirmation_instructions = (
            "Call `confirm_dob` after the user confirmed the date of birth is correct."
        )
        extra = extra_instructions if extra_instructions else ""

        super().__init__(
            instructions=Instructions(
                _BASE_INSTRUCTIONS.format(
                    modality_specific=_AUDIO_SPECIFIC,
                    time_instructions=time_instructions,
                    confirmation_instructions=(
                        confirmation_instructions if require_confirmation is not False else ""
                    ),
                    extra_instructions=extra,
                ),
                text=_BASE_INSTRUCTIONS.format(
                    modality_specific=_TEXT_SPECIFIC,
                    time_instructions=time_instructions,
                    confirmation_instructions=(
                        confirmation_instructions if require_confirmation is True else ""
                    ),
                    extra_instructions=extra,
                ),
            ),
            chat_ctx=chat_ctx,
            turn_detection=turn_detection,
            tools=tools or [],
            stt=stt,
            vad=vad,
            llm=llm,
            tts=tts,
            allow_interruptions=allow_interruptions,
        )

        self._include_time = include_time
        self._require_confirmation = require_confirmation
        self._current_dob: date | None = None
        self._current_time: time | None = None

        if include_time:
            self._tools.append(self._build_update_time_tool())

    async def on_enter(self) -> None:
        prompt = "Ask the user to provide their date of birth."
        if self._include_time:
            prompt = "Ask the user to provide their date of birth and, if they know it, their time of birth."
        self.session.generate_reply(instructions=prompt)

    @function_tool
    async def update_dob(
        self,
        year: int,
        month: int,
        day: int,
        ctx: RunContext,
    ) -> str | None:
        """Update the date of birth provided by the user. Given a spoken month and year (e.g., 'July 2030'), return its numerical representation (7/2030).

        Args:
            year: The birth year (e.g., 1990)
            month: The birth month (1-12)
            day: The birth day (1-31)
        """
        try:
            dob = date(year, month, day)
        except ValueError as e:
            raise ToolError(f"Invalid date: {e}") from None

        today = date.today()
        if dob > today:
            raise ToolError(
                f"Invalid date of birth: {dob.strftime('%B %d, %Y')} is in the future. "
                "Date of birth cannot be a future date."
            )

        self._current_dob = dob

        if not self._confirmation_required(ctx):
            if not self.done():
                self.complete(
                    GetDOBResult(
                        date_of_birth=self._current_dob,
                        time_of_birth=self._current_time,
                    )
                )
            return None

        confirm_tool = self._build_confirm_tool(dob=dob)
        current_tools = [t for t in self.tools if t.id != "confirm_dob"]
        current_tools.append(confirm_tool)
        await self.update_tools(current_tools)

        formatted_date = dob.strftime("%B %d, %Y")
        response = f"The date of birth has been updated to {formatted_date}"

        if self._current_time:
            formatted_time = self._current_time.strftime("%I:%M %p")
            response += f" at {formatted_time}"

        response += (
            "\nRepeat the date back to the user in a natural spoken format.\n"
            "Prompt the user for confirmation, do not call `confirm_dob` directly"
        )

        return response

    def _build_update_time_tool(self) -> llm.FunctionTool:
        @function_tool()
        async def update_time(hour: int, minute: int, ctx: RunContext) -> str | None:
            """Update the time of birth provided by the user.

            Args:
                hour: The birth hour (0-23)
                minute: The birth minute (0-59)
            """
            try:
                birth_time = time(hour, minute)
            except ValueError as e:
                raise ToolError(f"Invalid time: {e}") from None

            self._current_time = birth_time

            if not self._confirmation_required(ctx) and self._current_dob is not None:
                if not self.done():
                    self.complete(
                        GetDOBResult(
                            date_of_birth=self._current_dob,
                            time_of_birth=self._current_time,
                        )
                    )
                return None

            if self._confirmation_required(ctx):
                confirm_tool = self._build_confirm_tool(dob=self._current_dob)
                current_tools = [t for t in self.tools if t.id != "confirm_dob"]
                current_tools.append(confirm_tool)
                await self.update_tools(current_tools)

            formatted_time = birth_time.strftime("%I:%M %p")
            response = f"The time of birth has been updated to {formatted_time}"

            if self._current_dob:
                formatted_date = self._current_dob.strftime("%B %d, %Y")
                response = f"The date and time of birth has been updated to {formatted_date} at {formatted_time}"

            if self._confirmation_required(ctx):
                response += (
                    "\nRepeat the time back to the user in a natural spoken format.\n"
                    "Prompt the user for confirmation, do not call `confirm_dob` directly"
                )
            else:
                response += (
                    "\nThe date of birth has not been provided yet, ask the user to provide it."
                )

            return response

        return update_time

    def _build_confirm_tool(self, *, dob: date | None) -> llm.FunctionTool:
        # confirm tool is only injected after update_dob/update_time is called,
        # preventing the LLM from hallucinating a confirmation without user input
        captured_dob = dob
        captured_time = self._current_time

        @function_tool()
        async def confirm_dob() -> None:
            """Call after the user confirms the date of birth is correct."""
            if captured_dob != self._current_dob or captured_time != self._current_time:
                self.session.generate_reply(
                    instructions="The date of birth has changed since confirmation was requested, ask the user to confirm the updated date."
                )
                return

            if self._current_dob is None:
                self.session.generate_reply(
                    instructions="No date of birth was provided yet, ask the user to provide it."
                )
                return

            if not self.done():
                self.complete(
                    GetDOBResult(
                        date_of_birth=self._current_dob,
                        time_of_birth=self._current_time,
                    )
                )

        return confirm_dob

    @function_tool(flags=ToolFlag.IGNORE_ON_ENTER)
    async def decline_dob_capture(self, reason: str) -> None:
        """Handles the case when the user explicitly declines to provide a date of birth.

        Args:
            reason: A short explanation of why the user declined to provide the date of birth
        """
        if not self.done():
            self.complete(ToolError(f"couldn't get the date of birth: {reason}"))

    def _confirmation_required(self, ctx: RunContext) -> bool:
        if is_given(self._require_confirmation):
            return self._require_confirmation
        return ctx.speech_handle.input_details.modality == "audio"

Abstract base class for generic types.

On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::

class Mapping[KT, VT]:
    def __getitem__(self, key: KT) -> VT:
        ...
    # Etc.

On older versions of Python, however, generic classes have to explicitly inherit from Generic.

After a class has been declared to be generic, it can then be used as follows::

def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT:
    try:
        return mapping[key]
    except KeyError:
        return default

Ancestors

  • livekit.agents.voice.agent.AgentTask
  • livekit.agents.voice.agent.Agent
  • typing.Generic

Methods

async def decline_dob_capture(self, reason: str) ‑> None
Expand source code
@function_tool(flags=ToolFlag.IGNORE_ON_ENTER)
async def decline_dob_capture(self, reason: str) -> None:
    """Handles the case when the user explicitly declines to provide a date of birth.

    Args:
        reason: A short explanation of why the user declined to provide the date of birth
    """
    if not self.done():
        self.complete(ToolError(f"couldn't get the date of birth: {reason}"))

Handles the case when the user explicitly declines to provide a date of birth.

Args

reason
A short explanation of why the user declined to provide the date of birth
async def on_enter(self) ‑> None
Expand source code
async def on_enter(self) -> None:
    prompt = "Ask the user to provide their date of birth."
    if self._include_time:
        prompt = "Ask the user to provide their date of birth and, if they know it, their time of birth."
    self.session.generate_reply(instructions=prompt)

Called when the task is entered

async def update_dob(self, year: int, month: int, day: int, ctx: RunContext) ‑> str | None
Expand source code
@function_tool
async def update_dob(
    self,
    year: int,
    month: int,
    day: int,
    ctx: RunContext,
) -> str | None:
    """Update the date of birth provided by the user. Given a spoken month and year (e.g., 'July 2030'), return its numerical representation (7/2030).

    Args:
        year: The birth year (e.g., 1990)
        month: The birth month (1-12)
        day: The birth day (1-31)
    """
    try:
        dob = date(year, month, day)
    except ValueError as e:
        raise ToolError(f"Invalid date: {e}") from None

    today = date.today()
    if dob > today:
        raise ToolError(
            f"Invalid date of birth: {dob.strftime('%B %d, %Y')} is in the future. "
            "Date of birth cannot be a future date."
        )

    self._current_dob = dob

    if not self._confirmation_required(ctx):
        if not self.done():
            self.complete(
                GetDOBResult(
                    date_of_birth=self._current_dob,
                    time_of_birth=self._current_time,
                )
            )
        return None

    confirm_tool = self._build_confirm_tool(dob=dob)
    current_tools = [t for t in self.tools if t.id != "confirm_dob"]
    current_tools.append(confirm_tool)
    await self.update_tools(current_tools)

    formatted_date = dob.strftime("%B %d, %Y")
    response = f"The date of birth has been updated to {formatted_date}"

    if self._current_time:
        formatted_time = self._current_time.strftime("%I:%M %p")
        response += f" at {formatted_time}"

    response += (
        "\nRepeat the date back to the user in a natural spoken format.\n"
        "Prompt the user for confirmation, do not call `confirm_dob` directly"
    )

    return response

Update the date of birth provided by the user. Given a spoken month and year (e.g., 'July 2030'), return its numerical representation (7/2030).

Args

year
The birth year (e.g., 1990)
month
The birth month (1-12)
day
The birth day (1-31)
class GetDtmfResult (user_input: str)
Expand source code
@dataclass
class GetDtmfResult:
    user_input: str

    @classmethod
    def from_dtmf_inputs(cls, dtmf_inputs: list[DtmfEvent]) -> GetDtmfResult:
        return cls(user_input=format_dtmf(dtmf_inputs))

GetDtmfResult(user_input: 'str')

Static methods

def from_dtmf_inputs(dtmf_inputs: list[DtmfEvent]) ‑> GetDtmfResult

Instance variables

var user_input : str
class GetDtmfTask (*,
num_digits: int,
ask_for_confirmation: bool = False,
dtmf_input_timeout: float = 4.0,
dtmf_stop_event: DtmfEvent = DtmfEvent.POUND,
chat_ctx: NotGivenOr[ChatContext] = NOT_GIVEN,
extra_instructions: NotGivenOr[str] = NOT_GIVEN)
Expand source code
class GetDtmfTask(AgentTask[GetDtmfResult]):
    """A task to collect DTMF inputs from the user.

    Return a string of DTMF inputs if collected successfully, otherwise None.
    """

    def __init__(
        self,
        *,
        num_digits: int,
        ask_for_confirmation: bool = False,
        dtmf_input_timeout: float = 4.0,
        dtmf_stop_event: DtmfEvent = DtmfEvent.POUND,
        chat_ctx: NotGivenOr[ChatContext] = NOT_GIVEN,
        extra_instructions: NotGivenOr[str] = NOT_GIVEN,
    ) -> None:
        """
        Args:
            num_digits: The number of digits to collect.
            ask_for_confirmation: Whether to ask for confirmation when agent has collected full digits.
            repeat_instructions: The number of times to repeat the initial instructions.
            dtmf_input_timeout: The per-digit timeout.
            dtmf_stop_event: The DTMF event to stop collecting inputs.
            chat_ctx: The chat context to use.
            extra_instructions: Extra instructions to add to the task.
        """
        if num_digits <= 0:
            raise ValueError("num_digits must be greater than 0")

        self._curr_dtmf_inputs: list[DtmfEvent] = []
        self._dtmf_reply_running: bool = False

        @function_tool
        async def confirm_inputs(inputs: list[DtmfEvent]) -> None:
            """Finalize the collected digit inputs after explicit user confirmation.

            Use this ONLY after the confirmation. You should confirm by verbally reading out the digits one by one and, once the
            user confirms they are correct, call this tool with the inputs.

            Do not use this tool to capture the initial digits."""
            self.complete(GetDtmfResult.from_dtmf_inputs(inputs))

        @function_tool
        async def record_inputs(inputs: list[DtmfEvent]) -> None:
            """Record the collected digit inputs without additional confirmation.

            Call this tool as soon as a valid sequence of digits has been provided by the user (via DTMF or spoken)."""
            self.complete(GetDtmfResult.from_dtmf_inputs(inputs))

        instructions = (
            "You are a single step in a broader system, responsible solely for gathering digits input from the user. "
            "You will either receive a sequence of digits through dtmf events tagged by <dtmf_inputs>, or "
            "user will directly say the digits to you. You should be able to handle both cases. "
        )

        if ask_for_confirmation:
            instructions += "Once user has confirmed the digits (by verbally spoken or entered manually), call `confirm_inputs` with the inputs."
        else:
            instructions += "If user provides the digits through voice and it is valid, call `record_inputs` with the inputs."

        if is_given(extra_instructions):
            instructions += f"\n{extra_instructions}"

        super().__init__(
            instructions=instructions,
            chat_ctx=chat_ctx,
            tools=[confirm_inputs] if ask_for_confirmation else [record_inputs],
        )

        def _on_sip_dtmf_received(ev: rtc.SipDTMF) -> None:
            if self._dtmf_reply_running:
                return

            # immediately kick off the DTMF reply generation if matches the stop event
            if ev.digit == dtmf_stop_event.value:
                self._generate_dtmf_reply()
                return

            self._curr_dtmf_inputs.append(DtmfEvent(ev.digit))
            logger.info(f"DTMF inputs: {format_dtmf(self._curr_dtmf_inputs)}")
            self._generate_dtmf_reply.schedule()

        @debounced(delay=dtmf_input_timeout)
        async def _generate_dtmf_reply() -> None:
            self._dtmf_reply_running = True

            try:
                self.session.interrupt()

                dmtf_str = format_dtmf(self._curr_dtmf_inputs)
                logger.debug(f"Generating DTMF reply, current inputs: {dmtf_str}")

                # if input not fully received (i.e. timeout), return None
                if len(self._curr_dtmf_inputs) != num_digits:
                    error_msg = (
                        f"Digits input not fully received. "
                        f"Expect {num_digits} digits, got {len(self._curr_dtmf_inputs)}"
                    )
                    self.complete(ToolError(error_msg))
                    return

                # if not asking for confirmation, return the DTMF inputs
                if not ask_for_confirmation:
                    self.complete(GetDtmfResult.from_dtmf_inputs(self._curr_dtmf_inputs))
                    return

                instructions = (
                    "User has entered the following valid digits on the telephone keypad:\n"
                    f"<dtmf_inputs>{dmtf_str}</dtmf_inputs>\n"
                    "Please confirm it with the user by saying the digits one by one with space in between "
                    "(.e.g. 'one two three four five six seven eight nine ten'). "
                    "Once you are sure, call `confirm_inputs` with the inputs."
                    ""
                )

                await self.session.generate_reply(user_input=instructions)
            finally:
                self._dtmf_reply_running = False
                self._curr_dtmf_inputs.clear()

        def _on_user_state_changed(ev: UserStateChangedEvent) -> None:
            if self.dtmf_reply_running():
                return

            if ev.new_state == "speaking":
                # clear any pending DTMF reply generation
                self._generate_dtmf_reply.cancel()
            elif len(self._curr_dtmf_inputs) != 0:
                # resume any previously cancelled DTMF reply generation after user is back to non-speaking
                self._generate_dtmf_reply.schedule()

        def _on_agent_state_changed(ev: AgentStateChangedEvent) -> None:
            if self.dtmf_reply_running():
                return

            if ev.new_state in ["speaking", "thinking"]:
                # clear any pending DTMF reply generation
                self._generate_dtmf_reply.cancel()
            elif len(self._curr_dtmf_inputs) != 0:
                # resume any previously cancelled DTMF reply generation after agent is back to non-speaking
                self._generate_dtmf_reply.schedule()

        self._generate_dtmf_reply: Debounced[None] = _generate_dtmf_reply
        self._on_sip_dtmf_received: Callable[[rtc.SipDTMF], None] = _on_sip_dtmf_received
        self._on_user_state_changed: Callable[[UserStateChangedEvent], None] = (
            _on_user_state_changed
        )
        self._on_agent_state_changed: Callable[[AgentStateChangedEvent], None] = (
            _on_agent_state_changed
        )

    def dtmf_reply_running(self) -> bool:
        return self._dtmf_reply_running

    async def on_enter(self) -> None:
        ctx = get_job_context()

        ctx.room.on("sip_dtmf_received", self._on_sip_dtmf_received)
        self.session.on("agent_state_changed", self._on_user_state_changed)
        self.session.on("agent_state_changed", self._on_agent_state_changed)
        self.session.generate_reply(tool_choice="none")

    async def on_exit(self) -> None:
        ctx = get_job_context()

        ctx.room.off("sip_dtmf_received", self._on_sip_dtmf_received)
        self.session.off("agent_state_changed", self._on_user_state_changed)
        self.session.off("agent_state_changed", self._on_agent_state_changed)
        self._generate_dtmf_reply.cancel()

A task to collect DTMF inputs from the user.

Return a string of DTMF inputs if collected successfully, otherwise None.

Args

num_digits
The number of digits to collect.
ask_for_confirmation
Whether to ask for confirmation when agent has collected full digits.
repeat_instructions
The number of times to repeat the initial instructions.
dtmf_input_timeout
The per-digit timeout.
dtmf_stop_event
The DTMF event to stop collecting inputs.
chat_ctx
The chat context to use.
extra_instructions
Extra instructions to add to the task.

Ancestors

  • livekit.agents.voice.agent.AgentTask
  • livekit.agents.voice.agent.Agent
  • typing.Generic

Methods

def dtmf_reply_running(self) ‑> bool
Expand source code
def dtmf_reply_running(self) -> bool:
    return self._dtmf_reply_running
async def on_enter(self) ‑> None
Expand source code
async def on_enter(self) -> None:
    ctx = get_job_context()

    ctx.room.on("sip_dtmf_received", self._on_sip_dtmf_received)
    self.session.on("agent_state_changed", self._on_user_state_changed)
    self.session.on("agent_state_changed", self._on_agent_state_changed)
    self.session.generate_reply(tool_choice="none")

Called when the task is entered

async def on_exit(self) ‑> None
Expand source code
async def on_exit(self) -> None:
    ctx = get_job_context()

    ctx.room.off("sip_dtmf_received", self._on_sip_dtmf_received)
    self.session.off("agent_state_changed", self._on_user_state_changed)
    self.session.off("agent_state_changed", self._on_agent_state_changed)
    self._generate_dtmf_reply.cancel()

Called when the task is exited

class GetEmailResult (email_address: str)
Expand source code
@dataclass
class GetEmailResult:
    email_address: str

GetEmailResult(email_address: 'str')

Instance variables

var email_address : str
class GetEmailTask (*,
instructions: NotGivenOr[InstructionParts | Instructions | str] = NOT_GIVEN,
chat_ctx: NotGivenOr[llm.ChatContext] = NOT_GIVEN,
turn_detection: NotGivenOr[TurnDetectionMode | None] = NOT_GIVEN,
tools: NotGivenOr[list[llm.Tool | llm.Toolset]] = NOT_GIVEN,
stt: NotGivenOr[stt.STT | None] = NOT_GIVEN,
vad: NotGivenOr[vad.VAD | None] = NOT_GIVEN,
llm: NotGivenOr[llm.LLM | llm.RealtimeModel | None] = NOT_GIVEN,
tts: NotGivenOr[tts.TTS | None] = NOT_GIVEN,
allow_interruptions: NotGivenOr[bool] = NOT_GIVEN,
require_confirmation: NotGivenOr[bool] = NOT_GIVEN,
extra_instructions: str = '')
Expand source code
class GetEmailTask(AgentTask[GetEmailResult]):
    def __init__(
        self,
        *,
        instructions: NotGivenOr[InstructionParts | Instructions | str] = NOT_GIVEN,
        chat_ctx: NotGivenOr[llm.ChatContext] = NOT_GIVEN,
        turn_detection: NotGivenOr[TurnDetectionMode | None] = NOT_GIVEN,
        tools: NotGivenOr[list[llm.Tool | llm.Toolset]] = NOT_GIVEN,
        stt: NotGivenOr[stt.STT | None] = NOT_GIVEN,
        vad: NotGivenOr[vad.VAD | None] = NOT_GIVEN,
        llm: NotGivenOr[llm.LLM | llm.RealtimeModel | None] = NOT_GIVEN,
        tts: NotGivenOr[tts.TTS | None] = NOT_GIVEN,
        allow_interruptions: NotGivenOr[bool] = NOT_GIVEN,
        require_confirmation: NotGivenOr[bool] = NOT_GIVEN,
        # deprecated
        extra_instructions: str = "",
    ) -> None:
        if not is_given(instructions):
            instructions = InstructionParts(persona=PERSONA, extra=extra_instructions)
        elif extra_instructions:
            logger.warning("`extra_instructions` will be ignored when `instructions` is provided")

        if isinstance(instructions, InstructionParts):
            instructions = Instructions(INSTRUCTIONS_TEMPLATE).format(
                persona=instructions.persona if is_given(instructions.persona) else PERSONA,
                extra=instructions.extra,
                _modality_specific=Instructions(audio=AUDIO_SPECIFIC, text=TEXT_SPECIFIC),
                _confirmation=Instructions(
                    # confirmation is enabled by default for audio, disabled by default for text
                    audio=CONFIRMATION_INSTRUCTION if require_confirmation is not False else "",
                    text=CONFIRMATION_INSTRUCTION if require_confirmation is True else "",
                ),
            )

        assert is_given(instructions)  # for type checking
        super().__init__(
            instructions=instructions,
            chat_ctx=chat_ctx,
            turn_detection=turn_detection,
            tools=tools or [],
            stt=stt,
            vad=vad,
            llm=llm,
            tts=tts,
            allow_interruptions=allow_interruptions,
        )

        self._current_email = ""
        self._require_confirmation = require_confirmation

    async def on_enter(self) -> None:
        self.session.generate_reply(instructions="Ask the user to provide an email address.")

    @function_tool
    async def update_email_address(self, email: str, ctx: RunContext) -> str | None:
        """Update the email address provided by the user.

        Args:
            email: The email address provided by the user
        """
        email = email.strip()

        if not re.match(EMAIL_REGEX, email):
            raise ToolError(f"Invalid email address provided: {email}")

        self._current_email = email
        separated_email = " ".join(email)

        if not self._confirmation_required(ctx):
            if not self.done():
                self.complete(GetEmailResult(email_address=self._current_email))
            return None  # no need to continue the conversation

        confirm_tool = self._build_confirm_tool(email=email)
        current_tools = [t for t in self.tools if t.id != "confirm_email_address"]
        current_tools.append(confirm_tool)
        await self.update_tools(current_tools)

        return (
            f"The email has been updated to {email}\n"
            f"Repeat the email character by character: {separated_email} if needed\n"
            f"Prompt the user for confirmation, do not call `confirm_email_address` directly"
        )

    def _build_confirm_tool(self, *, email: str) -> llm.FunctionTool:
        @function_tool()
        async def confirm_email_address() -> None:
            """Call after the user confirms the email address is correct."""
            if email != self._current_email:
                self.session.generate_reply(
                    instructions="The email has changed since confirmation was requested, ask the user to confirm the updated email."
                )
                return

            if not self.done():
                self.complete(GetEmailResult(email_address=email))

        return confirm_email_address

    @function_tool(flags=ToolFlag.IGNORE_ON_ENTER)
    async def decline_email_capture(self, reason: str) -> None:
        """Handles the case when the user explicitly declines to provide an email address.

        Args:
            reason: A short explanation of why the user declined to provide the email address
        """
        if not self.done():
            self.complete(ToolError(f"couldn't get the email address: {reason}"))

    def _confirmation_required(self, ctx: RunContext) -> bool:
        if is_given(self._require_confirmation):
            return self._require_confirmation
        return ctx.speech_handle.input_details.modality == "audio"

Abstract base class for generic types.

On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::

class Mapping[KT, VT]:
    def __getitem__(self, key: KT) -> VT:
        ...
    # Etc.

On older versions of Python, however, generic classes have to explicitly inherit from Generic.

After a class has been declared to be generic, it can then be used as follows::

def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT:
    try:
        return mapping[key]
    except KeyError:
        return default

Ancestors

  • livekit.agents.voice.agent.AgentTask
  • livekit.agents.voice.agent.Agent
  • typing.Generic

Methods

async def decline_email_capture(self, reason: str) ‑> None
Expand source code
@function_tool(flags=ToolFlag.IGNORE_ON_ENTER)
async def decline_email_capture(self, reason: str) -> None:
    """Handles the case when the user explicitly declines to provide an email address.

    Args:
        reason: A short explanation of why the user declined to provide the email address
    """
    if not self.done():
        self.complete(ToolError(f"couldn't get the email address: {reason}"))

Handles the case when the user explicitly declines to provide an email address.

Args

reason
A short explanation of why the user declined to provide the email address
async def on_enter(self) ‑> None
Expand source code
async def on_enter(self) -> None:
    self.session.generate_reply(instructions="Ask the user to provide an email address.")

Called when the task is entered

async def update_email_address(self, email: str, ctx: RunContext) ‑> str | None
Expand source code
@function_tool
async def update_email_address(self, email: str, ctx: RunContext) -> str | None:
    """Update the email address provided by the user.

    Args:
        email: The email address provided by the user
    """
    email = email.strip()

    if not re.match(EMAIL_REGEX, email):
        raise ToolError(f"Invalid email address provided: {email}")

    self._current_email = email
    separated_email = " ".join(email)

    if not self._confirmation_required(ctx):
        if not self.done():
            self.complete(GetEmailResult(email_address=self._current_email))
        return None  # no need to continue the conversation

    confirm_tool = self._build_confirm_tool(email=email)
    current_tools = [t for t in self.tools if t.id != "confirm_email_address"]
    current_tools.append(confirm_tool)
    await self.update_tools(current_tools)

    return (
        f"The email has been updated to {email}\n"
        f"Repeat the email character by character: {separated_email} if needed\n"
        f"Prompt the user for confirmation, do not call `confirm_email_address` directly"
    )

Update the email address provided by the user.

Args

email
The email address provided by the user
class GetNameResult (first_name: str | None = None,
middle_name: str | None = None,
last_name: str | None = None)
Expand source code
@dataclass
class GetNameResult:
    first_name: str | None = None
    middle_name: str | None = None
    last_name: str | None = None

GetNameResult(first_name: 'str | None' = None, middle_name: 'str | None' = None, last_name: 'str | None' = None)

Instance variables

var first_name : str | None
var last_name : str | None
var middle_name : str | None
class GetNameTask (first_name: bool = True,
last_name: bool = False,
middle_name: bool = False,
name_format: NotGivenOr[str] = NOT_GIVEN,
verify_spelling: bool = False,
extra_instructions: str = '',
chat_ctx: NotGivenOr[llm.ChatContext] = NOT_GIVEN,
turn_detection: NotGivenOr[TurnDetectionMode | None] = NOT_GIVEN,
tools: NotGivenOr[list[llm.Tool | llm.Toolset]] = NOT_GIVEN,
stt: NotGivenOr[stt.STT | None] = NOT_GIVEN,
vad: NotGivenOr[vad.VAD | None] = NOT_GIVEN,
llm: NotGivenOr[llm.LLM | llm.RealtimeModel | None] = NOT_GIVEN,
tts: NotGivenOr[tts.TTS | None] = NOT_GIVEN,
allow_interruptions: NotGivenOr[bool] = NOT_GIVEN,
require_confirmation: NotGivenOr[bool] = NOT_GIVEN)
Expand source code
class GetNameTask(AgentTask[GetNameResult]):
    def __init__(
        self,
        first_name: bool = True,
        last_name: bool = False,
        middle_name: bool = False,
        name_format: NotGivenOr[str] = NOT_GIVEN,
        verify_spelling: bool = False,
        extra_instructions: str = "",
        chat_ctx: NotGivenOr[llm.ChatContext] = NOT_GIVEN,
        turn_detection: NotGivenOr[TurnDetectionMode | None] = NOT_GIVEN,
        tools: NotGivenOr[list[llm.Tool | llm.Toolset]] = NOT_GIVEN,
        stt: NotGivenOr[stt.STT | None] = NOT_GIVEN,
        vad: NotGivenOr[vad.VAD | None] = NOT_GIVEN,
        llm: NotGivenOr[llm.LLM | llm.RealtimeModel | None] = NOT_GIVEN,
        tts: NotGivenOr[tts.TTS | None] = NOT_GIVEN,
        allow_interruptions: NotGivenOr[bool] = NOT_GIVEN,
        require_confirmation: NotGivenOr[bool] = NOT_GIVEN,
    ) -> None:
        if not (first_name or middle_name or last_name):
            raise ValueError("At least one of first_name, middle_name, or last_name must be True")
        self._collect_first_name = first_name
        self._collect_last_name = last_name
        self._collect_middle_name = middle_name
        self._verify_spelling = verify_spelling
        self._require_confirmation = require_confirmation

        if is_given(name_format):
            self._name_format = name_format
        else:
            parts = []
            if first_name:
                parts.append("{first_name}")
            if middle_name:
                parts.append("{middle_name}")
            if last_name:
                parts.append("{last_name}")
            self._name_format = " ".join(parts)

        spelling_instructions = (
            ""
            if not verify_spelling
            else (
                "After receiving the name, always verify the spelling by asking the user to confirm "
                "or spell out the name letter by letter. "
                "When confirming, spell out each name part letter by letter to the user. "
            )
        )
        confirmation_instructions = (
            "Call `confirm_name` after the user confirmed the name is correct."
        )
        extra = extra_instructions if extra_instructions else ""

        super().__init__(
            instructions=Instructions(
                _BASE_INSTRUCTIONS.format(
                    name_format=self._name_format,
                    modality_specific=_AUDIO_SPECIFIC,
                    spelling_instructions=spelling_instructions,
                    confirmation_instructions=(
                        confirmation_instructions if require_confirmation is not False else ""
                    ),
                    extra_instructions=extra,
                ),
                text=_BASE_INSTRUCTIONS.format(
                    name_format=self._name_format,
                    modality_specific=_TEXT_SPECIFIC,
                    spelling_instructions=spelling_instructions,
                    confirmation_instructions=(
                        confirmation_instructions if require_confirmation is True else ""
                    ),
                    extra_instructions=extra,
                ),
            ),
            chat_ctx=chat_ctx,
            turn_detection=turn_detection,
            tools=tools or [],
            stt=stt,
            vad=vad,
            llm=llm,
            tts=tts,
            allow_interruptions=allow_interruptions,
        )

        self._first_name: str = ""
        self._middle_name: str = ""
        self._last_name: str = ""

    async def on_enter(self) -> None:
        self.session.generate_reply(
            instructions=f"Ask the user for their name, follow this order '{self._name_format}' but do not mention the format."
        )

    @function_tool()
    async def update_name(
        self,
        ctx: RunContext,
        first_name: str | None = None,
        middle_name: str | None = None,
        last_name: str | None = None,
    ) -> str | None:
        """Update the name provided by the user.

        Args:
            first_name: The user's first name.
            middle_name: The user's middle name, if collected.
            last_name: The user's last name, if collected.
        """
        errors: list[str] = []
        if self._collect_first_name and not (first_name and first_name.strip()):
            errors.append("first name is required but was not provided")
        if self._collect_middle_name and not (middle_name and middle_name.strip()):
            errors.append("middle name is required but was not provided")
        if self._collect_last_name and not (last_name and last_name.strip()):
            errors.append("last name is required but was not provided")

        if errors:
            raise ToolError(f"Incomplete name: {'; '.join(errors)}")

        self._first_name = first_name.strip() if first_name else ""
        self._middle_name = middle_name.strip() if middle_name else ""
        self._last_name = last_name.strip() if last_name else ""

        full_name = self._name_format.format(
            first_name=self._first_name,
            middle_name=self._middle_name,
            last_name=self._last_name,
        ).strip()

        if not self._confirmation_required(ctx):
            if not self.done():
                self.complete(
                    GetNameResult(
                        first_name=self._first_name if self._collect_first_name else None,
                        middle_name=self._middle_name if self._collect_middle_name else None,
                        last_name=self._last_name if self._collect_last_name else None,
                    )
                )
            return None

        confirm_tool = self._build_confirm_tool(
            first_name=self._first_name,
            middle_name=self._middle_name,
            last_name=self._last_name,
        )
        current_tools = [t for t in self.tools if t.id != "confirm_name"]
        current_tools.append(confirm_tool)
        await self.update_tools(current_tools)

        if self._verify_spelling:
            return (
                f"The name has been updated to {full_name}\n"
                f"Spell out the name letter by letter for verification: {full_name}\n"
                f"Prompt the user for confirmation, do not call `confirm_name` directly"
            )

        return (
            f"The name has been updated to {full_name}\n"
            f"Repeat the name back to the user and prompt for confirmation, "
            f"do not call `confirm_name` directly"
        )

    def _build_confirm_tool(
        self, *, first_name: str, middle_name: str, last_name: str
    ) -> llm.FunctionTool:
        @function_tool()
        async def confirm_name() -> None:
            """Call after the user confirms the name is correct."""
            if (
                first_name != self._first_name
                or middle_name != self._middle_name
                or last_name != self._last_name
            ):
                self.session.generate_reply(
                    instructions="The name has changed since confirmation was requested, ask the user to confirm the updated name."
                )
                return

            if not self.done():
                self.complete(
                    GetNameResult(
                        first_name=self._first_name if self._collect_first_name else None,
                        middle_name=self._middle_name if self._collect_middle_name else None,
                        last_name=self._last_name if self._collect_last_name else None,
                    )
                )

        return confirm_name

    @function_tool(flags=ToolFlag.IGNORE_ON_ENTER)
    async def decline_name_capture(self, reason: str) -> None:
        """Handles the case when the user explicitly declines to provide their name.

        Args:
            reason: A short explanation of why the user declined to provide their name
        """
        if not self.done():
            self.complete(ToolError(f"couldn't get the name: {reason}"))

    def _confirmation_required(self, ctx: RunContext) -> bool:
        if is_given(self._require_confirmation):
            return self._require_confirmation
        return ctx.speech_handle.input_details.modality == "audio"

Abstract base class for generic types.

On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::

class Mapping[KT, VT]:
    def __getitem__(self, key: KT) -> VT:
        ...
    # Etc.

On older versions of Python, however, generic classes have to explicitly inherit from Generic.

After a class has been declared to be generic, it can then be used as follows::

def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT:
    try:
        return mapping[key]
    except KeyError:
        return default

Ancestors

  • livekit.agents.voice.agent.AgentTask
  • livekit.agents.voice.agent.Agent
  • typing.Generic

Methods

async def decline_name_capture(self, reason: str) ‑> None
Expand source code
@function_tool(flags=ToolFlag.IGNORE_ON_ENTER)
async def decline_name_capture(self, reason: str) -> None:
    """Handles the case when the user explicitly declines to provide their name.

    Args:
        reason: A short explanation of why the user declined to provide their name
    """
    if not self.done():
        self.complete(ToolError(f"couldn't get the name: {reason}"))

Handles the case when the user explicitly declines to provide their name.

Args

reason
A short explanation of why the user declined to provide their name
async def on_enter(self) ‑> None
Expand source code
async def on_enter(self) -> None:
    self.session.generate_reply(
        instructions=f"Ask the user for their name, follow this order '{self._name_format}' but do not mention the format."
    )

Called when the task is entered

async def update_name(self,
ctx: RunContext,
first_name: str | None = None,
middle_name: str | None = None,
last_name: str | None = None) ‑> str | None
Expand source code
@function_tool()
async def update_name(
    self,
    ctx: RunContext,
    first_name: str | None = None,
    middle_name: str | None = None,
    last_name: str | None = None,
) -> str | None:
    """Update the name provided by the user.

    Args:
        first_name: The user's first name.
        middle_name: The user's middle name, if collected.
        last_name: The user's last name, if collected.
    """
    errors: list[str] = []
    if self._collect_first_name and not (first_name and first_name.strip()):
        errors.append("first name is required but was not provided")
    if self._collect_middle_name and not (middle_name and middle_name.strip()):
        errors.append("middle name is required but was not provided")
    if self._collect_last_name and not (last_name and last_name.strip()):
        errors.append("last name is required but was not provided")

    if errors:
        raise ToolError(f"Incomplete name: {'; '.join(errors)}")

    self._first_name = first_name.strip() if first_name else ""
    self._middle_name = middle_name.strip() if middle_name else ""
    self._last_name = last_name.strip() if last_name else ""

    full_name = self._name_format.format(
        first_name=self._first_name,
        middle_name=self._middle_name,
        last_name=self._last_name,
    ).strip()

    if not self._confirmation_required(ctx):
        if not self.done():
            self.complete(
                GetNameResult(
                    first_name=self._first_name if self._collect_first_name else None,
                    middle_name=self._middle_name if self._collect_middle_name else None,
                    last_name=self._last_name if self._collect_last_name else None,
                )
            )
        return None

    confirm_tool = self._build_confirm_tool(
        first_name=self._first_name,
        middle_name=self._middle_name,
        last_name=self._last_name,
    )
    current_tools = [t for t in self.tools if t.id != "confirm_name"]
    current_tools.append(confirm_tool)
    await self.update_tools(current_tools)

    if self._verify_spelling:
        return (
            f"The name has been updated to {full_name}\n"
            f"Spell out the name letter by letter for verification: {full_name}\n"
            f"Prompt the user for confirmation, do not call `confirm_name` directly"
        )

    return (
        f"The name has been updated to {full_name}\n"
        f"Repeat the name back to the user and prompt for confirmation, "
        f"do not call `confirm_name` directly"
    )

Update the name provided by the user.

Args

first_name
The user's first name.
middle_name
The user's middle name, if collected.
last_name
The user's last name, if collected.
class GetPhoneNumberResult (phone_number: str)
Expand source code
@dataclass
class GetPhoneNumberResult:
    phone_number: str

GetPhoneNumberResult(phone_number: 'str')

Instance variables

var phone_number : str
class GetPhoneNumberTask (extra_instructions: str = '',
chat_ctx: NotGivenOr[llm.ChatContext] = NOT_GIVEN,
turn_detection: NotGivenOr[TurnDetectionMode | None] = NOT_GIVEN,
tools: NotGivenOr[list[llm.Tool | llm.Toolset]] = NOT_GIVEN,
stt: NotGivenOr[stt.STT | None] = NOT_GIVEN,
vad: NotGivenOr[vad.VAD | None] = NOT_GIVEN,
llm: NotGivenOr[llm.LLM | llm.RealtimeModel | None] = NOT_GIVEN,
tts: NotGivenOr[tts.TTS | None] = NOT_GIVEN,
allow_interruptions: NotGivenOr[bool] = NOT_GIVEN,
require_confirmation: NotGivenOr[bool] = NOT_GIVEN)
Expand source code
class GetPhoneNumberTask(AgentTask[GetPhoneNumberResult]):
    def __init__(
        self,
        extra_instructions: str = "",
        chat_ctx: NotGivenOr[llm.ChatContext] = NOT_GIVEN,
        turn_detection: NotGivenOr[TurnDetectionMode | None] = NOT_GIVEN,
        tools: NotGivenOr[list[llm.Tool | llm.Toolset]] = NOT_GIVEN,
        stt: NotGivenOr[stt.STT | None] = NOT_GIVEN,
        vad: NotGivenOr[vad.VAD | None] = NOT_GIVEN,
        llm: NotGivenOr[llm.LLM | llm.RealtimeModel | None] = NOT_GIVEN,
        tts: NotGivenOr[tts.TTS | None] = NOT_GIVEN,
        allow_interruptions: NotGivenOr[bool] = NOT_GIVEN,
        require_confirmation: NotGivenOr[bool] = NOT_GIVEN,
    ) -> None:
        confirmation_instructions = (
            "Call `confirm_phone_number` after the user confirmed the phone number is correct."
        )
        extra = extra_instructions if extra_instructions else ""

        super().__init__(
            instructions=Instructions(
                _BASE_INSTRUCTIONS.format(
                    modality_specific=_AUDIO_SPECIFIC,
                    confirmation_instructions=(
                        confirmation_instructions if require_confirmation is not False else ""
                    ),
                    extra_instructions=extra,
                ),
                text=_BASE_INSTRUCTIONS.format(
                    modality_specific=_TEXT_SPECIFIC,
                    confirmation_instructions=(
                        confirmation_instructions if require_confirmation is True else ""
                    ),
                    extra_instructions=extra,
                ),
            ),
            chat_ctx=chat_ctx,
            turn_detection=turn_detection,
            tools=tools or [],
            stt=stt,
            vad=vad,
            llm=llm,
            tts=tts,
            allow_interruptions=allow_interruptions,
        )

        self._current_phone_number = ""
        self._require_confirmation = require_confirmation

    async def on_enter(self) -> None:
        self.session.generate_reply(instructions="Ask the user to provide their phone number.")

    @function_tool()
    async def update_phone_number(self, phone_number: str, ctx: RunContext) -> str | None:
        """Update the phone number provided by the user.

        Args:
            phone_number: The phone number provided by the user, digits only with optional leading +
        """
        cleaned = re.sub(r"[\s\-().]+", "", phone_number.strip())

        if not re.match(PHONE_REGEX, cleaned):
            raise ToolError(f"Invalid phone number provided: {phone_number}")

        self._current_phone_number = cleaned

        if not self._confirmation_required(ctx):
            if not self.done():
                self.complete(GetPhoneNumberResult(phone_number=self._current_phone_number))
            return None  # no need to continue the conversation

        confirm_tool = self._build_confirm_tool(phone_number=cleaned)
        current_tools = [t for t in self.tools if t.id != "confirm_phone_number"]
        current_tools.append(confirm_tool)
        await self.update_tools(current_tools)

        return (
            f"The phone number has been updated to {cleaned}\n"
            f"Read the number back to the user in groups.\n"
            f"Prompt the user for confirmation, do not call `confirm_phone_number` directly"
        )

    def _build_confirm_tool(self, *, phone_number: str) -> llm.FunctionTool:
        @function_tool()
        async def confirm_phone_number() -> None:
            """Call after the user confirms the phone number is correct."""
            if phone_number != self._current_phone_number:
                self.session.generate_reply(
                    instructions="The phone number has changed since confirmation was requested, ask the user to confirm the updated number."
                )
                return

            if not self.done():
                self.complete(GetPhoneNumberResult(phone_number=phone_number))

        return confirm_phone_number

    @function_tool(flags=ToolFlag.IGNORE_ON_ENTER)
    async def decline_phone_number_capture(self, reason: str) -> None:
        """Handles the case when the user explicitly declines to provide a phone number.

        Args:
            reason: A short explanation of why the user declined to provide the phone number
        """
        if not self.done():
            self.complete(ToolError(f"couldn't get the phone number: {reason}"))

    def _confirmation_required(self, ctx: RunContext) -> bool:
        if is_given(self._require_confirmation):
            return self._require_confirmation
        return ctx.speech_handle.input_details.modality == "audio"

Abstract base class for generic types.

On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::

class Mapping[KT, VT]:
    def __getitem__(self, key: KT) -> VT:
        ...
    # Etc.

On older versions of Python, however, generic classes have to explicitly inherit from Generic.

After a class has been declared to be generic, it can then be used as follows::

def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT:
    try:
        return mapping[key]
    except KeyError:
        return default

Ancestors

  • livekit.agents.voice.agent.AgentTask
  • livekit.agents.voice.agent.Agent
  • typing.Generic

Methods

async def decline_phone_number_capture(self, reason: str) ‑> None
Expand source code
@function_tool(flags=ToolFlag.IGNORE_ON_ENTER)
async def decline_phone_number_capture(self, reason: str) -> None:
    """Handles the case when the user explicitly declines to provide a phone number.

    Args:
        reason: A short explanation of why the user declined to provide the phone number
    """
    if not self.done():
        self.complete(ToolError(f"couldn't get the phone number: {reason}"))

Handles the case when the user explicitly declines to provide a phone number.

Args

reason
A short explanation of why the user declined to provide the phone number
async def on_enter(self) ‑> None
Expand source code
async def on_enter(self) -> None:
    self.session.generate_reply(instructions="Ask the user to provide their phone number.")

Called when the task is entered

async def update_phone_number(self, phone_number: str, ctx: RunContext) ‑> str | None
Expand source code
@function_tool()
async def update_phone_number(self, phone_number: str, ctx: RunContext) -> str | None:
    """Update the phone number provided by the user.

    Args:
        phone_number: The phone number provided by the user, digits only with optional leading +
    """
    cleaned = re.sub(r"[\s\-().]+", "", phone_number.strip())

    if not re.match(PHONE_REGEX, cleaned):
        raise ToolError(f"Invalid phone number provided: {phone_number}")

    self._current_phone_number = cleaned

    if not self._confirmation_required(ctx):
        if not self.done():
            self.complete(GetPhoneNumberResult(phone_number=self._current_phone_number))
        return None  # no need to continue the conversation

    confirm_tool = self._build_confirm_tool(phone_number=cleaned)
    current_tools = [t for t in self.tools if t.id != "confirm_phone_number"]
    current_tools.append(confirm_tool)
    await self.update_tools(current_tools)

    return (
        f"The phone number has been updated to {cleaned}\n"
        f"Read the number back to the user in groups.\n"
        f"Prompt the user for confirmation, do not call `confirm_phone_number` directly"
    )

Update the phone number provided by the user.

Args

phone_number
The phone number provided by the user, digits only with optional leading +
class InstructionParts (persona: NotGivenOr[Instructions | str] = NOT_GIVEN,
extra: Instructions | str = '')
Expand source code
@dataclass
class InstructionParts:
    """Customizable instruction sections for built-in workflow tasks.

    Each field overrides that section when set; leave as ``NOT_GIVEN`` to
    preserve the workflow's built-in default. Set to ``""`` to remove a
    section entirely.

    Args:
        persona: Agent persona/identity — who the agent is and how it behaves.
        extra: Extra instructions appended to the prompt. The simplest hook for
            adding domain context without touching defaults.
    """

    persona: NotGivenOr[Instructions | str] = NOT_GIVEN
    extra: Instructions | str = ""

Customizable instruction sections for built-in workflow tasks.

Each field overrides that section when set; leave as NOT_GIVEN to preserve the workflow's built-in default. Set to "" to remove a section entirely.

Args

persona
Agent persona/identity — who the agent is and how it behaves.
extra
Extra instructions appended to the prompt. The simplest hook for adding domain context without touching defaults.

Instance variables

var extra : livekit.agents.llm.chat_context.Instructions | str
var persona : str | livekit.agents.llm.chat_context.Instructions | livekit.agents.types.NotGiven
class TaskCompletedEvent (agent_task: livekit.agents.voice.agent.AgentTask, task_id: str, result: Any)
Expand source code
@dataclass
class TaskCompletedEvent:
    agent_task: AgentTask
    task_id: str
    result: Any

TaskCompletedEvent(agent_task: livekit.agents.voice.agent.AgentTask, task_id: str, result: Any)

Instance variables

var agent_task : livekit.agents.voice.agent.AgentTask
var result : Any
var task_id : str
class TaskGroup (*,
summarize_chat_ctx: bool = True,
return_exceptions: bool = False,
chat_ctx: livekit.agents.llm.chat_context.ChatContext | livekit.agents.types.NotGiven = NOT_GIVEN,
on_task_completed: collections.abc.Callable[[TaskCompletedEvent], collections.abc.Coroutine[None, None, None]] | None = None)
Expand source code
class TaskGroup(AgentTask[TaskGroupResult]):
    def __init__(
        self,
        *,
        summarize_chat_ctx: bool = True,
        return_exceptions: bool = False,
        chat_ctx: NotGivenOr[llm.ChatContext] = NOT_GIVEN,
        on_task_completed: Callable[[TaskCompletedEvent], Coroutine[None, None, None]]
        | None = None,
    ):
        """TaskGroup orchestrates a sequence of multiple AgentTasks. It also allows for users to regress to previous tasks if requested.

        Args:
            summarize_chat_ctx (bool): Whether or not to summarize the interactions within the TaskGroup into one message and merge the context. Defaults to True.
            return_exceptions (bool): Whether or not to directly propagate an error. When set to True, the exception is added to the results dictionary and the sequence continues. Defaults to False.
            on_task_completed (Callable[]): A callable that executes upon each task completion. The callback takes in a single argument of a TaskCompletedEvent.
        """
        super().__init__(
            instructions="*empty*", chat_ctx=chat_ctx, llm=NOT_GIVEN
        )  # the LLM is set as NOT_GIVEN to allow session reusage if supported

        self._summarize_chat_ctx = summarize_chat_ctx
        self._return_exceptions = return_exceptions
        self._visited_tasks = set[str]()
        self._registered_factories: OrderedDict[str, _FactoryInfo] = OrderedDict()
        self._task_completed_callback = on_task_completed

    def add(
        self,
        task_factory: Callable[[], AgentTask],
        *,
        id: str,
        description: str,
    ) -> Self:
        """Adds an AgentTask to the TaskGroup.

        Args:
            task_factory (Callable): A callable that returns a task instance
            id (str): An identifier for the task used to access results
            description (str): A description that helps the LLM understand when to regress to this task
        """
        self._registered_factories[id] = _FactoryInfo(
            task_factory=task_factory, id=id, description=description
        )
        return self

    async def on_enter(self) -> None:
        task_stack = list(self._registered_factories.keys())
        task_results: dict[str, Any] = {}

        while len(task_stack) > 0:
            task_id = task_stack.pop(0)
            factory_info = self._registered_factories[task_id]

            self._current_task = factory_info.task_factory()

            shared_chat_ctx = self.chat_ctx.copy()
            await self._current_task.update_chat_ctx(shared_chat_ctx)

            if out_of_scope_tool := self._build_out_of_scope_tool(active_task_id=task_id):
                current_tools = self._current_task.tools
                current_tools.append(out_of_scope_tool)
                await self._current_task.update_tools(current_tools)

            try:
                self._visited_tasks.add(task_id)
                res = await self._current_task

                # AgentTask handoff merges omit function calls. Re-merge the completed
                # task context so task-group summarization can incorporate tool results.
                self._chat_ctx.merge(
                    self._current_task.chat_ctx.copy(),
                    exclude_instructions=True,
                )

                task_results[task_id] = res

                if self._task_completed_callback is not None:
                    await self._task_completed_callback(
                        TaskCompletedEvent(
                            agent_task=self._current_task, task_id=task_id, result=res
                        )
                    )
            except _OutOfScopeError as e:
                task_stack.insert(0, task_id)
                for task_id in reversed(e.target_task_ids):
                    task_stack.insert(0, task_id)
                continue
            except Exception as e:
                if self._return_exceptions:
                    task_results[task_id] = e
                    continue
                else:
                    self.complete(e)
                    return

        if self._summarize_chat_ctx:
            try:
                assert isinstance(self.session.llm, llm.LLM), (
                    "llm must be a LLM instance to summarize the chat_ctx"
                )

                # when a task is done, the chat_ctx is going to be merged with the "caller" chat_ctx
                # enabling summarization will result on only one ChatMessage added.
                # keep every item to allow summarization to be more action-aware.
                summarized_chat_ctx = await self.chat_ctx.copy(
                    exclude_instructions=False,
                    exclude_handoff=False,
                    exclude_config_update=False,
                    exclude_empty_message=False,
                    exclude_function_call=False,
                )._summarize(llm_v=self.session.llm, keep_last_turns=0)

                await self.update_chat_ctx(summarized_chat_ctx)
            except Exception as e:
                self.complete(e)
                return

        self.complete(TaskGroupResult(task_results=task_results))

    def _build_out_of_scope_tool(self, *, active_task_id: str) -> FunctionTool | None:
        if not self._visited_tasks:
            return None

        # Only allow to regress to already visited tasks
        task_ids = self._visited_tasks.copy()
        task_ids.discard(active_task_id)
        task_repr = {
            f.id: f.description for f in self._registered_factories.values() if f.id in task_ids
        }

        description = (
            "Call to regress to other tasks according to what the user requested to modify, return the corresponding task ids. "
            'For example, if the user wants to change their email and there is a task with id "email_task" with a description of "Collect the user\'s email", return the id ("get_email_task").'
            "If the user requests to regress to multiple tasks, such as changing their phone number and email, return both task ids in the order they were requested."
            f"The following are the IDs and their corresponding task description. {json.dumps(task_repr)}"
        )

        @function_tool(description=description, flags=ToolFlag.IGNORE_ON_ENTER)
        async def out_of_scope(
            task_ids: Annotated[
                list[str],
                Field(
                    description="The IDs of the tasks requested",
                    json_schema_extra={"items": {"type": "string", "enum": list(task_ids)}},
                ),
            ],
        ) -> None:
            for task_id in task_ids:
                if task_id not in self._registered_factories or task_id not in self._visited_tasks:
                    raise ToolError(f"unable to regress, invalid task id {task_id}")

            if not self._current_task.done():
                self._current_task.complete(_OutOfScopeError(target_task_ids=task_ids))

        return out_of_scope

Abstract base class for generic types.

On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::

class Mapping[KT, VT]:
    def __getitem__(self, key: KT) -> VT:
        ...
    # Etc.

On older versions of Python, however, generic classes have to explicitly inherit from Generic.

After a class has been declared to be generic, it can then be used as follows::

def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT:
    try:
        return mapping[key]
    except KeyError:
        return default

TaskGroup orchestrates a sequence of multiple AgentTasks. It also allows for users to regress to previous tasks if requested.

Args

summarize_chat_ctx : bool
Whether or not to summarize the interactions within the TaskGroup into one message and merge the context. Defaults to True.
return_exceptions : bool
Whether or not to directly propagate an error. When set to True, the exception is added to the results dictionary and the sequence continues. Defaults to False.
on_task_completed : Callable[]
A callable that executes upon each task completion. The callback takes in a single argument of a TaskCompletedEvent.

Ancestors

  • livekit.agents.voice.agent.AgentTask
  • livekit.agents.voice.agent.Agent
  • typing.Generic

Methods

def add(self,
task_factory: Callable[[], livekit.agents.voice.agent.AgentTask],
*,
id: str,
description: str) ‑> Self
Expand source code
def add(
    self,
    task_factory: Callable[[], AgentTask],
    *,
    id: str,
    description: str,
) -> Self:
    """Adds an AgentTask to the TaskGroup.

    Args:
        task_factory (Callable): A callable that returns a task instance
        id (str): An identifier for the task used to access results
        description (str): A description that helps the LLM understand when to regress to this task
    """
    self._registered_factories[id] = _FactoryInfo(
        task_factory=task_factory, id=id, description=description
    )
    return self

Adds an AgentTask to the TaskGroup.

Args

task_factory : Callable
A callable that returns a task instance
id : str
An identifier for the task used to access results
description : str
A description that helps the LLM understand when to regress to this task
async def on_enter(self) ‑> None
Expand source code
async def on_enter(self) -> None:
    task_stack = list(self._registered_factories.keys())
    task_results: dict[str, Any] = {}

    while len(task_stack) > 0:
        task_id = task_stack.pop(0)
        factory_info = self._registered_factories[task_id]

        self._current_task = factory_info.task_factory()

        shared_chat_ctx = self.chat_ctx.copy()
        await self._current_task.update_chat_ctx(shared_chat_ctx)

        if out_of_scope_tool := self._build_out_of_scope_tool(active_task_id=task_id):
            current_tools = self._current_task.tools
            current_tools.append(out_of_scope_tool)
            await self._current_task.update_tools(current_tools)

        try:
            self._visited_tasks.add(task_id)
            res = await self._current_task

            # AgentTask handoff merges omit function calls. Re-merge the completed
            # task context so task-group summarization can incorporate tool results.
            self._chat_ctx.merge(
                self._current_task.chat_ctx.copy(),
                exclude_instructions=True,
            )

            task_results[task_id] = res

            if self._task_completed_callback is not None:
                await self._task_completed_callback(
                    TaskCompletedEvent(
                        agent_task=self._current_task, task_id=task_id, result=res
                    )
                )
        except _OutOfScopeError as e:
            task_stack.insert(0, task_id)
            for task_id in reversed(e.target_task_ids):
                task_stack.insert(0, task_id)
            continue
        except Exception as e:
            if self._return_exceptions:
                task_results[task_id] = e
                continue
            else:
                self.complete(e)
                return

    if self._summarize_chat_ctx:
        try:
            assert isinstance(self.session.llm, llm.LLM), (
                "llm must be a LLM instance to summarize the chat_ctx"
            )

            # when a task is done, the chat_ctx is going to be merged with the "caller" chat_ctx
            # enabling summarization will result on only one ChatMessage added.
            # keep every item to allow summarization to be more action-aware.
            summarized_chat_ctx = await self.chat_ctx.copy(
                exclude_instructions=False,
                exclude_handoff=False,
                exclude_config_update=False,
                exclude_empty_message=False,
                exclude_function_call=False,
            )._summarize(llm_v=self.session.llm, keep_last_turns=0)

            await self.update_chat_ctx(summarized_chat_ctx)
        except Exception as e:
            self.complete(e)
            return

    self.complete(TaskGroupResult(task_results=task_results))

Called when the task is entered

class TaskGroupResult (task_results: dict[str, typing.Any])
Expand source code
@dataclass
class TaskGroupResult:
    task_results: dict[str, Any]

TaskGroupResult(task_results: dict[str, typing.Any])

Instance variables

var task_results : dict[str, typing.Any]
class WarmTransferResult (human_agent_identity: str)
Expand source code
@dataclass
class WarmTransferResult:
    human_agent_identity: str

WarmTransferResult(human_agent_identity: 'str')

Instance variables

var human_agent_identity : str
class WarmTransferTask (sip_call_to: NotGivenOr[str] = NOT_GIVEN,
*,
sip_trunk_id: NotGivenOr[str | None] = NOT_GIVEN,
sip_connection: NotGivenOr[api.SIPOutboundConfig] = NOT_GIVEN,
sip_number: NotGivenOr[str] = NOT_GIVEN,
sip_headers: NotGivenOr[dict[str, str]] = NOT_GIVEN,
hold_audio: NotGivenOr[AudioSource | AudioConfig | list[AudioConfig] | None] = NOT_GIVEN,
instructions: NotGivenOr[InstructionParts | Instructions | str] = NOT_GIVEN,
chat_ctx: NotGivenOr[llm.ChatContext] = NOT_GIVEN,
turn_detection: NotGivenOr[TurnDetectionMode | None] = NOT_GIVEN,
tools: NotGivenOr[list[llm.Tool | llm.Toolset]] = NOT_GIVEN,
stt: NotGivenOr[stt.STT | None] = NOT_GIVEN,
vad: NotGivenOr[vad.VAD | None] = NOT_GIVEN,
llm: NotGivenOr[llm.LLM | llm.RealtimeModel | None] = NOT_GIVEN,
tts: NotGivenOr[tts.TTS | None] = NOT_GIVEN,
allow_interruptions: NotGivenOr[bool] = NOT_GIVEN,
extra_instructions: str = '',
target_phone_number: NotGivenOr[str] = NOT_GIVEN)
Expand source code
class WarmTransferTask(AgentTask[WarmTransferResult]):
    def __init__(
        self,
        sip_call_to: NotGivenOr[str] = NOT_GIVEN,
        *,
        sip_trunk_id: NotGivenOr[str | None] = NOT_GIVEN,
        sip_connection: NotGivenOr[api.SIPOutboundConfig] = NOT_GIVEN,
        sip_number: NotGivenOr[str] = NOT_GIVEN,
        sip_headers: NotGivenOr[dict[str, str]] = NOT_GIVEN,
        hold_audio: NotGivenOr[AudioSource | AudioConfig | list[AudioConfig] | None] = NOT_GIVEN,
        instructions: NotGivenOr[InstructionParts | Instructions | str] = NOT_GIVEN,
        chat_ctx: NotGivenOr[llm.ChatContext] = NOT_GIVEN,
        turn_detection: NotGivenOr[TurnDetectionMode | None] = NOT_GIVEN,
        tools: NotGivenOr[list[llm.Tool | llm.Toolset]] = NOT_GIVEN,
        stt: NotGivenOr[stt.STT | None] = NOT_GIVEN,
        vad: NotGivenOr[vad.VAD | None] = NOT_GIVEN,
        llm: NotGivenOr[llm.LLM | llm.RealtimeModel | None] = NOT_GIVEN,
        tts: NotGivenOr[tts.TTS | None] = NOT_GIVEN,
        allow_interruptions: NotGivenOr[bool] = NOT_GIVEN,
        # deprecated
        extra_instructions: str = "",
        target_phone_number: NotGivenOr[str] = NOT_GIVEN,
    ) -> None:
        """Initialize a WarmTransferTask to dial a human agent via SIP.

        Args:
            sip_call_to: The phone number or SIP URI to dial for the human agent
                (e.g. ``"+15105550123"`` or ``"sip:user@example.com"``).
            sip_trunk_id: ID of a pre-configured LiveKit SIP outbound trunk used to
                originate the call. Falls back to the ``LIVEKIT_SIP_OUTBOUND_TRUNK``
                environment variable when not provided.
            sip_connection: Low-level SIP connection config (``api.SIPOutboundConfig``)
                for originating calls from a **custom SIP domain** instead of through a
                saved trunk. Use this when you need to specify a custom hostname,
                transport, or authentication credentials directly, bypassing the
                trunk-based configuration.
            hold_audio: Audio played to the caller while they are on hold during the
                    transfer.
            extra_instructions: Extra instructions to append to the base instructions
                that are used to summarize the conversation history.
        """

        if not is_given(instructions):
            instructions = InstructionParts(persona=PERSONA, extra=extra_instructions)
        elif extra_instructions:
            logger.warning("`extra_instructions` will be ignored when `instructions` is provided")

        if isinstance(instructions, InstructionParts):
            conversation_history = self._format_conversation_history(chat_ctx)
            instructions = Instructions(INSTRUCTIONS_TEMPLATE).format(
                persona=instructions.persona if is_given(instructions.persona) else PERSONA,
                extra=instructions.extra,
                _conversation_history=conversation_history,
            )

        assert is_given(instructions)  # for type checking
        super().__init__(
            instructions=instructions,
            chat_ctx=NOT_GIVEN,  # don't pass the chat_ctx
            turn_detection=turn_detection,
            tools=tools or [],
            stt=stt,
            vad=vad,
            llm=llm,
            tts=tts,
            allow_interruptions=allow_interruptions,
        )

        self._caller_room: rtc.Room | None = None
        self._human_agent_sess: AgentSession | None = None
        self._human_agent_failed_fut: asyncio.Future[None] = asyncio.Future()
        self._human_agent_identity = "human-agent-sip"

        if target_phone_number:
            logger.warning("`target_phone_number` is deprecated, use `sip_call_to` instead")
            if not sip_call_to:
                sip_call_to = target_phone_number

        if not sip_call_to:
            raise ValueError("`sip_call_to` must be set")

        self._sip_call_to = sip_call_to
        self._sip_connection = sip_connection if is_given(sip_connection) else None
        self._sip_trunk_id = (
            sip_trunk_id
            if is_given(sip_trunk_id)
            else os.getenv("LIVEKIT_SIP_OUTBOUND_TRUNK", None)
        )
        if self._sip_trunk_id is None and self._sip_connection is None:
            raise ValueError(
                "`LIVEKIT_SIP_OUTBOUND_TRUNK` environment variable, `sip_trunk_id`,"
                " or `sip_connection` must be set"
            )

        self._sip_number = (
            sip_number if is_given(sip_number) else os.getenv("LIVEKIT_SIP_NUMBER", "")
        )
        self._sip_headers = sip_headers if is_given(sip_headers) else {}

        # background audio and io
        self._background_audio = BackgroundAudioPlayer()
        self._hold_audio_handle: PlayHandle | None = None
        self._hold_audio = (
            hold_audio
            if is_given(hold_audio)
            else AudioConfig(BuiltinAudioClip.HOLD_MUSIC, volume=0.8)
        )

        self._original_io_state: dict[str, bool] = {}

    @staticmethod
    def _format_conversation_history(chat_ctx: NotGivenOr[llm.ChatContext]) -> str:
        if not is_given(chat_ctx) or not chat_ctx:
            return ""
        prev_convo = ""
        for msg in chat_ctx.messages():
            if msg.role not in ("user", "assistant"):
                continue
            if not msg.text_content:
                continue
            role = "Caller" if msg.role == "user" else "Assistant"
            prev_convo += f"{role}: {msg.text_content}\n"
        return prev_convo

    async def on_enter(self) -> None:
        job_ctx = get_job_context()
        self._caller_room = job_ctx.room

        # start the background audio
        if self._hold_audio is not None:
            await self._background_audio.start(room=self._caller_room)
            self._hold_audio_handle = self._background_audio.play(self._hold_audio, loop=True)

        self._set_io_enabled(False)

        try:
            dial_human_agent_task = asyncio.create_task(self._dial_human_agent())
            done, _ = await asyncio.wait(
                (dial_human_agent_task, self._human_agent_failed_fut),
                return_when=asyncio.FIRST_COMPLETED,
            )
            if dial_human_agent_task not in done:
                raise RuntimeError()

            self._human_agent_sess = dial_human_agent_task.result()
            # let the human speak first

        except Exception:
            logger.exception("could not dial human agent")
            self._set_result(ToolError("could not dial human agent"))
            return

        finally:
            await utils.aio.cancel_and_wait(dial_human_agent_task)

    @function_tool(flags=ToolFlag.IGNORE_ON_ENTER)
    async def connect_to_caller(self) -> None:
        """Called when the human agent wants to connect to the caller."""
        logger.debug("connecting to caller")
        assert self._caller_room is not None

        await self._merge_calls()
        self._set_result(WarmTransferResult(human_agent_identity=self._human_agent_identity))

        # when the caller or human agent leaves the room, we'll delete the room
        self._caller_room.on("participant_disconnected", self._on_caller_participant_disconnected)

    @function_tool(flags=ToolFlag.IGNORE_ON_ENTER)
    async def decline_transfer(self, reason: str) -> None:
        """Handles the case when the human agent explicitly declines to connect to the caller.

        Args:
            reason: A short explanation of why the human agent declined to connect to the caller
        """
        self._set_result(ToolError(f"human agent declined to connect: {reason}"))

    @function_tool(flags=ToolFlag.IGNORE_ON_ENTER)
    async def voicemail_detected(self) -> None:
        """Called when the call reaches voicemail. Use this tool AFTER you hear the voicemail greeting"""
        self._set_result(ToolError("voicemail detected"))

    def _on_human_agent_room_close(self, reason: rtc.DisconnectReason.ValueType) -> None:
        logger.debug(
            "human agent's room closed",
            extra={"reason": rtc.DisconnectReason.Name(reason)},
        )
        with contextlib.suppress(asyncio.InvalidStateError):
            self._human_agent_failed_fut.set_result(None)

        self._set_result(ToolError(f"room closed: {rtc.DisconnectReason.Name(reason)}"))

    def _on_caller_participant_disconnected(self, participant: rtc.RemoteParticipant) -> None:
        if participant.kind not in DEFAULT_PARTICIPANT_KINDS:
            return

        logger.info(f"participant disconnected from caller room: {participant.identity}, closing")

        assert self._caller_room is not None
        self._caller_room.off("participant_disconnected", self._on_caller_participant_disconnected)
        job_ctx = get_job_context()
        job_ctx.delete_room(room_name=self._caller_room.name)

    def _set_result(self, result: WarmTransferResult | Exception) -> None:
        if self.done():
            return

        if self._human_agent_sess:
            self._human_agent_sess.shutdown()
            self._human_agent_sess = None

        if self._hold_audio_handle:
            self._hold_audio_handle.stop()
            self._hold_audio_handle = None

        self._set_io_enabled(True)
        self.complete(result)

    async def _dial_human_agent(self) -> AgentSession:
        assert self._caller_room is not None

        job_ctx = get_job_context()
        ws_url = job_ctx._info.url

        # create a new room for the human agent
        human_agent_room_name = self._caller_room.name + "-human-agent"
        room = rtc.Room()
        token = (
            api.AccessToken()
            .with_identity(self._caller_room.local_participant.identity)
            .with_grants(
                api.VideoGrants(
                    room_join=True,
                    room=human_agent_room_name,
                    can_update_own_metadata=True,
                    can_publish=True,
                    can_subscribe=True,
                )
            )
            .with_kind("agent")
        ).to_jwt()

        logger.debug(
            "connecting to human agent room",
            extra={"ws_url": ws_url, "human_agent_room_name": human_agent_room_name},
        )
        await room.connect(ws_url, token)

        # if human agent hung up for whatever reason, we'd resume the caller conversation
        room.on("disconnected", self._on_human_agent_room_close)

        human_agent_sess: AgentSession = AgentSession(
            vad=self.session.vad or NOT_GIVEN,
            llm=self.session.llm or NOT_GIVEN,
            stt=self.session.stt or NOT_GIVEN,
            tts=self.session.tts or NOT_GIVEN,
            turn_detection=self.session.turn_detection or NOT_GIVEN,
        )
        # create a copy of this AgentTask
        human_agent_agent = Agent(
            instructions=self.instructions,
            turn_detection=self.turn_detection,
            stt=self.stt,
            vad=self.vad,
            llm=self.llm,
            tts=self.tts,
            tools=self.tools,
            chat_ctx=self.chat_ctx,
            allow_interruptions=self.allow_interruptions,
        )
        await human_agent_sess.start(
            agent=human_agent_agent,
            room=room,
            room_options=room_io.RoomOptions(
                close_on_disconnect=True,
                delete_room_on_close=True,
                participant_identity=self._human_agent_identity,
            ),
            record=False,  # TODO: support recording on multiple sessions?
        )

        # dial the human agent
        sip_request = api.CreateSIPParticipantRequest(
            sip_trunk_id=self._sip_trunk_id,
            sip_call_to=self._sip_call_to,
            room_name=human_agent_room_name,
            participant_identity=self._human_agent_identity,
            wait_until_answered=True,
            sip_number=self._sip_number or None,
            headers=self._sip_headers,
        )
        if self._sip_connection is not None:
            sip_request.trunk.CopyFrom(self._sip_connection)
        await job_ctx.api.sip.create_sip_participant(sip_request)

        return human_agent_sess

    async def _merge_calls(self) -> None:
        assert self._caller_room is not None
        assert self._human_agent_sess is not None

        job_ctx = get_job_context()
        human_agent_room = self._human_agent_sess.room_io.room
        # we no longer care about the human agent session. it's supposed to be over
        human_agent_room.off("disconnected", self._on_human_agent_room_close)

        logger.debug(f"moving {self._human_agent_identity} to caller room {self._caller_room.name}")
        await job_ctx.api.room.move_participant(
            api.MoveParticipantRequest(
                room=human_agent_room.name,
                identity=self._human_agent_identity,
                destination_room=self._caller_room.name,
            )
        )

    def _set_io_enabled(self, enabled: bool) -> None:
        input = self.session.input
        output = self.session.output

        if not self._original_io_state:
            self._original_io_state = {
                "audio_input": input.audio_enabled,
                "video_input": input.video_enabled,
                "audio_output": output.audio_enabled,
                "transcription_output": output.transcription_enabled,
                "video_output": output.video_enabled,
            }

        if input.audio:
            input.set_audio_enabled(enabled and self._original_io_state["audio_input"])
        if input.video:
            input.set_video_enabled(enabled and self._original_io_state["video_input"])
        if output.audio:
            output.set_audio_enabled(enabled and self._original_io_state["audio_output"])
        if output.transcription:
            output.set_transcription_enabled(
                enabled and self._original_io_state["transcription_output"]
            )
        if output.video:
            output.set_video_enabled(enabled and self._original_io_state["video_output"])

Abstract base class for generic types.

On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::

class Mapping[KT, VT]:
    def __getitem__(self, key: KT) -> VT:
        ...
    # Etc.

On older versions of Python, however, generic classes have to explicitly inherit from Generic.

After a class has been declared to be generic, it can then be used as follows::

def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT:
    try:
        return mapping[key]
    except KeyError:
        return default

Initialize a WarmTransferTask to dial a human agent via SIP.

Args

sip_call_to
The phone number or SIP URI to dial for the human agent (e.g. "+15105550123" or "sip:user@example.com").
sip_trunk_id
ID of a pre-configured LiveKit SIP outbound trunk used to originate the call. Falls back to the LIVEKIT_SIP_OUTBOUND_TRUNK environment variable when not provided.
sip_connection
Low-level SIP connection config (api.SIPOutboundConfig) for originating calls from a custom SIP domain instead of through a saved trunk. Use this when you need to specify a custom hostname, transport, or authentication credentials directly, bypassing the trunk-based configuration.
hold_audio
Audio played to the caller while they are on hold during the transfer.
extra_instructions
Extra instructions to append to the base instructions that are used to summarize the conversation history.

Ancestors

  • livekit.agents.voice.agent.AgentTask
  • livekit.agents.voice.agent.Agent
  • typing.Generic

Methods

async def connect_to_caller(self) ‑> None
Expand source code
@function_tool(flags=ToolFlag.IGNORE_ON_ENTER)
async def connect_to_caller(self) -> None:
    """Called when the human agent wants to connect to the caller."""
    logger.debug("connecting to caller")
    assert self._caller_room is not None

    await self._merge_calls()
    self._set_result(WarmTransferResult(human_agent_identity=self._human_agent_identity))

    # when the caller or human agent leaves the room, we'll delete the room
    self._caller_room.on("participant_disconnected", self._on_caller_participant_disconnected)

Called when the human agent wants to connect to the caller.

async def decline_transfer(self, reason: str) ‑> None
Expand source code
@function_tool(flags=ToolFlag.IGNORE_ON_ENTER)
async def decline_transfer(self, reason: str) -> None:
    """Handles the case when the human agent explicitly declines to connect to the caller.

    Args:
        reason: A short explanation of why the human agent declined to connect to the caller
    """
    self._set_result(ToolError(f"human agent declined to connect: {reason}"))

Handles the case when the human agent explicitly declines to connect to the caller.

Args

reason
A short explanation of why the human agent declined to connect to the caller
async def on_enter(self) ‑> None
Expand source code
async def on_enter(self) -> None:
    job_ctx = get_job_context()
    self._caller_room = job_ctx.room

    # start the background audio
    if self._hold_audio is not None:
        await self._background_audio.start(room=self._caller_room)
        self._hold_audio_handle = self._background_audio.play(self._hold_audio, loop=True)

    self._set_io_enabled(False)

    try:
        dial_human_agent_task = asyncio.create_task(self._dial_human_agent())
        done, _ = await asyncio.wait(
            (dial_human_agent_task, self._human_agent_failed_fut),
            return_when=asyncio.FIRST_COMPLETED,
        )
        if dial_human_agent_task not in done:
            raise RuntimeError()

        self._human_agent_sess = dial_human_agent_task.result()
        # let the human speak first

    except Exception:
        logger.exception("could not dial human agent")
        self._set_result(ToolError("could not dial human agent"))
        return

    finally:
        await utils.aio.cancel_and_wait(dial_human_agent_task)

Called when the task is entered

async def voicemail_detected(self) ‑> None
Expand source code
@function_tool(flags=ToolFlag.IGNORE_ON_ENTER)
async def voicemail_detected(self) -> None:
    """Called when the call reaches voicemail. Use this tool AFTER you hear the voicemail greeting"""
    self._set_result(ToolError("voicemail detected"))

Called when the call reaches voicemail. Use this tool AFTER you hear the voicemail greeting