Module livekit.agents.beta.workflows

Sub-modules

livekit.agents.beta.workflows.address
livekit.agents.beta.workflows.dtmf_inputs
livekit.agents.beta.workflows.email_address
livekit.agents.beta.workflows.task_group
livekit.agents.beta.workflows.utils
livekit.agents.beta.workflows.warm_transfer

Classes

class GetAddressResult (address: str)
Expand source code
@dataclass
class GetAddressResult:
    address: str

GetAddressResult(address: 'str')

Instance variables

var address : str
class GetAddressTask (extra_instructions: str = '',
chat_ctx: NotGivenOr[llm.ChatContext] = NOT_GIVEN,
turn_detection: NotGivenOr[TurnDetectionMode | None] = NOT_GIVEN,
tools: NotGivenOr[list[llm.FunctionTool | llm.RawFunctionTool]] = NOT_GIVEN,
stt: NotGivenOr[stt.STT | None] = NOT_GIVEN,
vad: NotGivenOr[vad.VAD | None] = NOT_GIVEN,
llm: NotGivenOr[llm.LLM | llm.RealtimeModel | None] = NOT_GIVEN,
tts: NotGivenOr[tts.TTS | None] = NOT_GIVEN,
allow_interruptions: NotGivenOr[bool] = NOT_GIVEN)
Expand source code
class GetAddressTask(AgentTask[GetAddressResult]):
    def __init__(
        self,
        extra_instructions: str = "",
        chat_ctx: NotGivenOr[llm.ChatContext] = NOT_GIVEN,
        turn_detection: NotGivenOr[TurnDetectionMode | None] = NOT_GIVEN,
        tools: NotGivenOr[list[llm.FunctionTool | llm.RawFunctionTool]] = NOT_GIVEN,
        stt: NotGivenOr[stt.STT | None] = NOT_GIVEN,
        vad: NotGivenOr[vad.VAD | None] = NOT_GIVEN,
        llm: NotGivenOr[llm.LLM | llm.RealtimeModel | None] = NOT_GIVEN,
        tts: NotGivenOr[tts.TTS | None] = NOT_GIVEN,
        allow_interruptions: NotGivenOr[bool] = NOT_GIVEN,
    ) -> None:
        super().__init__(
            instructions=(
                "You are only a single step in a broader system, responsible solely for capturing an address.\n"
                "You will be handling addresses from any country. Expect that users will say address in different formats with fields filled like:\n"
                "- 'street_address': '450 SOUTH MAIN ST', 'unit_number': 'FLOOR 2', 'locality': 'SALT LAKE CITY UT 84101', 'country': 'UNITED STATES',\n"
                "- 'street_address': '123 MAPLE STREET', 'unit_number': 'APARTMENT 10', 'locality': 'OTTAWA ON K1A 0B1', 'country': 'CANADA',\n"
                "- 'street_address': 'GUOMAO JIE 3 HAO, CHAOYANG QU', 'unit_number': 'GUOMAO DA SHA 18 LOU 101 SHI', 'locality': 'BEIJING SHI 100000', 'country': 'CHINA',\n"
                "- 'street_address': '5 RUE DE L'ANCIENNE COMÉDIE', 'unit_number': 'APP C4', 'locality': '75006 PARIS', 'country': 'FRANCE',\n"
                "- 'street_address': 'PLOT 10, NEHRU ROAD', 'unit_number': 'OFFICE 403, 4TH FLOOR', 'locality': 'VILE PARLE (E), MUMBAI MAHARASHTRA 400099', 'country': 'INDIA',\n"
                "Normalize common spoken patterns silently:\n"
                "- Convert words like 'dash' and 'apostrophe' into symbols: `-`, `'`.\n"
                "- Convert spelled out numbers like 'six' and 'seven' into numerals: `6`, `7`.\n"
                "- Recognize patterns where users speak their address field followed by spelling: e.g., 'guomao g u o m a o'.\n"
                "- Filter out filler words or hesitations.\n"
                "- Recognize when there may be accents on certain letters if explicitly said or common in the location specified. Be sure to verify the correct accents if existent.\n"
                "Don't mention corrections. Treat inputs as possibly imperfect but fix them silently.\n"
                "Call `update_address` at the first opportunity whenever you form a new hypothesis about the address. "
                "(before asking any questions or providing any answers.) \n"
                "Don't invent new addresses, stick strictly to what the user said. \n"
                "Call `confirm_address` after the user confirmed the address is correct. \n"
                "When reading a numerical ordinal suffix (st, nd, rd, th), the number must be verbally expanded into its full, correctly pronounced word form.\n"
                "Do not read the number and the suffix letters separately.\n"
                "Confirm postal codes by reading them out digit-by-digit as a sequence of single numbers. Do not read them as cardinal numbers.\n"
                "For example, read 90210 as 'nine zero two one zero.'\n"
                "Avoid using bullet points and parenthese in any responses.\n"
                "Spell out the address letter-by-letter when applicable, such as street names and provinces, especially when the user spells it out initially. \n"
                "If the address is unclear or invalid, or it takes too much back-and-forth, prompt for it in parts in this order: street address, unit number if applicable, locality, and country. \n"
                "Ignore unrelated input and avoid going off-topic. Do not generate markdown, greetings, or unnecessary commentary. \n"
                "Always explicitly invoke a tool when applicable. Do not simulate tool usage, no real action is taken unless the tool is explicitly called."
                + extra_instructions
            ),
            chat_ctx=chat_ctx,
            turn_detection=turn_detection,
            tools=tools or [],
            stt=stt,
            vad=vad,
            llm=llm,
            tts=tts,
            allow_interruptions=allow_interruptions,
        )

        self._current_address = ""

        self._address_update_speech_handle: SpeechHandle | None = None

    async def on_enter(self) -> None:
        self.session.generate_reply(instructions="Ask the user to provide their address.")

    @function_tool()
    async def update_address(
        self, street_address: str, unit_number: str, locality: str, country: str, ctx: RunContext
    ) -> str:
        """Update the address provided by the user.

        Args:
            street_address (str): Dependent on country, may include fields like house number, street name, block, or district
            unit_number (str): The unit number, for example Floor 1 or Apartment 12. If there is no unit number, return ''
            locality (str): Dependent on country, may include fields like city, zip code, or province
            country (str): The country the user lives in spelled out fully
        """
        self._address_update_speech_handle = ctx.speech_handle
        address_fields = (
            [street_address, unit_number, locality, country]
            if unit_number.strip()
            else [street_address, locality, country]
        )
        address = " ".join(address_fields)
        self._current_address = address

        return (
            f"The address has been updated to {address}\n"
            f"Repeat the address field by field: {address_fields} if needed\n"
            f"Prompt the user for confirmation, do not call `confirm_address` directly"
        )

    @function_tool(flags=ToolFlag.IGNORE_ON_ENTER)
    async def confirm_address(self, ctx: RunContext) -> None:
        """Call this tool when the user confirms that the address is correct."""
        await ctx.wait_for_playout()

        if ctx.speech_handle == self._address_update_speech_handle:
            raise ToolError("error: the user must confirm the address explicitly")

        if not self._current_address:
            raise ToolError(
                "error: no address was provided, `update_address` must be called before"
            )

        if not self.done():
            self.complete(GetAddressResult(address=self._current_address))

    @function_tool(flags=ToolFlag.IGNORE_ON_ENTER)
    async def decline_address_capture(self, reason: str) -> None:
        """Handles the case when the user explicitly declines to provide an address.

        Args:
            reason: A short explanation of why the user declined to provide the address
        """
        if not self.done():
            self.complete(ToolError(f"couldn't get the address: {reason}"))

