Skip to main content

Realtime model metrics

Shows how to capture and summarize RealtimeModelMetrics for agents that use a realtime model.

This example shows how to capture token and latency metrics emitted by a realtime model and print them as a Rich table whenever the agent responds. Because realtime models bypass the STT-LLM-TTS pipeline, they emit RealtimeModelMetrics instead of LLMMetrics.

Prerequisites

  • Add a .env in this directory with your LiveKit and Google Gemini credentials:
    LIVEKIT_URL=your_livekit_url
    LIVEKIT_API_KEY=your_api_key
    LIVEKIT_API_SECRET=your_api_secret
    GOOGLE_API_KEY=your_gemini_api_key
  • Install dependencies:
    uv add python-dotenv rich "livekit-agents[silero]" livekit-plugins-google

Load configuration and logging

Set up dotenv, a logger, and a Rich console for the metrics table.

import logging
import asyncio
from dotenv import load_dotenv
from livekit.agents import JobContext, JobProcess, Agent, AgentSession, AgentServer, MetricsCollectedEvent, cli
from livekit.agents.metrics import RealtimeModelMetrics
from livekit.plugins import google, silero
from rich.console import Console
from rich.table import Table
from rich import box
from datetime import datetime
load_dotenv()
logger = logging.getLogger("metrics-realtime")
logger.setLevel(logging.INFO)
console = Console()
server = AgentServer()

Prewarm VAD for faster connections

Preload the VAD model once per process to reduce connection latency.

def prewarm(proc: JobProcess):
proc.userdata["vad"] = silero.VAD.load()
server.setup_fnc = prewarm

Create the metrics-enabled agent

In on_enter, attach a metrics_collected listener to the session so every realtime model response triggers your metrics handler.

class RealtimeMetricsAgent(Agent):
def __init__(self) -> None:
super().__init__(
instructions="""
You are a helpful agent.
"""
)
async def on_enter(self):
def sync_wrapper(ev: MetricsCollectedEvent):
if isinstance(ev.metrics, RealtimeModelMetrics):
asyncio.create_task(self.on_metrics_collected(ev.metrics))
self.session.on("metrics_collected", sync_wrapper)
self.session.generate_reply()

Render metrics with Rich

When metrics arrive, format them into a table with timestamps, TTFT, duration, token counts, and per-modality breakdowns.

async def on_metrics_collected(self, metrics: RealtimeModelMetrics) -> None:
table = Table(
title="[bold blue]Realtime Model Metrics Report[/bold blue]",
box=box.ROUNDED,
highlight=True,
show_header=True,
header_style="bold cyan"
)
table.add_column("Metric", style="bold green")
table.add_column("Value", style="yellow")
timestamp = datetime.fromtimestamp(metrics.timestamp).strftime('%Y-%m-%d %H:%M:%S')
table.add_row("Type", str(metrics.type))
table.add_row("Label", str(metrics.label))
table.add_row("Request ID", str(metrics.request_id))
table.add_row("Timestamp", timestamp)
table.add_row("Duration", f"[white]{metrics.duration:.4f}[/white]s")
# ttft is -1 when no audio tokens were generated
ttft_display = f"[white]{metrics.ttft:.4f}[/white]s" if metrics.ttft >= 0 else "n/a (no audio tokens)"
table.add_row("Time to First Audio Token", ttft_display)
table.add_row("Input Tokens", str(metrics.input_tokens))
table.add_row(" ↳ Audio", str(metrics.input_token_details.audio_tokens))
table.add_row(" ↳ Text", str(metrics.input_token_details.text_tokens))
table.add_row(" ↳ Cached", str(metrics.input_token_details.cached_tokens))
table.add_row("Output Tokens", str(metrics.output_tokens))
table.add_row(" ↳ Audio", str(metrics.output_token_details.audio_tokens))
table.add_row(" ↳ Text", str(metrics.output_token_details.text_tokens))
table.add_row("Total Tokens", str(metrics.total_tokens))
table.add_row("Tokens/Second", f"{metrics.tokens_per_second:.2f}")
console.print("\n")
console.print(table)
console.print("\n")

Set up the session

Configure the AgentSession with a realtime model and prewarmed VAD. The realtime model emits metrics_collected events on the session directly, rather than on a separate LLM component.

@server.rtc_session(agent_name="my-agent")
async def entrypoint(ctx: JobContext):
ctx.log_context_fields = {"room": ctx.room.name}
session = AgentSession(
llm=google.realtime.RealtimeModel(
voice="Puck",
temperature=0.8,
instructions="You are a helpful assistant",
),
vad=ctx.proc.userdata["vad"],
)
agent = RealtimeMetricsAgent()
await session.start(agent=agent, room=ctx.room)
await ctx.connect()

Run the server

Start the agent server with the CLI.

if __name__ == "__main__":
cli.run_app(server)

Run it

uv run metrics_realtime.py console

How it works

  1. The agent uses the Gemini Realtime API instead of an STT-LLM-TTS pipeline.
  2. The session emits metrics_collected with a RealtimeModelMetrics payload after each response.
  3. A wrapper in on_enter schedules on_metrics_collected so you can await inside it.
  4. Rich renders the metrics in a readable table showing latency and per-modality token stats.

Full example

import logging
import asyncio
from dotenv import load_dotenv
from livekit.agents import JobContext, JobProcess, Agent, AgentSession, AgentServer, MetricsCollectedEvent, cli
from livekit.agents.metrics import RealtimeModelMetrics
from livekit.plugins import google, silero
from rich.console import Console
from rich.table import Table
from rich import box
from datetime import datetime
load_dotenv()
logger = logging.getLogger("metrics-realtime")
logger.setLevel(logging.INFO)
console = Console()
class RealtimeMetricsAgent(Agent):
def __init__(self) -> None:
super().__init__(
instructions="""
You are a helpful agent.
"""
)
async def on_enter(self):
def sync_wrapper(ev: MetricsCollectedEvent):
if isinstance(ev.metrics, RealtimeModelMetrics):
asyncio.create_task(self.on_metrics_collected(ev.metrics))
self.session.on("metrics_collected", sync_wrapper)
self.session.generate_reply()
async def on_metrics_collected(self, metrics: RealtimeModelMetrics) -> None:
table = Table(
title="[bold blue]Realtime Model Metrics Report[/bold blue]",
box=box.ROUNDED,
highlight=True,
show_header=True,
header_style="bold cyan"
)
table.add_column("Metric", style="bold green")
table.add_column("Value", style="yellow")
timestamp = datetime.fromtimestamp(metrics.timestamp).strftime('%Y-%m-%d %H:%M:%S')
table.add_row("Type", str(metrics.type))
table.add_row("Label", str(metrics.label))
table.add_row("Request ID", str(metrics.request_id))
table.add_row("Timestamp", timestamp)
table.add_row("Duration", f"[white]{metrics.duration:.4f}[/white]s")
ttft_display = f"[white]{metrics.ttft:.4f}[/white]s" if metrics.ttft >= 0 else "n/a (no audio tokens)"
table.add_row("Time to First Audio Token", ttft_display)
table.add_row("Input Tokens", str(metrics.input_tokens))
table.add_row(" ↳ Audio", str(metrics.input_token_details.audio_tokens))
table.add_row(" ↳ Text", str(metrics.input_token_details.text_tokens))
table.add_row(" ↳ Cached", str(metrics.input_token_details.cached_tokens))
table.add_row("Output Tokens", str(metrics.output_tokens))
table.add_row(" ↳ Audio", str(metrics.output_token_details.audio_tokens))
table.add_row(" ↳ Text", str(metrics.output_token_details.text_tokens))
table.add_row("Total Tokens", str(metrics.total_tokens))
table.add_row("Tokens/Second", f"{metrics.tokens_per_second:.2f}")
console.print("\n")
console.print(table)
console.print("\n")
server = AgentServer()
def prewarm(proc: JobProcess):
proc.userdata["vad"] = silero.VAD.load()
server.setup_fnc = prewarm
@server.rtc_session(agent_name="my-agent")
async def entrypoint(ctx: JobContext):
ctx.log_context_fields = {"room": ctx.room.name}
session = AgentSession(
llm=google.realtime.RealtimeModel(
voice="Puck",
temperature=0.8,
instructions="You are a helpful assistant.",
),
vad=ctx.proc.userdata["vad"],
)
agent = RealtimeMetricsAgent()
await session.start(agent=agent, room=ctx.room)
await ctx.connect()
if __name__ == "__main__":
cli.run_app(server)