Jobs

On this page

A job is a request to handle a stateful session from an end user. When a new room is created or a participant starts publishing to a room, a job is dispatched to an available worker. Once the worker accepts the job, the framework spins up a new process to run the agent code with the context of that specific job.

Running jobs in separate processes ensures that agents are isolated from each other, preventing interference between one another. For example, if a particular agent instance crashes, it will not affect other agents running on the same worker.

async def entrypoint(job: JobContext):
print("job id:", job.id)
print("job info:", job.job)
print("room info:", job.room)
print("participant info:", job.publisher)
print("agent info:", job.agent)
job.shutdown()

The entrypoint function is called when the Agent has successfully connected to the room. It passes a JobContext object, which includes information about the environment and a room object representing a connection to that session. At this point, you can interact with the room, remote participants, and manage tracks as you would with the realtime SDK.

Below is an example of what part of the entry function could look like:

async def entrypoint(job: JobContext):
# listen to participant audio
def on_track_subscribed(track: rtc.Track, *_):
if track.kind == rtc.TrackKind.KIND_AUDIO:
async for audio_frame in rtc.AudioStream(track):
do_something(audio_frame)
job.room.on("track_subscribed", on_track_subscribed)
# publish to a track
source = rtc.AudioSource(24000, 1)
track = rtc.LocalAudioTrack.create_audio_track("agent-mic", source)
options = rtc.TrackPublishOptions()
options.source = rtc.TrackSource.SOURCE_MICROPHONE
await job.agent.publish_track(track_options)
await source.capture_frame(some_data)

Working examples of LiveKit Agents for Python are available in the repository. More on publishing and receiving tracks will be expanded in a later section.

JobType

You can choose to start a new instance of the agent for each room or for each publisher in the room. This can be set when you register your worker:

if __name__ == "__main__":
cli.run_app(WorkerOptions(
request_fnc,
# when omitted, the default is JobType.JT_ROOM
worker_type=JobType.JT_ROOM,
)
)

The JobType enum has two options:

  • JT_ROOM: A new instance of the agent is created for each room.
  • JT_PUBLISHER: A new instance of the agent is created for each publisher in the room.

If the agent is performing resource-intensive operations in a room that could potentially include multiple publishers (for example, processing incoming video from a set of security cameras), it may be desirable to set worker_type to JT_PUBLISHER to ensure that each publisher has its own instance of the agent.

For JT_PUBLISHER jobs, the entrypoint function will be called once for each publisher in the room. The JobContext.publisher object will contain a RemoteParticipant representing that publisher.