Abstract base class for generic types.

On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::

class Mapping[KT, VT]:
    def __getitem__(self, key: KT) -> VT:
        ...
    # Etc.

On older versions of Python, however, generic classes have to explicitly inherit from Generic.

After a class has been declared to be generic, it can then be used as follows::

def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT:
    try:
        return mapping[key]
    except KeyError:
        return default

Ancestors

  • livekit.agents.voice.agent.AgentTask
  • livekit.agents.voice.agent.Agent
  • typing.Generic

Methods

async def confirm_address(self, ctx: RunContext) ‑> None
Expand source code
@function_tool(flags=ToolFlag.IGNORE_ON_ENTER)
async def confirm_address(self, ctx: RunContext) -> None:
    """Call this tool when the user confirms that the address is correct."""
    await ctx.wait_for_playout()

    if ctx.speech_handle == self._address_update_speech_handle:
        raise ToolError("error: the user must confirm the address explicitly")

    if not self._current_address:
        raise ToolError(
            "error: no address was provided, `update_address` must be called before"
        )

    if not self.done():
        self.complete(GetAddressResult(address=self._current_address))

Call this tool when the user confirms that the address is correct.

async def decline_address_capture(self, reason: str) ‑> None
Expand source code
@function_tool(flags=ToolFlag.IGNORE_ON_ENTER)
async def decline_address_capture(self, reason: str) -> None:
    """Handles the case when the user explicitly declines to provide an address.

    Args:
        reason: A short explanation of why the user declined to provide the address
    """
    if not self.done():
        self.complete(ToolError(f"couldn't get the address: {reason}"))

Handles the case when the user explicitly declines to provide an address.

Args

reason
A short explanation of why the user declined to provide the address
async def on_enter(self) ‑> None
Expand source code
async def on_enter(self) -> None:
    self.session.generate_reply(instructions="Ask the user to provide their address.")

Called when the task is entered

async def update_address(self,
street_address: str,
unit_number: str,
locality: str,
country: str,
ctx: RunContext) ‑> str
Expand source code
@function_tool()
async def update_address(
    self, street_address: str, unit_number: str, locality: str, country: str, ctx: RunContext
) -> str:
    """Update the address provided by the user.

    Args:
        street_address (str): Dependent on country, may include fields like house number, street name, block, or district
        unit_number (str): The unit number, for example Floor 1 or Apartment 12. If there is no unit number, return ''
        locality (str): Dependent on country, may include fields like city, zip code, or province
        country (str): The country the user lives in spelled out fully
    """
    self._address_update_speech_handle = ctx.speech_handle
    address_fields = (
        [street_address, unit_number, locality, country]
        if unit_number.strip()
        else [street_address, locality, country]
    )
    address = " ".join(address_fields)
    self._current_address = address

    return (
        f"The address has been updated to {address}\n"
        f"Repeat the address field by field: {address_fields} if needed\n"
        f"Prompt the user for confirmation, do not call `confirm_address` directly"
    )

Update the address provided by the user.

Args

street_address : str
Dependent on country, may include fields like house number, street name, block, or district
unit_number : str
The unit number, for example Floor 1 or Apartment 12. If there is no unit number, return ''
locality : str
Dependent on country, may include fields like city, zip code, or province
country : str
The country the user lives in spelled out fully
class GetDtmfResult (user_input: str)
Expand source code
@dataclass
class GetDtmfResult:
    user_input: str

    @classmethod
    def from_dtmf_inputs(cls, dtmf_inputs: list[DtmfEvent]) -> GetDtmfResult:
        return cls(user_input=format_dtmf(dtmf_inputs))

GetDtmfResult(user_input: 'str')

Static methods

def from_dtmf_inputs(dtmf_inputs: list[DtmfEvent]) ‑> GetDtmfResult

Instance variables

