Job lifecycle

Learn more about the entrypoint function and how to end and clean up LiveKit sessions.

Lifecycle

After a worker accepts the job request from LiveKit server, it starts a new process to run your entrypoint function with the context of that specific job. Each job runs in a separate process to isolate agents from each other. If a session instance crashes, it doesn't affect other agents running on the same worker.

Entrypoint

The entrypoint function is executed as the main function of the process for each new job run by the worker. You have full control over the job in the entrypoint function. The job runs until all participants leave the room, or the job is explicitly shut down.

async def do_something(track: rtc.RemoteAudioTrack):
audio_stream = rtc.AudioStream(track)
async for event in audio_stream:
# Do something here to process event.frame
pass
await audio_stream.aclose()
async def entrypoint(ctx: JobContext):
# an rtc.Room instance from the LiveKit Python SDK
room = ctx.room
# set up listeners on the room before connecting
@room.on("track_subscribed")
def on_track_subscribed(track: rtc.Track, *_):
if track.kind == rtc.TrackKind.KIND_AUDIO:
asyncio.create_task(do_something(track))
# connect to room
await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)
# when connected, room.local_participant represents the agent
await room.local_participant.publish_data("hello world")
# iterate through currently connected remote participants
for rp in room.remote_participants.values():
print(rp.identity)

For more LiveKit Agents examples, see the GitHub repository. To learn more about publishing and receiving tracks, see the following topics:

Passing data to a job

You can customize a job with user or job-specific data using either job metadata, room metadata, or participant attributes.

Job metadata

Job metadata is a freeform string field defined in the dispatch request and consumed in the entrypoint. Use JSON or similar structured data to pass complex information.

For instance, you can pass the user's ID, name, and phone number:

import json
async def entrypoint(ctx: JobContext):
metadata = json.loads(ctx.job.metadata)
user_id = metadata["user_id"]
user_name = metadata["user_name"]
user_phone = metadata["user_phone"]
# ...

For more information on dispatch, see the following article:

Agent dispatch

Learn how to dispatch an agent with custom metadata.

Room metadata and participant attributes

You can also use properties such as the room's name, metadata, and participant attributes to customize agent behavior.

Here's an example showing how to access various properties:

async def entrypoint(ctx: JobContext):
# connect to the room
await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)
# wait for the first participant to arrive
participant = await ctx.wait_for_participant()
# customize behavior based on the participant
print(f"connected to room {ctx.room.name} with participant {participant.identity}")
# inspect the current value of the attribute
language = participant.attributes.get("user.language")
# listen to when the attribute is changed
@ctx.room.on("participant_attributes_changed")
def on_participant_attributes_changed(changed_attrs: dict[str, str], p: rtc.Participant):
if p == participant:
language = p.attributes.get("user.language")
print(f"participant {p.identity} changed language to {language}")

For more information, see the following articles:

Ending the session

Disconnecting the agent

You can disconnect an agent after it completes its task and is no longer needed in the room. This allows the other participants in the LiveKit session to continue. Your shutdown hooks run after the shutdown function.

async def entrypoint(ctx: JobContext):
# do some work
...
# disconnect from the room
ctx.shutdown(reason="Session ended")

Disconnecting everyone

If the session should end for everyone, use the server API deleteRoom to end the session.

The Disconnected room event will be sent, and the room will be removed from the server.

from livekit import api
async def entrypoint(ctx: JobContext):
# do some work
...
api_client = api.LiveKitAPI(
os.getenv("LIVEKIT_URL"),
os.getenv("LIVEKIT_API_KEY"),
os.getenv("LIVEKIT_API_SECRET"),
)
await api_client.room.delete_room(api.DeleteRoomRequest(
room=ctx.job.room.name,
))

Post-processing and cleanup

After a session ends, you can perform post-processing or cleanup tasks using shutdown hooks. For example, you might want to save user state in a database.

async def entrypoint(ctx: JobContext):
async def my_shutdown_hook():
# save user state
...
ctx.add_shutdown_callback(my_shutdown_hook)
Note

Shutdown hooks should complete within a short amount of time. By default, the framework waits 60 seconds before forcefully terminating the process. You can adjust this timeout using the shutdown_process_timeout parameter in WorkerOptions.