Agents Quickstart

Let's build a 'painter' agent capable of generating images from voice input. Generated images will be transmitted back to the user over a single WebRTC video track.

Pre-requisites

You'll need the following for this quickstart:

Building an agent

1. Create a virtualenv

Agents requires Python 3.9+. Some plugins may require 3.10+.

mkdir agent-quickstart
cd agent-quickstart
python3 -m venv venv
source venv/bin/activate

2. Install LiveKit Agents

We'll be using Whisper (speech-to-text) and Dall·E 3, both from OpenAI.

pip install livekit livekit-agents livekit-plugins-silero livekit-plugins-openai

3. Agent code

Create a file named agent.py with the following:

import asyncio
from datetime import datetime
import json
import logging
from typing import Optional
from livekit import agents, rtc
from livekit.plugins import openai, silero
class PainterAgent:
@classmethod
async def create(cls, ctx: agents.JobContext):
agent = PainterAgent(ctx)
await agent.start()
def __init__(self, ctx: agents.JobContext):
# plugins
self.whisper_stt = openai.STT()
self.vad = silero.VAD()
self.dalle = openai.Dalle3()
self.ctx = ctx
self.chat = rtc.ChatManager(ctx.room)
self.prompt: Optional[str] = None
self.current_image: Optional[rtc.VideoFrame] = None
# setup callbacks
def subscribe_cb(
track: rtc.Track,
publication: rtc.TrackPublication,
participant: rtc.RemoteParticipant,
):
self.ctx.create_task(self.audio_track_worker(track))
def process_chat(msg: rtc.ChatMessage):
self.prompt = msg.message
self.ctx.room.on("track_subscribed", subscribe_cb)
self.chat.on("message_received", process_chat)
async def start(self):
# give a bit of time for the user to fully connect so they don't miss
# the welcome message
await asyncio.sleep(1)
# create_task is used to run coroutines in the background
self.ctx.create_task(
self.chat.send_message(
"Welcome to the painter agent! Speak or type what you'd like me to paint."
)
)
self.ctx.create_task(self.image_generation_worker())
self.ctx.create_task(self.image_publish_worker())
self.update_agent_state("listening")
def update_agent_state(self, state: str):
metadata = json.dumps(
{
"agent_state": state,
}
)
self.ctx.create_task(self.ctx.room.local_participant.update_metadata(metadata))
async def audio_track_worker(self, track: rtc.Track):
audio_stream = rtc.AudioStream(track)
vad_stream = self.vad.stream(min_silence_duration=2.0)
stt = agents.stt.StreamAdapter(self.whisper_stt, vad_stream)
stt_stream = stt.stream()
self.ctx.create_task(self.stt_worker(stt_stream))
async for audio_frame_event in audio_stream:
stt_stream.push_frame(audio_frame_event.frame)
await stt_stream.flush()
async def stt_worker(self, stt_stream: agents.stt.SpeechStream):
async for event in stt_stream:
# we only want to act when result is final
if not event.is_final:
continue
speech = event.alternatives[0]
self.prompt = speech.text
await stt_stream.aclose()
async def image_generation_worker(self):
# task will be canceled when Agent is disconnected
while True:
prompt, self.prompt = self.prompt, None
if prompt:
self.update_agent_state("generating")
self.ctx.create_task(
self.chat.send_message(
f'Generating "{prompt}". It\'ll be just a minute.'
)
)
started_at = datetime.now()
try:
argb_frame = await self.dalle.generate(prompt, size="1792x1024")
self.current_image = argb_frame
elapsed = (datetime.now() - started_at).seconds
self.ctx.create_task(
self.chat.send_message(f"Done! Took {elapsed} seconds.")
)
except Exception as e:
logging.error("failed to generate image: %s", e, exc_info=e)
self.ctx.create_task(
self.chat.send_message("Sorry, I ran into an error.")
)
self.update_agent_state("listening")
await asyncio.sleep(0.05)
async def image_publish_worker(self):
video_source = rtc.VideoSource(1792, 1024)
track = rtc.LocalVideoTrack.create_video_track("image", video_source)
await self.ctx.room.local_participant.publish_track(track)
image: rtc.VideoFrame = None
while True:
if self.current_image:
image, self.current_image = self.current_image, None
if not image:
await asyncio.sleep(0.1)
continue
video_source.capture_frame(image)
# publish at 1fps
await asyncio.sleep(1)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
async def job_request_cb(job_request: agents.JobRequest):
await job_request.accept(
PainterAgent.create,
identity="painter",
name="Painter",
# subscribe to all audio tracks automatically
auto_subscribe=agents.AutoSubscribe.AUDIO_ONLY,
# disconnect when the last participant leaves
auto_disconnect=agents.AutoDisconnect.DEFAULT,
)
worker = agents.Worker(request_handler=job_request_cb)
agents.run_app(worker)