var user_input : str
class GetDtmfTask (*,
num_digits: int,
ask_for_confirmation: bool = False,
dtmf_input_timeout: float = 4.0,
dtmf_stop_event: DtmfEvent = DtmfEvent.POUND,
chat_ctx: NotGivenOr[ChatContext] = NOT_GIVEN,
extra_instructions: NotGivenOr[str] = NOT_GIVEN)
Expand source code
class GetDtmfTask(AgentTask[GetDtmfResult]):
    """A task to collect DTMF inputs from the user.

    Return a string of DTMF inputs if collected successfully, otherwise None.
    """

    def __init__(
        self,
        *,
        num_digits: int,
        ask_for_confirmation: bool = False,
        dtmf_input_timeout: float = 4.0,
        dtmf_stop_event: DtmfEvent = DtmfEvent.POUND,
        chat_ctx: NotGivenOr[ChatContext] = NOT_GIVEN,
        extra_instructions: NotGivenOr[str] = NOT_GIVEN,
    ) -> None:
        """
        Args:
            num_digits: The number of digits to collect.
            ask_for_confirmation: Whether to ask for confirmation when agent has collected full digits.
            repeat_instructions: The number of times to repeat the initial instructions.
            dtmf_input_timeout: The per-digit timeout.
            dtmf_stop_event: The DTMF event to stop collecting inputs.
            chat_ctx: The chat context to use.
            extra_instructions: Extra instructions to add to the task.
        """
        if num_digits <= 0:
            raise ValueError("num_digits must be greater than 0")

        self._curr_dtmf_inputs: list[DtmfEvent] = []
        self._dtmf_reply_running: bool = False

        @function_tool
        async def confirm_inputs(inputs: list[DtmfEvent]) -> None:
            """Finalize the collected digit inputs after explicit user confirmation.

            Use this ONLY after the confirmation. You should confirm by verbally reading out the digits one by one and, once the
            user confirms they are correct, call this tool with the inputs.

            Do not use this tool to capture the initial digits."""
            self.complete(GetDtmfResult.from_dtmf_inputs(inputs))

        @function_tool
        async def record_inputs(inputs: list[DtmfEvent]) -> None:
            """Record the collected digit inputs without additional confirmation.

            Call this tool as soon as a valid sequence of digits has been provided by the user (via DTMF or spoken)."""
            self.complete(GetDtmfResult.from_dtmf_inputs(inputs))

        instructions = (
            "You are a single step in a broader system, responsible solely for gathering digits input from the user. "
            "You will either receive a sequence of digits through dtmf events tagged by <dtmf_inputs>, or "
            "user will directly say the digits to you. You should be able to handle both cases. "
        )

        if ask_for_confirmation:
            instructions += "Once user has confirmed the digits (by verbally spoken or entered manually), call `confirm_inputs` with the inputs."
        else:
            instructions += "If user provides the digits through voice and it is valid, call `record_inputs` with the inputs."

        if is_given(extra_instructions):
            instructions += f"\n{extra_instructions}"

        super().__init__(
            instructions=instructions,
            chat_ctx=chat_ctx,
            tools=[confirm_inputs] if ask_for_confirmation else [record_inputs],
        )

        def _on_sip_dtmf_received(ev: rtc.SipDTMF) -> None:
            if self._dtmf_reply_running:
                return

            # immediately kick off the DTMF reply generation if matches the stop event
            if ev.digit == dtmf_stop_event.value:
                self._generate_dtmf_reply()
                return

            self._curr_dtmf_inputs.append(DtmfEvent(ev.digit))
            logger.info(f"DTMF inputs: {format_dtmf(self._curr_dtmf_inputs)}")
            self._generate_dtmf_reply.schedule()

        @debounced(delay=dtmf_input_timeout)
        async def _generate_dtmf_reply() -> None:
            self._dtmf_reply_running = True

            try:
                self.session.interrupt()

                dmtf_str = format_dtmf(self._curr_dtmf_inputs)
                logger.debug(f"Generating DTMF reply, current inputs: {dmtf_str}")

                # if input not fully received (i.e. timeout), return None
                if len(self._curr_dtmf_inputs) != num_digits:
                    error_msg = (
                        f"Digits input not fully received. "
                        f"Expect {num_digits} digits, got {len(self._curr_dtmf_inputs)}"
                    )
                    self.complete(ToolError(error_msg))
                    return

                # if not asking for confirmation, return the DTMF inputs
                if not ask_for_confirmation:
                    self.complete(GetDtmfResult.from_dtmf_inputs(self._curr_dtmf_inputs))
                    return

                instructions = (
                    "User has entered the following valid digits on the telephone keypad:\n"
                    f"<dtmf_inputs>{dmtf_str}</dtmf_inputs>\n"
                    "Please confirm it with the user by saying the digits one by one with space in between "
                    "(.e.g. 'one two three four five six seven eight nine ten'). "
                    "Once you are sure, call `confirm_inputs` with the inputs."
                    ""
                )

                await self.session.generate_reply(user_input=instructions)
            finally:
                self._dtmf_reply_running = False
                self._curr_dtmf_inputs.clear()

        def _on_user_state_changed(ev: UserStateChangedEvent) -> None:
            if self.dtmf_reply_running():
                return

            if ev.new_state == "speaking":
                # clear any pending DTMF reply generation
                self._generate_dtmf_reply.cancel()
            elif len(self._curr_dtmf_inputs) != 0:
                # resume any previously cancelled DTMF reply generation after user is back to non-speaking
                self._generate_dtmf_reply.schedule()

        def _on_agent_state_changed(ev: AgentStateChangedEvent) -> None:
            if self.dtmf_reply_running():
                return

            if ev.new_state in ["speaking", "thinking"]:
                # clear any pending DTMF reply generation
                self._generate_dtmf_reply.cancel()
            elif len(self._curr_dtmf_inputs) != 0:
                # resume any previously cancelled DTMF reply generation after agent is back to non-speaking
                self._generate_dtmf_reply.schedule()

        self._generate_dtmf_reply: Debounced[None] = _generate_dtmf_reply
        self._on_sip_dtmf_received: Callable[[rtc.SipDTMF], None] = _on_sip_dtmf_received
        self._on_user_state_changed: Callable[[UserStateChangedEvent], None] = (
            _on_user_state_changed
        )
        self._on_agent_state_changed: Callable[[AgentStateChangedEvent], None] = (
            _on_agent_state_changed
        )

    def dtmf_reply_running(self) -> bool:
        return self._dtmf_reply_running

    async def on_enter(self) -> None:
        ctx = get_job_context()

        ctx.room.on("sip_dtmf_received", self._on_sip_dtmf_received)
        self.session.on("agent_state_changed", self._on_user_state_changed)
        self.session.on("agent_state_changed", self._on_agent_state_changed)
        self.session.generate_reply(tool_choice="none")

    async def on_exit(self) -> None:
        ctx = get_job_context()

        ctx.room.off("sip_dtmf_received", self._on_sip_dtmf_received)
        self.session.off("agent_state_changed", self._on_user_state_changed)
        self.session.off("agent_state_changed", self._on_agent_state_changed)
        self._generate_dtmf_reply.cancel()

A task to collect DTMF inputs from the user.

Return a string of DTMF inputs if collected successfully, otherwise None.

Args

num_digits
The number of digits to collect.
ask_for_confirmation
Whether to ask for confirmation when agent has collected full digits.
repeat_instructions
The number of times to repeat the initial instructions.
dtmf_input_timeout
The per-digit timeout.
dtmf_stop_event
The DTMF event to stop collecting inputs.
chat_ctx
The chat context to use.
extra_instructions
Extra instructions to add to the task.

Ancestors

  • livekit.agents.voice.agent.AgentTask
  • livekit.agents.voice.agent.Agent
  • typing.Generic

Methods

def dtmf_reply_running(self) ‑> bool
Expand source code
def dtmf_reply_running(self) -> bool:
    return self._dtmf_reply_running
