Module livekit.agents.llm.llm
Classes
class ChatChunk (choices: list[Choice] = <factory>)
-
ChatChunk(choices: 'list[Choice]' =
) Expand source code
@dataclass class ChatChunk: choices: list[Choice] = field(default_factory=list)
Class variables
var choices : list[Choice]
class Choice (delta: ChoiceDelta, index: int = 0)
-
Choice(delta: 'ChoiceDelta', index: 'int' = 0)
Expand source code
@dataclass class Choice: delta: ChoiceDelta index: int = 0
Class variables
var delta : ChoiceDelta
var index : int
class ChoiceDelta (role: ChatRole, content: str | None = None, tool_calls: list[function_context.FunctionCallInfo] | None = None)
-
ChoiceDelta(role: 'ChatRole', content: 'str | None' = None, tool_calls: 'list[function_context.FunctionCallInfo] | None' = None)
Expand source code
@dataclass class ChoiceDelta: role: ChatRole content: str | None = None tool_calls: list[function_context.FunctionCallInfo] | None = None
Class variables
var content : str | None
var role : Literal['system', 'user', 'assistant', 'tool']
var tool_calls : list[FunctionCallInfo] | None
class LLM
-
Helper class that provides a standard way to create an ABC using inheritance.
Expand source code
class LLM(abc.ABC): @abc.abstractmethod def chat( self, *, chat_ctx: ChatContext, fnc_ctx: function_context.FunctionContext | None = None, temperature: float | None = None, n: int | None = None, parallel_tool_calls: bool | None = None, ) -> "LLMStream": ...
Ancestors
- abc.ABC
Subclasses
- livekit.plugins.anthropic.llm.LLM
- AssistantLLM
- livekit.plugins.openai.llm.LLM
Methods
def chat(self, *, chat_ctx: ChatContext, fnc_ctx: function_context.FunctionContext | None = None, temperature: float | None = None, n: int | None = None, parallel_tool_calls: bool | None = None) ‑> LLMStream
class LLMStream (*, chat_ctx: ChatContext, fnc_ctx: function_context.FunctionContext | None)
-
Helper class that provides a standard way to create an ABC using inheritance.
Expand source code
class LLMStream(abc.ABC): def __init__( self, *, chat_ctx: ChatContext, fnc_ctx: function_context.FunctionContext | None ) -> None: self._function_calls_info: list[function_context.FunctionCallInfo] = [] self._tasks = set[asyncio.Task[Any]]() self._chat_ctx = chat_ctx self._fnc_ctx = fnc_ctx @property def function_calls(self) -> list[function_context.FunctionCallInfo]: """List of called functions from this stream.""" return self._function_calls_info @property def chat_ctx(self) -> ChatContext: """The initial chat context of this stream.""" return self._chat_ctx @property def fnc_ctx(self) -> function_context.FunctionContext | None: """The function context of this stream.""" return self._fnc_ctx def execute_functions(self) -> list[function_context.CalledFunction]: """Execute all functions concurrently of this stream.""" called_functions: list[function_context.CalledFunction] = [] for fnc_info in self._function_calls_info: called_fnc = fnc_info.execute() self._tasks.add(called_fnc.task) called_fnc.task.add_done_callback(self._tasks.remove) called_functions.append(called_fnc) return called_functions async def aclose(self) -> None: await utils.aio.gracefully_cancel(*self._tasks) def __aiter__(self) -> AsyncIterator[ChatChunk]: return self @abc.abstractmethod async def __anext__(self) -> ChatChunk: ...
Ancestors
- abc.ABC
Subclasses
- livekit.plugins.anthropic.llm.LLMStream
- AssistantLLMStream
- livekit.plugins.openai.llm.LLMStream
Instance variables
prop chat_ctx : ChatContext
-
The initial chat context of this stream.
Expand source code
@property def chat_ctx(self) -> ChatContext: """The initial chat context of this stream.""" return self._chat_ctx
prop fnc_ctx : function_context.FunctionContext | None
-
The function context of this stream.
Expand source code
@property def fnc_ctx(self) -> function_context.FunctionContext | None: """The function context of this stream.""" return self._fnc_ctx
prop function_calls : list[function_context.FunctionCallInfo]
-
List of called functions from this stream.
Expand source code
@property def function_calls(self) -> list[function_context.FunctionCallInfo]: """List of called functions from this stream.""" return self._function_calls_info
Methods
async def aclose(self) ‑> None
def execute_functions(self) ‑> list[CalledFunction]
-
Execute all functions concurrently of this stream.