Running the Agent

Ensure the following variables are set in your environment:

export LIVEKIT_URL=<your LiveKit server URL>
export LIVEKIT_API_KEY=<your API Key>
export LIVEKIT_API_SECRET=<your API Secret>
export OPENAI_API_KEY=<add_openai_api_key_here>

Then run the agent

python agent.py start

Your agent is now running in worker mode. Any time a room is created, your agent will join it and can interact with other participants in that room. You may start as many workers as you like. Each room will be assigned to a single worker.

Testing the Agent

Your agent can now interact with your end-users in the same room via browser or native apps.

To make prototyping and testing easier, we've created an example frontend you can use with any agent running on the backend. Head over to the Agents Playground and enter your LiveKit URL and a access token to start testing.

For LiveKit Cloud users, you can use the following:

URL: <your LiveKit server URL>
Token: <generate a token>

Design Notes

There are several things worth noting about the agent code above:

Worker and agent

Your program/script becomes a worker by using the agents.Worker class

worker = agents.Worker(request_handler=job_request_cb)
agents.run_app(worker)

When you run the program with the start command, it establishes a persistent WebSocket connection to LiveKit server. This setup inverts the typical request/response model, allowing LiveKit server to dispatch job requests directly to the worker.

The job_request_cb function is triggered when a new job is available for your worker. You have the option to accept or reject the job after reviewing its details. When you accept a job, an instance of your agent is created, which then joins the room specified in the JobRequest.

async def job_request_cb(job_request: agents.JobRequest):
await job_request.accept(PainterAgent.create, ...)

How does my agent join the room?

Normally client applications connect to LiveKit using a URL and token, but there are no explicit references to either in the example above.

The Agents framework simplifies the connection and authentication process by automatically generating a token for the agent (which also serves to identify the agent with LiveKit server).

To customize the grants for the agent's token, you can specify grants when accepting a job. For example, to instantiate an agent that's not visible to other participants in the room (a "secret agent" if you will :)):

await job_request.accept(
PainterAgent.create,
grants=api.VideoGrants(
can_publish=False,
hidden=True,
),
)

JobContext

At the point where your agent's application code is invoked, it has already joined the room. Your agent is provided with a JobContext object containing information about the session. The JobContext object contains the following properties:

  • room: the room object from LiveKit's Python SDK
  • agent_identity: identity of the agent participant
  • create_task: a helper function for creating new asynchronous tasks

Async parallelism

Agents uses asyncio to optimize performance. Whenever possible, it's recommended to run tasks in parallel to minimize latency.

You can use the JobContext.create_task function to initiate new tasks. The key advantage of JobContext.create_task over asyncio.create_task is its ability to automatically cancel running tasks if your agent disconnects from a room.

Voice activity detection

Voice activity detection (VAD) is crucial when designing a multimodal voice agent and the action threshold varies by application.

For instance, in our painter example, we set min_silence_duration to 1 second. This indicates that the agent will wait for 1 second of silence before assuming the user has completed their input prompt.

Note that some speech-to-text providers incorporate automatic VAD. In those cases, it's more efficient to directly stream audio frames and leverage intermediate transcription results while the end-user is speaking.

Communicating with the frontend application

An agent often needs to exchange various types of data with the end-user beyond audio and video. For this purpose we can use LiveKit's data features.

In our painter example, the ChatManager is used to facilitate the exchange of text messages between the agent and end-user. Additionally, the agent uses update_agent_state to update our frontend UI. This function updates the agent's participant metadata, which is then flushed to all other participants in the room.