async def on_enter(self) ‑> None
Expand source code
async def on_enter(self) -> None:
    ctx = get_job_context()

    ctx.room.on("sip_dtmf_received", self._on_sip_dtmf_received)
    self.session.on("agent_state_changed", self._on_user_state_changed)
    self.session.on("agent_state_changed", self._on_agent_state_changed)
    self.session.generate_reply(tool_choice="none")

Called when the task is entered

async def on_exit(self) ‑> None
Expand source code
async def on_exit(self) -> None:
    ctx = get_job_context()

    ctx.room.off("sip_dtmf_received", self._on_sip_dtmf_received)
    self.session.off("agent_state_changed", self._on_user_state_changed)
    self.session.off("agent_state_changed", self._on_agent_state_changed)
    self._generate_dtmf_reply.cancel()

Called when the task is exited

class GetEmailResult (email_address: str)
Expand source code
@dataclass
class GetEmailResult:
    email_address: str

GetEmailResult(email_address: 'str')

Instance variables

var email_address : str
class GetEmailTask (extra_instructions: str = '',
chat_ctx: NotGivenOr[llm.ChatContext] = NOT_GIVEN,
turn_detection: NotGivenOr[TurnDetectionMode | None] = NOT_GIVEN,
tools: NotGivenOr[list[llm.FunctionTool | llm.RawFunctionTool]] = NOT_GIVEN,
stt: NotGivenOr[stt.STT | None] = NOT_GIVEN,
vad: NotGivenOr[vad.VAD | None] = NOT_GIVEN,
llm: NotGivenOr[llm.LLM | llm.RealtimeModel | None] = NOT_GIVEN,
tts: NotGivenOr[tts.TTS | None] = NOT_GIVEN,
allow_interruptions: NotGivenOr[bool] = NOT_GIVEN)
Expand source code
class GetEmailTask(AgentTask[GetEmailResult]):
    def __init__(
        self,
        extra_instructions: str = "",
        chat_ctx: NotGivenOr[llm.ChatContext] = NOT_GIVEN,
        turn_detection: NotGivenOr[TurnDetectionMode | None] = NOT_GIVEN,
        tools: NotGivenOr[list[llm.FunctionTool | llm.RawFunctionTool]] = NOT_GIVEN,
        stt: NotGivenOr[stt.STT | None] = NOT_GIVEN,
        vad: NotGivenOr[vad.VAD | None] = NOT_GIVEN,
        llm: NotGivenOr[llm.LLM | llm.RealtimeModel | None] = NOT_GIVEN,
        tts: NotGivenOr[tts.TTS | None] = NOT_GIVEN,
        allow_interruptions: NotGivenOr[bool] = NOT_GIVEN,
    ) -> None:
        super().__init__(
            instructions=(
                "You are only a single step in a broader system, responsible solely for capturing an email address.\n"
                "Handle input as noisy voice transcription. Expect that users will say emails aloud with formats like:\n"
                "- 'john dot doe at gmail dot com'\n"
                "- 'susan underscore smith at yahoo dot co dot uk'\n"
                "- 'dave dash b at protonmail dot com'\n"
                "- 'jane at example' (partial—prompt for the domain)\n"
                "- 'theo t h e o at livekit dot io' (name followed by spelling)\n"
                "Normalize common spoken patterns silently:\n"
                "- Convert words like 'dot', 'underscore', 'dash', 'plus' into symbols: `.`, `_`, `-`, `+`.\n"
                "- Convert 'at' to `@`.\n"
                "- Recognize patterns where users speak their name or a word, followed by spelling: e.g., 'john j o h n'.\n"
                "- Filter out filler words or hesitations.\n"
                "- Assume some spelling if contextually obvious (e.g. 'mike b two two' → mikeb22).\n"
                "Don't mention corrections. Treat inputs as possibly imperfect but fix them silently.\n"
                "Call `update_email_address` at the first opportunity whenever you form a new hypothesis about the email. "
                "(before asking any questions or providing any answers.) \n"
                "Don't invent new email addresses, stick strictly to what the user said. \n"
                "Call `confirm_email_address` after the user confirmed the email address is correct. \n"
                "If the email is unclear or invalid, or it takes too much back-and-forth, prompt for it in parts: first the part before the '@', then the domain—only if needed. \n"
                "Ignore unrelated input and avoid going off-topic. Do not generate markdown, greetings, or unnecessary commentary. \n"
                "Always explicitly invoke a tool when applicable. Do not simulate tool usage, no real action is taken unless the tool is explicitly called."
                + extra_instructions
            ),
            chat_ctx=chat_ctx,
            turn_detection=turn_detection,
            tools=tools or [],
            stt=stt,
            vad=vad,
            llm=llm,
            tts=tts,
            allow_interruptions=allow_interruptions,
        )

        self._current_email = ""

        # speech_handle/turn used to update the email address.
        # used to ignore the call to confirm_email_address in case the LLM is hallucinating and not asking for user confirmation
        self._email_update_speech_handle: SpeechHandle | None = None

    async def on_enter(self) -> None:
        self.session.generate_reply(instructions="Ask the user to provide an email address.")

    @function_tool
    async def update_email_address(self, email: str, ctx: RunContext) -> str:
        """Update the email address provided by the user.

        Args:
            email: The email address provided by the user
        """
        self._email_update_speech_handle = ctx.speech_handle
        email = email.strip()

        if not re.match(EMAIL_REGEX, email):
            raise ToolError(f"Invalid email address provided: {email}")

        self._current_email = email
        separated_email = " ".join(email)

        return (
            f"The email has been updated to {email}\n"
            f"Repeat the email character by character: {separated_email} if needed\n"
            f"Prompt the user for confirmation, do not call `confirm_email_address` directly"
        )

    @function_tool(flags=ToolFlag.IGNORE_ON_ENTER)
    async def confirm_email_address(self, ctx: RunContext) -> None:
        """Validates/confirms the email address provided by the user."""
        await ctx.wait_for_playout()

        if ctx.speech_handle == self._email_update_speech_handle:
            raise ToolError("error: the user must confirm the email address explicitly")

        if not self._current_email.strip():
            raise ToolError(
                "error: no email address were provided, `update_email_address` must be called before"
            )

        if not self.done():
            self.complete(GetEmailResult(email_address=self._current_email))

    @function_tool(flags=ToolFlag.IGNORE_ON_ENTER)
    async def decline_email_capture(self, reason: str) -> None:
        """Handles the case when the user explicitly declines to provide an email address.

        Args:
            reason: A short explanation of why the user declined to provide the email address
        """
        if not self.done():
            self.complete(ToolError(f"couldn't get the email address: {reason}"))

