VoicePipelineAgent is a high-level abstraction that orchestrates conversation flow using a pipeline of three main models: STT → LLM → TTS. Additional models, like VAD, are used to enhance the conversation flow.
Usage
from livekit.agents import llm, VoicePipelineAgentfrom livekit.plugins import cartesia, deepgram, openai, sileroinitial_ctx = llm.ChatContext().append(role="system",text="<your prompt>",)agent = VoicePipelineAgent(vad=silero.VAD.load(),# flexibility to use any modelsstt=deepgram.STT(model="nova-2-general"),llm=openai.LLM(),tts=cartesia.TTS(),# intial ChatContext with system promptchat_ctx=initial_ctx,# whether the agent can be interruptedallow_interruptions=True,# sensitivity of when to interruptinterrupt_speech_duration=0.5,interrupt_min_words=0,# minimal silence duration to consider end of turnmin_endpointing_delay=0.5,# callback to run before LLM is called, can be used to modify chat contextbefore_llm_cb=None,# callback to run before TTS is called, can be used to customize pronounciationbefore_tts_cb=None,)# start the participant for a particular room, taking audio input from a single participantagent.start(room, participant)
Model options
Options on the models can be customized when creating the plugin objects. For example, you can adjust the model and temperature of the LLM like this:
llm = openai.LLM(model="gpt-4o-mini",temperature=0.5,)
Modify context before LLM
The before_llm_cb
callback allows you to modify the ChatContext
before it is sent to the LLM model. This is useful for adding extra context or adjusting the context based on the conversation. For example, when the context becomes too long, you can truncate it to optimize the amount of tokens used in inference.
async def truncate_context(assistant: VoicePipelineAgent, chat_ctx: llm.ChatContext):if len(chat_ctx.messages) > 15:chat_ctx.messages = chat_ctx.messages[-15:]agent = VoicePipelineAgent(...before_llm_cb=truncate_context,)
Altering text before TTS
The before_tts_cb
callback allows you to modify the text before it is sent to the TTS model. This is useful for customizing pronunciation or adding extra context to the text.
from livekit.agents import tokenizefrom livekit.agents.pipeline import VoicePipelineAgentdef replace_words(assistant: VoicePipelineAgent, text: str | AsyncIterable[str]):return tokenize.utils.replace_words(text=text, replacements={"livekit": r"<<l|aɪ|v|k|ɪ|t|>>"})agent = VoicePipelineAgent(...before_tts_cb=replace_words,)
Turn-detection thresholds
min_endpointing_delay
defines the minimum silence duration to detect the end of a turn. Increasing this value allows for longer pauses before the agent assumes the user has finished speaking.
Interruption handling
When the user interrupts, the agent stops speaking and switches to listening mode, storing the position of the speech played so far in its ChatContext.
There are three flags that control the interruption behavior:
allow_interruptions
: set toFalse
to disable user interruptions.interrupt_speech_duration
: the minimum speech duration (detected by VAD) required to consider the interruption intentional.interrupt_min_words
: the minimum number of transcribed words needed for the interruption to be considered intentional.
Emitted events
An agent emits the following events:
Event | Description |
---|---|
user_started_speaking | User started speaking. |
user_stopped_speaking | User stopped speaking. |
agent_started_speaking | Agent started speaking. |
agent_stopped_speaking | Agent stopped speaking. |
user_speech_committed | User's speech was committed to the chat context. |
agent_speech_committed | Agent's speech was committed to the chat context. |
agent_speech_interrupted | Agent was interrupted while speaking. |
function_calls_collected | The complete set of functions to be executed was received. |
function_calls_finished | All function calls have been executed. |
metrics_collected | Metric was collected. Metrics can include time to first token for STT, LLM, TTS, duration, and usage metrics. |
Events example
For example, when a user's speech is committed to the chat context, save it to a queue for transcription:
@agent.on("user_speech_committed")def on_user_speech_committed(msg: llm.ChatMessage):# convert string lists to strings, drop imagesif isinstance(msg.content, list):msg.content = "\n".join("[image]" if isinstance(x, llm.ChatImage) else x for x in msg)log_queue.put_nowait(f"[{datetime.now()}] USER:\n{msg.content}\n\n")
The full example is available in Github.