Voice pipeline nodes

Learn how to customize the behavior of your agent by overriding nodes in the voice pipeline.

Overview

The Agents framework allows you to fully customize your agent's behavior at multiple nodes in the processing path. A node is a point in the path where one process transitions to another. In the case of STT, LLM, and TTS nodes, in addition to customizing the pre- and post-processing at the transition point from one node to the next, you can also entirely replace the default process with custom code.

These nodes are exposed on the Agent class and occur at the following points in the pipeline:

  • on_enter(): Agent enters session.
  • on_exit(): Agent exits session.
  • on_user_turn_completed(): User's turn is completed.
  • transcription_node(): Processing agent's LLM output to transcriptions.
  • stt_node(): Agent's STT processing step (pipeline only).
  • llm_node(): Agent's LLM processing step (pipeline only).
  • tts_node(): Agent's TTS processing step (pipeline only).
  • realtime_audio_output_node(): Agent's audio output step (realtime only).

Pipeline and realtime agent differences

Realtime agents aren't componentized like pipeline agents and don't have nodes for STT, LLM, and TTS. Instead, realtime agents use a single model for the entire agent, and the agent processes user input in realtime. You can still customize the behavior of a realtime agent by overriding the transcription node, updating the agent's instructions, or adding to its chat context.

Agent with a voice pipeline

Processing path for a voice pipeline agent:

Diagram showing voice pipeline agent processing path.

Agent with a realtime model

Processing path for a realtime agent:

Diagram showing realtime agent processing path.

Use cases for customization

The following use cases are some examples of how you can customize your agent's behavior:

  • Use a custom STT, LLM, or TTS provider without a plugin.
  • Generate a custom greeting when an agent enters a session.
  • Modify STT output to remove filler words before sending it to the LLM.
  • Modify LLM output before sending it to TTS to customize pronunciation.
  • Update the user interface when an agent or user finishes speaking.

Customizing node behavior

Each node is a step in the agent pipeline where processing takes place. By default, some nodes are stub methods, and other nodes (the STT, LLM, and TTS nodes) execute the code in the provider plugin. For these nodes, you can customize behavior by overriding the node and adding additional processing before, after, or instead of the default behavior.

Stub methods are provided to allow you to add functionality at specific points in the processing path.

On enter and exit nodes

The on_enter and on_exit nodes are called when the agent enters or leaves an agent session. When an agent enters a session, it becomes that agent in control and handles processing for the session until the agent exits. To learn more, see Workflows.

For example, initiate a conversation when an agent enters the session:

async def on_enter(self):
# Instruct the agent to greet the user when it's added to a session
self.session.generate_reply(
instructions="Greet the user with a warm welcome",
)

For a more comprehensive example of a handoff between agents, and saving chat history in the on_enter node, see the restaurant ordering and reservations example.

You can override the on_exit method to say goodbye before the agent exits the session:

async def on_exit(self):
# Say goodbye
await self.session.generate_reply(
instructions="Tell the user a friendly goodbye before you exit.",
)

On turn completed node

The on_user_turn_completed node represents the end of the user's turn in the conversation, prior to the agent's reply. Override this method to modify the content of the turn, cancel the agent's reply, or perform other actions.

Realtime models

To use this node with a realtime model, you must configure turn detection to occur in your agent instead of the realtime model.

The node receives the following parameters:

  • turn_ctx: The full ChatContext, up to but not including the user's latest message.
  • new_message: The user's latest message, representing their current turn.

After the node is complete, the new_message is added to the chat context.

One common use of this node is retrieval-augmented generation (RAG). You can retrieve context relevant to the newest message and inject it into the chat context for the LLM.

from livekit.agents import ChatContext, ChatMessage
async def on_user_turn_completed(
self, turn_ctx: ChatContext, new_message: ChatMessage,
) -> None:
rag_content = await my_rag_lookup(new_message.text_content())
turn_ctx.add_message(
role="assistant",
content=f"Additional information relevant to the user's next message: {rag_content}"
)

Additional messages added in this way are not persisted beyond the current turn. To permanently add messages to the chat history, use the update_chat_ctx method:

async def on_user_turn_completed(
self, turn_ctx: ChatContext, new_message: ChatMessage,
) -> None:
rag_content = await my_rag_lookup(new_message.text_content())
turn_ctx.add_message(role="assistant", content=rag_content)
await self.update_chat_ctx(turn_ctx)

You can also edit the new_message object to modify the user's message before it's added to the chat context. For example, you can remove offensive content or add additional context. These changes are persisted to the chat history going forward.

async def on_user_turn_completed(
self, turn_ctx: ChatContext, new_message: ChatMessage,
) -> None:
new_message.content = ["... modified message ..."]

To abort generation entirely—for example, in a push-to-talk interface—you can do the following:

async def on_user_turn_completed(
self, turn_ctx: ChatContext, new_message: ChatMessage,
) -> None:
if not new_message.text_content:
# for example, raise StopResponse to stop the agent from generating a reply
raise StopResponse()

For a complete example, see the multi-user agent with push to talk example.

STT node

From the STT node, you can customize how audio frames are handled before being sent to the default STT provider, and post-process the STT output before it's passed to the LLM.

To use the default implementation, call Agent.default.stt_node().