Abstract base class for generic types.

On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::

class Mapping[KT, VT]:
    def __getitem__(self, key: KT) -> VT:
        ...
    # Etc.

On older versions of Python, however, generic classes have to explicitly inherit from Generic.

After a class has been declared to be generic, it can then be used as follows::

def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT:
    try:
        return mapping[key]
    except KeyError:
        return default

Ancestors

  • livekit.agents.voice.agent.AgentTask
  • livekit.agents.voice.agent.Agent
  • typing.Generic

Methods

async def confirm_email_address(self, ctx: RunContext) ‑> None
Expand source code
@function_tool(flags=ToolFlag.IGNORE_ON_ENTER)
async def confirm_email_address(self, ctx: RunContext) -> None:
    """Validates/confirms the email address provided by the user."""
    await ctx.wait_for_playout()

    if ctx.speech_handle == self._email_update_speech_handle:
        raise ToolError("error: the user must confirm the email address explicitly")

    if not self._current_email.strip():
        raise ToolError(
            "error: no email address were provided, `update_email_address` must be called before"
        )

    if not self.done():
        self.complete(GetEmailResult(email_address=self._current_email))

Validates/confirms the email address provided by the user.

async def decline_email_capture(self, reason: str) ‑> None
Expand source code
@function_tool(flags=ToolFlag.IGNORE_ON_ENTER)
async def decline_email_capture(self, reason: str) -> None:
    """Handles the case when the user explicitly declines to provide an email address.

    Args:
        reason: A short explanation of why the user declined to provide the email address
    """
    if not self.done():
        self.complete(ToolError(f"couldn't get the email address: {reason}"))

Handles the case when the user explicitly declines to provide an email address.

Args

reason
A short explanation of why the user declined to provide the email address
async def on_enter(self) ‑> None
Expand source code
async def on_enter(self) -> None:
    self.session.generate_reply(instructions="Ask the user to provide an email address.")

Called when the task is entered

async def update_email_address(self, email: str, ctx: RunContext) ‑> str
Expand source code
@function_tool
async def update_email_address(self, email: str, ctx: RunContext) -> str:
    """Update the email address provided by the user.

    Args:
        email: The email address provided by the user
    """
    self._email_update_speech_handle = ctx.speech_handle
    email = email.strip()

    if not re.match(EMAIL_REGEX, email):
        raise ToolError(f"Invalid email address provided: {email}")

    self._current_email = email
    separated_email = " ".join(email)

    return (
        f"The email has been updated to {email}\n"
        f"Repeat the email character by character: {separated_email} if needed\n"
        f"Prompt the user for confirmation, do not call `confirm_email_address` directly"
    )

Update the email address provided by the user.

Args

email
The email address provided by the user
class TaskGroup (*,
summarize_chat_ctx: bool = True,
chat_ctx: livekit.agents.llm.chat_context.ChatContext | livekit.agents.types.NotGiven = NOT_GIVEN)
Expand source code
class TaskGroup(AgentTask[TaskGroupResult]):
    def __init__(
        self,
        *,
        summarize_chat_ctx: bool = True,
        chat_ctx: NotGivenOr[llm.ChatContext] = NOT_GIVEN,
    ):
        """Creates a TaskGroup instance."""
        super().__init__(instructions="*empty*", chat_ctx=chat_ctx, llm=None)

        self._summarize_chat_ctx = summarize_chat_ctx
        self._visited_tasks = set[str]()
        self._registered_factories: OrderedDict[str, _FactoryInfo] = OrderedDict()

    def add(self, task_factory: Callable[[], AgentTask], *, id: str, description: str) -> Self:
        self._registered_factories[id] = _FactoryInfo(
            task_factory=task_factory, id=id, description=description
        )
        return self

    async def on_enter(self) -> None:
        task_stack = list(self._registered_factories.keys())
        task_results: dict[str, Any] = {}

        while len(task_stack) > 0:
            task_id = task_stack.pop(0)
            factory_info = self._registered_factories[task_id]

            self._current_task = factory_info.task_factory()

            shared_chat_ctx = self.chat_ctx.copy()
            await self._current_task.update_chat_ctx(shared_chat_ctx)

            if out_of_scope_tool := self._build_out_of_scope_tool(active_task_id=task_id):
                current_tools = self._current_task.tools
                current_tools.append(out_of_scope_tool)
                await self._current_task.update_tools(current_tools)

            try:
                self._visited_tasks.add(task_id)
                res = await self._current_task
                task_results[task_id] = res
            except _OutOfScopeError as e:
                task_stack.insert(0, task_id)
                for task_id in reversed(e.target_task_ids):
                    task_stack.insert(0, task_id)
                continue
            except Exception as e:
                self.complete(e)
                break

        try:
            if self._summarize_chat_ctx:
                assert isinstance(self.session.llm, llm.LLM)

                # when a task is done, the chat_ctx is going to be merged with the "caller" chat_ctx
                # enabling summarization will result on only one ChatMessage added.
                summarized_chat_ctx = await self.chat_ctx.copy(
                    exclude_instructions=True
                )._summarize(llm_v=self.session.llm, keep_last_turns=0)
                await self.update_chat_ctx(summarized_chat_ctx)
        except Exception as e:
            self.complete(RuntimeError(f"failed to summarize the chat_ctx: {e}"))

        self.complete(TaskGroupResult(task_results=task_results))

    def _build_out_of_scope_tool(self, *, active_task_id: str) -> Optional[FunctionTool]:
        if not self._visited_tasks:
            return None

        # Only allow to regress to already visited tasks
        task_ids = self._visited_tasks.copy()
        task_ids.discard(active_task_id)
        task_repr = {
            f.id: f.description for f in self._registered_factories.values() if f.id in task_ids
        }

        description = (
            "Call to regress to other tasks according to what the user requested to modify, return the corresponding task ids. "
            'For example, if the user wants to change their email and there is a task with id "email_task" with a description of "Collect the user\'s email", return the id ("get_email_task").'
            "If the user requests to regress to multiple tasks, such as changing their phone number and email, return both task ids in the order they were requested."
            f"The following are the IDs and their corresponding task description. {json.dumps(task_repr)}"
        )

        @function_tool(description=description, flags=ToolFlag.IGNORE_ON_ENTER)
        async def out_of_scope(
            task_ids: Annotated[
                list[str],
                Field(
                    description="The IDs of the tasks requested",
                    json_schema_extra={"items": {"enum": list(task_ids)}},
                ),
            ],
        ) -> None:
            for task_id in task_ids:
                if task_id not in self._registered_factories or task_id not in self._visited_tasks:
                    raise ToolError(f"unable to regress, invalid task id {task_id}")

            if not self._current_task.done():
                self._current_task.complete(_OutOfScopeError(target_task_ids=task_ids))

        return out_of_scope

