Module livekit.plugins.langchain
LangChain plugin for LiveKit Agents.
Classes
class LLMAdapter (graph: PregelProtocol, *, config: RunnableConfig | None = None)
-
Expand source code
class LLMAdapter(llm.LLM): def __init__( self, graph: PregelProtocol, *, config: RunnableConfig | None = None, ) -> None: super().__init__() self._graph = graph self._config = config def chat( self, *, chat_ctx: ChatContext, tools: list[FunctionTool | RawFunctionTool] | None = None, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, # these are unused, since tool execution takes place in langgraph parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN, tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN, extra_kwargs: NotGivenOr[dict[str, Any]] = NOT_GIVEN, ) -> LangGraphStream: return LangGraphStream( self, chat_ctx=chat_ctx, tools=tools or [], graph=self._graph, conn_options=conn_options, config=self._config, )
Helper class that provides a standard way to create an ABC using inheritance.
Ancestors
- livekit.agents.llm.llm.LLM
- abc.ABC
- EventEmitter
- typing.Generic
Methods
def chat(self,
*,
chat_ctx: ChatContext,
tools: list[FunctionTool | RawFunctionTool] | None = None,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0),
parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN,
tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN,
extra_kwargs: NotGivenOr[dict[str, Any]] = NOT_GIVEN) ‑> livekit.plugins.langchain.langgraph.LangGraphStream-
Expand source code
def chat( self, *, chat_ctx: ChatContext, tools: list[FunctionTool | RawFunctionTool] | None = None, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, # these are unused, since tool execution takes place in langgraph parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN, tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN, extra_kwargs: NotGivenOr[dict[str, Any]] = NOT_GIVEN, ) -> LangGraphStream: return LangGraphStream( self, chat_ctx=chat_ctx, tools=tools or [], graph=self._graph, conn_options=conn_options, config=self._config, )
Inherited members
class LangGraphStream (llm: LLMAdapter,
*,
chat_ctx: ChatContext,
tools: list[FunctionTool | RawFunctionTool],
conn_options: APIConnectOptions,
graph: PregelProtocol,
config: RunnableConfig | None = None)-
Expand source code
class LangGraphStream(llm.LLMStream): def __init__( self, llm: LLMAdapter, *, chat_ctx: ChatContext, tools: list[FunctionTool | RawFunctionTool], conn_options: APIConnectOptions, graph: PregelProtocol, config: RunnableConfig | None = None, ): super().__init__( llm, chat_ctx=chat_ctx, tools=tools, conn_options=conn_options, ) self._graph = graph self._config = config async def _run(self) -> None: state = self._chat_ctx_to_state() async for message_chunk, _ in self._graph.astream( state, self._config, stream_mode="messages", ): chat_chunk = _to_chat_chunk(message_chunk) if chat_chunk: self._event_ch.send_nowait(chat_chunk) def _chat_ctx_to_state(self) -> dict[str, Any]: """Convert chat context to langgraph input""" messages: list[AIMessage | HumanMessage | SystemMessage] = [] for item in self._chat_ctx.items: # only support chat messages, ignoring tool calls if isinstance(item, ChatMessage): content = item.text_content if content: if item.role == "assistant": messages.append(AIMessage(content=content, id=item.id)) elif item.role == "user": messages.append(HumanMessage(content=content, id=item.id)) elif item.role in ["system", "developer"]: messages.append(SystemMessage(content=content, id=item.id)) return { "messages": messages, }
Helper class that provides a standard way to create an ABC using inheritance.
Ancestors
- livekit.agents.llm.llm.LLMStream
- abc.ABC