Module livekit.agents.llm.llm
Classes
class ChatChunk (request_id: str,
choices: list[Choice] = <factory>,
usage: CompletionUsage | None = None)-
Expand source code
@dataclass class ChatChunk: request_id: str choices: list[Choice] = field(default_factory=list) usage: CompletionUsage | None = None
ChatChunk(request_id: 'str', choices: 'list[Choice]' =
, usage: 'CompletionUsage | None' = None) Class variables
var choices : list[Choice]
var request_id : str
var usage : CompletionUsage | None
class Choice (delta: ChoiceDelta,
index: int = 0)-
Expand source code
@dataclass class Choice: delta: ChoiceDelta index: int = 0
Choice(delta: 'ChoiceDelta', index: 'int' = 0)
Class variables
var delta : ChoiceDelta
var index : int
class ChoiceDelta (role: ChatRole,
content: str | None = None,
tool_calls: list[function_context.FunctionCallInfo] | None = None)-
Expand source code
@dataclass class ChoiceDelta: role: ChatRole content: str | None = None tool_calls: list[function_context.FunctionCallInfo] | None = None
ChoiceDelta(role: 'ChatRole', content: 'str | None' = None, tool_calls: 'list[function_context.FunctionCallInfo] | None' = None)
Class variables
var content : str | None
var role : Literal['system', 'user', 'assistant', 'tool']
var tool_calls : list[FunctionCallInfo] | None
class CompletionUsage (completion_tokens: int, prompt_tokens: int, total_tokens: int)
-
Expand source code
@dataclass class CompletionUsage: completion_tokens: int prompt_tokens: int total_tokens: int
CompletionUsage(completion_tokens: 'int', prompt_tokens: 'int', total_tokens: 'int')
Class variables
var completion_tokens : int
var prompt_tokens : int
var total_tokens : int
class LLM
-
Expand source code
class LLM( ABC, rtc.EventEmitter[Union[Literal["metrics_collected"], TEvent]], Generic[TEvent], ): def __init__(self) -> None: super().__init__() self._capabilities = LLMCapabilities() self._label = f"{type(self).__module__}.{type(self).__name__}" @property def label(self) -> str: return self._label @abstractmethod def chat( self, *, chat_ctx: ChatContext, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, fnc_ctx: function_context.FunctionContext | None = None, temperature: float | None = None, n: int | None = None, parallel_tool_calls: bool | None = None, tool_choice: Union[ToolChoice, Literal["auto", "required", "none"]] | None = None, ) -> "LLMStream": ... @property def capabilities(self) -> LLMCapabilities: return self._capabilities async def aclose(self) -> None: ... async def __aenter__(self) -> LLM: return self async def __aexit__( self, exc_type: type[BaseException] | None, exc: BaseException | None, exc_tb: TracebackType | None, ) -> None: await self.aclose()
Helper class that provides a standard way to create an ABC using inheritance.
Ancestors
- abc.ABC
- EventEmitter
- typing.Generic
Subclasses
- FallbackAdapter
- livekit.plugins.anthropic.llm.LLM
- AssistantLLM
- livekit.plugins.openai.llm.LLM
Instance variables
prop capabilities : LLMCapabilities
-
Expand source code
@property def capabilities(self) -> LLMCapabilities: return self._capabilities
prop label : str
-
Expand source code
@property def label(self) -> str: return self._label
Methods
async def aclose(self) ‑> None
-
Expand source code
async def aclose(self) -> None: ...
def chat(self,
*,
chat_ctx: ChatContext,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=5.0, timeout=10.0),
fnc_ctx: function_context.FunctionContext | None = None,
temperature: float | None = None,
n: int | None = None,
parallel_tool_calls: bool | None = None,
tool_choice: "Union[ToolChoice, Literal['auto', 'required', 'none']] | None" = None) ‑> LLMStream-
Expand source code
@abstractmethod def chat( self, *, chat_ctx: ChatContext, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, fnc_ctx: function_context.FunctionContext | None = None, temperature: float | None = None, n: int | None = None, parallel_tool_calls: bool | None = None, tool_choice: Union[ToolChoice, Literal["auto", "required", "none"]] | None = None, ) -> "LLMStream": ...
Inherited members
class LLMCapabilities (supports_choices_on_int: bool = True)
-
Expand source code
@dataclass class LLMCapabilities: supports_choices_on_int: bool = True
LLMCapabilities(supports_choices_on_int: 'bool' = True)
Class variables
var supports_choices_on_int : bool
class LLMStream (llm: LLM,
*,
chat_ctx: ChatContext,
fnc_ctx: function_context.FunctionContext | None,
conn_options: APIConnectOptions)-
Expand source code
class LLMStream(ABC): def __init__( self, llm: LLM, *, chat_ctx: ChatContext, fnc_ctx: function_context.FunctionContext | None, conn_options: APIConnectOptions, ) -> None: self._llm = llm self._chat_ctx = chat_ctx self._fnc_ctx = fnc_ctx self._conn_options = conn_options self._event_ch = aio.Chan[ChatChunk]() self._event_aiter, monitor_aiter = aio.itertools.tee(self._event_ch, 2) self._metrics_task = asyncio.create_task( self._metrics_monitor_task(monitor_aiter), name="LLM._metrics_task" ) self._task = asyncio.create_task(self._main_task()) self._task.add_done_callback(lambda _: self._event_ch.close()) self._function_calls_info: list[function_context.FunctionCallInfo] = [] self._function_tasks = set[asyncio.Task[Any]]() @abstractmethod async def _run(self) -> None: ... async def _main_task(self) -> None: for i in range(self._conn_options.max_retry + 1): try: return await self._run() except APIError as e: if self._conn_options.max_retry == 0: raise elif i == self._conn_options.max_retry: raise APIConnectionError( f"failed to generate LLM completion after {self._conn_options.max_retry + 1} attempts", ) from e else: logger.warning( f"failed to generate LLM completion, retrying in {self._conn_options.retry_interval}s", exc_info=e, extra={ "llm": self._llm._label, "attempt": i + 1, }, ) await asyncio.sleep(self._conn_options.retry_interval) @utils.log_exceptions(logger=logger) async def _metrics_monitor_task( self, event_aiter: AsyncIterable[ChatChunk] ) -> None: start_time = time.perf_counter() ttft = -1.0 request_id = "" usage: CompletionUsage | None = None async for ev in event_aiter: request_id = ev.request_id if ttft == -1.0: ttft = time.perf_counter() - start_time if ev.usage is not None: usage = ev.usage duration = time.perf_counter() - start_time metrics = LLMMetrics( timestamp=time.time(), request_id=request_id, ttft=ttft, duration=duration, cancelled=self._task.cancelled(), label=self._llm._label, completion_tokens=usage.completion_tokens if usage else 0, prompt_tokens=usage.prompt_tokens if usage else 0, total_tokens=usage.total_tokens if usage else 0, tokens_per_second=usage.completion_tokens / duration if usage else 0.0, error=None, ) self._llm.emit("metrics_collected", metrics) @property def function_calls(self) -> list[function_context.FunctionCallInfo]: """List of called functions from this stream.""" return self._function_calls_info @property def chat_ctx(self) -> ChatContext: """The initial chat context of this stream.""" return self._chat_ctx @property def fnc_ctx(self) -> function_context.FunctionContext | None: """The function context of this stream.""" return self._fnc_ctx def execute_functions(self) -> list[function_context.CalledFunction]: """Execute all functions concurrently of this stream.""" called_functions: list[function_context.CalledFunction] = [] for fnc_info in self._function_calls_info: called_fnc = fnc_info.execute() self._function_tasks.add(called_fnc.task) called_fnc.task.add_done_callback(self._function_tasks.remove) called_functions.append(called_fnc) return called_functions async def aclose(self) -> None: await aio.gracefully_cancel(self._task) await utils.aio.gracefully_cancel(*self._function_tasks) await self._metrics_task async def __anext__(self) -> ChatChunk: try: val = await self._event_aiter.__anext__() except StopAsyncIteration: if not self._task.cancelled() and (exc := self._task.exception()): raise exc from None raise StopAsyncIteration return val def __aiter__(self) -> AsyncIterator[ChatChunk]: return self async def __aenter__(self) -> LLMStream: return self async def __aexit__( self, exc_type: type[BaseException] | None, exc: BaseException | None, exc_tb: TracebackType | None, ) -> None: await self.aclose()
Helper class that provides a standard way to create an ABC using inheritance.
Ancestors
- abc.ABC
Subclasses
- FallbackLLMStream
- livekit.plugins.anthropic.llm.LLMStream
- AssistantLLMStream
- livekit.plugins.openai.llm.LLMStream
Instance variables
prop chat_ctx : ChatContext
-
Expand source code
@property def chat_ctx(self) -> ChatContext: """The initial chat context of this stream.""" return self._chat_ctx
The initial chat context of this stream.
prop fnc_ctx : function_context.FunctionContext | None
-
Expand source code
@property def fnc_ctx(self) -> function_context.FunctionContext | None: """The function context of this stream.""" return self._fnc_ctx
The function context of this stream.
prop function_calls : list[function_context.FunctionCallInfo]
-
Expand source code
@property def function_calls(self) -> list[function_context.FunctionCallInfo]: """List of called functions from this stream.""" return self._function_calls_info
List of called functions from this stream.
Methods
async def aclose(self) ‑> None
-
Expand source code
async def aclose(self) -> None: await aio.gracefully_cancel(self._task) await utils.aio.gracefully_cancel(*self._function_tasks) await self._metrics_task
def execute_functions(self) ‑> list[CalledFunction]
-
Expand source code
def execute_functions(self) -> list[function_context.CalledFunction]: """Execute all functions concurrently of this stream.""" called_functions: list[function_context.CalledFunction] = [] for fnc_info in self._function_calls_info: called_fnc = fnc_info.execute() self._function_tasks.add(called_fnc.task) called_fnc.task.add_done_callback(self._function_tasks.remove) called_functions.append(called_fnc) return called_functions
Execute all functions concurrently of this stream.
class ToolChoice (type: "Literal['function']", name: str)
-
Expand source code
@dataclass class ToolChoice: type: Literal["function"] name: str
ToolChoice(type: "Literal['function']", name: 'str')
Class variables
var name : str
var type : Literal['function']