Inside a session

Once the worker accepts the job, the framework starts a new process to run your entrypoint function with the context of that specific job.

Each session runs in a separate process to ensure that agents are isolated from each other. This isolation means that if an agent instance crashes, it will not affect other agents running on the same worker.

Entrypoint

The entrypoint function is called as soon as the job is assigned to the worker. From there, you have full control over the session. The session will continue to run until all human participants have left the room, or when the job is explicitly shut down.

async def entrypoint(ctx: JobContext):
# an rtc.Room instance, with full access to LiveKit's realtime SDK features
room = ctx.room
# set up listeners on the room before connecting
@room.on("track_subscribed")
def on_track_subscribed(track: rtc.Track, *_):
if track.kind == rtc.TrackKind.KIND_AUDIO:
async for audio_frame in rtc.AudioStream(track):
do_something(audio_frame)
# connect to room
await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)
# when connected, room.local_participant represents the agent
await room.local_participant.publish_data("hello world")
# iterate through currently connected remote participants
for rp in room.remote_participants.values():
print(rp.identity)

Working examples of LiveKit Agents for Python are available in the repository. More on publishing and receiving tracks will be expanded in a later section.

Customizing for participant

The agent can be customized to behave differently based on the connected participant, enabling a personalized experience.

LiveKit provides several ways to identify participants:

  • ctx.room.name: the name that the participant is connected to
  • participant.identity: the identity of the participant
  • participant.attributes: custom attributes set on the participant

Here's an example:

async def entrypoint(ctx: JobContext):
# connect to the room
await ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)
# wait for the first participant to arrive
participant = await ctx.wait_for_participant()
# customize behavior based on the participant
print(f"connected to room {ctx.room.name} with participant {participant.identity}")
# inspect the current value of the attribute
language = participant.attributes.get("user.language")
# listen to when the attribute is changed
@ctx.room.on("participant_attributes_changed")
def on_participant_attributes_changed(changed_attrs: dict[str, str], p: rtc.Participant):
if p == participant:
language = p.attributes.get("user.language")
print(f"participant {p.identity} changed language to {language}")

Ending the session

Disconnecting the agent

When the agent should no longer be in the room, you could disconnect it from the room. This will allow the other participants in the session to continue. After disconnection, your shutdown hooks will also be called.

async def entrypoint(ctx: JobContext):
# do some work
...
# disconnect from the room
await ctx.shutdown(reason="Session ended")

Disconnecting everyone

If the session should end for everyone, use the server API deleteRoom to end the session.

Realtime clients will receive a Disconnected event, and the room will be removed from the server.

from livekit import api
async def entrypoint(ctx: JobContext):
# do some work
...
client = api.LiveKitAPI(
os.getenv("LIVEKIT_URL"),
os.getenv("LIVEKIT_API_KEY"),
os.getenv("LIVEKIT_API_SECRET"),
)
await client.room.delete_room(api.DeleteRoomRequest(ctx.job.room.name))

Post-processing and cleanup

After the session has ended, you may want to perform post-processing or cleanup tasks. This could include saving user state in a database. Agents framework supports shutdown hooks that are called when the session ends.

async def entrypoint(ctx: JobContext):
async def my_shutdown_hook():
# save user state
...
ctx.add_shutdown_callback(my_shutdown_hook)
info:

Shutdown hooks are expected to complete within a short amount of time. By default, the framework waits 60 seconds before forcefully terminating the agent process. You can adjust this timeout using the shutdown_process_timeout parameter in WorkerOptions.