This example shows how to filter the LLM's output with a second moderation model. The agent buffers sentences from the main LLM stream, checks them with a moderator LLM, and only forwards safe text to the TTS.
Prerequisites
- Add a
.envin this directory with your LiveKit credentials:LIVEKIT_URL=your_livekit_urlLIVEKIT_API_KEY=your_api_keyLIVEKIT_API_SECRET=your_api_secretOPENAI_API_KEY=your_openai_key - Install dependencies:pip install "livekit-agents[silero]" livekit-plugins-openai python-dotenv
Load configuration and logging
Load environment variables and configure logging for monitoring moderation decisions.
import loggingimport asynciofrom typing import Optional, Anyfrom dotenv import load_dotenvfrom livekit.agents import JobContext, JobProcess, Agent, AgentSession, inference, AgentServer, clifrom livekit.plugins import openai, silerofrom livekit.agents.llm import ChatContext, ChatMessageload_dotenv()logger = logging.getLogger("complex-content-filter")logger.setLevel(logging.INFO)server = AgentServer()
Prewarm VAD for faster connections
Preload the VAD model once per process to reduce connection latency.
def prewarm(proc: JobProcess):proc.userdata["vad"] = silero.VAD.load()server.setup_fnc = prewarm
Create the dual-LLM agent
The agent keeps a separate moderator LLM for content checks. The main LLM for responses is provided through the AgentSession using LiveKit inference, while the moderator uses the OpenAI plugin directly for fine-grained control.
class ContentFilterAgent(Agent):def __init__(self) -> None:super().__init__(instructions="You are a helpful agent.")self.moderator_llm = openai.LLM(model="gpt-4o-mini")async def on_enter(self):self.session.generate_reply()
Evaluate content with a moderator prompt
Send candidate text to the moderator LLM with a strict system prompt that returns only APPROPRIATE/INAPPROPRIATE. Parse the streamed response and return a boolean.
async def evaluate_content(self, text: str) -> bool:moderation_ctx = ChatContext([ChatMessage(type="message", role="system", content=["You are a content moderator. Respond ONLY with 'APPROPRIATE' or 'INAPPROPRIATE'. Respond with 'INAPPROPRIATE' if the text mentions strawberries."]),ChatMessage(type="message", role="user", content=[f"Evaluate: {text}"])])response = ""async with self.moderator_llm.chat(chat_ctx=moderation_ctx) as stream:async for chunk in stream:content = getattr(chunk.delta, "content", None) if hasattr(chunk, "delta") else str(chunk)if content:response += contentreturn "INAPPROPRIATE" not in response.strip().upper()
Extract content from streamed chunks
This helper normalizes string vs delta-based chunks from the main LLM stream.
def _extract_content(self, chunk: Any) -> Optional[str]:if not chunk:return Noneif isinstance(chunk, str):return chunkif hasattr(chunk, "delta"):return getattr(chunk.delta, "content", None)return None
Override llm_node to buffer and filter
Buffer text until a sentence-ending punctuation appears. When a sentence completes, send it to the moderator; if approved, yield buffered chunks downstream, otherwise drop them.
async def llm_node(self, chat_ctx, tools, model_settings=None):async def process_stream():buffer = ""chunk_buffer = []sentence_end_chars = {".", "!", "?"}async with self.session.llm.chat(chat_ctx=chat_ctx, tools=tools, tool_choice=None) as stream:try:async for chunk in stream:content = self._extract_content(chunk)chunk_buffer.append(chunk)if content:buffer += contentif any(char in buffer for char in sentence_end_chars):last_end = max(buffer.rfind(char) for char in sentence_end_chars if char in buffer)if last_end != -1:sentence = buffer[:last_end + 1]buffer = buffer[last_end + 1:]if not await self.evaluate_content(sentence):yield "Content filtered."returnfor buffered_chunk in chunk_buffer:yield buffered_chunkchunk_buffer = []if buffer and any(buffer.endswith(char) for char in sentence_end_chars):if not await self.evaluate_content(buffer):yield "Content filtered."returnfor buffered_chunk in chunk_buffer:yield buffered_chunkexcept asyncio.CancelledError:raiseexcept Exception as e:logger.error(f"Error in content filtering: {str(e)}")yield "[Error in content filtering]"return process_stream()
Set up the session
Configure the AgentSession with LiveKit inference for STT, LLM, and TTS. The main LLM is accessed via self.session.llm in the llm_node override.
@server.rtc_session()async def entrypoint(ctx: JobContext):ctx.log_context_fields = {"room": ctx.room.name}session = AgentSession(stt=inference.STT(model="deepgram/nova-3-general"),llm=inference.LLM(model="openai/gpt-4.1-mini"),tts=inference.TTS(model="cartesia/sonic-3", voice="9626c31c-bec5-4cca-baa8-f8ba9e84c8bc"),vad=ctx.proc.userdata["vad"],preemptive_generation=True,)agent = ContentFilterAgent()await session.start(agent=agent, room=ctx.room)await ctx.connect()
Run the server
Start the agent server with the CLI.
if __name__ == "__main__":cli.run_app(server)
Run it
python llm_powered_content_filter.py console
How it works
- The main LLM streams responses via LiveKit inference; chunks are buffered until a sentence completes.
- The moderator LLM (using the OpenAI plugin directly) judges the buffered text; unsafe content is dropped.
- Safe chunks are replayed to the downstream pipeline (and then to TTS).
- The agent owns the moderator LLM separately from the session's main LLM.
Full example
import loggingimport asynciofrom typing import Optional, Anyfrom dotenv import load_dotenvfrom livekit.agents import JobContext, JobProcess, Agent, AgentSession, inference, AgentServer, clifrom livekit.plugins import openai, silerofrom livekit.agents.llm import ChatContext, ChatMessageload_dotenv()logger = logging.getLogger("complex-content-filter")logger.setLevel(logging.INFO)class ContentFilterAgent(Agent):def __init__(self) -> None:super().__init__(instructions="You are a helpful agent.")self.moderator_llm = inference.LLM(model="openai/gpt-4.1-mini")async def evaluate_content(self, text: str) -> bool:"""Evaluate if content is appropriate using a separate LLM."""moderation_ctx = ChatContext([ChatMessage(type="message",role="system",content=["You are a content moderator. Respond ONLY with 'APPROPRIATE' or 'INAPPROPRIATE'. Respond with 'INAPPROPRIATE' if the text mentions strawberries."]),ChatMessage(type="message", role="user", content=[f"Evaluate: {text}"])])response = ""async with self.moderator_llm.chat(chat_ctx=moderation_ctx) as stream:async for chunk in stream:if not chunk:continuecontent = getattr(chunk.delta, 'content', None) if hasattr(chunk, 'delta') else str(chunk)if content:response += contentresponse = response.strip().upper()logger.info(f"Moderation response for '{text}': {response}")return "INAPPROPRIATE" not in responseasync def on_enter(self):self.session.generate_reply()def _extract_content(self, chunk: Any) -> Optional[str]:"""Extract content from a chunk, handling different chunk formats."""if not chunk:return Noneif isinstance(chunk, str):return chunkif hasattr(chunk, 'delta'):return getattr(chunk.delta, 'content', None)return Noneasync def llm_node(self, chat_ctx, tools, model_settings=None):async def process_stream():buffer = ""chunk_buffer = []sentence_end_chars = {'.', '!', '?'}async with self.session.llm.chat(chat_ctx=chat_ctx, tools=tools, tool_choice=None) as stream:try:async for chunk in stream:content = self._extract_content(chunk)chunk_buffer.append(chunk)if content:buffer += contentif any(char in buffer for char in sentence_end_chars):last_end = max(buffer.rfind(char) for char in sentence_end_chars if char in buffer)if last_end != -1:sentence = buffer[:last_end + 1]buffer = buffer[last_end + 1:]if not await self.evaluate_content(sentence):yield "Content filtered."returnfor buffered_chunk in chunk_buffer:yield buffered_chunkchunk_buffer = []if buffer and any(buffer.endswith(char) for char in sentence_end_chars):if not await self.evaluate_content(buffer):yield "Content filtered."returnfor buffered_chunk in chunk_buffer:yield buffered_chunkexcept asyncio.CancelledError:raiseexcept Exception as e:logger.error(f"Error in content filtering: {str(e)}")yield "[Error in content filtering]"return process_stream()server = AgentServer()def prewarm(proc: JobProcess):proc.userdata["vad"] = silero.VAD.load()server.setup_fnc = prewarm@server.rtc_session()async def entrypoint(ctx: JobContext):ctx.log_context_fields = {"room": ctx.room.name}session = AgentSession(stt=inference.STT(model="deepgram/nova-3-general"),llm=inference.LLM(model="openai/gpt-4.1-mini"),tts=inference.TTS(model="cartesia/sonic-3", voice="9626c31c-bec5-4cca-baa8-f8ba9e84c8bc"),vad=ctx.proc.userdata["vad"],preemptive_generation=True,)agent = ContentFilterAgent()await session.start(agent=agent, room=ctx.room)await ctx.connect()if __name__ == "__main__":cli.run_app(server)