Skip to main content

Job lifecycle

Learn more about the entrypoint function and how to end and clean up LiveKit sessions.

Lifecycle

When an agent server accepts a job request from LiveKit Cloud, it starts a new process and runs your agent code inside. Each job runs in a separate process to isolate agents from each other. If a session instance crashes, it doesn't affect other agents running on the same agent server. The job runs until all standard and SIP participants leave the room, or you explicitly shut it down.

Entrypoint

The entrypoint is executed as the main function of the process for each new job run by the agent server, effectively handing control over to your code. You should load any necessary app-specific data and then execute your agent's logic.

Defining the entrypoint function

In Python, the entrypoint function is decorated with @server.rtc_session(). In Node.js, the entrypoint function is defined as a property of the default export of the agent file.

Default entrypoint file path

The default Dockerfile template generated by the LiveKit CLI assumes your agent entrypoint file is at src/agent.py (Python) or references the start script in package.json (Node.js). If you restructure your project, update your Dockerfile and startup command to match. See Builds and Dockerfiles for details.

You can use the entrypoint function and Agents Framework without creating an AgentSession. This lets you take advantage of the framework's job context and lifecycle to build a programmatic participant that's automatically dispatched to rooms. To learn more, see Server lifecycle.

Controlling connection

If you use AgentSession, it connects to LiveKit automatically when started. If you're not using AgentSession, or if you need to control the precise timing or method of connection (for example, to enable end-to-end encryption), use the JobContext connect method.

Examples

This example shows a simple entrypoint function that processes incoming audio tracks and publishes a text message to the room.

async def do_something(track: rtc.RemoteAudioTrack):
audio_stream = rtc.AudioStream(track)
async for event in audio_stream:
# Do something here to process event.frame
pass
await audio_stream.aclose()
@server.rtc_session(agent_name="my-agent")
async def my_agent(ctx: JobContext):
# an rtc.Room instance from the LiveKit Python SDK
room = ctx.room
# set up listeners on the room before connecting
@room.on("track_subscribed")
def on_track_subscribed(track: rtc.Track, *_):
if track.kind == rtc.TrackKind.KIND_AUDIO:
asyncio.create_task(do_something(track))
# connect to room
await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)
# when connected, room.local_participant represents the agent
await room.local_participant.send_text('hello world', topic='hello-world')
# iterate through currently connected remote participants
for rp in room.remote_participants.values():
print(rp.identity)
async function doSomething(track: RemoteTrack) {
for await (const frame of new AudioStream(track)) {
// do something with the frame
}
}
export default defineAgent({
entry: async (ctx: JobContext) => {
// an rtc.Room instance from the LiveKit Node.js SDK
const room = ctx.room;
// set up listeners on the room before connecting
room.on(RoomEvent.TrackSubscribed, async (track: RemoteTrack) => {
if (track.kind === TrackKind.KIND_AUDIO) {
doSomething(track);
}
});
await ctx.connect(undefined, AutoSubscribe.AUDIO_ONLY);
// when connected, room.localParticipant represents the agent
await room.localParticipant?.sendText('hello world', {
topic: 'hello-world',
});
// iterate through currently connected remote participants
for (const rp of ctx.room.remoteParticipants.values()) {
console.log(rp.identity);
}
},
});

Working examples of LiveKit Agents for Node.js are available in the repository .

Echo Agent

This programmatic participant example demonstrates how to subscribe to audio tracks and play them back to the room.

For more LiveKit Agents examples, see the GitHub repository .

Publishing and receiving tracks

To learn more about publishing and receiving tracks, see the following topics.

Participant entrypoint function

A participant entrypoint is a callback that runs once for every participant in the room. Register it on JobContext to execute per-participant logic, such as looking up user data, subscribing to specific tracks, or running a long-lived task scoped to that participant, without creating an AgentSession.

The callback runs for participants already in the room when you call ctx.connect(), and for every new participant that joins after. You can register multiple entrypoints and they run concurrently for each participant.

Register before connecting

Call add_participant_entrypoint (Python) or addParticipantEntrypoint (Node.js) before ctx.connect(). If you register an entrypoint after the connection is established, it fires only for participants who join afterward, not for participants already in the room.

