Lifecycle
After a worker accepts the job request from LiveKit server, it starts a new process to run your entrypoint function with the context of that specific job. Each job runs in a separate process to isolate agents from each other. If a session instance crashes, it doesn't affect other agents running on the same worker.
Entrypoint
The entrypoint function is executed as the main function of the process for each new job run by the worker. You have full control over the job in the entrypoint function. The job runs until all participants leave the room, or the job is explicitly shut down.
async def do_something(track: rtc.RemoteAudioTrack):audio_stream = rtc.AudioStream(track)async for event in audio_stream:# Do something here to process event.framepassawait audio_stream.aclose()async def entrypoint(ctx: JobContext):# an rtc.Room instance from the LiveKit Python SDKroom = ctx.room# set up listeners on the room before connecting@room.on("track_subscribed")def on_track_subscribed(track: rtc.Track, *_):if track.kind == rtc.TrackKind.KIND_AUDIO:asyncio.create_task(do_something(track))# connect to roomawait ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)# when connected, room.local_participant represents the agentawait room.local_participant.publish_data("hello world")# iterate through currently connected remote participantsfor rp in room.remote_participants.values():print(rp.identity)
For more LiveKit Agents examples, see the GitHub repository. To learn more about publishing and receiving tracks, see the following topics:
Media tracks
Use the microphone, speaker, cameras, and screenshare with your agent.
Realtime text and data
Use text and data channels to communicate with your agent.
Passing data to a job
You can customize a job with user or job-specific data using either job metadata, room metadata, or participant attributes.
Job metadata
Job metadata is a freeform string field defined in the dispatch request and consumed in the entrypoint
. Use JSON or similar structured data to pass complex information.
For instance, you can pass the user's ID, name, and phone number:
import jsonasync def entrypoint(ctx: JobContext):metadata = json.loads(ctx.job.metadata)user_id = metadata["user_id"]user_name = metadata["user_name"]user_phone = metadata["user_phone"]# ...
For more information on dispatch, see the following article:
Agent dispatch
Learn how to dispatch an agent with custom metadata.
Room metadata and participant attributes
You can also use properties such as the room's name, metadata, and participant attributes to customize agent behavior.
Here's an example showing how to access various properties:
async def entrypoint(ctx: JobContext):# connect to the roomawait ctx.connect(auto_subscribe=AutoSubscribe.AUDIO_ONLY)# wait for the first participant to arriveparticipant = await ctx.wait_for_participant()# customize behavior based on the participantprint(f"connected to room {ctx.room.name} with participant {participant.identity}")# inspect the current value of the attributelanguage = participant.attributes.get("user.language")# listen to when the attribute is changed@ctx.room.on("participant_attributes_changed")def on_participant_attributes_changed(changed_attrs: dict[str, str], p: rtc.Participant):if p == participant:language = p.attributes.get("user.language")print(f"participant {p.identity} changed language to {language}")
For more information, see the following articles:
Room metadata
Learn how to set and use room metadata.
Participant attributes & metadata
Learn how to set and use participant attributes and metadata.
Ending the session
Disconnecting the agent
You can disconnect an agent after it completes its task and is no longer needed in the room. This allows the other participants in the LiveKit session to continue. Your shutdown hooks run after the shutdown
function.
async def entrypoint(ctx: JobContext):# do some work...# disconnect from the roomctx.shutdown(reason="Session ended")
Disconnecting everyone
If the session should end for everyone, use the server API deleteRoom to end the session.
The Disconnected
room event will be sent, and the room will be removed from the server.
from livekit import apiasync def entrypoint(ctx: JobContext):# do some work...api_client = api.LiveKitAPI(os.getenv("LIVEKIT_URL"),os.getenv("LIVEKIT_API_KEY"),os.getenv("LIVEKIT_API_SECRET"),)await api_client.room.delete_room(api.DeleteRoomRequest(room=ctx.job.room.name,))
Post-processing and cleanup
After a session ends, you can perform post-processing or cleanup tasks using shutdown hooks. For example, you might want to save user state in a database.
async def entrypoint(ctx: JobContext):async def my_shutdown_hook():# save user state...ctx.add_shutdown_callback(my_shutdown_hook)
Shutdown hooks should complete within a short amount of time. By default, the framework waits 60 seconds before forcefully terminating the process. You can adjust this timeout using the shutdown_process_timeout
parameter in WorkerOptions.