Skip to main content

LLM Output Replacement

Remove chain-of-thought reasoning blocks from a streaming LLM response before they reach TTS.

This recipe shows how to remove chain-of-thought reasoning from a streaming LLM response before it reaches TTS. The agent overrides llm_node to remove <think>...</think> blocks, so only the final answer reaches both speech and chat history. The same buffered state-machine pattern works for other inline markup you want removed from chat context, like RAG citation markers or structured-output scaffolding.

Note

If you only need to change pronunciation or remove Markdown for TTS (without modifying chat context), see tts_text_transforms. The technique below is for cases where the change should also persist in conversation history.

Prerequisites

To complete this guide, you need the following prerequisites:

Define the agent

The system prompt instructs the model to wrap its reasoning in <think>...</think> tags.

class SimpleAgent(Agent):
THINK_OPEN = "<think>"
THINK_CLOSE = "</think>"
def __init__(self) -> None:
super().__init__(
instructions=(
"You are a helpful agent that thinks through problems step by step. "
"Wrap your reasoning in <think></think> tags, then provide your final answer."
),
)
async def on_enter(self):
self.session.generate_reply()

Filter thinking blocks in llm_node

Override llm_node to wrap the LLM stream with a state machine. An in_thinking flag tracks whether the stream is currently inside a <think> block, and a small buffer holds back trailing characters that could be the start of a tag split across chunks.

self.session.llm.chat(...) invokes whichever LLM you configured on AgentSession, so this filter works with any model — including reasoning models that emit <think> blocks natively.

async def llm_node(self, chat_ctx, tools, model_settings=None):
async def process_stream():
in_thinking = False
buffer = ""
async with self.session.llm.chat(
chat_ctx=chat_ctx, tools=tools, tool_choice=None
) as stream:
async for chunk in stream:
content = (
getattr(chunk.delta, "content", None)
if hasattr(chunk, "delta")
else None
)
if content is None:
yield chunk
continue
buffer += content
output = ""
while buffer:
if not in_thinking:
idx = buffer.find(self.THINK_OPEN)
if idx >= 0:
output += buffer[:idx]
buffer = buffer[idx + len(self.THINK_OPEN) :]
in_thinking = True
continue
# Hold back any trailing characters that could start "<think>".
keep = next(
(
i
for i in range(len(self.THINK_OPEN) - 1, 0, -1)
if buffer.endswith(self.THINK_OPEN[:i])
),
0,
)
output += buffer[: len(buffer) - keep]
buffer = buffer[len(buffer) - keep :]
break
else:
idx = buffer.find(self.THINK_CLOSE)
if idx >= 0:
buffer = buffer[idx + len(self.THINK_CLOSE) :]
in_thinking = False
continue
# Drop thinking text but hold back any trailing partial "</think>".
keep = next(
(
i
for i in range(len(self.THINK_CLOSE) - 1, 0, -1)
if buffer.endswith(self.THINK_CLOSE[:i])
),
0,
)
buffer = buffer[len(buffer) - keep :]
break
if (
output
and hasattr(chunk, "delta")
and hasattr(chunk.delta, "content")
):
chunk.delta.content = output
yield chunk
return process_stream()

Run it

uv run src/agent.py console

How it works

  1. The LLM streams its response one chunk at a time, with reasoning wrapped in <think>...</think>.
  2. The custom llm_node wraps that stream with a state machine. An in_thinking flag tracks whether the stream is currently inside a <think> block, and a small buffer holds back trailing characters that could be the start of a tag split across chunks.
  3. While outside a thinking block, the filter emits text and only buffers a trailing partial prefix of <think>. When the full opening tag arrives, the filter switches into thinking mode.
  4. While inside a thinking block, the filter drops text and only buffers a trailing partial prefix of </think>. When the closing tag arrives, the filter switches back to passing text through.
  5. TTS receives only the cleaned chunks, and the chat history persisted by the session contains the same cleaned text. The filter removes the reasoning from both surfaces in one pass.

Full example

import logging
from dotenv import load_dotenv
from livekit.agents import (
Agent,
AgentServer,
AgentSession,
JobContext,
JobProcess,
cli,
inference,
)
from livekit.plugins import silero
load_dotenv()
logger = logging.getLogger("replacing-llm-output")
logger.setLevel(logging.INFO)
class SimpleAgent(Agent):
THINK_OPEN = "<think>"
THINK_CLOSE = "</think>"
def __init__(self) -> None:
super().__init__(
instructions=(
"You are a helpful agent that thinks through problems step by step. "
"Wrap your reasoning in <think></think> tags, then provide your final answer."
),
)
async def on_enter(self):
self.session.generate_reply()
async def llm_node(self, chat_ctx, tools, model_settings=None):
async def process_stream():
in_thinking = False
buffer = ""
async with self.session.llm.chat(
chat_ctx=chat_ctx, tools=tools, tool_choice=None
) as stream:
async for chunk in stream:
content = (
getattr(chunk.delta, "content", None)
if hasattr(chunk, "delta")
else None
)
if content is None:
yield chunk
continue
buffer += content
output = ""
while buffer:
if not in_thinking:
idx = buffer.find(self.THINK_OPEN)
if idx >= 0:
output += buffer[:idx]
buffer = buffer[idx + len(self.THINK_OPEN) :]
in_thinking = True
continue
keep = next(
(
i
for i in range(len(self.THINK_OPEN) - 1, 0, -1)
if buffer.endswith(self.THINK_OPEN[:i])
),
0,
)
output += buffer[: len(buffer) - keep]
buffer = buffer[len(buffer) - keep :]
break
else:
idx = buffer.find(self.THINK_CLOSE)
if idx >= 0:
buffer = buffer[idx + len(self.THINK_CLOSE) :]
in_thinking = False
continue
keep = next(
(
i
for i in range(len(self.THINK_CLOSE) - 1, 0, -1)
if buffer.endswith(self.THINK_CLOSE[:i])
),
0,
)
buffer = buffer[len(buffer) - keep :]
break
if (
output
and hasattr(chunk, "delta")
and hasattr(chunk.delta, "content")
):
chunk.delta.content = output
yield chunk
return process_stream()
server = AgentServer()
def prewarm(proc: JobProcess):
proc.userdata["vad"] = silero.VAD.load()
server.setup_fnc = prewarm
@server.rtc_session(agent_name="my-agent")
async def entrypoint(ctx: JobContext):
ctx.log_context_fields = {"room": ctx.room.name}
session = AgentSession(
stt=inference.STT(model="deepgram/nova-3", language="multi"),
llm=inference.LLM(model="openai/gpt-5.2-chat-latest"),
tts=inference.TTS(
model="cartesia/sonic-3",
voice="9626c31c-bec5-4cca-baa8-f8ba9e84c8bc",
),
vad=ctx.proc.userdata["vad"],
preemptive_generation=True,
)
await session.start(agent=SimpleAgent(), room=ctx.room)
await ctx.connect()
if __name__ == "__main__":
cli.run_app(server)