LiveKit docs › Advanced LLM › LLM Output Replacement

---

# LLM Output Replacement

> Remove chain-of-thought reasoning blocks from a streaming LLM response before they reach TTS.

This recipe shows how to remove chain-of-thought reasoning from a streaming LLM response before it reaches TTS. The agent overrides `llm_node` to remove `<think>...</think>` blocks, so only the final answer reaches both speech and chat history. The same buffered state-machine pattern works for other inline markup you want removed from chat context, like RAG citation markers or structured-output scaffolding.

> ℹ️ **Note**
> 
> If you only need to change pronunciation or remove Markdown for TTS (without modifying chat context), see [`tts_text_transforms`](https://docs.livekit.io/agents/multimodality/text.md#text-transforms). The technique below is for cases where the change should also persist in conversation history.

## Prerequisites

To complete this guide, you need the following prerequisites:

- Create an agent using the [Voice AI quickstart](https://docs.livekit.io/agents/start/voice-ai.md).

## Define the agent

The system prompt instructs the model to wrap its reasoning in `<think>...</think>` tags.

```python
class SimpleAgent(Agent):
    THINK_OPEN = "<think>"
    THINK_CLOSE = "</think>"

    def __init__(self) -> None:
        super().__init__(
            instructions=(
                "You are a helpful agent that thinks through problems step by step. "
                "Wrap your reasoning in <think></think> tags, then provide your final answer."
            ),
        )

    async def on_enter(self):
        self.session.generate_reply()

```

## Filter thinking blocks in llm_node

Override `llm_node` to wrap the LLM stream with a state machine. An `in_thinking` flag tracks whether the stream is currently inside a `<think>` block, and a small buffer holds back trailing characters that could be the start of a tag split across chunks.

`self.session.llm.chat(...)` invokes whichever LLM you configured on `AgentSession`, so this filter works with any model — including reasoning models that emit `<think>` blocks natively.

```python
    async def llm_node(self, chat_ctx, tools, model_settings=None):
        async def process_stream():
            in_thinking = False
            buffer = ""

            async with self.session.llm.chat(
                chat_ctx=chat_ctx, tools=tools, tool_choice=None
            ) as stream:
                async for chunk in stream:
                    content = (
                        getattr(chunk.delta, "content", None)
                        if hasattr(chunk, "delta")
                        else None
                    )
                    if content is None:
                        yield chunk
                        continue

                    buffer += content
                    output = ""

                    while buffer:
                        if not in_thinking:
                            idx = buffer.find(self.THINK_OPEN)
                            if idx >= 0:
                                output += buffer[:idx]
                                buffer = buffer[idx + len(self.THINK_OPEN) :]
                                in_thinking = True
                                continue
                            # Hold back any trailing characters that could start "<think>".
                            keep = next(
                                (
                                    i
                                    for i in range(len(self.THINK_OPEN) - 1, 0, -1)
                                    if buffer.endswith(self.THINK_OPEN[:i])
                                ),
                                0,
                            )
                            output += buffer[: len(buffer) - keep]
                            buffer = buffer[len(buffer) - keep :]
                            break
                        else:
                            idx = buffer.find(self.THINK_CLOSE)
                            if idx >= 0:
                                buffer = buffer[idx + len(self.THINK_CLOSE) :]
                                in_thinking = False
                                continue
                            # Drop thinking text but hold back any trailing partial "</think>".
                            keep = next(
                                (
                                    i
                                    for i in range(len(self.THINK_CLOSE) - 1, 0, -1)
                                    if buffer.endswith(self.THINK_CLOSE[:i])
                                ),
                                0,
                            )
                            buffer = buffer[len(buffer) - keep :]
                            break

                    if (
                        output
                        and hasattr(chunk, "delta")
                        and hasattr(chunk.delta, "content")
                    ):
                        chunk.delta.content = output
                        yield chunk

        return process_stream()

```

## Run it

```bash
uv run src/agent.py console

```

## How it works

1. The LLM streams its response one chunk at a time, with reasoning wrapped in `<think>...</think>`.
2. The custom `llm_node` wraps that stream with a state machine. An `in_thinking` flag tracks whether the stream is currently inside a `<think>` block, and a small buffer holds back trailing characters that could be the start of a tag split across chunks.
3. While outside a thinking block, the filter emits text and only buffers a trailing partial prefix of `<think>`. When the full opening tag arrives, the filter switches into thinking mode.
4. While inside a thinking block, the filter drops text and only buffers a trailing partial prefix of `</think>`. When the closing tag arrives, the filter switches back to passing text through.
5. TTS receives only the cleaned chunks, and the chat history persisted by the session contains the same cleaned text. The filter removes the reasoning from both surfaces in one pass.

## Full example

```python
import logging
from dotenv import load_dotenv
from livekit.agents import (
    Agent,
    AgentServer,
    AgentSession,
    JobContext,
    JobProcess,
    cli,
    inference,
)
from livekit.plugins import silero

load_dotenv(".env.local")

logger = logging.getLogger("replacing-llm-output")
logger.setLevel(logging.INFO)


class SimpleAgent(Agent):
    THINK_OPEN = "<think>"
    THINK_CLOSE = "</think>"

    def __init__(self) -> None:
        super().__init__(
            instructions=(
                "You are a helpful agent that thinks through problems step by step. "
                "Wrap your reasoning in <think></think> tags, then provide your final answer."
            ),
        )

    async def on_enter(self):
        self.session.generate_reply()

    async def llm_node(self, chat_ctx, tools, model_settings=None):
        async def process_stream():
            in_thinking = False
            buffer = ""

            async with self.session.llm.chat(
                chat_ctx=chat_ctx, tools=tools, tool_choice=None
            ) as stream:
                async for chunk in stream:
                    content = (
                        getattr(chunk.delta, "content", None)
                        if hasattr(chunk, "delta")
                        else None
                    )
                    if content is None:
                        yield chunk
                        continue

                    buffer += content
                    output = ""

                    while buffer:
                        if not in_thinking:
                            idx = buffer.find(self.THINK_OPEN)
                            if idx >= 0:
                                output += buffer[:idx]
                                buffer = buffer[idx + len(self.THINK_OPEN) :]
                                in_thinking = True
                                continue
                            keep = next(
                                (
                                    i
                                    for i in range(len(self.THINK_OPEN) - 1, 0, -1)
                                    if buffer.endswith(self.THINK_OPEN[:i])
                                ),
                                0,
                            )
                            output += buffer[: len(buffer) - keep]
                            buffer = buffer[len(buffer) - keep :]
                            break
                        else:
                            idx = buffer.find(self.THINK_CLOSE)
                            if idx >= 0:
                                buffer = buffer[idx + len(self.THINK_CLOSE) :]
                                in_thinking = False
                                continue
                            keep = next(
                                (
                                    i
                                    for i in range(len(self.THINK_CLOSE) - 1, 0, -1)
                                    if buffer.endswith(self.THINK_CLOSE[:i])
                                ),
                                0,
                            )
                            buffer = buffer[len(buffer) - keep :]
                            break

                    if (
                        output
                        and hasattr(chunk, "delta")
                        and hasattr(chunk.delta, "content")
                    ):
                        chunk.delta.content = output
                        yield chunk

        return process_stream()


server = AgentServer()


def prewarm(proc: JobProcess):
    proc.userdata["vad"] = silero.VAD.load()


server.setup_fnc = prewarm


@server.rtc_session(agent_name="my-agent")
async def entrypoint(ctx: JobContext):
    ctx.log_context_fields = {"room": ctx.room.name}

    session = AgentSession(
        stt=inference.STT(model="deepgram/nova-3", language="multi"),
        llm=inference.LLM(model="openai/gpt-5.2-chat-latest"),
        tts=inference.TTS(
            model="cartesia/sonic-3",
            voice="9626c31c-bec5-4cca-baa8-f8ba9e84c8bc",
        ),
        vad=ctx.proc.userdata["vad"],
        preemptive_generation=True,
    )

    await session.start(agent=SimpleAgent(), room=ctx.room)
    await ctx.connect()


if __name__ == "__main__":
    cli.run_app(server)

```

---

This document was rendered at 2026-06-07T11:35:12.340Z.
For the latest version of this document, see [https://docs.livekit.io/reference/recipes/replacing_llm_output.md](https://docs.livekit.io/reference/recipes/replacing_llm_output.md).

To explore all LiveKit documentation, see [llms.txt](https://docs.livekit.io/llms.txt).