Abstract base class for generic types.

On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::

class Mapping[KT, VT]:
    def __getitem__(self, key: KT) -> VT:
        ...
    # Etc.

On older versions of Python, however, generic classes have to explicitly inherit from Generic.

After a class has been declared to be generic, it can then be used as follows::

def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT:
    try:
        return mapping[key]
    except KeyError:
        return default

Creates a TaskGroup instance.

Ancestors

  • livekit.agents.voice.agent.AgentTask
  • livekit.agents.voice.agent.Agent
  • typing.Generic

Methods

def add(self,
task_factory: Callable[[], livekit.agents.voice.agent.AgentTask],
*,
id: str,
description: str) ‑> Self
Expand source code
def add(self, task_factory: Callable[[], AgentTask], *, id: str, description: str) -> Self:
    self._registered_factories[id] = _FactoryInfo(
        task_factory=task_factory, id=id, description=description
    )
    return self
async def on_enter(self) ‑> None
Expand source code
async def on_enter(self) -> None:
    task_stack = list(self._registered_factories.keys())
    task_results: dict[str, Any] = {}

    while len(task_stack) > 0:
        task_id = task_stack.pop(0)
        factory_info = self._registered_factories[task_id]

        self._current_task = factory_info.task_factory()

        shared_chat_ctx = self.chat_ctx.copy()
        await self._current_task.update_chat_ctx(shared_chat_ctx)

        if out_of_scope_tool := self._build_out_of_scope_tool(active_task_id=task_id):
            current_tools = self._current_task.tools
            current_tools.append(out_of_scope_tool)
            await self._current_task.update_tools(current_tools)

        try:
            self._visited_tasks.add(task_id)
            res = await self._current_task
            task_results[task_id] = res
        except _OutOfScopeError as e:
            task_stack.insert(0, task_id)
            for task_id in reversed(e.target_task_ids):
                task_stack.insert(0, task_id)
            continue
        except Exception as e:
            self.complete(e)
            break

    try:
        if self._summarize_chat_ctx:
            assert isinstance(self.session.llm, llm.LLM)

            # when a task is done, the chat_ctx is going to be merged with the "caller" chat_ctx
            # enabling summarization will result on only one ChatMessage added.
            summarized_chat_ctx = await self.chat_ctx.copy(
                exclude_instructions=True
            )._summarize(llm_v=self.session.llm, keep_last_turns=0)
            await self.update_chat_ctx(summarized_chat_ctx)
    except Exception as e:
        self.complete(RuntimeError(f"failed to summarize the chat_ctx: {e}"))

    self.complete(TaskGroupResult(task_results=task_results))

Called when the task is entered

class TaskGroupResult (task_results: dict[str, typing.Any])
Expand source code
@dataclass
class TaskGroupResult:
    task_results: dict[str, Any]

TaskGroupResult(task_results: dict[str, typing.Any])

Instance variables

var task_results : dict[str, typing.Any]
class WarmTransferResult (human_agent_identity: str)
Expand source code
@dataclass
class WarmTransferResult:
    human_agent_identity: str

WarmTransferResult(human_agent_identity: 'str')

Instance variables

