VoicePipelineAgent

Building voice assistants with a pipeline of STT, LLM, and TTS models

Diagram showing pipeline voice agent flow

VoicePipelineAgent is a high-level abstraction that orchestrates conversation flow using a pipeline of three main models: STT → LLM → TTS. Additional models, like VAD, are used to enhance the conversation flow.

Usage

from livekit.agents import llm, VoicePipelineAgent
from livekit.plugins import cartesia, deepgram, openai, silero
initial_ctx = llm.ChatContext().append(
role="system",
text="<your prompt>",
)
agent = VoicePipelineAgent(
vad=silero.VAD.load(),
# flexibility to use any models
stt=deepgram.STT(model="nova-2-general"),
llm=openai.LLM(),
tts=cartesia.TTS(),
# intial ChatContext with system prompt
chat_ctx=initial_ctx,
# whether the agent can be interrupted
allow_interruptions=True,
# sensitivity of when to interrupt
interrupt_speech_duration=0.5,
interrupt_min_words=0,
# minimal silence duration to consider end of turn
min_endpointing_delay=0.5,
# callback to run before LLM is called, can be used to modify chat context
before_llm_cb=None,
# callback to run before TTS is called, can be used to customize pronounciation
before_tts_cb=None,
)
# start the participant for a particular room, taking audio input from a single participant
agent.start(room, participant)

Model options

Options on the models can be customized when creating the plugin objects. For example, you can adjust the model and temprature of the LLM like this:

llm = openai.LLM(
model="gpt-4o-mini",
temperature=0.5,
)

Modify context before LLM

The before_llm_cb callback allows you to modify the ChatContext before it is sent to the LLM model. This is useful for adding extra context or adjusting the context based on the conversation. For example, when the context becomes too long, you can truncate it to optimize the amount of tokens used in inference.

async def truncate_context(assistant: VoicePipelineAgent, chat_ctx: llm.ChatContext):
if len(chat_ctx.messages) > 15:
chat_ctx.messages = chat_ctx.messages[-15:]
agent = VoicePipelineAgent(
...
before_llm_cb=truncate_context,
)

Altering text before TTS

The before_tts_cb callback allows you to modify the text before it is sent to the TTS model. This is useful for customizing pronunciation or adding extra context to the text.

from livekit.agents import tokenize
from livekit.agents.pipeline import VoicePipelineAgent
def replace_words(assistant: VoicePipelineAgent, text: str | AsyncIterable[str]):
return tokenize.utils.replace_words(
text=text, replacements={"livekit": r"<<l|aɪ|v|k|ɪ|t|>>"}
)
agent = VoicePipelineAgent(
...
before_tts_cb=replace_words,
)

Turn-detection thresholds

min_endpointing_delay defines the minimum silence duration to detect the end of a turn. Increasing this value allows for longer pauses before the agent assumes the user has finished speaking.

Interruption handling

When the user interrupts, the agent stops speaking and switches to listening mode, storing the position of the speech played so far in its ChatContext.

There are three flags that control the interruption behavior:

  • allow_interruptions: set to False to disable user interruptions.
  • interrupt_speech_duration: the minimum speech duration (detected by VAD) required to consider the interruption intentional.
  • interrupt_min_words: the minimum number of transcribed words needed for the interruption to be considered intentional.

Emitted events

An agent emits the following events:

EventDescription
user_started_speakingUser started speaking.
user_stopped_speakingUser stopped speaking.
agent_started_speakingAgent started speaking.
agent_stopped_speakingAgent stopped speaking.
user_speech_committedUser's speech was committed to the chat context.
agent_speech_committedAgent's speech was committed to the chat context.
agent_speech_interruptedAgent was interrupted while speaking.
function_calls_collectedThe complete set of functions to be executed was received.
function_calls_finishedAll function calls have been executed.
metrics_collectedMetric was collected. Metrics can include time to first token for STT, LLM, TTS, duration, and usage metrics.

Events example

For example, when a user's speech is committed to the chat context, save it to a queue for transcription:

@agent.on("user_speech_committed")
def on_user_speech_committed(msg: llm.ChatMessage):
# convert string lists to strings, drop images
if isinstance(msg.content, list):
msg.content = "\n".join(
"[image]" if isinstance(x, llm.ChatImage) else x for x in msg
)
log_queue.put_nowait(f"[{datetime.now()}] USER:\n{msg.content}\n\n")

The full example is available in Github.