WorkerOptions parameters
The interface for creating a worker is through the WorkerOptions
class. The following only includes some of the available parameters. For the complete list, see the WorkerOptions reference.
opts = WorkerOptions(# entrypoint function is called when a job is assigned to this worker# this is the only required parameter to WorkerOptionsentrypoint_fnc,# inspect the request and decide if the current worker should handle it.request_fnc,# a function to perform any necessary initialization in a new process.prewarm_fnc,# a function that reports the current system load, whether CPU or RAM, etc.load_fnc,# the maximum value of load_fnc, above which new processes will not spawnload_threshold,# whether the agent can subscribe to tracks, publish data, update metadata, etc.permissions,# the type of worker to create, either JT_ROOM or JT_PUBLISHERworker_type=WorkerType.ROOM,)# start the workercli.run_app(opts)
For security purposes, set the LiveKit API key and secret as environment variables rather than as WorkerOptions
parameters.
Entrypoint
This is the main function called when LiveKit server assigns the worker a new job. It's the entrypoint for your agent. logic. The entrypoint function runs before the agent joins the room, and is where you can set up any necessary state or configuration. To learn more about the entrypoint function, see the Job lifecycle topic.
async def entrypoint(ctx: JobContext):# connect to the roomawait ctx.connect()# handle the session...
Request handler
The request_fnc
function is executed each time that the server has a job for the agent. The framework expects workers to explicitly accept or reject each job request. If you accept the request, your entrypoint function is called. If the request is rejected, it's sent to the next available worker.
By default, if left blank, the behavior is to auto-accept all requests dispatched to the worker.
async def request_fnc(req: JobRequest):# accept the job requestawait req.accept(# the agent's name (Participant.name), defaults to ""name="agent",# the agent's identity (Participant.identity), defaults to "agent-<jobid>"identity="identity",# attributes to set on the agent participant upon joinattributes={"myagent": "rocks"},)# or reject it# await req.reject()opts = WorkerOptions(entrypoint_fnc=entrypoint, request_fnc=request_fnc)
Prewarm function
For isolation and performance reasons, the framework runs each agent job in its own process. Agents often need access to model files that take time to load. To address this, you can use a prewarm
function to warm up the process before assigning any jobs to it. You can control the number of processes to keep warm using the num_idle_processes
parameter.
def prewarm_fnc(proc: JobProcess):# load silero weights and store to process userdataproc.userdata["vad"] = silero.VAD.load()async def entrypoint(ctx: JobContext):# access the loaded silero instancevad: silero.VAD = ctx.proc.userdata["vad"]opts = WorkerOptions(entrypoint_fnc=entrypoint, prewarm_fnc=prewarm_fnc)
Permissions
By default, agents can both publish to and subscribe from the other participants in the same room. However, you can customize these permissions by setting the permissions
parameter in WorkerOptions
. To see the full list of parameters, see the WorkerPermissions reference.
opts = WorkerOptions(...permissions=WorkerPermissions(can_publish=True,can_subscribe=True,can_publish_data=True,# when set to true, the agent won't be visible to others in the room.# when hidden, it will also not be able to publish tracks to the room as it won't be visible.hidden=False,),)
Worker type
You can choose to start a new instance of the agent for each room or for each publisher in the room. This can be set when you register your worker:
opts = WorkerOptions(...# when omitted, the default is WorkerType.ROOMworker_type=WorkerType.ROOM,)
The WorkerType
enum has two options:
ROOM
: Create a new instance of the agent for each room.PUBLISHER
: Create a new instance of the agent for each publisher in the room.
If the agent is performing resource-intensive operations in a room that could potentially include multiple publishers (for example, processing incoming video from a set of security cameras), you can set worker_type
to JT_PUBLISHER
to ensure that each publisher has its own instance of the agent.
For PUBLISHER
jobs, call the entrypoint
function once for each publisher in the room. The JobContext.publisher
object contains a RemoteParticipant
representing that publisher.
Starting the worker
To spin up a worker with the configuration defined using WorkerOptions
, call the CLI:
if __name__ == "__main__":cli.run_app(opts)
The Agents worker CLI provides two subcommands: start
and dev
. The former outputs raw JSON data to stdout, and is recommended for production. dev
is recommended to use for development, as it outputs human-friendly colored logs, and supports hot reloading on Python.