var human_agent_identity : str
class WarmTransferTask (target_phone_number: str,
*,
hold_audio: NotGivenOr[AudioSource | AudioConfig | list[AudioConfig] | None] = NOT_GIVEN,
sip_trunk_id: NotGivenOr[str] = NOT_GIVEN,
extra_instructions: str = '',
chat_ctx: NotGivenOr[llm.ChatContext] = NOT_GIVEN,
turn_detection: NotGivenOr[TurnDetectionMode | None] = NOT_GIVEN,
tools: NotGivenOr[list[llm.FunctionTool | llm.RawFunctionTool]] = NOT_GIVEN,
stt: NotGivenOr[stt.STT | None] = NOT_GIVEN,
vad: NotGivenOr[vad.VAD | None] = NOT_GIVEN,
llm: NotGivenOr[llm.LLM | llm.RealtimeModel | None] = NOT_GIVEN,
tts: NotGivenOr[tts.TTS | None] = NOT_GIVEN,
allow_interruptions: NotGivenOr[bool] = NOT_GIVEN)
Expand source code
class WarmTransferTask(AgentTask[WarmTransferResult]):
    def __init__(
        self,
        target_phone_number: str,
        *,
        hold_audio: NotGivenOr[AudioSource | AudioConfig | list[AudioConfig] | None] = NOT_GIVEN,
        sip_trunk_id: NotGivenOr[str] = NOT_GIVEN,
        extra_instructions: str = "",
        chat_ctx: NotGivenOr[llm.ChatContext] = NOT_GIVEN,
        turn_detection: NotGivenOr[TurnDetectionMode | None] = NOT_GIVEN,
        tools: NotGivenOr[list[llm.FunctionTool | llm.RawFunctionTool]] = NOT_GIVEN,
        stt: NotGivenOr[stt.STT | None] = NOT_GIVEN,
        vad: NotGivenOr[vad.VAD | None] = NOT_GIVEN,
        llm: NotGivenOr[llm.LLM | llm.RealtimeModel | None] = NOT_GIVEN,
        tts: NotGivenOr[tts.TTS | None] = NOT_GIVEN,
        allow_interruptions: NotGivenOr[bool] = NOT_GIVEN,
    ) -> None:
        super().__init__(
            instructions=self.get_instructions(
                chat_ctx=chat_ctx, extra_instructions=extra_instructions
            ),
            chat_ctx=NOT_GIVEN,  # don't pass the chat_ctx
            turn_detection=turn_detection,
            tools=tools or [],
            stt=stt,
            vad=vad,
            llm=llm,
            tts=tts,
            allow_interruptions=allow_interruptions,
        )

        self._caller_room: rtc.Room | None = None
        self._human_agent_sess: AgentSession | None = None
        self._human_agent_failed_fut: asyncio.Future[None] = asyncio.Future()
        self._human_agent_identity = "human-agent-sip"

        self._target_phone_number = target_phone_number
        self._sip_trunk_id = (
            sip_trunk_id if is_given(sip_trunk_id) else os.getenv("LIVEKIT_SIP_OUTBOUND_TRUNK", "")
        )
        if not self._sip_trunk_id:
            raise ValueError(
                "`LIVEKIT_SIP_OUTBOUND_TRUNK` environment variable or `sip_trunk_id` argument must be set"
            )

        # background audio and io
        self._background_audio = BackgroundAudioPlayer()
        self._hold_audio_handle: PlayHandle | None = None
        self._hold_audio = (
            cast(Optional[Union[AudioSource, AudioConfig, list[AudioConfig]]], hold_audio)
            if is_given(hold_audio)
            else AudioConfig(BuiltinAudioClip.HOLD_MUSIC, volume=0.8)
        )

        self._original_io_state: dict[str, bool] = {}

    def get_instructions(
        self, *, chat_ctx: NotGivenOr[llm.ChatContext], extra_instructions: str = ""
    ) -> str:
        # users can override this method if they want to customize the entire instructions
        prev_convo = ""
        if chat_ctx:
            context_copy = chat_ctx.copy(
                exclude_empty_message=True, exclude_instructions=True, exclude_function_call=True
            )
            for msg in context_copy.items:
                if msg.type != "message":
                    continue
                role = "Caller" if msg.role == "user" else "Assistant"
                prev_convo += f"{role}: {msg.text_content}\n"
        return BASE_INSTRUCTIONS.format(conversation_history=prev_convo) + extra_instructions

    async def on_enter(self) -> None:
        job_ctx = get_job_context()
        self._caller_room = job_ctx.room

        # start the background audio
        if self._hold_audio is not None:
            await self._background_audio.start(room=self._caller_room)
            self._hold_audio_handle = self._background_audio.play(self._hold_audio, loop=True)

        self._set_io_enabled(False)

        try:
            dial_human_agent_task = asyncio.create_task(self._dial_human_agent())
            done, _ = await asyncio.wait(
                (dial_human_agent_task, self._human_agent_failed_fut),
                return_when=asyncio.FIRST_COMPLETED,
            )
            if dial_human_agent_task not in done:
                raise RuntimeError()

            self._human_agent_sess = dial_human_agent_task.result()
            # let the human speak first

        except Exception:
            logger.exception("could not dial human agent")
            self._set_result(ToolError("could not dial human agent"))
            return

        finally:
            await utils.aio.cancel_and_wait(dial_human_agent_task)

    @function_tool(flags=ToolFlag.IGNORE_ON_ENTER)
    async def connect_to_caller(self) -> None:
        """Called when the human agent wants to connect to the caller."""
        logger.debug("connecting to caller")
        assert self._caller_room is not None

        await self._merge_calls()
        self._set_result(WarmTransferResult(human_agent_identity=self._human_agent_identity))

        # when the caller or human agent leaves the room, we'll delete the room
        self._caller_room.on("participant_disconnected", self._on_caller_participant_disconnected)

    @function_tool(flags=ToolFlag.IGNORE_ON_ENTER)
    async def decline_transfer(self, reason: str) -> None:
        """Handles the case when the human agent explicitly declines to connect to the caller.

        Args:
            reason: A short explanation of why the human agent declined to connect to the caller
        """
        self._set_result(ToolError(f"human agent declined to connect: {reason}"))

    @function_tool(flags=ToolFlag.IGNORE_ON_ENTER)
    async def voicemail_detected(self) -> None:
        """Called when the call reaches voicemail. Use this tool AFTER you hear the voicemail greeting"""
        self._set_result(ToolError("voicemail detected"))

    def _on_human_agent_room_close(self, reason: rtc.DisconnectReason.ValueType) -> None:
        logger.debug(
            "human agent's room closed",
            extra={"reason": rtc.DisconnectReason.Name(reason)},
        )
        with contextlib.suppress(asyncio.InvalidStateError):
            self._human_agent_failed_fut.set_result(None)

        self._set_result(ToolError(f"room closed: {rtc.DisconnectReason.Name(reason)}"))

    def _on_caller_participant_disconnected(self, participant: rtc.RemoteParticipant) -> None:
        if participant.kind not in (
            rtc.ParticipantKind.PARTICIPANT_KIND_SIP,
            rtc.ParticipantKind.PARTICIPANT_KIND_STANDARD,
        ):
            return

        logger.info(f"participant disconnected from caller room: {participant.identity}, closing")

        assert self._caller_room is not None
        self._caller_room.off("participant_disconnected", self._on_caller_participant_disconnected)
        job_ctx = get_job_context()
        job_ctx.delete_room(room_name=self._caller_room.name)

    def _set_result(self, result: WarmTransferResult | Exception) -> None:
        if self.done():
            return

        if self._human_agent_sess:
            self._human_agent_sess.shutdown()
            self._human_agent_sess = None

        if self._hold_audio_handle:
            self._hold_audio_handle.stop()
            self._hold_audio_handle = None

        self._set_io_enabled(True)
        self.complete(result)

    async def _dial_human_agent(self) -> AgentSession:
        assert self._caller_room is not None

        job_ctx = get_job_context()
        ws_url = job_ctx._info.url

        # create a new room for the human agent
        human_agent_room_name = self._caller_room.name + "-human-agent"
        room = rtc.Room()
        token = (
            api.AccessToken()
            .with_identity(self._caller_room.local_participant.identity)
            .with_grants(
                api.VideoGrants(
                    room_join=True,
                    room=human_agent_room_name,
                    can_update_own_metadata=True,
                    can_publish=True,
                    can_subscribe=True,
                )
            )
            .with_kind("agent")
        ).to_jwt()

        logger.debug(
            "connecting to human agent room",
            extra={"ws_url": ws_url, "human_agent_room_name": human_agent_room_name},
        )
        await room.connect(ws_url, token)

        # if human agent hung up for whatever reason, we'd resume the caller conversation
        room.on("disconnected", self._on_human_agent_room_close)

        human_agent_sess: AgentSession = AgentSession(
            vad=self.session.vad or NOT_GIVEN,
            llm=self.session.llm or NOT_GIVEN,
            stt=self.session.stt or NOT_GIVEN,
            tts=self.session.tts or NOT_GIVEN,
            turn_detection=self.session.turn_detection or NOT_GIVEN,
        )
        # create a copy of this AgentTask
        human_agent_agent = Agent(
            instructions=self.instructions,
            turn_detection=self.turn_detection,
            stt=self.stt,
            vad=self.vad,
            llm=self.llm,
            tts=self.tts,
            tools=self.tools,
            chat_ctx=self.chat_ctx,
            allow_interruptions=self.allow_interruptions,
        )
        await human_agent_sess.start(
            agent=human_agent_agent,
            room=room,
            room_options=room_io.RoomOptions(
                close_on_disconnect=True,
                delete_room_on_close=True,
                participant_identity=self._human_agent_identity,
            ),
            record=False,  # TODO: support recording on multiple sessions?
        )

        # dial the human agent
        await job_ctx.api.sip.create_sip_participant(
            api.CreateSIPParticipantRequest(
                sip_trunk_id=self._sip_trunk_id,
                sip_call_to=self._target_phone_number,
                room_name=human_agent_room_name,
                participant_identity=self._human_agent_identity,
                wait_until_answered=True,
            )
        )

        return human_agent_sess

    async def _merge_calls(self) -> None:
        assert self._caller_room is not None
        assert self._human_agent_sess is not None

        job_ctx = get_job_context()
        human_agent_room = self._human_agent_sess.room_io.room
        # we no longer care about the human agent session. it's supposed to be over
        human_agent_room.off("disconnected", self._on_human_agent_room_close)

        logger.debug(f"moving {self._human_agent_identity} to caller room {self._caller_room.name}")
        await job_ctx.api.room.move_participant(
            api.MoveParticipantRequest(
                room=human_agent_room.name,
                identity=self._human_agent_identity,
                destination_room=self._caller_room.name,
            )
        )

    def _set_io_enabled(self, enabled: bool) -> None:
        input = self.session.input
        output = self.session.output

        if not self._original_io_state:
            self._original_io_state = {
                "audio_input": input.audio_enabled,
                "video_input": input.video_enabled,
                "audio_output": output.audio_enabled,
                "transcription_output": output.transcription_enabled,
                "video_output": output.video_enabled,
            }

        if input.audio:
            input.set_audio_enabled(enabled and self._original_io_state["audio_input"])
        if input.video:
            input.set_video_enabled(enabled and self._original_io_state["video_input"])
        if output.audio:
            output.set_audio_enabled(enabled and self._original_io_state["audio_output"])
        if output.transcription:
            output.set_transcription_enabled(
                enabled and self._original_io_state["transcription_output"]
            )
        if output.video:
            output.set_video_enabled(enabled and self._original_io_state["video_output"])