@server.rtc_session()
async def entrypoint(ctx: JobContext):
async def greet_participant(ctx: JobContext, p: rtc.RemoteParticipant):
# Access participant identity, attributes, and metadata
logger.info(f"participant joined: {p.identity}")
# Filter out participants you don't need to handle
if p.identity == "some-service-bot":
return
# Run participant-scoped work
await ctx.room.local_participant.send_text(
f"Hello, {p.identity}!", topic="greeting"
)
async def track_participant_audio(ctx: JobContext, p: rtc.RemoteParticipant):
# Multiple entrypoints run concurrently for each participant
logger.info(f"tracking audio for {p.identity}")
await asyncio.sleep(60)
# Register entrypoints before connecting
ctx.add_participant_entrypoint(entrypoint_fnc=greet_participant)
ctx.add_participant_entrypoint(entrypoint_fnc=track_participant_audio)
await ctx.connect(auto_subscribe=AutoSubscribe.SUBSCRIBE_ALL)
export default defineAgent({
entry: async (ctx: JobContext) => {
const greetParticipant = async (ctx: JobContext, p: RemoteParticipant) => {
// Access participant identity, attributes, and metadata
console.log(`participant joined: ${p.identity}`);
// Filter out participants you don't need to handle
if (p.identity === 'some-service-bot') {
return;
}
// Run participant-scoped work
await ctx.room.localParticipant?.sendText(
`Hello, ${p.identity}!`, { topic: 'greeting' }
);
};
const trackParticipantAudio = async (ctx: JobContext, p: RemoteParticipant) => {
// Multiple entrypoints run concurrently for each participant
console.log(`tracking audio for ${p.identity}`);
await new Promise((resolve) => setTimeout(resolve, 60_000));
};
// Register entrypoints before connecting
ctx.addParticipantEntrypoint(greetParticipant);
ctx.addParticipantEntrypoint(trackParticipantAudio);
await ctx.connect(undefined, AutoSubscribe.SUBSCRIBE_ALL);
},
});

In Python, add_participant_entrypoint accepts a kind parameter to restrict which participant types trigger the callback. By default, entrypoints run for standard, SIP, and connector participants. To run only for SIP participants, for example, pass kind=rtc.ParticipantKind.PARTICIPANT_KIND_SIP.

Adding custom fields to agent logs

Only Available inPython

Each job outputs JSON-formatted logs that include the user transcript, turn detection data, job ID, process ID, and more. You can include custom fields in the logs using ctx.log_context_fields for additional diagnostic context.

The following example adds worker ID and room name to the logs:

@server.rtc_session(agent_name="my-agent")
async def my_agent(ctx: JobContext):
ctx.log_context_fields = {
"worker_id": ctx.worker_id,
"room_name": ctx.room.name,
}

To learn more, see the reference documentation for JobContext.log_context_fields.

Passing data to a job

You can customize a job with user or job-specific data using either job metadata, room metadata, or participant attributes.

Job metadata

Job metadata is a freeform string field defined in the dispatch request and consumed in the entrypoint function. Use JSON or similar structured data to pass complex information.

The following example assumes your agent dispatch request includes the user_id, user_name, and user_phone fields in the metadata. You can access this data in the entrypoint function:

import json
@server.rtc_session(agent_name="my-agent")
async def my_agent(ctx: JobContext):
metadata = json.loads(ctx.job.metadata)
user_id = metadata["user_id"]
user_name = metadata["user_name"]
user_phone = metadata["user_phone"]
# ...
export default defineAgent({
entry: async (ctx: JobContext) => {
const metadata = JSON.parse(ctx.job.metadata);
const userId = metadata.user_id;
const userName = metadata.user_name;
const userPhone = metadata.user_phone;
// ...
},
});

For more information on dispatch, see the following article:

Agent dispatch

Learn how to dispatch an agent with custom metadata.

Room metadata and participant attributes

You can also use properties such as the room's name, metadata, and participant attributes to customize agent behavior.

Telephony use case

For outbound calling agents, use wait_for_participant with the SIP participant's identity to confirm they've joined the room before starting the agent session. For a complete example including call failure handling and voicemail detection, see Handling call outcomes.

Here's an example showing how to access various properties:

