Module livekit.agents.beta.workflows.dtmf_inputs

Classes

class GetDtmfResult (user_input: str)
Expand source code
@dataclass
class GetDtmfResult:
    user_input: str

    @classmethod
    def from_dtmf_inputs(cls, dtmf_inputs: list[DtmfEvent]) -> GetDtmfResult:
        return cls(user_input=format_dtmf(dtmf_inputs))

GetDtmfResult(user_input: 'str')

Static methods

def from_dtmf_inputs(dtmf_inputs: list[DtmfEvent]) ‑> GetDtmfResult

Instance variables

var user_input : str
class GetDtmfTask (*,
num_digits: int,
ask_for_confirmation: bool = False,
dtmf_input_timeout: float = 4.0,
dtmf_stop_event: DtmfEvent = DtmfEvent.POUND,
chat_ctx: NotGivenOr[ChatContext] = NOT_GIVEN,
extra_instructions: NotGivenOr[str] = NOT_GIVEN)
Expand source code
class GetDtmfTask(AgentTask[GetDtmfResult]):
    """A task to collect DTMF inputs from the user.

    Return a string of DTMF inputs if collected successfully, otherwise None.
    """

    def __init__(
        self,
        *,
        num_digits: int,
        ask_for_confirmation: bool = False,
        dtmf_input_timeout: float = 4.0,
        dtmf_stop_event: DtmfEvent = DtmfEvent.POUND,
        chat_ctx: NotGivenOr[ChatContext] = NOT_GIVEN,
        extra_instructions: NotGivenOr[str] = NOT_GIVEN,
    ) -> None:
        """
        Args:
            num_digits: The number of digits to collect.
            ask_for_confirmation: Whether to ask for confirmation when agent has collected full digits.
            repeat_instructions: The number of times to repeat the initial instructions.
            dtmf_input_timeout: The per-digit timeout.
            dtmf_stop_event: The DTMF event to stop collecting inputs.
            chat_ctx: The chat context to use.
            extra_instructions: Extra instructions to add to the task.
        """
        if num_digits <= 0:
            raise ValueError("num_digits must be greater than 0")

        self._curr_dtmf_inputs: list[DtmfEvent] = []
        self._dtmf_reply_running: bool = False

        @function_tool
        async def confirm_inputs(inputs: list[DtmfEvent]) -> None:
            """Finalize the collected digit inputs after explicit user confirmation.

            Use this ONLY after the confirmation. You should confirm by verbally reading out the digits one by one and, once the
            user confirms they are correct, call this tool with the inputs.

            Do not use this tool to capture the initial digits."""
            self.complete(GetDtmfResult.from_dtmf_inputs(inputs))

        @function_tool
        async def record_inputs(inputs: list[DtmfEvent]) -> None:
            """Record the collected digit inputs without additional confirmation.

            Call this tool as soon as a valid sequence of digits has been provided by the user (via DTMF or spoken)."""
            self.complete(GetDtmfResult.from_dtmf_inputs(inputs))

        instructions = (
            "You are a single step in a broader system, responsible solely for gathering digits input from the user. "
            "You will either receive a sequence of digits through dtmf events tagged by <dtmf_inputs>, or "
            "user will directly say the digits to you. You should be able to handle both cases. "
        )

        if ask_for_confirmation:
            instructions += "Once user has confirmed the digits (by verbally spoken or entered manually), call `confirm_inputs` with the inputs."
        else:
            instructions += "If user provides the digits through voice and it is valid, call `record_inputs` with the inputs."

        if is_given(extra_instructions):
            instructions += f"\n{extra_instructions}"

        super().__init__(
            instructions=instructions,
            chat_ctx=chat_ctx,
            tools=[confirm_inputs] if ask_for_confirmation else [record_inputs],
        )

        def _on_sip_dtmf_received(ev: rtc.SipDTMF) -> None:
            if self._dtmf_reply_running:
                return

            # immediately kick off the DTMF reply generation if matches the stop event
            if ev.digit == dtmf_stop_event.value:
                self._generate_dtmf_reply()
                return

            self._curr_dtmf_inputs.append(DtmfEvent(ev.digit))
            logger.info(f"DTMF inputs: {format_dtmf(self._curr_dtmf_inputs)}")
            self._generate_dtmf_reply.schedule()

        @debounced(delay=dtmf_input_timeout)
        async def _generate_dtmf_reply() -> None:
            self._dtmf_reply_running = True

            try:
                self.session.interrupt()

                dmtf_str = format_dtmf(self._curr_dtmf_inputs)
                logger.debug(f"Generating DTMF reply, current inputs: {dmtf_str}")

                # if input not fully received (i.e. timeout), return None
                if len(self._curr_dtmf_inputs) != num_digits:
                    error_msg = (
                        f"Digits input not fully received. "
                        f"Expect {num_digits} digits, got {len(self._curr_dtmf_inputs)}"
                    )
                    self.complete(ToolError(error_msg))
                    return

                # if not asking for confirmation, return the DTMF inputs
                if not ask_for_confirmation:
                    self.complete(GetDtmfResult.from_dtmf_inputs(self._curr_dtmf_inputs))
                    return

                instructions = (
                    "User has entered the following valid digits on the telephone keypad:\n"
                    f"<dtmf_inputs>{dmtf_str}</dtmf_inputs>\n"
                    "Please confirm it with the user by saying the digits one by one with space in between "
                    "(.e.g. 'one two three four five six seven eight nine ten'). "
                    "Once you are sure, call `confirm_inputs` with the inputs."
                    ""
                )

                await self.session.generate_reply(user_input=instructions)
            finally:
                self._dtmf_reply_running = False
                self._curr_dtmf_inputs.clear()

        def _on_user_state_changed(ev: UserStateChangedEvent) -> None:
            if self.dtmf_reply_running():
                return

            if ev.new_state == "speaking":
                # clear any pending DTMF reply generation
                self._generate_dtmf_reply.cancel()
            elif len(self._curr_dtmf_inputs) != 0:
                # resume any previously cancelled DTMF reply generation after user is back to non-speaking
                self._generate_dtmf_reply.schedule()

        def _on_agent_state_changed(ev: AgentStateChangedEvent) -> None:
            if self.dtmf_reply_running():
                return

            if ev.new_state in ["speaking", "thinking"]:
                # clear any pending DTMF reply generation
                self._generate_dtmf_reply.cancel()
            elif len(self._curr_dtmf_inputs) != 0:
                # resume any previously cancelled DTMF reply generation after agent is back to non-speaking
                self._generate_dtmf_reply.schedule()

        self._generate_dtmf_reply: Debounced[None] = _generate_dtmf_reply
        self._on_sip_dtmf_received: Callable[[rtc.SipDTMF], None] = _on_sip_dtmf_received
        self._on_user_state_changed: Callable[[UserStateChangedEvent], None] = (
            _on_user_state_changed
        )
        self._on_agent_state_changed: Callable[[AgentStateChangedEvent], None] = (
            _on_agent_state_changed
        )

    def dtmf_reply_running(self) -> bool:
        return self._dtmf_reply_running

    async def on_enter(self) -> None:
        ctx = get_job_context()

        ctx.room.on("sip_dtmf_received", self._on_sip_dtmf_received)
        self.session.on("agent_state_changed", self._on_user_state_changed)
        self.session.on("agent_state_changed", self._on_agent_state_changed)
        self.session.generate_reply(tool_choice="none")

    async def on_exit(self) -> None:
        ctx = get_job_context()

        ctx.room.off("sip_dtmf_received", self._on_sip_dtmf_received)
        self.session.off("agent_state_changed", self._on_user_state_changed)
        self.session.off("agent_state_changed", self._on_agent_state_changed)
        self._generate_dtmf_reply.cancel()

A task to collect DTMF inputs from the user.

Return a string of DTMF inputs if collected successfully, otherwise None.

Args

num_digits
The number of digits to collect.
ask_for_confirmation
Whether to ask for confirmation when agent has collected full digits.
repeat_instructions
The number of times to repeat the initial instructions.
dtmf_input_timeout
The per-digit timeout.
dtmf_stop_event
The DTMF event to stop collecting inputs.
chat_ctx
The chat context to use.
extra_instructions
Extra instructions to add to the task.

Ancestors

  • livekit.agents.voice.agent.AgentTask
  • livekit.agents.voice.agent.Agent
  • typing.Generic

Methods

def dtmf_reply_running(self) ‑> bool
Expand source code
def dtmf_reply_running(self) -> bool:
    return self._dtmf_reply_running
async def on_enter(self) ‑> None
Expand source code
async def on_enter(self) -> None:
    ctx = get_job_context()

    ctx.room.on("sip_dtmf_received", self._on_sip_dtmf_received)
    self.session.on("agent_state_changed", self._on_user_state_changed)
    self.session.on("agent_state_changed", self._on_agent_state_changed)
    self.session.generate_reply(tool_choice="none")

Called when the task is entered

async def on_exit(self) ‑> None
Expand source code
async def on_exit(self) -> None:
    ctx = get_job_context()

    ctx.room.off("sip_dtmf_received", self._on_sip_dtmf_received)
    self.session.off("agent_state_changed", self._on_user_state_changed)
    self.session.off("agent_state_changed", self._on_agent_state_changed)
    self._generate_dtmf_reply.cancel()

Called when the task is exited