Abstract base class for generic types.

On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::

class Mapping[KT, VT]:
    def __getitem__(self, key: KT) -> VT:
        ...
    # Etc.

On older versions of Python, however, generic classes have to explicitly inherit from Generic.

After a class has been declared to be generic, it can then be used as follows::

def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT:
    try:
        return mapping[key]
    except KeyError:
        return default

Ancestors

  • livekit.agents.voice.agent.AgentTask
  • livekit.agents.voice.agent.Agent
  • typing.Generic

Methods

async def connect_to_caller(self) ‑> None
Expand source code
@function_tool(flags=ToolFlag.IGNORE_ON_ENTER)
async def connect_to_caller(self) -> None:
    """Called when the human agent wants to connect to the caller."""
    logger.debug("connecting to caller")
    assert self._caller_room is not None

    await self._merge_calls()
    self._set_result(WarmTransferResult(human_agent_identity=self._human_agent_identity))

    # when the caller or human agent leaves the room, we'll delete the room
    self._caller_room.on("participant_disconnected", self._on_caller_participant_disconnected)

Called when the human agent wants to connect to the caller.

async def decline_transfer(self, reason: str) ‑> None
Expand source code
@function_tool(flags=ToolFlag.IGNORE_ON_ENTER)
async def decline_transfer(self, reason: str) -> None:
    """Handles the case when the human agent explicitly declines to connect to the caller.

    Args:
        reason: A short explanation of why the human agent declined to connect to the caller
    """
    self._set_result(ToolError(f"human agent declined to connect: {reason}"))

Handles the case when the human agent explicitly declines to connect to the caller.

Args

reason
A short explanation of why the human agent declined to connect to the caller
def get_instructions(self, *, chat_ctx: NotGivenOr[llm.ChatContext], extra_instructions: str = '') ‑> str
Expand source code
def get_instructions(
    self, *, chat_ctx: NotGivenOr[llm.ChatContext], extra_instructions: str = ""
) -> str:
    # users can override this method if they want to customize the entire instructions
    prev_convo = ""
    if chat_ctx:
        context_copy = chat_ctx.copy(
            exclude_empty_message=True, exclude_instructions=True, exclude_function_call=True
        )
        for msg in context_copy.items:
            if msg.type != "message":
                continue
            role = "Caller" if msg.role == "user" else "Assistant"
            prev_convo += f"{role}: {msg.text_content}\n"
    return BASE_INSTRUCTIONS.format(conversation_history=prev_convo) + extra_instructions
async def on_enter(self) ‑> None
Expand source code
async def on_enter(self) -> None:
    job_ctx = get_job_context()
    self._caller_room = job_ctx.room

    # start the background audio
    if self._hold_audio is not None:
        await self._background_audio.start(room=self._caller_room)
        self._hold_audio_handle = self._background_audio.play(self._hold_audio, loop=True)

    self._set_io_enabled(False)

    try:
        dial_human_agent_task = asyncio.create_task(self._dial_human_agent())
        done, _ = await asyncio.wait(
            (dial_human_agent_task, self._human_agent_failed_fut),
            return_when=asyncio.FIRST_COMPLETED,
        )
        if dial_human_agent_task not in done:
            raise RuntimeError()

        self._human_agent_sess = dial_human_agent_task.result()
        # let the human speak first

    except Exception:
        logger.exception("could not dial human agent")
        self._set_result(ToolError("could not dial human agent"))
        return

    finally:
        await utils.aio.cancel_and_wait(dial_human_agent_task)

Called when the task is entered

async def voicemail_detected(self) ‑> None
Expand source code
@function_tool(flags=ToolFlag.IGNORE_ON_ENTER)
async def voicemail_detected(self) -> None:
    """Called when the call reaches voicemail. Use this tool AFTER you hear the voicemail greeting"""
    self._set_result(ToolError("voicemail detected"))

Called when the call reaches voicemail. Use this tool AFTER you hear the voicemail greeting