@server.rtc_session(agent_name="my-agent")
async def my_agent(ctx: JobContext):
# connect to the room
await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)
# wait for the first participant to arrive
participant = await ctx.wait_for_participant()
# customize behavior based on the participant
print(f"connected to room {ctx.room.name} with participant {participant.identity}")
# inspect the current value of the attribute
language = participant.attributes.get("user.language")
# listen to when the attribute is changed
@ctx.room.on("participant_attributes_changed")
def on_participant_attributes_changed(changed_attrs: dict[str, str], p: rtc.Participant):
if p == participant:
language = p.attributes.get("user.language")
print(f"participant {p.identity} changed language to {language}")
export default defineAgent({
entry: async (ctx: JobContext) => {
// connect to the room
await ctx.connect(undefined, AutoSubscribe.AUDIO_ONLY);
// wait for the first participant to arrive
const participant = await ctx.waitForParticipant();
// customize behavior based on the participant
console.log(`connected to room ${ctx.room.name} with participant ${participant.identity}`);
// inspect the current value of the attribute
let language = participant.attributes['user.language'];
// listen to when the attribute is changed
ctx.room.on(
'participantAttributesChanged',
(changedAttrs: Record<string, string>, p: Participant) => {
if (p === participant) {
language = p.attributes['user.language'];
console.log(`participant ${p.identity} changed language to ${language}`);
}
},
);
},
});

For more information, see the following topics:

Ending the session

Close the session and disconnect the agent from the room using the shutdown() method. This method waits for queued operations to complete, commits any remaining user transcripts, and closes all I/O connections. If the drain parameter is True, the session gracefully drains pending speech before closing.

Other participants in the LiveKit room can continue. Your shutdown hooks run after the shutdown function.

In Python, use the session.shutdown() method to gracefully close the session and disconnect the agent from the room.

# Graceful shutdown with draining
session.shutdown(drain=True)
# Or immediate close
await session.aclose()

In Node.js, use the ctx.shutdown() method to close the session and disconnect the agent from the room.

export default defineAgent({
entry: async (ctx: JobContext) => {
// do some work...
// Graceful shutdown with draining
ctx.shutdown(drain=true);
// Or immediate close
await ctx.aclose();
},
});

The difference between shutdown() and aclose() is as follows:

  • agent_session.shutdown(): Takes an optional drain parameter that allows you to shutdown gracefully and drain pending speech before closing. It's a non-blocking call that executes in the background. The shutdown operations happen asynchronously while your code continues executing.
  • agent_session.aclose(): Executes the shutdown operation immediately. It's an awaitable method (async) that pauses the current coroutine execution until the close operation is finished. Your code doesn't proceed until aclose() completes.

After you shutdown the session, you can delete the room if it's no longer needed.

Delete the room

You can configure the agent session to automatically delete the room on session end by setting the delete_room_on_close parameter to True. To learn more, see Delete room when session ends.

Alternatively, you can delete the room manually. If the session should end for everyone, use the server API deleteRoom to end the session. This disconnects all participants from the room.

When the room is removed from the server, a disconnected room event is emitted.

from livekit import api
async def entrypoint(ctx: JobContext):
# do some work
...
api_client = api.LiveKitAPI(
os.environ["LIVEKIT_URL"],
os.environ["LIVEKIT_API_KEY"],
os.environ["LIVEKIT_API_SECRET"],
)
await api_client.room.delete_room(api.DeleteRoomRequest(
room=ctx.job.room.name,
))
export default defineAgent({
entry: async (ctx: JobContext) => {
// do some work...
const roomServiceClient = new RoomServiceClient(
process.env.LIVEKIT_URL,
process.env.LIVEKIT_API_KEY,
process.env.LIVEKIT_API_SECRET,
);
await roomServiceClient.deleteRoom(ctx.job.room.name);
},
});

Post-processing and cleanup

After a session ends, you can perform post-processing or cleanup tasks using shutdown hooks. For example, you might want to save user state in a database.

async def entrypoint(ctx: JobContext):
async def my_shutdown_hook():
# save user state
...
ctx.add_shutdown_callback(my_shutdown_hook)
export default defineAgent({
entry: async (ctx: JobContext) => {
ctx.addShutdownCallback(() => {
// save user state...
});
},
});
Note

Shutdown hooks should complete within a short amount of time. By default, the framework waits 10 seconds before forcefully terminating the process. You can adjust this timeout using the shutdown_process_timeout parameter in agent server options.