This example demonstrates advanced SIP (Session Initiation Protocol) call management. The agent can add new SIP participants to a call, track call status changes, list participants, and cleanly end calls. It monitors SIP-specific participant attributes like call status, trunk information, and phone numbers.
Prerequisites
- Add a
.envin this directory with your LiveKit credentials and SIP trunk ID:LIVEKIT_URL=your_livekit_urlLIVEKIT_API_KEY=your_api_keyLIVEKIT_API_SECRET=your_api_secretSIP_TRUNK_ID=your_sip_trunk_id - Install dependencies:pip install "livekit-agents[silero,deepgram,openai,elevenlabs]" python-dotenv
Set up logging and create the AgentServer
Load environment variables and configure logging. Create an AgentServer to manage the agent lifecycle.
import asyncioimport loggingimport osimport uuidfrom dotenv import load_dotenvfrom livekit.agents import AgentServer, AgentSession, JobContext, JobProcess, cli, Agent, inference, RunContext, function_toolfrom livekit import rtcfrom livekit import apifrom livekit.plugins import sileroload_dotenv()logger = logging.getLogger("sip-lifecycle-agent")logger.setLevel(logging.INFO)server = AgentServer()
Prewarm VAD for faster connections
Preload the VAD model once per process. This runs before any sessions start and stores the VAD instance in proc.userdata so it can be reused.
def prewarm(proc: JobProcess):proc.userdata["vad"] = silero.VAD.load()server.setup_fnc = prewarm
Define the SIP Lifecycle Agent with function tools
The agent stores a reference to the job context to access the LiveKit API. Function tools allow the agent to add SIP participants, end calls, and list participants based on user voice commands.
class SIPLifecycleAgent(Agent):def __init__(self, job_context=None) -> None:self.job_context = job_contextsuper().__init__(instructions="""You are a helpful assistant demonstrating SIP call lifecycle management.You can add SIP participants and end the call when requested.""",)@function_toolasync def add_sip_participant(self, context: RunContext, phone_number: str):"""Add a SIP participant to the current call."""if not self.job_context:logger.error("No job context available")await self.session.say("I'm sorry, I can't add participants at this time.")return None, "Failed to add SIP participant: No job context available"room_name = self.job_context.room.nameidentity = f"sip_{uuid.uuid4().hex[:8]}"sip_trunk_id = os.environ.get('SIP_TRUNK_ID')try:response = await self.job_context.api.sip.create_sip_participant(api.CreateSIPParticipantRequest(sip_trunk_id=sip_trunk_id,sip_call_to=phone_number,room_name=room_name,participant_identity=identity,participant_name=f"SIP Participant {phone_number}",krisp_enabled=True))return None, f"Added SIP participant {phone_number} to the call."except Exception as e:logger.error(f"Error adding SIP participant: {e}")await self.session.say(f"I'm sorry, I couldn't add {phone_number} to the call.")return None, f"Failed to add SIP participant: {e}"@function_toolasync def end_call(self, context: RunContext):"""End the current call by deleting the room."""if not self.job_context:return None, "Failed to end call: No job context available"room_name = self.job_context.room.nametry:await context.session.generate_reply(instructions="Thank you for your time. I'll be ending this call now. Goodbye!")await self.job_context.api.room.delete_room(api.DeleteRoomRequest(room=room_name))return None, "Call ended successfully."except Exception as e:return None, f"Failed to end call: {e}"@function_toolasync def log_participants(self, context: RunContext):"""Log all participants in the current room."""if not self.job_context:return None, "Failed to list participants: No job context available"room_name = self.job_context.room.nametry:response = await self.job_context.api.room.list_participants(api.ListParticipantsRequest(room=room_name))participants = response.participantsawait self.session.say(f"There are {len(participants)} participants in this call.")return None, f"Listed {len(participants)} participants in the room."except Exception as e:return None, f"Failed to list participants: {e}"async def on_enter(self):self.session.generate_reply()
Define the RTC session entrypoint with event handlers
Create the AgentSession with models configured. After starting the session, set up event handlers to monitor participant connections and attribute changes. These handlers log SIP-specific information like call status and phone numbers.
@server.rtc_session()async def entrypoint(ctx: JobContext):ctx.log_context_fields = {"room": ctx.room.name}session = AgentSession(stt=inference.STT(model="deepgram/nova-3", language="en"),llm=inference.LLM(model="openai/gpt-4.1-mini"),tts=inference.TTS(model="elevenlabs/eleven_multilingual_v2"),vad=ctx.proc.userdata["vad"],preemptive_generation=True,)agent = SIPLifecycleAgent(job_context=ctx)await session.start(agent=agent, room=ctx.room)await ctx.connect()def on_participant_connected_handler(participant: rtc.RemoteParticipant):asyncio.create_task(async_on_participant_connected(participant))async def async_on_participant_connected(participant: rtc.RemoteParticipant):logger.info(f"New participant connected: {participant.identity}")if participant.kind == rtc.ParticipantKind.PARTICIPANT_KIND_SIP:if participant.attributes:call_status = participant.attributes.get('sip.callStatus', 'Unknown')phone_number = participant.attributes.get('sip.phoneNumber', 'Unknown')logger.info(f"SIP Call Status: {call_status}, Phone: {phone_number}")await agent.session.say(f"Welcome, {participant.name or participant.identity}!")ctx.room.on("participant_connected", on_participant_connected_handler)
Run the server
The cli.run_app() function starts the agent server, manages the worker lifecycle, and processes incoming jobs.
if __name__ == "__main__":cli.run_app(server)
Run it
Run the agent using the console command for local testing:
python sip_lifecycle.py console
For testing with real SIP calls, use dev mode:
python sip_lifecycle.py dev
How it works
- The agent initializes with function tools for SIP operations.
- When a new participant joins, the event handler logs their SIP attributes (call status, phone number, trunk ID).
- Users can ask the agent to add participants by phone number—the agent uses the LiveKit SIP API to dial out.
- The agent tracks call status changes (dialing, ringing, active, hangup) via attribute change events.
- Users can end the call, which triggers a goodbye message and room deletion.
Full example
import asyncioimport loggingimport osimport uuidfrom dotenv import load_dotenvfrom livekit.agents import AgentServer, AgentSession, JobContext, JobProcess, cli, Agent, inference, RunContext, function_toolfrom livekit import rtcfrom livekit import apifrom livekit.plugins import sileroload_dotenv()logger = logging.getLogger("sip-lifecycle-agent")logger.setLevel(logging.INFO)class SIPLifecycleAgent(Agent):def __init__(self, job_context=None) -> None:self.job_context = job_contextsuper().__init__(instructions="""You are a helpful assistant demonstrating SIP call lifecycle management.You can add SIP participants and end the call when requested.""",)@function_toolasync def add_sip_participant(self, context: RunContext, phone_number: str):"""Add a SIP participant to the current call.Args:context: The call contextphone_number: The phone number to call"""if not self.job_context:logger.error("No job context available")await self.session.say("I'm sorry, I can't add participants at this time.")return None, "Failed to add SIP participant: No job context available"room_name = self.job_context.room.nameidentity = f"sip_{uuid.uuid4().hex[:8]}"sip_trunk_id = os.environ.get('SIP_TRUNK_ID')logger.info(f"Adding SIP participant with phone number {phone_number} to room {room_name}")try:response = await self.job_context.api.sip.create_sip_participant(api.CreateSIPParticipantRequest(sip_trunk_id=sip_trunk_id,sip_call_to=phone_number,room_name=room_name,participant_identity=identity,participant_name=f"SIP Participant {phone_number}",krisp_enabled=True))logger.info(f"Successfully added SIP participant: {response}")return None, f"Added SIP participant {phone_number} to the call."except Exception as e:logger.error(f"Error adding SIP participant: {e}")await self.session.say(f"I'm sorry, I couldn't add {phone_number} to the call.")return None, f"Failed to add SIP participant: {e}"@function_toolasync def end_call(self, context: RunContext):"""End the current call by deleting the room."""if not self.job_context:logger.error("No job context available")await self.session.say("I'm sorry, I can't end the call at this time.")return None, "Failed to end call: No job context available"room_name = self.job_context.room.namelogger.info(f"Ending call by deleting room {room_name}")try:await context.session.generate_reply(instructions="Thank you for your time. I'll be ending this call now. Goodbye!")await self.job_context.api.room.delete_room(api.DeleteRoomRequest(room=room_name))logger.info(f"Successfully deleted room {room_name}")return None, "Call ended successfully."except Exception as e:logger.error(f"Error ending call: {e}")return None, f"Failed to end call: {e}"@function_toolasync def log_participants(self, context: RunContext):"""Log all participants in the current room."""if not self.job_context:logger.error("No job context available")await self.session.say("I'm sorry, I can't list participants at this time.")return None, "Failed to list participants: No job context available"room_name = self.job_context.room.namelogger.info(f"Logging participants in room {room_name}")try:response = await self.job_context.api.room.list_participants(api.ListParticipantsRequest(room=room_name))participants = response.participantsparticipant_info = []for p in participants:participant_info.append({"identity": p.identity,"name": p.name,"state": p.state,"is_publisher": p.is_publisher})logger.info(f"Participants in room {room_name}: {participant_info}")await self.session.say(f"There are {len(participants)} participants in this call.")return None, f"Listed {len(participants)} participants in the room."except Exception as e:logger.error(f"Error listing participants: {e}")return None, f"Failed to list participants: {e}"async def on_enter(self):self.session.generate_reply()server = AgentServer()def prewarm(proc: JobProcess):proc.userdata["vad"] = silero.VAD.load()server.setup_fnc = prewarm@server.rtc_session()async def entrypoint(ctx: JobContext):ctx.log_context_fields = {"room": ctx.room.name}session = AgentSession(stt=inference.STT(model="deepgram/nova-3", language="en"),llm=inference.LLM(model="openai/gpt-4.1-mini"),tts=inference.TTS(model="elevenlabs/eleven_multilingual_v2"),vad=ctx.proc.userdata["vad"],preemptive_generation=True,)agent = SIPLifecycleAgent(job_context=ctx)await session.start(agent=agent, room=ctx.room)await ctx.connect()def on_participant_connected_handler(participant: rtc.RemoteParticipant):asyncio.create_task(async_on_participant_connected(participant))def on_participant_attributes_changed_handler(changed_attributes: dict, participant: rtc.Participant):asyncio.create_task(async_on_participant_attributes_changed(changed_attributes, participant))async def async_on_participant_connected(participant: rtc.RemoteParticipant):logger.info(f"New participant connected: {participant.identity}")# Check if this is a SIP participant and log call statusif participant.kind == rtc.ParticipantKind.PARTICIPANT_KIND_SIP:logger.info(f"SIP participant connected: {participant.identity}")# Log SIP attributesif participant.attributes:call_id = participant.attributes.get('sip.callID', 'Unknown')call_status = participant.attributes.get('sip.callStatus', 'Unknown')phone_number = participant.attributes.get('sip.phoneNumber', 'Unknown')trunk_id = participant.attributes.get('sip.trunkID', 'Unknown')trunk_phone = participant.attributes.get('sip.trunkPhoneNumber', 'Unknown')logger.info(f"SIP Call ID: {call_id}")logger.info(f"SIP Call Status: {call_status}")logger.info(f"SIP Phone Number: {phone_number}")logger.info(f"SIP Trunk ID: {trunk_id}")logger.info(f"SIP Trunk Phone Number: {trunk_phone}")# Log specific call status informationif call_status == 'active':logger.info("Call is active and connected")elif call_status == 'automation':logger.info("Call is connected and dialing DTMF numbers")elif call_status == 'dialing':logger.info("Call is dialing and waiting to be picked up")elif call_status == 'hangup':logger.info("Call has been ended by a participant")elif call_status == 'ringing':logger.info("Inbound call is ringing for the caller")await agent.session.say(f"Welcome, {participant.name or participant.identity}! I can help you add a participant to this call or end the call.")async def async_on_participant_attributes_changed(changed_attributes: dict, participant: rtc.Participant):logger.info(f"Participant {participant.identity} attributes changed: {changed_attributes}")# Check if this is a SIP participant and if call status has changedif participant.kind == rtc.ParticipantKind.PARTICIPANT_KIND_SIP:# Check if sip.callStatus is in the changed attributesif 'sip.callStatus' in changed_attributes:call_status = changed_attributes['sip.callStatus']logger.info(f"SIP Call Status updated: {call_status}")# Log specific call status informationif call_status == 'active':logger.info("Call is now active and connected")elif call_status == 'automation':logger.info("Call is now connected and dialing DTMF numbers")elif call_status == 'dialing':logger.info("Call is now dialing and waiting to be picked up")elif call_status == 'hangup':logger.info("Call has been ended by a participant")elif call_status == 'ringing':logger.info("Inbound call is now ringing for the caller")ctx.room.on("participant_connected", on_participant_connected_handler)ctx.room.on("participant_attributes_changed", on_participant_attributes_changed_handler)if __name__ == "__main__":cli.run_app(server)