For example, you can add noise filtering to the STT node by overriding the stt_node method in your Agent:

# add these imports
from livekit import rtc
from livekit.agents.voice import ModelSettings
from livekit.agents import stt
from typing import AsyncIterable, Optional
async def stt_node(
self, audio: AsyncIterable[rtc.AudioFrame], model_settings: ModelSettings
) -> Optional[AsyncIterable[stt.SpeechEvent]]:
async def filtered_audio():
async for frame in audio:
# Apply some noise filtering logic here
yield frame
async for event in Agent.default.stt_node(self, filtered_audio(), model_settings):
yield event

LLM node

The LLM node is responsible for generating the agent's response. You can customize the LLM node by overriding the llm_node method in your Agent.

llm_node can be used to integrate with custom LLM providers without having to create a plugin. As long as it returns AsyncIterable[llm.ChatChunk], the LLM node will forward the chunks to the next node in the pipeline.

You can also update the LLM output before sending it to the TTS node as in the following example:

# add these imports
from livekit.agents.voice import ModelSettings
from livekit.agents import llm, FunctionTool
from typing import AsyncIterable
async def llm_node(
self,
chat_ctx: llm.ChatContext,
tools: list[FunctionTool],
model_settings: ModelSettings
) -> AsyncIterable[llm.ChatChunk]:
# Process with base LLM implementation
async for chunk in Agent.default.llm_node(self, chat_ctx, tools, model_settings):
# Do something with the LLM output before sending it to the next node
yield chunk

llm_node can also be used to handle structured output. See full example here.

TTS node

The TTS node is responsible for converting the LLM output into audio. You can customize the TTS node by overriding the tts_node method in your Agent. For example, you can update the TTS output before sending it to the user interface as in the following example:

# add these imports
from livekit import rtc
from livekit.agents.voice import ModelSettings
from livekit.agents import tts
from typing import AsyncIterable
async def tts_node(
self,
text: AsyncIterable[str],
model_settings: ModelSettings
) -> AsyncIterable[rtc.AudioFrame]:
"""
Process text-to-speech with custom pronunciation rules before synthesis.
Adjusts common technical terms and abbreviations for better pronunciation.
"""
# Dictionary of pronunciation replacements.
# Support for custom pronunciations depends on the TTS provider.
# To learn more, see the Speech documentation:
# https://docs.livekit.io/agents/build/audio/#pronunciation.
pronunciations = {
"API": "A P I",
"REST": "rest",
"SQL": "sequel",
"kubectl": "kube control",
"AWS": "A W S",
"UI": "U I",
"URL": "U R L",
"npm": "N P M",
"LiveKit": "Live Kit",
"async": "a sink",
"nginx": "engine x",
}
async def adjust_pronunciation(input_text: AsyncIterable[str]) -> AsyncIterable[str]:
async for chunk in input_text:
modified_chunk = chunk
# Apply pronunciation rules
for term, pronunciation in pronunciations.items():
# Use word boundaries to avoid partial replacements
modified_chunk = re.sub(
rf'\b{term}\b',
pronunciation,
modified_chunk,
flags=re.IGNORECASE
)
yield modified_chunk
# Process with modified text through base TTS implementation
async for frame in Agent.default.tts_node(
self,
adjust_pronunciation(text),
model_settings
):
yield frame

Transcription node

The transcription node is part of the forwarding path for agent transcriptions. By default, the node simply passes the transcription to the task that forwards it to the designated output. You can customize this behavior by overriding the transcription_node method in your Agent. For example, you can strip any unwanted formatting before it's sent to the client as transcripts.

# add these imports
from livekit.agents.voice import ModelSettings
from typing import AsyncIterable
async def transcription_node(self, text: AsyncIterable[str], model_settings: ModelSettings) -> AsyncIterable[str]:
def cleanup_text(text_chunk: str) -> str:
# Strip unwanted characters
return text_chunk.replace("😘", "")
async for delta in text:
yield cleanup_text(delta)

Realtime audio output node

The realtime_audio_output_node is called when a realtime model outputs speech. This allows you to modify the audio output before it's sent to the user. For example, you can speed up or slow down the audio in the following example:

# add these imports
from livekit import rtc
from livekit.agents.voice import ModelSettings
from livekit.agents import utils
from typing import AsyncIterable
def _process_audio(self, frame: rtc.AudioFrame) -> rtc.AudioFrame:
pass
async def _process_audio_stream(
audio: AsyncIterable[rtc.AudioFrame]
) -> AsyncIterable[rtc.AudioFrame]:
stream: utils.audio.AudioByteStream | None = None
async for frame in audio:
if stream is None:
stream = utils.audio.AudioByteStream(
sample_rate=frame.sample_rate,
num_channels=frame.num_channels,
samples_per_channel=frame.sample_rate // 10, # 100ms
)
for f in stream.push(frame.data):
yield _process_audio(f)
for f in stream.flush():
yield _process_audio(f)
async def realtime_audio_output_node(
self, audio: AsyncIterable[rtc.AudioFrame], model_settings: ModelSettings
) -> AsyncIterable[rtc.AudioFrame]:
return _process_audio_stream(
Agent.default.realtime_audio_output_node(self, audio, model_settings)
)

See full example here.

Examples

The following examples demonstrate various node customizations: