Once the worker accepts the job, the framework starts a new process to run your entrypoint function with the context of that specific job.
Each session runs in a separate process to ensure that agents are isolated from each other. This isolation means that if an agent instance crashes, it will not affect other agents running on the same worker.
Entrypoint
The entrypoint function is called as soon as the job is assigned to the worker. From there, you have full control over the session. The session will continue to run until all human participants have left the room, or when the job is explicitly shut down.
async def entrypoint(ctx: JobContext):# an rtc.Room instance, with full access to LiveKit's realtime SDK featuresroom = ctx.room# set up listeners on the room before connecting@room.on("track_subscribed")def on_track_subscribed(track: rtc.Track, *_):if track.kind == rtc.TrackKind.KIND_AUDIO:async for audio_frame in rtc.AudioStream(track):do_something(audio_frame)# connect to roomawait ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)# when connected, room.local_participant represents the agentawait room.local_participant.publish_data("hello world")# iterate through currently connected remote participantsfor rp in room.remote_participants.values():print(rp.identity)
Working examples of LiveKit Agents for Python are available in the repository. More on publishing and receiving tracks will be expanded in a later section.
Customizing for participant
The agent can be customized to behave differently based on the connected participant, enabling a personalized experience.
LiveKit provides several ways to identify participants:
ctx.room.name
: the name that the participant is connected toparticipant.identity
: the identity of the participantparticipant.attributes
: custom attributes set on the participant
Here's an example:
async def entrypoint(ctx: JobContext):# connect to the roomawait ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)# wait for the first participant to arriveparticipant = await ctx.wait_for_participant()# customize behavior based on the participantprint(f"connected to room {ctx.room.name} with participant {participant.identity}")# inspect the current value of the attributelanguage = participant.attributes.get("user.language")# listen to when the attribute is changed@ctx.room.on("participant_attributes_changed")def on_participant_attributes_changed(changed_attrs: dict[str, str], p: rtc.Participant):if p == participant:language = p.attributes.get("user.language")print(f"participant {p.identity} changed language to {language}")
Ending the session
Disconnecting the agent
When the agent should no longer be in the room, you could disconnect it from the room. This will allow the other participants in the session to continue. After disconnection, your shutdown hooks will also be called.
async def entrypoint(ctx: JobContext):# do some work...# disconnect from the roomawait ctx.shutdown(reason="Session ended")
Disconnecting everyone
If the session should end for everyone, use the server API deleteRoom to end the session.
Realtime clients will receive a Disconnected
event, and the room will be removed from the server.
from livekit import apiasync def entrypoint(ctx: JobContext):# do some work...client = api.LiveKitAPI(os.getenv("LIVEKIT_URL"),os.getenv("LIVEKIT_API_KEY"),os.getenv("LIVEKIT_API_SECRET"),)await client.room.delete_room(api.DeleteRoomRequest(ctx.job.room.name))
Post-processing and cleanup
After the session has ended, you may want to perform post-processing or cleanup tasks. This could include saving user state in a database. Agents framework supports shutdown hooks that are called when the session ends.
async def entrypoint(ctx: JobContext):async def my_shutdown_hook():# save user state...ctx.add_shutdown_callback(my_shutdown_hook)
Shutdown hooks are expected to complete within a short amount of time. By default, the framework waits 60 seconds before forcefully terminating the agent process. You can adjust this timeout using the shutdown_process_timeout
parameter in WorkerOptions
.