Skip to main content

Video

Sample video frames, enable live video input, and add virtual avatars for video output.

Overview

LiveKit Agents supports video as both input and output. On the input side, you can sample video frames from an STT-LLM-TTS pipeline or enable live video input with a supported realtime model. On the output side, you can add a virtual avatar for lifelike video output.

Sample video frames

LLMs can process video in the form of still images, but many LLMs are not trained for this use case and can produce suboptimal results in understanding motion and other changes through a video feed. Realtime models, like Gemini Live, are trained on video and you can enable live video input for automatic support.

If you're using an STT-LLM-TTS pipeline, you can still work with video by sampling the video track at suitable times. For instance, in the following example, the agent always includes the latest video frame on each conversation turn from the user. This provides the model with additional context without overwhelming it with data or expecting it to interpret many sequential frames at a time:

class Assistant(Agent):
def __init__(self) -> None:
self._latest_frame = None
self._video_stream = None
self._tasks = []
super().__init__(instructions="You are a helpful voice AI assistant.")
async def on_enter(self):
room = get_job_context().room
# Find the first video track (if any) from the remote participant
remote_participant = list(room.remote_participants.values())[0]
video_tracks = [publication.track for publication in list(remote_participant.track_publications.values()) if publication.track.kind == rtc.TrackKind.KIND_VIDEO]
if video_tracks:
self._create_video_stream(video_tracks[0])
# Watch for new video tracks not yet published
@room.on("track_subscribed")
def on_track_subscribed(track: rtc.Track, publication: rtc.RemoteTrackPublication, participant: rtc.RemoteParticipant):
if track.kind == rtc.TrackKind.KIND_VIDEO:
self._create_video_stream(track)
async def on_user_turn_completed(self, turn_ctx: ChatContext, new_message: ChatMessage) -> None:
# Add the latest video frame, if any, to the new message
if self._latest_frame:
new_message.content.append(ImageContent(image=self._latest_frame))
self._latest_frame = None
# Helper method to buffer the latest video frame from the user's track
def _create_video_stream(self, track: rtc.Track):
# Close any existing stream (we only want one at a time)
if self._video_stream is not None:
self._video_stream.close()
# Create a new stream to receive frames
self._video_stream = rtc.VideoStream(track)
async def read_stream():
async for event in self._video_stream:
# Store the latest frame for use later
self._latest_frame = event.frame
# Store the async task
task = asyncio.create_task(read_stream())
task.add_done_callback(lambda t: self._tasks.remove(t))
self._tasks.append(task)
import asyncio
from livekit import rtc
from livekit.agents import Agent, get_job_context
from livekit.agents.llm import ImageContent
class Assistant extends voice.Agent {
private latestFrame: VideoFrame | null = null;
private videoStream: VideoStream | null = null;
private tasks: Set<Task<void>> = new Set();
constructor() {
super({
instructions: 'You are a helpful voice AI assistant.',
});
}
async onEnter(): Promise<void> {
const room = getJobContext().room;
// Find the first video track (if any) from the remote participant
const remoteParticipants = Array.from(room.remoteParticipants.values());
if (remoteParticipants.length > 0) {
const remoteParticipant = remoteParticipants[0]!;
const videoTracks = Array.from(remoteParticipant.trackPublications.values())
.filter((pub) => pub.track?.kind === TrackKind.KIND_VIDEO)
.map((pub) => pub.track!)
.filter((track) => track !== undefined);
if (videoTracks.length > 0) {
this.createVideoStream(videoTracks[0]!);
}
}
// Watch for new video tracks not yet published
room.on(RoomEvent.TrackSubscribed, (track: Track) => {
if (track.kind === TrackKind.KIND_VIDEO) {
this.createVideoStream(track);
}
});
}
async onUserTurnCompleted(chatCtx: llm.ChatContext, newMessage: llm.ChatMessage): Promise<void> {
// Add the latest video frame, if any, to the new message
if (this.latestFrame !== null) {
newMessage.content.push(
llm.createImageContent({
image: this.latestFrame,
}),
);
this.latestFrame = null;
}
}
// Helper method to buffer the latest video frame from the user's track
private createVideoStream(track: Track): void {
// Close any existing stream (we only want one at a time)
if (this.videoStream !== null) {
this.videoStream.cancel();
}
// Create a new stream to receive frames
this.videoStream = new VideoStream(track);
const readStream = async (controller: AbortController): Promise<void> => {
if (!this.videoStream) return;
for await (const event of this.videoStream) {
if (controller.signal.aborted) return;
// Store the latest frame for use later
this.latestFrame = event.frame;
}
};
// Store the async task
const task = Task.from((controller) => readStream(controller));
task.result.finally(() => this.tasks.delete(task));
this.tasks.add(task);
}
}
import { Task, getJobContext, llm, voice } from '@livekit/agents';
import type { Track, VideoFrame } from '@livekit/rtc-node';
import { RoomEvent, TrackKind, VideoStream } from '@livekit/rtc-node';

Video frame encoding

By default, ImageContent encodes video frames as JPEGs at their native size. To adjust the size of the encoded frames, set the inference_width and inference_height parameters. Each frame is resized to fit within the provided dimensions while maintaining the original aspect ratio. For more control, use the encode method of the livekit.agents.utils.images module and pass the result as a data URL:

image_bytes = encode(
event.frame,
EncodeOptions(
format="PNG",
resize_options=ResizeOptions(
width=512,
height=512,
strategy="scale_aspect_fit"
)
)
)
image_content = ImageContent(
image=f"data:image/png;base64,{base64.b64encode(image_bytes).decode('utf-8')}"
)
import base64
from livekit.agents.utils.images import encode, EncodeOptions, ResizeOptions

