This recipe shows how to remove chain-of-thought reasoning from a streaming LLM response before it reaches TTS. The agent overrides llm_node to remove <think>...</think> blocks, so only the final answer reaches both speech and chat history. The same buffered state-machine pattern works for other inline markup you want removed from chat context, like RAG citation markers or structured-output scaffolding.
If you only need to change pronunciation or remove Markdown for TTS (without modifying chat context), see tts_text_transforms. The technique below is for cases where the change should also persist in conversation history.
Prerequisites
To complete this guide, you need the following prerequisites:
- Create an agent using the Voice AI quickstart.
Define the agent
The system prompt instructs the model to wrap its reasoning in <think>...</think> tags.
class SimpleAgent(Agent):THINK_OPEN = "<think>"THINK_CLOSE = "</think>"def __init__(self) -> None:super().__init__(instructions=("You are a helpful agent that thinks through problems step by step. ""Wrap your reasoning in <think></think> tags, then provide your final answer."),)async def on_enter(self):self.session.generate_reply()
Filter thinking blocks in llm_node
Override llm_node to wrap the LLM stream with a state machine. An in_thinking flag tracks whether the stream is currently inside a <think> block, and a small buffer holds back trailing characters that could be the start of a tag split across chunks.
self.session.llm.chat(...) invokes whichever LLM you configured on AgentSession, so this filter works with any model — including reasoning models that emit <think> blocks natively.
async def llm_node(self, chat_ctx, tools, model_settings=None):async def process_stream():in_thinking = Falsebuffer = ""async with self.session.llm.chat(chat_ctx=chat_ctx, tools=tools, tool_choice=None) as stream:async for chunk in stream:content = (getattr(chunk.delta, "content", None)if hasattr(chunk, "delta")else None)if content is None:yield chunkcontinuebuffer += contentoutput = ""while buffer:if not in_thinking:idx = buffer.find(self.THINK_OPEN)if idx >= 0:output += buffer[:idx]buffer = buffer[idx + len(self.THINK_OPEN) :]in_thinking = Truecontinue# Hold back any trailing characters that could start "<think>".keep = next((ifor i in range(len(self.THINK_OPEN) - 1, 0, -1)if buffer.endswith(self.THINK_OPEN[:i])),0,)output += buffer[: len(buffer) - keep]buffer = buffer[len(buffer) - keep :]breakelse:idx = buffer.find(self.THINK_CLOSE)if idx >= 0:buffer = buffer[idx + len(self.THINK_CLOSE) :]in_thinking = Falsecontinue# Drop thinking text but hold back any trailing partial "</think>".keep = next((ifor i in range(len(self.THINK_CLOSE) - 1, 0, -1)if buffer.endswith(self.THINK_CLOSE[:i])),0,)buffer = buffer[len(buffer) - keep :]breakif (outputand hasattr(chunk, "delta")and hasattr(chunk.delta, "content")):chunk.delta.content = outputyield chunkreturn process_stream()
Run it
uv run src/agent.py console
How it works
- The LLM streams its response one chunk at a time, with reasoning wrapped in
<think>...</think>. - The custom
llm_nodewraps that stream with a state machine. Anin_thinkingflag tracks whether the stream is currently inside a<think>block, and a small buffer holds back trailing characters that could be the start of a tag split across chunks. - While outside a thinking block, the filter emits text and only buffers a trailing partial prefix of
<think>. When the full opening tag arrives, the filter switches into thinking mode. - While inside a thinking block, the filter drops text and only buffers a trailing partial prefix of
</think>. When the closing tag arrives, the filter switches back to passing text through. - TTS receives only the cleaned chunks, and the chat history persisted by the session contains the same cleaned text. The filter removes the reasoning from both surfaces in one pass.
Full example
import loggingfrom dotenv import load_dotenvfrom livekit.agents import (Agent,AgentServer,AgentSession,JobContext,JobProcess,cli,inference,)from livekit.plugins import sileroload_dotenv()logger = logging.getLogger("replacing-llm-output")logger.setLevel(logging.INFO)class SimpleAgent(Agent):THINK_OPEN = "<think>"THINK_CLOSE = "</think>"def __init__(self) -> None:super().__init__(instructions=("You are a helpful agent that thinks through problems step by step. ""Wrap your reasoning in <think></think> tags, then provide your final answer."),)async def on_enter(self):self.session.generate_reply()async def llm_node(self, chat_ctx, tools, model_settings=None):async def process_stream():in_thinking = Falsebuffer = ""async with self.session.llm.chat(chat_ctx=chat_ctx, tools=tools, tool_choice=None) as stream:async for chunk in stream:content = (getattr(chunk.delta, "content", None)if hasattr(chunk, "delta")else None)if content is None:yield chunkcontinuebuffer += contentoutput = ""while buffer:if not in_thinking:idx = buffer.find(self.THINK_OPEN)if idx >= 0:output += buffer[:idx]buffer = buffer[idx + len(self.THINK_OPEN) :]in_thinking = Truecontinuekeep = next((ifor i in range(len(self.THINK_OPEN) - 1, 0, -1)if buffer.endswith(self.THINK_OPEN[:i])),0,)output += buffer[: len(buffer) - keep]buffer = buffer[len(buffer) - keep :]breakelse:idx = buffer.find(self.THINK_CLOSE)if idx >= 0:buffer = buffer[idx + len(self.THINK_CLOSE) :]in_thinking = Falsecontinuekeep = next((ifor i in range(len(self.THINK_CLOSE) - 1, 0, -1)if buffer.endswith(self.THINK_CLOSE[:i])),0,)buffer = buffer[len(buffer) - keep :]breakif (outputand hasattr(chunk, "delta")and hasattr(chunk.delta, "content")):chunk.delta.content = outputyield chunkreturn process_stream()server = AgentServer()def prewarm(proc: JobProcess):proc.userdata["vad"] = silero.VAD.load()server.setup_fnc = prewarm@server.rtc_session(agent_name="my-agent")async def entrypoint(ctx: JobContext):ctx.log_context_fields = {"room": ctx.room.name}session = AgentSession(stt=inference.STT(model="deepgram/nova-3", language="multi"),llm=inference.LLM(model="openai/gpt-5.2-chat-latest"),tts=inference.TTS(model="cartesia/sonic-3",voice="9626c31c-bec5-4cca-baa8-f8ba9e84c8bc",),vad=ctx.proc.userdata["vad"],preemptive_generation=True,)await session.start(agent=SimpleAgent(), room=ctx.room)await ctx.connect()if __name__ == "__main__":cli.run_app(server)