Module livekit.agents.beta.workflows
Sub-modules
livekit.agents.beta.workflows.addresslivekit.agents.beta.workflows.credit_cardlivekit.agents.beta.workflows.doblivekit.agents.beta.workflows.dtmf_inputslivekit.agents.beta.workflows.email_addresslivekit.agents.beta.workflows.namelivekit.agents.beta.workflows.phone_numberlivekit.agents.beta.workflows.task_grouplivekit.agents.beta.workflows.utilslivekit.agents.beta.workflows.warm_transfer
Classes
class GetAddressResult (address: str)-
Expand source code
@dataclass class GetAddressResult: address: strGetAddressResult(address: 'str')
Instance variables
var address : str
class GetAddressTask (*,
instructions: NotGivenOr[InstructionParts | Instructions | str] = NOT_GIVEN,
chat_ctx: NotGivenOr[llm.ChatContext] = NOT_GIVEN,
turn_detection: NotGivenOr[TurnDetectionMode | None] = NOT_GIVEN,
tools: NotGivenOr[list[llm.Tool | llm.Toolset]] = NOT_GIVEN,
stt: NotGivenOr[stt.STT | None] = NOT_GIVEN,
vad: NotGivenOr[vad.VAD | None] = NOT_GIVEN,
llm: NotGivenOr[llm.LLM | llm.RealtimeModel | None] = NOT_GIVEN,
tts: NotGivenOr[tts.TTS | None] = NOT_GIVEN,
allow_interruptions: NotGivenOr[bool] = NOT_GIVEN,
require_confirmation: NotGivenOr[bool] = NOT_GIVEN,
extra_instructions: str = '')-
Expand source code
class GetAddressTask(AgentTask[GetAddressResult]): def __init__( self, *, instructions: NotGivenOr[InstructionParts | Instructions | str] = NOT_GIVEN, chat_ctx: NotGivenOr[llm.ChatContext] = NOT_GIVEN, turn_detection: NotGivenOr[TurnDetectionMode | None] = NOT_GIVEN, tools: NotGivenOr[list[llm.Tool | llm.Toolset]] = NOT_GIVEN, stt: NotGivenOr[stt.STT | None] = NOT_GIVEN, vad: NotGivenOr[vad.VAD | None] = NOT_GIVEN, llm: NotGivenOr[llm.LLM | llm.RealtimeModel | None] = NOT_GIVEN, tts: NotGivenOr[tts.TTS | None] = NOT_GIVEN, allow_interruptions: NotGivenOr[bool] = NOT_GIVEN, require_confirmation: NotGivenOr[bool] = NOT_GIVEN, # deprecated extra_instructions: str = "", ) -> None: if not is_given(instructions): instructions = InstructionParts(persona=PERSONA, extra=extra_instructions) elif extra_instructions: logger.warning("`extra_instructions` will be ignored when `instructions` is provided") if isinstance(instructions, InstructionParts): instructions = Instructions(INSTRUCTIONS_TEMPLATE).format( persona=instructions.persona if is_given(instructions.persona) else PERSONA, extra=instructions.extra, _modality_specific=Instructions(audio=AUDIO_SPECIFIC, text=TEXT_SPECIFIC), _confirmation=Instructions( # confirmation is enabled by default for audio, disabled by default for text audio=CONFIRMATION_INSTRUCTION if require_confirmation is not False else "", text=CONFIRMATION_INSTRUCTION if require_confirmation is True else "", ), ) assert is_given(instructions) # for type checking super().__init__( instructions=instructions, chat_ctx=chat_ctx, turn_detection=turn_detection, tools=tools or [], stt=stt, vad=vad, llm=llm, tts=tts, allow_interruptions=allow_interruptions, ) self._current_address = "" self._require_confirmation = require_confirmation async def on_enter(self) -> None: self.session.generate_reply(instructions="Ask the user to provide their address.") @function_tool() async def update_address( self, street_address: str, unit_number: str, locality: str, country: str, ctx: RunContext ) -> str | None: """Update the address provided by the user. Args: street_address (str): Dependent on country, may include fields like house number, street name, block, or district unit_number (str): The unit number, for example Floor 1 or Apartment 12. If there is no unit number, return '' locality (str): Dependent on country, may include fields like city, zip code, or province country (str): The country the user lives in spelled out fully """ address_fields = ( [street_address, unit_number, locality, country] if unit_number.strip() else [street_address, locality, country] ) address = " ".join(address_fields) self._current_address = address if not self._confirmation_required(ctx): if not self.done(): self.complete(GetAddressResult(address=self._current_address)) return None confirm_tool = self._build_confirm_tool(address=address) current_tools = [t for t in self.tools if t.id != "confirm_address"] current_tools.append(confirm_tool) await self.update_tools(current_tools) return ( f"The address has been updated to {address}\n" f"Repeat the address field by field: {address_fields} if needed\n" f"Prompt the user for confirmation, do not call `confirm_address` directly" ) def _build_confirm_tool(self, *, address: str) -> llm.FunctionTool: # confirm tool is only injected after update_address is called, # preventing the LLM from hallucinating a confirmation without user input @function_tool() async def confirm_address() -> None: """Call after the user confirms the address is correct.""" if address != self._current_address: self.session.generate_reply( instructions="The address has changed since confirmation was requested, ask the user to confirm the updated address." ) return if not self.done(): self.complete(GetAddressResult(address=address)) return confirm_address @function_tool(flags=ToolFlag.IGNORE_ON_ENTER) async def decline_address_capture(self, reason: str) -> None: """Handles the case when the user explicitly declines to provide an address. Args: reason: A short explanation of why the user declined to provide the address """ if not self.done(): self.complete(ToolError(f"couldn't get the address: {reason}")) def _confirmation_required(self, ctx: RunContext) -> bool: if is_given(self._require_confirmation): return self._require_confirmation return ctx.speech_handle.input_details.modality == "audio"Abstract base class for generic types.
On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::
class Mapping[KT, VT]: def __getitem__(self, key: KT) -> VT: ... # Etc.On older versions of Python, however, generic classes have to explicitly inherit from Generic.
After a class has been declared to be generic, it can then be used as follows::
def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return defaultAncestors
- livekit.agents.voice.agent.AgentTask
- livekit.agents.voice.agent.Agent
- typing.Generic
Methods
async def decline_address_capture(self, reason: str) ‑> None-
Expand source code
@function_tool(flags=ToolFlag.IGNORE_ON_ENTER) async def decline_address_capture(self, reason: str) -> None: """Handles the case when the user explicitly declines to provide an address. Args: reason: A short explanation of why the user declined to provide the address """ if not self.done(): self.complete(ToolError(f"couldn't get the address: {reason}"))Handles the case when the user explicitly declines to provide an address.
Args
reason- A short explanation of why the user declined to provide the address
async def on_enter(self) ‑> None-
Expand source code
async def on_enter(self) -> None: self.session.generate_reply(instructions="Ask the user to provide their address.")Called when the task is entered
async def update_address(self,
street_address: str,
unit_number: str,
locality: str,
country: str,
ctx: RunContext) ‑> str | None-
Expand source code
@function_tool() async def update_address( self, street_address: str, unit_number: str, locality: str, country: str, ctx: RunContext ) -> str | None: """Update the address provided by the user. Args: street_address (str): Dependent on country, may include fields like house number, street name, block, or district unit_number (str): The unit number, for example Floor 1 or Apartment 12. If there is no unit number, return '' locality (str): Dependent on country, may include fields like city, zip code, or province country (str): The country the user lives in spelled out fully """ address_fields = ( [street_address, unit_number, locality, country] if unit_number.strip() else [street_address, locality, country] ) address = " ".join(address_fields) self._current_address = address if not self._confirmation_required(ctx): if not self.done(): self.complete(GetAddressResult(address=self._current_address)) return None confirm_tool = self._build_confirm_tool(address=address) current_tools = [t for t in self.tools if t.id != "confirm_address"] current_tools.append(confirm_tool) await self.update_tools(current_tools) return ( f"The address has been updated to {address}\n" f"Repeat the address field by field: {address_fields} if needed\n" f"Prompt the user for confirmation, do not call `confirm_address` directly" )Update the address provided by the user.
Args
street_address:str- Dependent on country, may include fields like house number, street name, block, or district
unit_number:str- The unit number, for example Floor 1 or Apartment 12. If there is no unit number, return ''
locality:str- Dependent on country, may include fields like city, zip code, or province
country:str- The country the user lives in spelled out fully
class GetCreditCardResult (cardholder_name: str,
issuer: str,
card_number: str,
security_code: str,
expiration_date: str)-
Expand source code
@dataclass class GetCreditCardResult: cardholder_name: str issuer: str card_number: str security_code: str expiration_date: strGetCreditCardResult(cardholder_name: 'str', issuer: 'str', card_number: 'str', security_code: 'str', expiration_date: 'str')
Instance variables
var card_number : strvar cardholder_name : strvar expiration_date : strvar issuer : strvar security_code : str
class GetCreditCardTask (chat_ctx: NotGivenOr[llm.ChatContext] = NOT_GIVEN,
turn_detection: NotGivenOr[TurnDetectionMode | None] = NOT_GIVEN,
tools: NotGivenOr[list[llm.Tool | llm.Toolset]] = NOT_GIVEN,
stt: NotGivenOr[stt.STT | None] = NOT_GIVEN,
vad: NotGivenOr[vad.VAD | None] = NOT_GIVEN,
llm: NotGivenOr[llm.LLM | llm.RealtimeModel | None] = NOT_GIVEN,
tts: NotGivenOr[tts.TTS | None] = NOT_GIVEN,
allow_interruptions: NotGivenOr[bool] = NOT_GIVEN,
require_confirmation: NotGivenOr[bool] = NOT_GIVEN)-
Expand source code
class GetCreditCardTask(AgentTask[GetCreditCardResult]): def __init__( self, chat_ctx: NotGivenOr[llm.ChatContext] = NOT_GIVEN, turn_detection: NotGivenOr[TurnDetectionMode | None] = NOT_GIVEN, tools: NotGivenOr[list[llm.Tool | llm.Toolset]] = NOT_GIVEN, stt: NotGivenOr[stt.STT | None] = NOT_GIVEN, vad: NotGivenOr[vad.VAD | None] = NOT_GIVEN, llm: NotGivenOr[llm.LLM | llm.RealtimeModel | None] = NOT_GIVEN, tts: NotGivenOr[tts.TTS | None] = NOT_GIVEN, allow_interruptions: NotGivenOr[bool] = NOT_GIVEN, require_confirmation: NotGivenOr[bool] = NOT_GIVEN, ) -> None: super().__init__( instructions="*none*", chat_ctx=chat_ctx, turn_detection=turn_detection, tools=tools or [], stt=stt, vad=vad, llm=llm, tts=tts, allow_interruptions=allow_interruptions, ) self._require_confirmation = require_confirmation async def on_enter(self) -> None: while not self.done(): task_group = TaskGroup() task_group.add( lambda: GetNameTask( last_name=True, extra_instructions="This is in the context of credit card information collection, ask specifically for the full name listed on it.", require_confirmation=self._require_confirmation, ), id="cardholder_name_task", description="Collects the cardholder's full name", ) task_group.add( lambda: GetCardNumberTask(require_confirmation=self._require_confirmation), id="card_number_task", description="Collects the user's card number", ) task_group.add( lambda: GetSecurityCodeTask(require_confirmation=self._require_confirmation), id="security_code_task", description="Collects the card's security code", ) task_group.add( lambda: GetExpirationDateTask(require_confirmation=self._require_confirmation), id="expiration_date_task", description="Collects the card's expiration date", ) try: results = await task_group name = f"{results.task_results['cardholder_name_task'].first_name} {results.task_results['cardholder_name_task'].last_name}" result = GetCreditCardResult( cardholder_name=name, issuer=results.task_results["card_number_task"].issuer, card_number=results.task_results["card_number_task"].card_number, security_code=results.task_results["security_code_task"].security_code, expiration_date=results.task_results["expiration_date_task"].date, ) self.complete(result) except CardCollectionRestartError: continue except (CardCaptureDeclinedError, ToolError) as e: self.complete(e)Abstract base class for generic types.
On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::
class Mapping[KT, VT]: def __getitem__(self, key: KT) -> VT: ... # Etc.On older versions of Python, however, generic classes have to explicitly inherit from Generic.
After a class has been declared to be generic, it can then be used as follows::
def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return defaultAncestors
- livekit.agents.voice.agent.AgentTask
- livekit.agents.voice.agent.Agent
- typing.Generic
Methods
async def on_enter(self) ‑> None-
Expand source code
async def on_enter(self) -> None: while not self.done(): task_group = TaskGroup() task_group.add( lambda: GetNameTask( last_name=True, extra_instructions="This is in the context of credit card information collection, ask specifically for the full name listed on it.", require_confirmation=self._require_confirmation, ), id="cardholder_name_task", description="Collects the cardholder's full name", ) task_group.add( lambda: GetCardNumberTask(require_confirmation=self._require_confirmation), id="card_number_task", description="Collects the user's card number", ) task_group.add( lambda: GetSecurityCodeTask(require_confirmation=self._require_confirmation), id="security_code_task", description="Collects the card's security code", ) task_group.add( lambda: GetExpirationDateTask(require_confirmation=self._require_confirmation), id="expiration_date_task", description="Collects the card's expiration date", ) try: results = await task_group name = f"{results.task_results['cardholder_name_task'].first_name} {results.task_results['cardholder_name_task'].last_name}" result = GetCreditCardResult( cardholder_name=name, issuer=results.task_results["card_number_task"].issuer, card_number=results.task_results["card_number_task"].card_number, security_code=results.task_results["security_code_task"].security_code, expiration_date=results.task_results["expiration_date_task"].date, ) self.complete(result) except CardCollectionRestartError: continue except (CardCaptureDeclinedError, ToolError) as e: self.complete(e)Called when the task is entered
class GetDOBResult (date_of_birth: date, time_of_birth: time | None = None)-
Expand source code
@dataclass class GetDOBResult: date_of_birth: date time_of_birth: time | None = NoneGetDOBResult(date_of_birth: 'date', time_of_birth: 'time | None' = None)
Instance variables
var date_of_birth : datetime.datevar time_of_birth : datetime.time | None
class GetDOBTask (extra_instructions: str = '',
include_time: bool = False,
chat_ctx: NotGivenOr[llm.ChatContext] = NOT_GIVEN,
turn_detection: NotGivenOr[TurnDetectionMode | None] = NOT_GIVEN,
tools: NotGivenOr[list[llm.Tool | llm.Toolset]] = NOT_GIVEN,
stt: NotGivenOr[stt.STT | None] = NOT_GIVEN,
vad: NotGivenOr[vad.VAD | None] = NOT_GIVEN,
llm: NotGivenOr[llm.LLM | llm.RealtimeModel | None] = NOT_GIVEN,
tts: NotGivenOr[tts.TTS | None] = NOT_GIVEN,
allow_interruptions: NotGivenOr[bool] = NOT_GIVEN,
require_confirmation: NotGivenOr[bool] = NOT_GIVEN)-
Expand source code
class GetDOBTask(AgentTask[GetDOBResult]): def __init__( self, extra_instructions: str = "", include_time: bool = False, chat_ctx: NotGivenOr[llm.ChatContext] = NOT_GIVEN, turn_detection: NotGivenOr[TurnDetectionMode | None] = NOT_GIVEN, tools: NotGivenOr[list[llm.Tool | llm.Toolset]] = NOT_GIVEN, stt: NotGivenOr[stt.STT | None] = NOT_GIVEN, vad: NotGivenOr[vad.VAD | None] = NOT_GIVEN, llm: NotGivenOr[llm.LLM | llm.RealtimeModel | None] = NOT_GIVEN, tts: NotGivenOr[tts.TTS | None] = NOT_GIVEN, allow_interruptions: NotGivenOr[bool] = NOT_GIVEN, require_confirmation: NotGivenOr[bool] = NOT_GIVEN, ) -> None: time_instructions = ( "" if not include_time else ( "Also ask for and capture the time of birth if the user knows it. " "The time is optional - if the user doesn't know it, proceed without it.\n" ) ) confirmation_instructions = ( "Call `confirm_dob` after the user confirmed the date of birth is correct." ) extra = extra_instructions if extra_instructions else "" super().__init__( instructions=Instructions( _BASE_INSTRUCTIONS.format( modality_specific=_AUDIO_SPECIFIC, time_instructions=time_instructions, confirmation_instructions=( confirmation_instructions if require_confirmation is not False else "" ), extra_instructions=extra, ), text=_BASE_INSTRUCTIONS.format( modality_specific=_TEXT_SPECIFIC, time_instructions=time_instructions, confirmation_instructions=( confirmation_instructions if require_confirmation is True else "" ), extra_instructions=extra, ), ), chat_ctx=chat_ctx, turn_detection=turn_detection, tools=tools or [], stt=stt, vad=vad, llm=llm, tts=tts, allow_interruptions=allow_interruptions, ) self._include_time = include_time self._require_confirmation = require_confirmation self._current_dob: date | None = None self._current_time: time | None = None if include_time: self._tools.append(self._build_update_time_tool()) async def on_enter(self) -> None: prompt = "Ask the user to provide their date of birth." if self._include_time: prompt = "Ask the user to provide their date of birth and, if they know it, their time of birth." self.session.generate_reply(instructions=prompt) @function_tool async def update_dob( self, year: int, month: int, day: int, ctx: RunContext, ) -> str | None: """Update the date of birth provided by the user. Given a spoken month and year (e.g., 'July 2030'), return its numerical representation (7/2030). Args: year: The birth year (e.g., 1990) month: The birth month (1-12) day: The birth day (1-31) """ try: dob = date(year, month, day) except ValueError as e: raise ToolError(f"Invalid date: {e}") from None today = date.today() if dob > today: raise ToolError( f"Invalid date of birth: {dob.strftime('%B %d, %Y')} is in the future. " "Date of birth cannot be a future date." ) self._current_dob = dob if not self._confirmation_required(ctx): if not self.done(): self.complete( GetDOBResult( date_of_birth=self._current_dob, time_of_birth=self._current_time, ) ) return None confirm_tool = self._build_confirm_tool(dob=dob) current_tools = [t for t in self.tools if t.id != "confirm_dob"] current_tools.append(confirm_tool) await self.update_tools(current_tools) formatted_date = dob.strftime("%B %d, %Y") response = f"The date of birth has been updated to {formatted_date}" if self._current_time: formatted_time = self._current_time.strftime("%I:%M %p") response += f" at {formatted_time}" response += ( "\nRepeat the date back to the user in a natural spoken format.\n" "Prompt the user for confirmation, do not call `confirm_dob` directly" ) return response def _build_update_time_tool(self) -> llm.FunctionTool: @function_tool() async def update_time(hour: int, minute: int, ctx: RunContext) -> str | None: """Update the time of birth provided by the user. Args: hour: The birth hour (0-23) minute: The birth minute (0-59) """ try: birth_time = time(hour, minute) except ValueError as e: raise ToolError(f"Invalid time: {e}") from None self._current_time = birth_time if not self._confirmation_required(ctx) and self._current_dob is not None: if not self.done(): self.complete( GetDOBResult( date_of_birth=self._current_dob, time_of_birth=self._current_time, ) ) return None if self._confirmation_required(ctx): confirm_tool = self._build_confirm_tool(dob=self._current_dob) current_tools = [t for t in self.tools if t.id != "confirm_dob"] current_tools.append(confirm_tool) await self.update_tools(current_tools) formatted_time = birth_time.strftime("%I:%M %p") response = f"The time of birth has been updated to {formatted_time}" if self._current_dob: formatted_date = self._current_dob.strftime("%B %d, %Y") response = f"The date and time of birth has been updated to {formatted_date} at {formatted_time}" if self._confirmation_required(ctx): response += ( "\nRepeat the time back to the user in a natural spoken format.\n" "Prompt the user for confirmation, do not call `confirm_dob` directly" ) else: response += ( "\nThe date of birth has not been provided yet, ask the user to provide it." ) return response return update_time def _build_confirm_tool(self, *, dob: date | None) -> llm.FunctionTool: # confirm tool is only injected after update_dob/update_time is called, # preventing the LLM from hallucinating a confirmation without user input captured_dob = dob captured_time = self._current_time @function_tool() async def confirm_dob() -> None: """Call after the user confirms the date of birth is correct.""" if captured_dob != self._current_dob or captured_time != self._current_time: self.session.generate_reply( instructions="The date of birth has changed since confirmation was requested, ask the user to confirm the updated date." ) return if self._current_dob is None: self.session.generate_reply( instructions="No date of birth was provided yet, ask the user to provide it." ) return if not self.done(): self.complete( GetDOBResult( date_of_birth=self._current_dob, time_of_birth=self._current_time, ) ) return confirm_dob @function_tool(flags=ToolFlag.IGNORE_ON_ENTER) async def decline_dob_capture(self, reason: str) -> None: """Handles the case when the user explicitly declines to provide a date of birth. Args: reason: A short explanation of why the user declined to provide the date of birth """ if not self.done(): self.complete(ToolError(f"couldn't get the date of birth: {reason}")) def _confirmation_required(self, ctx: RunContext) -> bool: if is_given(self._require_confirmation): return self._require_confirmation return ctx.speech_handle.input_details.modality == "audio"Abstract base class for generic types.
On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::
class Mapping[KT, VT]: def __getitem__(self, key: KT) -> VT: ... # Etc.On older versions of Python, however, generic classes have to explicitly inherit from Generic.
After a class has been declared to be generic, it can then be used as follows::
def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return defaultAncestors
- livekit.agents.voice.agent.AgentTask
- livekit.agents.voice.agent.Agent
- typing.Generic
Methods
async def decline_dob_capture(self, reason: str) ‑> None-
Expand source code
@function_tool(flags=ToolFlag.IGNORE_ON_ENTER) async def decline_dob_capture(self, reason: str) -> None: """Handles the case when the user explicitly declines to provide a date of birth. Args: reason: A short explanation of why the user declined to provide the date of birth """ if not self.done(): self.complete(ToolError(f"couldn't get the date of birth: {reason}"))Handles the case when the user explicitly declines to provide a date of birth.
Args
reason- A short explanation of why the user declined to provide the date of birth
async def on_enter(self) ‑> None-
Expand source code
async def on_enter(self) -> None: prompt = "Ask the user to provide their date of birth." if self._include_time: prompt = "Ask the user to provide their date of birth and, if they know it, their time of birth." self.session.generate_reply(instructions=prompt)Called when the task is entered
async def update_dob(self, year: int, month: int, day: int, ctx: RunContext) ‑> str | None-
Expand source code
@function_tool async def update_dob( self, year: int, month: int, day: int, ctx: RunContext, ) -> str | None: """Update the date of birth provided by the user. Given a spoken month and year (e.g., 'July 2030'), return its numerical representation (7/2030). Args: year: The birth year (e.g., 1990) month: The birth month (1-12) day: The birth day (1-31) """ try: dob = date(year, month, day) except ValueError as e: raise ToolError(f"Invalid date: {e}") from None today = date.today() if dob > today: raise ToolError( f"Invalid date of birth: {dob.strftime('%B %d, %Y')} is in the future. " "Date of birth cannot be a future date." ) self._current_dob = dob if not self._confirmation_required(ctx): if not self.done(): self.complete( GetDOBResult( date_of_birth=self._current_dob, time_of_birth=self._current_time, ) ) return None confirm_tool = self._build_confirm_tool(dob=dob) current_tools = [t for t in self.tools if t.id != "confirm_dob"] current_tools.append(confirm_tool) await self.update_tools(current_tools) formatted_date = dob.strftime("%B %d, %Y") response = f"The date of birth has been updated to {formatted_date}" if self._current_time: formatted_time = self._current_time.strftime("%I:%M %p") response += f" at {formatted_time}" response += ( "\nRepeat the date back to the user in a natural spoken format.\n" "Prompt the user for confirmation, do not call `confirm_dob` directly" ) return responseUpdate the date of birth provided by the user. Given a spoken month and year (e.g., 'July 2030'), return its numerical representation (7/2030).
Args
year- The birth year (e.g., 1990)
month- The birth month (1-12)
day- The birth day (1-31)
class GetDtmfResult (user_input: str)-
Expand source code
@dataclass class GetDtmfResult: user_input: str @classmethod def from_dtmf_inputs(cls, dtmf_inputs: list[DtmfEvent]) -> GetDtmfResult: return cls(user_input=format_dtmf(dtmf_inputs))GetDtmfResult(user_input: 'str')
Static methods
def from_dtmf_inputs(dtmf_inputs: list[DtmfEvent]) ‑> GetDtmfResult
Instance variables
var user_input : str
class GetDtmfTask (*,
num_digits: int,
ask_for_confirmation: bool = False,
dtmf_input_timeout: float = 4.0,
dtmf_stop_event: DtmfEvent = DtmfEvent.POUND,
chat_ctx: NotGivenOr[ChatContext] = NOT_GIVEN,
extra_instructions: NotGivenOr[str] = NOT_GIVEN)-
Expand source code
class GetDtmfTask(AgentTask[GetDtmfResult]): """A task to collect DTMF inputs from the user. Return a string of DTMF inputs if collected successfully, otherwise None. """ def __init__( self, *, num_digits: int, ask_for_confirmation: bool = False, dtmf_input_timeout: float = 4.0, dtmf_stop_event: DtmfEvent = DtmfEvent.POUND, chat_ctx: NotGivenOr[ChatContext] = NOT_GIVEN, extra_instructions: NotGivenOr[str] = NOT_GIVEN, ) -> None: """ Args: num_digits: The number of digits to collect. ask_for_confirmation: Whether to ask for confirmation when agent has collected full digits. repeat_instructions: The number of times to repeat the initial instructions. dtmf_input_timeout: The per-digit timeout. dtmf_stop_event: The DTMF event to stop collecting inputs. chat_ctx: The chat context to use. extra_instructions: Extra instructions to add to the task. """ if num_digits <= 0: raise ValueError("num_digits must be greater than 0") self._curr_dtmf_inputs: list[DtmfEvent] = [] self._dtmf_reply_running: bool = False @function_tool async def confirm_inputs(inputs: list[DtmfEvent]) -> None: """Finalize the collected digit inputs after explicit user confirmation. Use this ONLY after the confirmation. You should confirm by verbally reading out the digits one by one and, once the user confirms they are correct, call this tool with the inputs. Do not use this tool to capture the initial digits.""" self.complete(GetDtmfResult.from_dtmf_inputs(inputs)) @function_tool async def record_inputs(inputs: list[DtmfEvent]) -> None: """Record the collected digit inputs without additional confirmation. Call this tool as soon as a valid sequence of digits has been provided by the user (via DTMF or spoken).""" self.complete(GetDtmfResult.from_dtmf_inputs(inputs)) instructions = ( "You are a single step in a broader system, responsible solely for gathering digits input from the user. " "You will either receive a sequence of digits through dtmf events tagged by <dtmf_inputs>, or " "user will directly say the digits to you. You should be able to handle both cases. " ) if ask_for_confirmation: instructions += "Once user has confirmed the digits (by verbally spoken or entered manually), call `confirm_inputs` with the inputs." else: instructions += "If user provides the digits through voice and it is valid, call `record_inputs` with the inputs." if is_given(extra_instructions): instructions += f"\n{extra_instructions}" super().__init__( instructions=instructions, chat_ctx=chat_ctx, tools=[confirm_inputs] if ask_for_confirmation else [record_inputs], ) def _on_sip_dtmf_received(ev: rtc.SipDTMF) -> None: if self._dtmf_reply_running: return # immediately kick off the DTMF reply generation if matches the stop event if ev.digit == dtmf_stop_event.value: self._generate_dtmf_reply() return self._curr_dtmf_inputs.append(DtmfEvent(ev.digit)) logger.info(f"DTMF inputs: {format_dtmf(self._curr_dtmf_inputs)}") self._generate_dtmf_reply.schedule() @debounced(delay=dtmf_input_timeout) async def _generate_dtmf_reply() -> None: self._dtmf_reply_running = True try: self.session.interrupt() dmtf_str = format_dtmf(self._curr_dtmf_inputs) logger.debug(f"Generating DTMF reply, current inputs: {dmtf_str}") # if input not fully received (i.e. timeout), return None if len(self._curr_dtmf_inputs) != num_digits: error_msg = ( f"Digits input not fully received. " f"Expect {num_digits} digits, got {len(self._curr_dtmf_inputs)}" ) self.complete(ToolError(error_msg)) return # if not asking for confirmation, return the DTMF inputs if not ask_for_confirmation: self.complete(GetDtmfResult.from_dtmf_inputs(self._curr_dtmf_inputs)) return instructions = ( "User has entered the following valid digits on the telephone keypad:\n" f"<dtmf_inputs>{dmtf_str}</dtmf_inputs>\n" "Please confirm it with the user by saying the digits one by one with space in between " "(.e.g. 'one two three four five six seven eight nine ten'). " "Once you are sure, call `confirm_inputs` with the inputs." "" ) await self.session.generate_reply(user_input=instructions) finally: self._dtmf_reply_running = False self._curr_dtmf_inputs.clear() def _on_user_state_changed(ev: UserStateChangedEvent) -> None: if self.dtmf_reply_running(): return if ev.new_state == "speaking": # clear any pending DTMF reply generation self._generate_dtmf_reply.cancel() elif len(self._curr_dtmf_inputs) != 0: # resume any previously cancelled DTMF reply generation after user is back to non-speaking self._generate_dtmf_reply.schedule() def _on_agent_state_changed(ev: AgentStateChangedEvent) -> None: if self.dtmf_reply_running(): return if ev.new_state in ["speaking", "thinking"]: # clear any pending DTMF reply generation self._generate_dtmf_reply.cancel() elif len(self._curr_dtmf_inputs) != 0: # resume any previously cancelled DTMF reply generation after agent is back to non-speaking self._generate_dtmf_reply.schedule() self._generate_dtmf_reply: Debounced[None] = _generate_dtmf_reply self._on_sip_dtmf_received: Callable[[rtc.SipDTMF], None] = _on_sip_dtmf_received self._on_user_state_changed: Callable[[UserStateChangedEvent], None] = ( _on_user_state_changed ) self._on_agent_state_changed: Callable[[AgentStateChangedEvent], None] = ( _on_agent_state_changed ) def dtmf_reply_running(self) -> bool: return self._dtmf_reply_running async def on_enter(self) -> None: ctx = get_job_context() ctx.room.on("sip_dtmf_received", self._on_sip_dtmf_received) self.session.on("agent_state_changed", self._on_user_state_changed) self.session.on("agent_state_changed", self._on_agent_state_changed) self.session.generate_reply(tool_choice="none") async def on_exit(self) -> None: ctx = get_job_context() ctx.room.off("sip_dtmf_received", self._on_sip_dtmf_received) self.session.off("agent_state_changed", self._on_user_state_changed) self.session.off("agent_state_changed", self._on_agent_state_changed) self._generate_dtmf_reply.cancel()A task to collect DTMF inputs from the user.
Return a string of DTMF inputs if collected successfully, otherwise None.
Args
num_digits- The number of digits to collect.
ask_for_confirmation- Whether to ask for confirmation when agent has collected full digits.
repeat_instructions- The number of times to repeat the initial instructions.
dtmf_input_timeout- The per-digit timeout.
dtmf_stop_event- The DTMF event to stop collecting inputs.
chat_ctx- The chat context to use.
extra_instructions- Extra instructions to add to the task.
Ancestors
- livekit.agents.voice.agent.AgentTask
- livekit.agents.voice.agent.Agent
- typing.Generic
Methods
def dtmf_reply_running(self) ‑> bool-
Expand source code
def dtmf_reply_running(self) -> bool: return self._dtmf_reply_running async def on_enter(self) ‑> None-
Expand source code
async def on_enter(self) -> None: ctx = get_job_context() ctx.room.on("sip_dtmf_received", self._on_sip_dtmf_received) self.session.on("agent_state_changed", self._on_user_state_changed) self.session.on("agent_state_changed", self._on_agent_state_changed) self.session.generate_reply(tool_choice="none")Called when the task is entered
async def on_exit(self) ‑> None-
Expand source code
async def on_exit(self) -> None: ctx = get_job_context() ctx.room.off("sip_dtmf_received", self._on_sip_dtmf_received) self.session.off("agent_state_changed", self._on_user_state_changed) self.session.off("agent_state_changed", self._on_agent_state_changed) self._generate_dtmf_reply.cancel()Called when the task is exited
class GetEmailResult (email_address: str)-
Expand source code
@dataclass class GetEmailResult: email_address: strGetEmailResult(email_address: 'str')
Instance variables
var email_address : str
class GetEmailTask (*,
instructions: NotGivenOr[InstructionParts | Instructions | str] = NOT_GIVEN,
chat_ctx: NotGivenOr[llm.ChatContext] = NOT_GIVEN,
turn_detection: NotGivenOr[TurnDetectionMode | None] = NOT_GIVEN,
tools: NotGivenOr[list[llm.Tool | llm.Toolset]] = NOT_GIVEN,
stt: NotGivenOr[stt.STT | None] = NOT_GIVEN,
vad: NotGivenOr[vad.VAD | None] = NOT_GIVEN,
llm: NotGivenOr[llm.LLM | llm.RealtimeModel | None] = NOT_GIVEN,
tts: NotGivenOr[tts.TTS | None] = NOT_GIVEN,
allow_interruptions: NotGivenOr[bool] = NOT_GIVEN,
require_confirmation: NotGivenOr[bool] = NOT_GIVEN,
extra_instructions: str = '')-
Expand source code
class GetEmailTask(AgentTask[GetEmailResult]): def __init__( self, *, instructions: NotGivenOr[InstructionParts | Instructions | str] = NOT_GIVEN, chat_ctx: NotGivenOr[llm.ChatContext] = NOT_GIVEN, turn_detection: NotGivenOr[TurnDetectionMode | None] = NOT_GIVEN, tools: NotGivenOr[list[llm.Tool | llm.Toolset]] = NOT_GIVEN, stt: NotGivenOr[stt.STT | None] = NOT_GIVEN, vad: NotGivenOr[vad.VAD | None] = NOT_GIVEN, llm: NotGivenOr[llm.LLM | llm.RealtimeModel | None] = NOT_GIVEN, tts: NotGivenOr[tts.TTS | None] = NOT_GIVEN, allow_interruptions: NotGivenOr[bool] = NOT_GIVEN, require_confirmation: NotGivenOr[bool] = NOT_GIVEN, # deprecated extra_instructions: str = "", ) -> None: if not is_given(instructions): instructions = InstructionParts(persona=PERSONA, extra=extra_instructions) elif extra_instructions: logger.warning("`extra_instructions` will be ignored when `instructions` is provided") if isinstance(instructions, InstructionParts): instructions = Instructions(INSTRUCTIONS_TEMPLATE).format( persona=instructions.persona if is_given(instructions.persona) else PERSONA, extra=instructions.extra, _modality_specific=Instructions(audio=AUDIO_SPECIFIC, text=TEXT_SPECIFIC), _confirmation=Instructions( # confirmation is enabled by default for audio, disabled by default for text audio=CONFIRMATION_INSTRUCTION if require_confirmation is not False else "", text=CONFIRMATION_INSTRUCTION if require_confirmation is True else "", ), ) assert is_given(instructions) # for type checking super().__init__( instructions=instructions, chat_ctx=chat_ctx, turn_detection=turn_detection, tools=tools or [], stt=stt, vad=vad, llm=llm, tts=tts, allow_interruptions=allow_interruptions, ) self._current_email = "" self._require_confirmation = require_confirmation async def on_enter(self) -> None: self.session.generate_reply(instructions="Ask the user to provide an email address.") @function_tool async def update_email_address(self, email: str, ctx: RunContext) -> str | None: """Update the email address provided by the user. Args: email: The email address provided by the user """ email = email.strip() if not re.match(EMAIL_REGEX, email): raise ToolError(f"Invalid email address provided: {email}") self._current_email = email separated_email = " ".join(email) if not self._confirmation_required(ctx): if not self.done(): self.complete(GetEmailResult(email_address=self._current_email)) return None # no need to continue the conversation confirm_tool = self._build_confirm_tool(email=email) current_tools = [t for t in self.tools if t.id != "confirm_email_address"] current_tools.append(confirm_tool) await self.update_tools(current_tools) return ( f"The email has been updated to {email}\n" f"Repeat the email character by character: {separated_email} if needed\n" f"Prompt the user for confirmation, do not call `confirm_email_address` directly" ) def _build_confirm_tool(self, *, email: str) -> llm.FunctionTool: @function_tool() async def confirm_email_address() -> None: """Call after the user confirms the email address is correct.""" if email != self._current_email: self.session.generate_reply( instructions="The email has changed since confirmation was requested, ask the user to confirm the updated email." ) return if not self.done(): self.complete(GetEmailResult(email_address=email)) return confirm_email_address @function_tool(flags=ToolFlag.IGNORE_ON_ENTER) async def decline_email_capture(self, reason: str) -> None: """Handles the case when the user explicitly declines to provide an email address. Args: reason: A short explanation of why the user declined to provide the email address """ if not self.done(): self.complete(ToolError(f"couldn't get the email address: {reason}")) def _confirmation_required(self, ctx: RunContext) -> bool: if is_given(self._require_confirmation): return self._require_confirmation return ctx.speech_handle.input_details.modality == "audio"Abstract base class for generic types.
On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::
class Mapping[KT, VT]: def __getitem__(self, key: KT) -> VT: ... # Etc.On older versions of Python, however, generic classes have to explicitly inherit from Generic.
After a class has been declared to be generic, it can then be used as follows::
def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return defaultAncestors
- livekit.agents.voice.agent.AgentTask
- livekit.agents.voice.agent.Agent
- typing.Generic
Methods
async def decline_email_capture(self, reason: str) ‑> None-
Expand source code
@function_tool(flags=ToolFlag.IGNORE_ON_ENTER) async def decline_email_capture(self, reason: str) -> None: """Handles the case when the user explicitly declines to provide an email address. Args: reason: A short explanation of why the user declined to provide the email address """ if not self.done(): self.complete(ToolError(f"couldn't get the email address: {reason}"))Handles the case when the user explicitly declines to provide an email address.
Args
reason- A short explanation of why the user declined to provide the email address
async def on_enter(self) ‑> None-
Expand source code
async def on_enter(self) -> None: self.session.generate_reply(instructions="Ask the user to provide an email address.")Called when the task is entered
async def update_email_address(self, email: str, ctx: RunContext) ‑> str | None-
Expand source code
@function_tool async def update_email_address(self, email: str, ctx: RunContext) -> str | None: """Update the email address provided by the user. Args: email: The email address provided by the user """ email = email.strip() if not re.match(EMAIL_REGEX, email): raise ToolError(f"Invalid email address provided: {email}") self._current_email = email separated_email = " ".join(email) if not self._confirmation_required(ctx): if not self.done(): self.complete(GetEmailResult(email_address=self._current_email)) return None # no need to continue the conversation confirm_tool = self._build_confirm_tool(email=email) current_tools = [t for t in self.tools if t.id != "confirm_email_address"] current_tools.append(confirm_tool) await self.update_tools(current_tools) return ( f"The email has been updated to {email}\n" f"Repeat the email character by character: {separated_email} if needed\n" f"Prompt the user for confirmation, do not call `confirm_email_address` directly" )Update the email address provided by the user.
Args
email- The email address provided by the user
class GetNameResult (first_name: str | None = None,
middle_name: str | None = None,
last_name: str | None = None)-
Expand source code
@dataclass class GetNameResult: first_name: str | None = None middle_name: str | None = None last_name: str | None = NoneGetNameResult(first_name: 'str | None' = None, middle_name: 'str | None' = None, last_name: 'str | None' = None)
Instance variables
var first_name : str | Nonevar last_name : str | Nonevar middle_name : str | None
class GetNameTask (first_name: bool = True,
last_name: bool = False,
middle_name: bool = False,
name_format: NotGivenOr[str] = NOT_GIVEN,
verify_spelling: bool = False,
extra_instructions: str = '',
chat_ctx: NotGivenOr[llm.ChatContext] = NOT_GIVEN,
turn_detection: NotGivenOr[TurnDetectionMode | None] = NOT_GIVEN,
tools: NotGivenOr[list[llm.Tool | llm.Toolset]] = NOT_GIVEN,
stt: NotGivenOr[stt.STT | None] = NOT_GIVEN,
vad: NotGivenOr[vad.VAD | None] = NOT_GIVEN,
llm: NotGivenOr[llm.LLM | llm.RealtimeModel | None] = NOT_GIVEN,
tts: NotGivenOr[tts.TTS | None] = NOT_GIVEN,
allow_interruptions: NotGivenOr[bool] = NOT_GIVEN,
require_confirmation: NotGivenOr[bool] = NOT_GIVEN)-
Expand source code
class GetNameTask(AgentTask[GetNameResult]): def __init__( self, first_name: bool = True, last_name: bool = False, middle_name: bool = False, name_format: NotGivenOr[str] = NOT_GIVEN, verify_spelling: bool = False, extra_instructions: str = "", chat_ctx: NotGivenOr[llm.ChatContext] = NOT_GIVEN, turn_detection: NotGivenOr[TurnDetectionMode | None] = NOT_GIVEN, tools: NotGivenOr[list[llm.Tool | llm.Toolset]] = NOT_GIVEN, stt: NotGivenOr[stt.STT | None] = NOT_GIVEN, vad: NotGivenOr[vad.VAD | None] = NOT_GIVEN, llm: NotGivenOr[llm.LLM | llm.RealtimeModel | None] = NOT_GIVEN, tts: NotGivenOr[tts.TTS | None] = NOT_GIVEN, allow_interruptions: NotGivenOr[bool] = NOT_GIVEN, require_confirmation: NotGivenOr[bool] = NOT_GIVEN, ) -> None: if not (first_name or middle_name or last_name): raise ValueError("At least one of first_name, middle_name, or last_name must be True") self._collect_first_name = first_name self._collect_last_name = last_name self._collect_middle_name = middle_name self._verify_spelling = verify_spelling self._require_confirmation = require_confirmation if is_given(name_format): self._name_format = name_format else: parts = [] if first_name: parts.append("{first_name}") if middle_name: parts.append("{middle_name}") if last_name: parts.append("{last_name}") self._name_format = " ".join(parts) spelling_instructions = ( "" if not verify_spelling else ( "After receiving the name, always verify the spelling by asking the user to confirm " "or spell out the name letter by letter. " "When confirming, spell out each name part letter by letter to the user. " ) ) confirmation_instructions = ( "Call `confirm_name` after the user confirmed the name is correct." ) extra = extra_instructions if extra_instructions else "" super().__init__( instructions=Instructions( _BASE_INSTRUCTIONS.format( name_format=self._name_format, modality_specific=_AUDIO_SPECIFIC, spelling_instructions=spelling_instructions, confirmation_instructions=( confirmation_instructions if require_confirmation is not False else "" ), extra_instructions=extra, ), text=_BASE_INSTRUCTIONS.format( name_format=self._name_format, modality_specific=_TEXT_SPECIFIC, spelling_instructions=spelling_instructions, confirmation_instructions=( confirmation_instructions if require_confirmation is True else "" ), extra_instructions=extra, ), ), chat_ctx=chat_ctx, turn_detection=turn_detection, tools=tools or [], stt=stt, vad=vad, llm=llm, tts=tts, allow_interruptions=allow_interruptions, ) self._first_name: str = "" self._middle_name: str = "" self._last_name: str = "" async def on_enter(self) -> None: self.session.generate_reply( instructions=f"Ask the user for their name, follow this order '{self._name_format}' but do not mention the format." ) @function_tool() async def update_name( self, ctx: RunContext, first_name: str | None = None, middle_name: str | None = None, last_name: str | None = None, ) -> str | None: """Update the name provided by the user. Args: first_name: The user's first name. middle_name: The user's middle name, if collected. last_name: The user's last name, if collected. """ errors: list[str] = [] if self._collect_first_name and not (first_name and first_name.strip()): errors.append("first name is required but was not provided") if self._collect_middle_name and not (middle_name and middle_name.strip()): errors.append("middle name is required but was not provided") if self._collect_last_name and not (last_name and last_name.strip()): errors.append("last name is required but was not provided") if errors: raise ToolError(f"Incomplete name: {'; '.join(errors)}") self._first_name = first_name.strip() if first_name else "" self._middle_name = middle_name.strip() if middle_name else "" self._last_name = last_name.strip() if last_name else "" full_name = self._name_format.format( first_name=self._first_name, middle_name=self._middle_name, last_name=self._last_name, ).strip() if not self._confirmation_required(ctx): if not self.done(): self.complete( GetNameResult( first_name=self._first_name if self._collect_first_name else None, middle_name=self._middle_name if self._collect_middle_name else None, last_name=self._last_name if self._collect_last_name else None, ) ) return None confirm_tool = self._build_confirm_tool( first_name=self._first_name, middle_name=self._middle_name, last_name=self._last_name, ) current_tools = [t for t in self.tools if t.id != "confirm_name"] current_tools.append(confirm_tool) await self.update_tools(current_tools) if self._verify_spelling: return ( f"The name has been updated to {full_name}\n" f"Spell out the name letter by letter for verification: {full_name}\n" f"Prompt the user for confirmation, do not call `confirm_name` directly" ) return ( f"The name has been updated to {full_name}\n" f"Repeat the name back to the user and prompt for confirmation, " f"do not call `confirm_name` directly" ) def _build_confirm_tool( self, *, first_name: str, middle_name: str, last_name: str ) -> llm.FunctionTool: @function_tool() async def confirm_name() -> None: """Call after the user confirms the name is correct.""" if ( first_name != self._first_name or middle_name != self._middle_name or last_name != self._last_name ): self.session.generate_reply( instructions="The name has changed since confirmation was requested, ask the user to confirm the updated name." ) return if not self.done(): self.complete( GetNameResult( first_name=self._first_name if self._collect_first_name else None, middle_name=self._middle_name if self._collect_middle_name else None, last_name=self._last_name if self._collect_last_name else None, ) ) return confirm_name @function_tool(flags=ToolFlag.IGNORE_ON_ENTER) async def decline_name_capture(self, reason: str) -> None: """Handles the case when the user explicitly declines to provide their name. Args: reason: A short explanation of why the user declined to provide their name """ if not self.done(): self.complete(ToolError(f"couldn't get the name: {reason}")) def _confirmation_required(self, ctx: RunContext) -> bool: if is_given(self._require_confirmation): return self._require_confirmation return ctx.speech_handle.input_details.modality == "audio"Abstract base class for generic types.
On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::
class Mapping[KT, VT]: def __getitem__(self, key: KT) -> VT: ... # Etc.On older versions of Python, however, generic classes have to explicitly inherit from Generic.
After a class has been declared to be generic, it can then be used as follows::
def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return defaultAncestors
- livekit.agents.voice.agent.AgentTask
- livekit.agents.voice.agent.Agent
- typing.Generic
Methods
async def decline_name_capture(self, reason: str) ‑> None-
Expand source code
@function_tool(flags=ToolFlag.IGNORE_ON_ENTER) async def decline_name_capture(self, reason: str) -> None: """Handles the case when the user explicitly declines to provide their name. Args: reason: A short explanation of why the user declined to provide their name """ if not self.done(): self.complete(ToolError(f"couldn't get the name: {reason}"))Handles the case when the user explicitly declines to provide their name.
Args
reason- A short explanation of why the user declined to provide their name
async def on_enter(self) ‑> None-
Expand source code
async def on_enter(self) -> None: self.session.generate_reply( instructions=f"Ask the user for their name, follow this order '{self._name_format}' but do not mention the format." )Called when the task is entered
async def update_name(self,
ctx: RunContext,
first_name: str | None = None,
middle_name: str | None = None,
last_name: str | None = None) ‑> str | None-
Expand source code
@function_tool() async def update_name( self, ctx: RunContext, first_name: str | None = None, middle_name: str | None = None, last_name: str | None = None, ) -> str | None: """Update the name provided by the user. Args: first_name: The user's first name. middle_name: The user's middle name, if collected. last_name: The user's last name, if collected. """ errors: list[str] = [] if self._collect_first_name and not (first_name and first_name.strip()): errors.append("first name is required but was not provided") if self._collect_middle_name and not (middle_name and middle_name.strip()): errors.append("middle name is required but was not provided") if self._collect_last_name and not (last_name and last_name.strip()): errors.append("last name is required but was not provided") if errors: raise ToolError(f"Incomplete name: {'; '.join(errors)}") self._first_name = first_name.strip() if first_name else "" self._middle_name = middle_name.strip() if middle_name else "" self._last_name = last_name.strip() if last_name else "" full_name = self._name_format.format( first_name=self._first_name, middle_name=self._middle_name, last_name=self._last_name, ).strip() if not self._confirmation_required(ctx): if not self.done(): self.complete( GetNameResult( first_name=self._first_name if self._collect_first_name else None, middle_name=self._middle_name if self._collect_middle_name else None, last_name=self._last_name if self._collect_last_name else None, ) ) return None confirm_tool = self._build_confirm_tool( first_name=self._first_name, middle_name=self._middle_name, last_name=self._last_name, ) current_tools = [t for t in self.tools if t.id != "confirm_name"] current_tools.append(confirm_tool) await self.update_tools(current_tools) if self._verify_spelling: return ( f"The name has been updated to {full_name}\n" f"Spell out the name letter by letter for verification: {full_name}\n" f"Prompt the user for confirmation, do not call `confirm_name` directly" ) return ( f"The name has been updated to {full_name}\n" f"Repeat the name back to the user and prompt for confirmation, " f"do not call `confirm_name` directly" )Update the name provided by the user.
Args
first_name- The user's first name.
middle_name- The user's middle name, if collected.
last_name- The user's last name, if collected.
class GetPhoneNumberResult (phone_number: str)-
Expand source code
@dataclass class GetPhoneNumberResult: phone_number: strGetPhoneNumberResult(phone_number: 'str')
Instance variables
var phone_number : str
class GetPhoneNumberTask (extra_instructions: str = '',
chat_ctx: NotGivenOr[llm.ChatContext] = NOT_GIVEN,
turn_detection: NotGivenOr[TurnDetectionMode | None] = NOT_GIVEN,
tools: NotGivenOr[list[llm.Tool | llm.Toolset]] = NOT_GIVEN,
stt: NotGivenOr[stt.STT | None] = NOT_GIVEN,
vad: NotGivenOr[vad.VAD | None] = NOT_GIVEN,
llm: NotGivenOr[llm.LLM | llm.RealtimeModel | None] = NOT_GIVEN,
tts: NotGivenOr[tts.TTS | None] = NOT_GIVEN,
allow_interruptions: NotGivenOr[bool] = NOT_GIVEN,
require_confirmation: NotGivenOr[bool] = NOT_GIVEN)-
Expand source code
class GetPhoneNumberTask(AgentTask[GetPhoneNumberResult]): def __init__( self, extra_instructions: str = "", chat_ctx: NotGivenOr[llm.ChatContext] = NOT_GIVEN, turn_detection: NotGivenOr[TurnDetectionMode | None] = NOT_GIVEN, tools: NotGivenOr[list[llm.Tool | llm.Toolset]] = NOT_GIVEN, stt: NotGivenOr[stt.STT | None] = NOT_GIVEN, vad: NotGivenOr[vad.VAD | None] = NOT_GIVEN, llm: NotGivenOr[llm.LLM | llm.RealtimeModel | None] = NOT_GIVEN, tts: NotGivenOr[tts.TTS | None] = NOT_GIVEN, allow_interruptions: NotGivenOr[bool] = NOT_GIVEN, require_confirmation: NotGivenOr[bool] = NOT_GIVEN, ) -> None: confirmation_instructions = ( "Call `confirm_phone_number` after the user confirmed the phone number is correct." ) extra = extra_instructions if extra_instructions else "" super().__init__( instructions=Instructions( _BASE_INSTRUCTIONS.format( modality_specific=_AUDIO_SPECIFIC, confirmation_instructions=( confirmation_instructions if require_confirmation is not False else "" ), extra_instructions=extra, ), text=_BASE_INSTRUCTIONS.format( modality_specific=_TEXT_SPECIFIC, confirmation_instructions=( confirmation_instructions if require_confirmation is True else "" ), extra_instructions=extra, ), ), chat_ctx=chat_ctx, turn_detection=turn_detection, tools=tools or [], stt=stt, vad=vad, llm=llm, tts=tts, allow_interruptions=allow_interruptions, ) self._current_phone_number = "" self._require_confirmation = require_confirmation async def on_enter(self) -> None: self.session.generate_reply(instructions="Ask the user to provide their phone number.") @function_tool() async def update_phone_number(self, phone_number: str, ctx: RunContext) -> str | None: """Update the phone number provided by the user. Args: phone_number: The phone number provided by the user, digits only with optional leading + """ cleaned = re.sub(r"[\s\-().]+", "", phone_number.strip()) if not re.match(PHONE_REGEX, cleaned): raise ToolError(f"Invalid phone number provided: {phone_number}") self._current_phone_number = cleaned if not self._confirmation_required(ctx): if not self.done(): self.complete(GetPhoneNumberResult(phone_number=self._current_phone_number)) return None # no need to continue the conversation confirm_tool = self._build_confirm_tool(phone_number=cleaned) current_tools = [t for t in self.tools if t.id != "confirm_phone_number"] current_tools.append(confirm_tool) await self.update_tools(current_tools) return ( f"The phone number has been updated to {cleaned}\n" f"Read the number back to the user in groups.\n" f"Prompt the user for confirmation, do not call `confirm_phone_number` directly" ) def _build_confirm_tool(self, *, phone_number: str) -> llm.FunctionTool: @function_tool() async def confirm_phone_number() -> None: """Call after the user confirms the phone number is correct.""" if phone_number != self._current_phone_number: self.session.generate_reply( instructions="The phone number has changed since confirmation was requested, ask the user to confirm the updated number." ) return if not self.done(): self.complete(GetPhoneNumberResult(phone_number=phone_number)) return confirm_phone_number @function_tool(flags=ToolFlag.IGNORE_ON_ENTER) async def decline_phone_number_capture(self, reason: str) -> None: """Handles the case when the user explicitly declines to provide a phone number. Args: reason: A short explanation of why the user declined to provide the phone number """ if not self.done(): self.complete(ToolError(f"couldn't get the phone number: {reason}")) def _confirmation_required(self, ctx: RunContext) -> bool: if is_given(self._require_confirmation): return self._require_confirmation return ctx.speech_handle.input_details.modality == "audio"Abstract base class for generic types.
On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::
class Mapping[KT, VT]: def __getitem__(self, key: KT) -> VT: ... # Etc.On older versions of Python, however, generic classes have to explicitly inherit from Generic.
After a class has been declared to be generic, it can then be used as follows::
def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return defaultAncestors
- livekit.agents.voice.agent.AgentTask
- livekit.agents.voice.agent.Agent
- typing.Generic
Methods
async def decline_phone_number_capture(self, reason: str) ‑> None-
Expand source code
@function_tool(flags=ToolFlag.IGNORE_ON_ENTER) async def decline_phone_number_capture(self, reason: str) -> None: """Handles the case when the user explicitly declines to provide a phone number. Args: reason: A short explanation of why the user declined to provide the phone number """ if not self.done(): self.complete(ToolError(f"couldn't get the phone number: {reason}"))Handles the case when the user explicitly declines to provide a phone number.
Args
reason- A short explanation of why the user declined to provide the phone number
async def on_enter(self) ‑> None-
Expand source code
async def on_enter(self) -> None: self.session.generate_reply(instructions="Ask the user to provide their phone number.")Called when the task is entered
async def update_phone_number(self, phone_number: str, ctx: RunContext) ‑> str | None-
Expand source code
@function_tool() async def update_phone_number(self, phone_number: str, ctx: RunContext) -> str | None: """Update the phone number provided by the user. Args: phone_number: The phone number provided by the user, digits only with optional leading + """ cleaned = re.sub(r"[\s\-().]+", "", phone_number.strip()) if not re.match(PHONE_REGEX, cleaned): raise ToolError(f"Invalid phone number provided: {phone_number}") self._current_phone_number = cleaned if not self._confirmation_required(ctx): if not self.done(): self.complete(GetPhoneNumberResult(phone_number=self._current_phone_number)) return None # no need to continue the conversation confirm_tool = self._build_confirm_tool(phone_number=cleaned) current_tools = [t for t in self.tools if t.id != "confirm_phone_number"] current_tools.append(confirm_tool) await self.update_tools(current_tools) return ( f"The phone number has been updated to {cleaned}\n" f"Read the number back to the user in groups.\n" f"Prompt the user for confirmation, do not call `confirm_phone_number` directly" )Update the phone number provided by the user.
Args
phone_number- The phone number provided by the user, digits only with optional leading +
class InstructionParts (persona: NotGivenOr[Instructions | str] = NOT_GIVEN,
extra: Instructions | str = '')-
Expand source code
@dataclass class InstructionParts: """Customizable instruction sections for built-in workflow tasks. Each field overrides that section when set; leave as ``NOT_GIVEN`` to preserve the workflow's built-in default. Set to ``""`` to remove a section entirely. Args: persona: Agent persona/identity — who the agent is and how it behaves. extra: Extra instructions appended to the prompt. The simplest hook for adding domain context without touching defaults. """ persona: NotGivenOr[Instructions | str] = NOT_GIVEN extra: Instructions | str = ""Customizable instruction sections for built-in workflow tasks.
Each field overrides that section when set; leave as
NOT_GIVENto preserve the workflow's built-in default. Set to""to remove a section entirely.Args
persona- Agent persona/identity — who the agent is and how it behaves.
extra- Extra instructions appended to the prompt. The simplest hook for adding domain context without touching defaults.
Instance variables
var extra : livekit.agents.llm.chat_context.Instructions | strvar persona : str | livekit.agents.llm.chat_context.Instructions | livekit.agents.types.NotGiven
class TaskCompletedEvent (agent_task: livekit.agents.voice.agent.AgentTask, task_id: str, result: Any)-
Expand source code
@dataclass class TaskCompletedEvent: agent_task: AgentTask task_id: str result: AnyTaskCompletedEvent(agent_task: livekit.agents.voice.agent.AgentTask, task_id: str, result: Any)
Instance variables
var agent_task : livekit.agents.voice.agent.AgentTaskvar result : Anyvar task_id : str
class TaskGroup (*,
summarize_chat_ctx: bool = True,
return_exceptions: bool = False,
chat_ctx: livekit.agents.llm.chat_context.ChatContext | livekit.agents.types.NotGiven = NOT_GIVEN,
on_task_completed: collections.abc.Callable[[TaskCompletedEvent], collections.abc.Coroutine[None, None, None]] | None = None)-
Expand source code
class TaskGroup(AgentTask[TaskGroupResult]): def __init__( self, *, summarize_chat_ctx: bool = True, return_exceptions: bool = False, chat_ctx: NotGivenOr[llm.ChatContext] = NOT_GIVEN, on_task_completed: Callable[[TaskCompletedEvent], Coroutine[None, None, None]] | None = None, ): """TaskGroup orchestrates a sequence of multiple AgentTasks. It also allows for users to regress to previous tasks if requested. Args: summarize_chat_ctx (bool): Whether or not to summarize the interactions within the TaskGroup into one message and merge the context. Defaults to True. return_exceptions (bool): Whether or not to directly propagate an error. When set to True, the exception is added to the results dictionary and the sequence continues. Defaults to False. on_task_completed (Callable[]): A callable that executes upon each task completion. The callback takes in a single argument of a TaskCompletedEvent. """ super().__init__( instructions="*empty*", chat_ctx=chat_ctx, llm=NOT_GIVEN ) # the LLM is set as NOT_GIVEN to allow session reusage if supported self._summarize_chat_ctx = summarize_chat_ctx self._return_exceptions = return_exceptions self._visited_tasks = set[str]() self._registered_factories: OrderedDict[str, _FactoryInfo] = OrderedDict() self._task_completed_callback = on_task_completed def add( self, task_factory: Callable[[], AgentTask], *, id: str, description: str, ) -> Self: """Adds an AgentTask to the TaskGroup. Args: task_factory (Callable): A callable that returns a task instance id (str): An identifier for the task used to access results description (str): A description that helps the LLM understand when to regress to this task """ self._registered_factories[id] = _FactoryInfo( task_factory=task_factory, id=id, description=description ) return self async def on_enter(self) -> None: task_stack = list(self._registered_factories.keys()) task_results: dict[str, Any] = {} while len(task_stack) > 0: task_id = task_stack.pop(0) factory_info = self._registered_factories[task_id] self._current_task = factory_info.task_factory() shared_chat_ctx = self.chat_ctx.copy() await self._current_task.update_chat_ctx(shared_chat_ctx) if out_of_scope_tool := self._build_out_of_scope_tool(active_task_id=task_id): current_tools = self._current_task.tools current_tools.append(out_of_scope_tool) await self._current_task.update_tools(current_tools) try: self._visited_tasks.add(task_id) res = await self._current_task # AgentTask handoff merges omit function calls. Re-merge the completed # task context so task-group summarization can incorporate tool results. self._chat_ctx.merge( self._current_task.chat_ctx.copy(), exclude_instructions=True, ) task_results[task_id] = res if self._task_completed_callback is not None: await self._task_completed_callback( TaskCompletedEvent( agent_task=self._current_task, task_id=task_id, result=res ) ) except _OutOfScopeError as e: task_stack.insert(0, task_id) for task_id in reversed(e.target_task_ids): task_stack.insert(0, task_id) continue except Exception as e: if self._return_exceptions: task_results[task_id] = e continue else: self.complete(e) return if self._summarize_chat_ctx: try: assert isinstance(self.session.llm, llm.LLM), ( "llm must be a LLM instance to summarize the chat_ctx" ) # when a task is done, the chat_ctx is going to be merged with the "caller" chat_ctx # enabling summarization will result on only one ChatMessage added. # keep every item to allow summarization to be more action-aware. summarized_chat_ctx = await self.chat_ctx.copy( exclude_instructions=False, exclude_handoff=False, exclude_config_update=False, exclude_empty_message=False, exclude_function_call=False, )._summarize(llm_v=self.session.llm, keep_last_turns=0) await self.update_chat_ctx(summarized_chat_ctx) except Exception as e: self.complete(e) return self.complete(TaskGroupResult(task_results=task_results)) def _build_out_of_scope_tool(self, *, active_task_id: str) -> FunctionTool | None: if not self._visited_tasks: return None # Only allow to regress to already visited tasks task_ids = self._visited_tasks.copy() task_ids.discard(active_task_id) task_repr = { f.id: f.description for f in self._registered_factories.values() if f.id in task_ids } description = ( "Call to regress to other tasks according to what the user requested to modify, return the corresponding task ids. " 'For example, if the user wants to change their email and there is a task with id "email_task" with a description of "Collect the user\'s email", return the id ("get_email_task").' "If the user requests to regress to multiple tasks, such as changing their phone number and email, return both task ids in the order they were requested." f"The following are the IDs and their corresponding task description. {json.dumps(task_repr)}" ) @function_tool(description=description, flags=ToolFlag.IGNORE_ON_ENTER) async def out_of_scope( task_ids: Annotated[ list[str], Field( description="The IDs of the tasks requested", json_schema_extra={"items": {"type": "string", "enum": list(task_ids)}}, ), ], ) -> None: for task_id in task_ids: if task_id not in self._registered_factories or task_id not in self._visited_tasks: raise ToolError(f"unable to regress, invalid task id {task_id}") if not self._current_task.done(): self._current_task.complete(_OutOfScopeError(target_task_ids=task_ids)) return out_of_scopeAbstract base class for generic types.
On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::
class Mapping[KT, VT]: def __getitem__(self, key: KT) -> VT: ... # Etc.On older versions of Python, however, generic classes have to explicitly inherit from Generic.
After a class has been declared to be generic, it can then be used as follows::
def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return defaultTaskGroup orchestrates a sequence of multiple AgentTasks. It also allows for users to regress to previous tasks if requested.
Args
summarize_chat_ctx:bool- Whether or not to summarize the interactions within the TaskGroup into one message and merge the context. Defaults to True.
return_exceptions:bool- Whether or not to directly propagate an error. When set to True, the exception is added to the results dictionary and the sequence continues. Defaults to False.
on_task_completed:Callable[]- A callable that executes upon each task completion. The callback takes in a single argument of a TaskCompletedEvent.
Ancestors
- livekit.agents.voice.agent.AgentTask
- livekit.agents.voice.agent.Agent
- typing.Generic
Methods
def add(self,
task_factory: Callable[[], livekit.agents.voice.agent.AgentTask],
*,
id: str,
description: str) ‑> Self-
Expand source code
def add( self, task_factory: Callable[[], AgentTask], *, id: str, description: str, ) -> Self: """Adds an AgentTask to the TaskGroup. Args: task_factory (Callable): A callable that returns a task instance id (str): An identifier for the task used to access results description (str): A description that helps the LLM understand when to regress to this task """ self._registered_factories[id] = _FactoryInfo( task_factory=task_factory, id=id, description=description ) return selfAdds an AgentTask to the TaskGroup.
Args
task_factory:Callable- A callable that returns a task instance
id:str- An identifier for the task used to access results
description:str- A description that helps the LLM understand when to regress to this task
async def on_enter(self) ‑> None-
Expand source code
async def on_enter(self) -> None: task_stack = list(self._registered_factories.keys()) task_results: dict[str, Any] = {} while len(task_stack) > 0: task_id = task_stack.pop(0) factory_info = self._registered_factories[task_id] self._current_task = factory_info.task_factory() shared_chat_ctx = self.chat_ctx.copy() await self._current_task.update_chat_ctx(shared_chat_ctx) if out_of_scope_tool := self._build_out_of_scope_tool(active_task_id=task_id): current_tools = self._current_task.tools current_tools.append(out_of_scope_tool) await self._current_task.update_tools(current_tools) try: self._visited_tasks.add(task_id) res = await self._current_task # AgentTask handoff merges omit function calls. Re-merge the completed # task context so task-group summarization can incorporate tool results. self._chat_ctx.merge( self._current_task.chat_ctx.copy(), exclude_instructions=True, ) task_results[task_id] = res if self._task_completed_callback is not None: await self._task_completed_callback( TaskCompletedEvent( agent_task=self._current_task, task_id=task_id, result=res ) ) except _OutOfScopeError as e: task_stack.insert(0, task_id) for task_id in reversed(e.target_task_ids): task_stack.insert(0, task_id) continue except Exception as e: if self._return_exceptions: task_results[task_id] = e continue else: self.complete(e) return if self._summarize_chat_ctx: try: assert isinstance(self.session.llm, llm.LLM), ( "llm must be a LLM instance to summarize the chat_ctx" ) # when a task is done, the chat_ctx is going to be merged with the "caller" chat_ctx # enabling summarization will result on only one ChatMessage added. # keep every item to allow summarization to be more action-aware. summarized_chat_ctx = await self.chat_ctx.copy( exclude_instructions=False, exclude_handoff=False, exclude_config_update=False, exclude_empty_message=False, exclude_function_call=False, )._summarize(llm_v=self.session.llm, keep_last_turns=0) await self.update_chat_ctx(summarized_chat_ctx) except Exception as e: self.complete(e) return self.complete(TaskGroupResult(task_results=task_results))Called when the task is entered
class TaskGroupResult (task_results: dict[str, typing.Any])-
Expand source code
@dataclass class TaskGroupResult: task_results: dict[str, Any]TaskGroupResult(task_results: dict[str, typing.Any])
Instance variables
var task_results : dict[str, typing.Any]
class WarmTransferResult (human_agent_identity: str)-
Expand source code
@dataclass class WarmTransferResult: human_agent_identity: strWarmTransferResult(human_agent_identity: 'str')
Instance variables
var human_agent_identity : str
class WarmTransferTask (sip_call_to: NotGivenOr[str] = NOT_GIVEN,
*,
sip_trunk_id: NotGivenOr[str | None] = NOT_GIVEN,
sip_connection: NotGivenOr[api.SIPOutboundConfig] = NOT_GIVEN,
sip_number: NotGivenOr[str] = NOT_GIVEN,
sip_headers: NotGivenOr[dict[str, str]] = NOT_GIVEN,
hold_audio: NotGivenOr[AudioSource | AudioConfig | list[AudioConfig] | None] = NOT_GIVEN,
instructions: NotGivenOr[InstructionParts | Instructions | str] = NOT_GIVEN,
chat_ctx: NotGivenOr[llm.ChatContext] = NOT_GIVEN,
turn_detection: NotGivenOr[TurnDetectionMode | None] = NOT_GIVEN,
tools: NotGivenOr[list[llm.Tool | llm.Toolset]] = NOT_GIVEN,
stt: NotGivenOr[stt.STT | None] = NOT_GIVEN,
vad: NotGivenOr[vad.VAD | None] = NOT_GIVEN,
llm: NotGivenOr[llm.LLM | llm.RealtimeModel | None] = NOT_GIVEN,
tts: NotGivenOr[tts.TTS | None] = NOT_GIVEN,
allow_interruptions: NotGivenOr[bool] = NOT_GIVEN,
extra_instructions: str = '',
target_phone_number: NotGivenOr[str] = NOT_GIVEN)-
Expand source code
class WarmTransferTask(AgentTask[WarmTransferResult]): def __init__( self, sip_call_to: NotGivenOr[str] = NOT_GIVEN, *, sip_trunk_id: NotGivenOr[str | None] = NOT_GIVEN, sip_connection: NotGivenOr[api.SIPOutboundConfig] = NOT_GIVEN, sip_number: NotGivenOr[str] = NOT_GIVEN, sip_headers: NotGivenOr[dict[str, str]] = NOT_GIVEN, hold_audio: NotGivenOr[AudioSource | AudioConfig | list[AudioConfig] | None] = NOT_GIVEN, instructions: NotGivenOr[InstructionParts | Instructions | str] = NOT_GIVEN, chat_ctx: NotGivenOr[llm.ChatContext] = NOT_GIVEN, turn_detection: NotGivenOr[TurnDetectionMode | None] = NOT_GIVEN, tools: NotGivenOr[list[llm.Tool | llm.Toolset]] = NOT_GIVEN, stt: NotGivenOr[stt.STT | None] = NOT_GIVEN, vad: NotGivenOr[vad.VAD | None] = NOT_GIVEN, llm: NotGivenOr[llm.LLM | llm.RealtimeModel | None] = NOT_GIVEN, tts: NotGivenOr[tts.TTS | None] = NOT_GIVEN, allow_interruptions: NotGivenOr[bool] = NOT_GIVEN, # deprecated extra_instructions: str = "", target_phone_number: NotGivenOr[str] = NOT_GIVEN, ) -> None: """Initialize a WarmTransferTask to dial a human agent via SIP. Args: sip_call_to: The phone number or SIP URI to dial for the human agent (e.g. ``"+15105550123"`` or ``"sip:user@example.com"``). sip_trunk_id: ID of a pre-configured LiveKit SIP outbound trunk used to originate the call. Falls back to the ``LIVEKIT_SIP_OUTBOUND_TRUNK`` environment variable when not provided. sip_connection: Low-level SIP connection config (``api.SIPOutboundConfig``) for originating calls from a **custom SIP domain** instead of through a saved trunk. Use this when you need to specify a custom hostname, transport, or authentication credentials directly, bypassing the trunk-based configuration. hold_audio: Audio played to the caller while they are on hold during the transfer. extra_instructions: Extra instructions to append to the base instructions that are used to summarize the conversation history. """ if not is_given(instructions): instructions = InstructionParts(persona=PERSONA, extra=extra_instructions) elif extra_instructions: logger.warning("`extra_instructions` will be ignored when `instructions` is provided") if isinstance(instructions, InstructionParts): conversation_history = self._format_conversation_history(chat_ctx) instructions = Instructions(INSTRUCTIONS_TEMPLATE).format( persona=instructions.persona if is_given(instructions.persona) else PERSONA, extra=instructions.extra, _conversation_history=conversation_history, ) assert is_given(instructions) # for type checking super().__init__( instructions=instructions, chat_ctx=NOT_GIVEN, # don't pass the chat_ctx turn_detection=turn_detection, tools=tools or [], stt=stt, vad=vad, llm=llm, tts=tts, allow_interruptions=allow_interruptions, ) self._caller_room: rtc.Room | None = None self._human_agent_sess: AgentSession | None = None self._human_agent_failed_fut: asyncio.Future[None] = asyncio.Future() self._human_agent_identity = "human-agent-sip" if target_phone_number: logger.warning("`target_phone_number` is deprecated, use `sip_call_to` instead") if not sip_call_to: sip_call_to = target_phone_number if not sip_call_to: raise ValueError("`sip_call_to` must be set") self._sip_call_to = sip_call_to self._sip_connection = sip_connection if is_given(sip_connection) else None self._sip_trunk_id = ( sip_trunk_id if is_given(sip_trunk_id) else os.getenv("LIVEKIT_SIP_OUTBOUND_TRUNK", None) ) if self._sip_trunk_id is None and self._sip_connection is None: raise ValueError( "`LIVEKIT_SIP_OUTBOUND_TRUNK` environment variable, `sip_trunk_id`," " or `sip_connection` must be set" ) self._sip_number = ( sip_number if is_given(sip_number) else os.getenv("LIVEKIT_SIP_NUMBER", "") ) self._sip_headers = sip_headers if is_given(sip_headers) else {} # background audio and io self._background_audio = BackgroundAudioPlayer() self._hold_audio_handle: PlayHandle | None = None self._hold_audio = ( hold_audio if is_given(hold_audio) else AudioConfig(BuiltinAudioClip.HOLD_MUSIC, volume=0.8) ) self._original_io_state: dict[str, bool] = {} @staticmethod def _format_conversation_history(chat_ctx: NotGivenOr[llm.ChatContext]) -> str: if not is_given(chat_ctx) or not chat_ctx: return "" prev_convo = "" for msg in chat_ctx.messages(): if msg.role not in ("user", "assistant"): continue if not msg.text_content: continue role = "Caller" if msg.role == "user" else "Assistant" prev_convo += f"{role}: {msg.text_content}\n" return prev_convo async def on_enter(self) -> None: job_ctx = get_job_context() self._caller_room = job_ctx.room # start the background audio if self._hold_audio is not None: await self._background_audio.start(room=self._caller_room) self._hold_audio_handle = self._background_audio.play(self._hold_audio, loop=True) self._set_io_enabled(False) try: dial_human_agent_task = asyncio.create_task(self._dial_human_agent()) done, _ = await asyncio.wait( (dial_human_agent_task, self._human_agent_failed_fut), return_when=asyncio.FIRST_COMPLETED, ) if dial_human_agent_task not in done: raise RuntimeError() self._human_agent_sess = dial_human_agent_task.result() # let the human speak first except Exception: logger.exception("could not dial human agent") self._set_result(ToolError("could not dial human agent")) return finally: await utils.aio.cancel_and_wait(dial_human_agent_task) @function_tool(flags=ToolFlag.IGNORE_ON_ENTER) async def connect_to_caller(self) -> None: """Called when the human agent wants to connect to the caller.""" logger.debug("connecting to caller") assert self._caller_room is not None await self._merge_calls() self._set_result(WarmTransferResult(human_agent_identity=self._human_agent_identity)) # when the caller or human agent leaves the room, we'll delete the room self._caller_room.on("participant_disconnected", self._on_caller_participant_disconnected) @function_tool(flags=ToolFlag.IGNORE_ON_ENTER) async def decline_transfer(self, reason: str) -> None: """Handles the case when the human agent explicitly declines to connect to the caller. Args: reason: A short explanation of why the human agent declined to connect to the caller """ self._set_result(ToolError(f"human agent declined to connect: {reason}")) @function_tool(flags=ToolFlag.IGNORE_ON_ENTER) async def voicemail_detected(self) -> None: """Called when the call reaches voicemail. Use this tool AFTER you hear the voicemail greeting""" self._set_result(ToolError("voicemail detected")) def _on_human_agent_room_close(self, reason: rtc.DisconnectReason.ValueType) -> None: logger.debug( "human agent's room closed", extra={"reason": rtc.DisconnectReason.Name(reason)}, ) with contextlib.suppress(asyncio.InvalidStateError): self._human_agent_failed_fut.set_result(None) self._set_result(ToolError(f"room closed: {rtc.DisconnectReason.Name(reason)}")) def _on_caller_participant_disconnected(self, participant: rtc.RemoteParticipant) -> None: if participant.kind not in DEFAULT_PARTICIPANT_KINDS: return logger.info(f"participant disconnected from caller room: {participant.identity}, closing") assert self._caller_room is not None self._caller_room.off("participant_disconnected", self._on_caller_participant_disconnected) job_ctx = get_job_context() job_ctx.delete_room(room_name=self._caller_room.name) def _set_result(self, result: WarmTransferResult | Exception) -> None: if self.done(): return if self._human_agent_sess: self._human_agent_sess.shutdown() self._human_agent_sess = None if self._hold_audio_handle: self._hold_audio_handle.stop() self._hold_audio_handle = None self._set_io_enabled(True) self.complete(result) async def _dial_human_agent(self) -> AgentSession: assert self._caller_room is not None job_ctx = get_job_context() ws_url = job_ctx._info.url # create a new room for the human agent human_agent_room_name = self._caller_room.name + "-human-agent" room = rtc.Room() token = ( api.AccessToken() .with_identity(self._caller_room.local_participant.identity) .with_grants( api.VideoGrants( room_join=True, room=human_agent_room_name, can_update_own_metadata=True, can_publish=True, can_subscribe=True, ) ) .with_kind("agent") ).to_jwt() logger.debug( "connecting to human agent room", extra={"ws_url": ws_url, "human_agent_room_name": human_agent_room_name}, ) await room.connect(ws_url, token) # if human agent hung up for whatever reason, we'd resume the caller conversation room.on("disconnected", self._on_human_agent_room_close) human_agent_sess: AgentSession = AgentSession( vad=self.session.vad or NOT_GIVEN, llm=self.session.llm or NOT_GIVEN, stt=self.session.stt or NOT_GIVEN, tts=self.session.tts or NOT_GIVEN, turn_detection=self.session.turn_detection or NOT_GIVEN, ) # create a copy of this AgentTask human_agent_agent = Agent( instructions=self.instructions, turn_detection=self.turn_detection, stt=self.stt, vad=self.vad, llm=self.llm, tts=self.tts, tools=self.tools, chat_ctx=self.chat_ctx, allow_interruptions=self.allow_interruptions, ) await human_agent_sess.start( agent=human_agent_agent, room=room, room_options=room_io.RoomOptions( close_on_disconnect=True, delete_room_on_close=True, participant_identity=self._human_agent_identity, ), record=False, # TODO: support recording on multiple sessions? ) # dial the human agent sip_request = api.CreateSIPParticipantRequest( sip_trunk_id=self._sip_trunk_id, sip_call_to=self._sip_call_to, room_name=human_agent_room_name, participant_identity=self._human_agent_identity, wait_until_answered=True, sip_number=self._sip_number or None, headers=self._sip_headers, ) if self._sip_connection is not None: sip_request.trunk.CopyFrom(self._sip_connection) await job_ctx.api.sip.create_sip_participant(sip_request) return human_agent_sess async def _merge_calls(self) -> None: assert self._caller_room is not None assert self._human_agent_sess is not None job_ctx = get_job_context() human_agent_room = self._human_agent_sess.room_io.room # we no longer care about the human agent session. it's supposed to be over human_agent_room.off("disconnected", self._on_human_agent_room_close) logger.debug(f"moving {self._human_agent_identity} to caller room {self._caller_room.name}") await job_ctx.api.room.move_participant( api.MoveParticipantRequest( room=human_agent_room.name, identity=self._human_agent_identity, destination_room=self._caller_room.name, ) ) def _set_io_enabled(self, enabled: bool) -> None: input = self.session.input output = self.session.output if not self._original_io_state: self._original_io_state = { "audio_input": input.audio_enabled, "video_input": input.video_enabled, "audio_output": output.audio_enabled, "transcription_output": output.transcription_enabled, "video_output": output.video_enabled, } if input.audio: input.set_audio_enabled(enabled and self._original_io_state["audio_input"]) if input.video: input.set_video_enabled(enabled and self._original_io_state["video_input"]) if output.audio: output.set_audio_enabled(enabled and self._original_io_state["audio_output"]) if output.transcription: output.set_transcription_enabled( enabled and self._original_io_state["transcription_output"] ) if output.video: output.set_video_enabled(enabled and self._original_io_state["video_output"])Abstract base class for generic types.
On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::
class Mapping[KT, VT]: def __getitem__(self, key: KT) -> VT: ... # Etc.On older versions of Python, however, generic classes have to explicitly inherit from Generic.
After a class has been declared to be generic, it can then be used as follows::
def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return defaultInitialize a WarmTransferTask to dial a human agent via SIP.
Args
sip_call_to- The phone number or SIP URI to dial for the human agent
(e.g.
"+15105550123"or"sip:user@example.com"). sip_trunk_id- ID of a pre-configured LiveKit SIP outbound trunk used to
originate the call. Falls back to the
LIVEKIT_SIP_OUTBOUND_TRUNKenvironment variable when not provided. sip_connection- Low-level SIP connection config (
api.SIPOutboundConfig) for originating calls from a custom SIP domain instead of through a saved trunk. Use this when you need to specify a custom hostname, transport, or authentication credentials directly, bypassing the trunk-based configuration. hold_audio- Audio played to the caller while they are on hold during the transfer.
extra_instructions- Extra instructions to append to the base instructions that are used to summarize the conversation history.
Ancestors
- livekit.agents.voice.agent.AgentTask
- livekit.agents.voice.agent.Agent
- typing.Generic
Methods
async def connect_to_caller(self) ‑> None-
Expand source code
@function_tool(flags=ToolFlag.IGNORE_ON_ENTER) async def connect_to_caller(self) -> None: """Called when the human agent wants to connect to the caller.""" logger.debug("connecting to caller") assert self._caller_room is not None await self._merge_calls() self._set_result(WarmTransferResult(human_agent_identity=self._human_agent_identity)) # when the caller or human agent leaves the room, we'll delete the room self._caller_room.on("participant_disconnected", self._on_caller_participant_disconnected)Called when the human agent wants to connect to the caller.
async def decline_transfer(self, reason: str) ‑> None-
Expand source code
@function_tool(flags=ToolFlag.IGNORE_ON_ENTER) async def decline_transfer(self, reason: str) -> None: """Handles the case when the human agent explicitly declines to connect to the caller. Args: reason: A short explanation of why the human agent declined to connect to the caller """ self._set_result(ToolError(f"human agent declined to connect: {reason}"))Handles the case when the human agent explicitly declines to connect to the caller.
Args
reason- A short explanation of why the human agent declined to connect to the caller
async def on_enter(self) ‑> None-
Expand source code
async def on_enter(self) -> None: job_ctx = get_job_context() self._caller_room = job_ctx.room # start the background audio if self._hold_audio is not None: await self._background_audio.start(room=self._caller_room) self._hold_audio_handle = self._background_audio.play(self._hold_audio, loop=True) self._set_io_enabled(False) try: dial_human_agent_task = asyncio.create_task(self._dial_human_agent()) done, _ = await asyncio.wait( (dial_human_agent_task, self._human_agent_failed_fut), return_when=asyncio.FIRST_COMPLETED, ) if dial_human_agent_task not in done: raise RuntimeError() self._human_agent_sess = dial_human_agent_task.result() # let the human speak first except Exception: logger.exception("could not dial human agent") self._set_result(ToolError("could not dial human agent")) return finally: await utils.aio.cancel_and_wait(dial_human_agent_task)Called when the task is entered
async def voicemail_detected(self) ‑> None-
Expand source code
@function_tool(flags=ToolFlag.IGNORE_ON_ENTER) async def voicemail_detected(self) -> None: """Called when the call reaches voicemail. Use this tool AFTER you hear the voicemail greeting""" self._set_result(ToolError("voicemail detected"))Called when the call reaches voicemail. Use this tool AFTER you hear the voicemail greeting