Live video input

ONLY Available in
Python
Supported models

Live video input requires a realtime model with video support, such as Gemini Live or the OpenAI Realtime API.

Set the video_input parameter to True in RoomOptions to enable live video input. Your agent automatically receives frames from the user's camera or screen sharing tracks, if available. Only the single most recently published video track is used.

By default, the agent samples one frame per second while the user speaks, and one frame every three seconds otherwise. Each frame is resized into 1024x1024 and encoded to JPEG. To override the frame rate, set video_sampler on the AgentSession with a custom instance.

Video input is passive and has no effect on turn detection. To leverage live video input in a non-conversational context, use manual turn control and trigger LLM responses or tool calls on a timer or other schedule.

The following example shows how to add Gemini Live vision to your voice AI quickstart agent:

class VideoAssistant(Agent):
def __init__(self) -> None:
super().__init__(
instructions="You are a helpful voice assistant with live video input from your user.",
llm=google.realtime.RealtimeModel(
voice="Puck",
temperature=0.8,
),
)
server = AgentServer()
@server.rtc_session(agent_name="my-agent")
async def my_agent(ctx: JobContext):
session = AgentSession()
await session.start(
agent=VideoAssistant(),
room=ctx.room,
room_options=room_io.RoomOptions(
video_input=True,
# ... noise_cancellation, etc.
),
)
from livekit.agents import (
AgentServer,
AgentSession,
room_io
)
from livekit.plugins import google

Video output

Virtual avatars add lifelike video output for your voice AI agents. An avatar provider joins the LiveKit room as a secondary participant and publishes synchronized audio and video tracks, giving your agent a visual presence.

The AgentSession sends its audio output to the avatar worker instead of directly to the room. The avatar worker uses this audio to generate synchronized lip movements and gestures, then publishes the resulting audio and video tracks to the room.

Adding an avatar to your agent

To add a virtual avatar:

  1. Install the avatar plugin and set up API keys for your chosen provider.
  2. Create an AgentSession as in the voice AI quickstart.
  3. Create an AvatarSession and configure it as necessary.
  4. Start the avatar session, passing in the AgentSession instance.

The following example uses Hedra Realtime Avatars:

server = AgentServer()
@server.rtc_session(agent_name="my-agent")
async def my_agent(ctx: agents.JobContext):
session = AgentSession(
# ... stt, llm, tts, etc.
)
avatar = hedra.AvatarSession(
avatar_id="...", # ID of the Hedra avatar to use
)
# Start the avatar and wait for it to join
await avatar.start(session, room=ctx.room)
# Start your agent session with the user
await session.start(
# ... room, agent, room_options, etc....
)
from livekit import agents
from livekit.agents import AgentServer, AgentSession
from livekit.plugins import hedra
export default defineAgent({
entry: async (ctx: JobContext) => {
await ctx.connect();
const agent = new voice.Agent({
instructions: 'You are a helpful assistant.',
});
const session = new voice.AgentSession({
// ... llm, stt, tts, etc.
});
await session.start({
agent,
room: ctx.room,
});
const avatar = new bey.AvatarSession({
avatarId: '...', // ID of the avatar to use
});
await avatar.start(session, ctx.room);
},
});
import { type JobContext, defineAgent, voice } from '@livekit/agents';
import * as bey from '@livekit/agents-plugin-bey';

Frontend integration

In your frontend, distinguish between the agent (your Python or Node.js program) and the avatar worker. You can identify an avatar worker as an agent participant with the attribute lk.publish_on_behalf:

In React apps, use the useVoiceAssistant hook to get the correct audio and video tracks automatically:

const {
agent, // The agent participant
audioTrack, // the worker's audio track
videoTrack, // the worker's video track
} = useVoiceAssistant();

With the lower-level SDK, find participants by kind and attribute:

const participants = Array.from(room.remoteParticipants.values());
const agent = participants.find(
p => p.kind === ParticipantKind.AGENT && !p.attributes['lk.publish_on_behalf']
);
const avatarWorker = participants.find(
p => p.kind === ParticipantKind.AGENT && p.attributes['lk.publish_on_behalf'] === agent?.identity
);
let agent = room.remoteParticipants.values.first {
$0.kind == .agent && $0.attributes["lk.publish_on_behalf"] == nil
}
let avatarWorker = room.remoteParticipants.values.first {
$0.kind == .agent && $0.attributes["lk.publish_on_behalf"] == agent?.identity?.stringValue
}
val agent = room.remoteParticipants.values.firstOrNull {
it.kind == Participant.Kind.AGENT &&
it.agentAttributes.lkPublishOnBehalf == null
}
val avatarWorker = room.remoteParticipants.values.firstOrNull {
it.kind == Participant.Kind.AGENT &&
it.agentAttributes.lkPublishOnBehalf == agent?.identity?.value
}
final agent = room.remoteParticipants.values.firstWhereOrNull(
(p) => p.kind == ParticipantKind.AGENT &&
(p.attributes['lk.publish_on_behalf'] == null ||
p.attributes['lk.publish_on_behalf']!.isEmpty),
);
final avatarWorker = room.remoteParticipants.values.firstWhereOrNull(
(p) => p.kind == ParticipantKind.AGENT &&
p.attributes['lk.publish_on_behalf'] == agent?.identity,
);

For more details on building frontends with avatars, see Virtual avatars in the frontends section. For step-by-step setup guides for each avatar provider, see Virtual avatar models.