This guide explains the core ideas and components that make up Agents. For a step-by-step guide on building an Agent without the deep dive into its inner workings, check out the quickstart guide.
Agent lifecycle
The framework turns your program into an "agent" that can join a LiveKit room and interact with other participants. Here's a high-level overview of the lifecycle:
- Worker registration: When you run
myagent.py start
, it connects to LiveKit server and registers itself as a "worker" via a persistent WebSocket connection. Once registered, the app is on standby, waiting for rooms (sessions with end-users) to be created. It exchanges availability and capacity information with LiveKit server automatically, allowing for correct load balancing of incoming requests. - Agent dispatch: When an end-user connects to a room, LiveKit server selects an available worker and sends it information about that session. The first worker to accept that request will instantiate your program and join the room. A worker can host multiple instances of your agent simultaneously, running each in its own process for isolation.
- Your program: This is where you take over. Your program can use most LiveKit client features via our Python SDK. Agents can also leverage the plugin ecosystem to process or synthesize voice and video data.
- Room close: When the last participant (excluding your agent) leaves the room, the room will close, disconnecting all connected agents.
Worker options
In stateful computing, a worker is similar to a web server process in traditional web architecture. The worker acts as the program's main loop, responsible for deploying and monitoring instances of the Agent on child processes. It can handle many Agent instances with minimal overhead, effectively utilizing machine resources.
When a worker experiences high load, it stops taking on new sessions and notifies LiveKit's server.
As you deploy updates to your program, the worker will gracefully drain existing sessions before shutting down, ensuring no sessions are interrupted mid-call.
The interface for creating a worker is through the WorkerOptions
class:
opts = WorkerOptions(# entrypoint function is called when a job is assigned to this worker# this is the only required parameter to WorkerOptionsentrypoint_fnc,# inspect the request and decide if the current worker should handle it.request_fnc,# a function to perform any necessary initialization in a new process.prewarm_fnc,# a function that reports the current system load, whether CPU or RAM, etc.load_fnc,# the maximum value of load_fnc, above which new processes will not spawnload_threshold,# whether the agent can subscribe to tracks, publish data, update metadata, etc.permissions,# the type of worker to create, either JT_ROOM or JT_PUBLISHERworker_type=WorkerType.ROOM,)# start the workercli.run_app(opts)
While it is possible to supply API keys and secrets to the worker directly through WorkerOptions
, for security reasons it is recommended to set them as environmental variables that the worker will then read in. The full list of worker options, information about them, and their default values can be found in the source code.
Entrypoint
This is the main function that is called when a new job is assigned to the worker. It is the entry point for your agent's logic. The entrypoint is called before the agent joins the room, and is where you can set up any necessary state or configuration.
async def entrypoint(ctx: JobContext):# connect to the roomawait ctx.connect()# handle the session...
For details about the entrypoint function, refer to the Inside a session section.
Request handler
The request_fnc
function is executed each time that the server has a job for the agent. The framework expects workers to explicitly accept or reject each job request. If you accept the request, your entrypoint function will be called. If the request is rejected, it'll be sent to the next available worker.
By default, if left blank, the behavior is to auto-accept all requests dispatched to the worker.
We are aware of a bug affecting request rejection behavior, and it is not functioning as expected. A fix is in progress. Until resolved, please avoid relying on this feature.
async def request_fnc(req: JobRequest):# accept the job requestawait req.accept(# the agent's name (Participant.name), defaults to ""name="agent",# the agent's identity (Participant.identity), defaults to "agent-<jobid>"identity="identity",)# or reject it# await req.reject()
Prewarm function
For isolation and performance reasons, the framework runs each agent session in its own process. Agents often need access to model files that take time to load. To address this, the prewarm function can be used to warm up the process before assigning any jobs to it. You can control the number of processes to keep warm using the num_idle_processes
parameter.
def prewarm_fnc(proc: JobProcess):# load silero weights and store to process userdataproc.userdata["vad"] = silero.VAD.load()async def entrypoint(ctx: JobContext):# access the loaded silero instancevad: silero.VAD = ctx.proc.userdata["vad"]opts = WorkerOptions(entrypoint_fnc=entrypoint, prewarm_fnc=prewarm_fnc)
Permissions
By default, agents are allowed to both publish and subscribe from the others in the same Room. However, you can customize these permissions by setting the permissions
parameter in WorkerOptions
.
opts = WorkerOptions(...permissions=WorkerPermissions(can_publish=True,can_subscribe=True,# when set to true, the agent won't be visible to others in the room.# when hidden, it will also not be able to publish tracks to the room as it won't be visible.hidden=False,),)
Worker type
You can choose to start a new instance of the agent for each room or for each publisher in the room. This can be set when you register your worker:
opts = WorkerOptions(...# when omitted, the default is WorkerType.ROOMworker_type=WorkerType.ROOM,)
The WorkerType
enum has two options:
ROOM
: A new instance of the agent is created for each room.PUBLISHER
: A new instance of the agent is created for each publisher in the room.
If the agent is performing resource-intensive operations in a room that could potentially include multiple publishers (for example, processing incoming video from a set of security cameras), it may be desirable to set worker_type
to JT_PUBLISHER
to ensure that each publisher has its own instance of the agent.
For PUBLISHER
jobs, the entrypoint
function will be called once for each publisher in the room. The JobContext.publisher
object will contain a RemoteParticipant
representing that publisher.
Starting the worker
Finally, to spin up a worker with the configuration defined using WorkerOptions
, call the CLI:
if __name__ == "__main__":cli.run_app(opts)
The Agents worker CLI provides two subcommands: start
and dev
. The former outputs raw JSON data to stdout, and is recommended for production. dev
is recommended to use for development, as it outputs human-friendly colored logs, and supports hot reloading on Python.