Module livekit.agents.llm
Sub-modules
livekit.agents.llm.chat_context
livekit.agents.llm.fallback_adapter
livekit.agents.llm.function_context
livekit.agents.llm.llm
Functions
def _create_ai_function_info(fnc_ctx: FunctionContext,
tool_call_id: str,
fnc_name: str,
raw_arguments: str) ‑> FunctionCallInfo-
Expand source code
def _create_ai_function_info( fnc_ctx: FunctionContext, tool_call_id: str, fnc_name: str, raw_arguments: str, # JSON string ) -> FunctionCallInfo: if fnc_name not in fnc_ctx.ai_functions: raise ValueError(f"AI function {fnc_name} not found") parsed_arguments: dict[str, Any] = {} try: if raw_arguments: # ignore empty string parsed_arguments = json.loads(raw_arguments) except json.JSONDecodeError: raise ValueError( f"AI function {fnc_name} received invalid JSON arguments - {raw_arguments}" ) fnc_info = fnc_ctx.ai_functions[fnc_name] # Ensure all necessary arguments are present and of the correct type. sanitized_arguments: dict[str, Any] = {} for arg_info in fnc_info.arguments.values(): if arg_info.name not in parsed_arguments: if arg_info.default is inspect.Parameter.empty: raise ValueError( f"AI function {fnc_name} missing required argument {arg_info.name}" ) continue arg_value = parsed_arguments[arg_info.name] is_optional, inner_th = _is_optional_type(arg_info.type) if typing.get_origin(inner_th) is not None: if not isinstance(arg_value, list): raise ValueError( f"AI function {fnc_name} argument {arg_info.name} should be a list" ) inner_type = typing.get_args(inner_th)[0] sanitized_value = [ _sanitize_primitive( value=v, expected_type=inner_type, choices=arg_info.choices, ) for v in arg_value ] else: sanitized_value = _sanitize_primitive( value=arg_value, expected_type=inner_th, choices=arg_info.choices, ) sanitized_arguments[arg_info.name] = sanitized_value return FunctionCallInfo( tool_call_id=tool_call_id, raw_arguments=raw_arguments, function_info=fnc_info, arguments=sanitized_arguments, )
def ai_callable(*,
name: str | None = None,
description: str | _UseDocMarker | None = None,
auto_retry: bool = False) ‑> Callable-
Expand source code
def ai_callable( *, name: str | None = None, description: str | _UseDocMarker | None = None, auto_retry: bool = False, ) -> Callable: def deco(f): _set_metadata(f, name=name, desc=description, auto_retry=auto_retry) return f return deco
Classes
class AvailabilityChangedEvent (llm: LLM,
available: bool)-
Expand source code
@dataclass class AvailabilityChangedEvent: llm: LLM available: bool
AvailabilityChangedEvent(llm: 'LLM', available: 'bool')
Class variables
var available : bool
var llm : LLM
class CalledFunction (call_info: FunctionCallInfo,
task: asyncio.Task[Any],
result: Any | None = None,
exception: BaseException | None = None)-
Expand source code
@dataclass class CalledFunction: call_info: FunctionCallInfo task: asyncio.Task[Any] result: Any | None = None exception: BaseException | None = None
CalledFunction(call_info: 'FunctionCallInfo', task: 'asyncio.Task[Any]', result: 'Any | None' = None, exception: 'BaseException | None' = None)
Class variables
var call_info : FunctionCallInfo
var exception : BaseException | None
var result : typing.Any | None
var task : _asyncio.Task[typing.Any]
class ChatAudio (frame: rtc.AudioFrame | list[rtc.AudioFrame])
-
Expand source code
@dataclass class ChatAudio: frame: rtc.AudioFrame | list[rtc.AudioFrame]
ChatAudio(frame: 'rtc.AudioFrame | list[rtc.AudioFrame]')
Class variables
var frame : AudioFrame | list[AudioFrame]
class ChatChunk (request_id: str,
choices: list[Choice] = <factory>,
usage: CompletionUsage | None = None)-
Expand source code
@dataclass class ChatChunk: request_id: str choices: list[Choice] = field(default_factory=list) usage: CompletionUsage | None = None
ChatChunk(request_id: 'str', choices: 'list[Choice]' =
, usage: 'CompletionUsage | None' = None) Class variables
var choices : list[Choice]
var request_id : str
var usage : CompletionUsage | None
class ChatContext (messages: list[ChatMessage] = <factory>)
-
Expand source code
@dataclass class ChatContext: messages: list[ChatMessage] = field(default_factory=list) _metadata: dict[str, Any] = field(default_factory=dict, repr=False, init=False) def append( self, *, text: str = "", images: list[ChatImage] = [], role: ChatRole = "system" ) -> ChatContext: self.messages.append(ChatMessage.create(text=text, images=images, role=role)) return self def copy(self) -> ChatContext: copied_chat_ctx = ChatContext(messages=[m.copy() for m in self.messages]) copied_chat_ctx._metadata = self._metadata return copied_chat_ctx
ChatContext(messages: 'list[ChatMessage]' =
) Class variables
var messages : list[ChatMessage]
Methods
def append(self,
*,
text: str = '',
images: list[ChatImage] = [],
role: ChatRole = 'system') ‑> ChatContext-
Expand source code
def append( self, *, text: str = "", images: list[ChatImage] = [], role: ChatRole = "system" ) -> ChatContext: self.messages.append(ChatMessage.create(text=text, images=images, role=role)) return self
def copy(self) ‑> ChatContext
-
Expand source code
def copy(self) -> ChatContext: copied_chat_ctx = ChatContext(messages=[m.copy() for m in self.messages]) copied_chat_ctx._metadata = self._metadata return copied_chat_ctx
class ChatImage (image: str | rtc.VideoFrame,
inference_width: int | None = None,
inference_height: int | None = None,
inference_detail: "Literal['auto', 'high', 'low']" = 'auto')-
Expand source code
@dataclass class ChatImage: """ ChatImage is used to input images into the ChatContext on supported LLM providers / plugins. You may need to consult your LLM provider's documentation on supported URL types. ```python # Pass a VideoFrame directly, which will be automatically converted to a JPEG data URL internally async for event in rtc.VideoStream(video_track): chat_image = ChatImage(image=event.frame) # this instance is now available for your ChatContext # Encode your VideoFrame yourself for more control, and pass the result as a data URL (see EncodeOptions for more details) from livekit.agents.utils.images import encode, EncodeOptions, ResizeOptions image_bytes = encode( event.frame, EncodeOptions( format="PNG", resize_options=ResizeOptions( width=512, height=512, strategy="scale_aspect_fit" ), ), ) chat_image = ChatImage( image=f"data:image/png;base64,{base64.b64encode(image_bytes).decode('utf-8')}" ) # With an external URL chat_image = ChatImage(image="https://example.com/image.jpg") ``` """ image: str | rtc.VideoFrame """ Either a string URL or a VideoFrame object """ inference_width: int | None = None """ Resizing parameter for rtc.VideoFrame inputs (ignored for URL images) """ inference_height: int | None = None """ Resizing parameter for rtc.VideoFrame inputs (ignored for URL images) """ inference_detail: Literal["auto", "high", "low"] = "auto" """ Detail parameter for LLM provider, if supported. Currently only supported by OpenAI (see https://platform.openai.com/docs/guides/vision?lang=node#low-or-high-fidelity-image-understanding) """ _cache: dict[Any, Any] = field(default_factory=dict, repr=False, init=False) """ _cache is used internally by LLM implementations to store a processed version of the image for later use. """
ChatImage is used to input images into the ChatContext on supported LLM providers / plugins.
You may need to consult your LLM provider's documentation on supported URL types.
# Pass a VideoFrame directly, which will be automatically converted to a JPEG data URL internally async for event in rtc.VideoStream(video_track): chat_image = ChatImage(image=event.frame) # this instance is now available for your ChatContext # Encode your VideoFrame yourself for more control, and pass the result as a data URL (see EncodeOptions for more details) from livekit.agents.utils.images import encode, EncodeOptions, ResizeOptions image_bytes = encode( event.frame, EncodeOptions( format="PNG", resize_options=ResizeOptions( width=512, height=512, strategy="scale_aspect_fit" ), ), ) chat_image = ChatImage( image=f"data:image/png;base64,{base64.b64encode(image_bytes).decode('utf-8')}" ) # With an external URL chat_image = ChatImage(image="https://example.com/image.jpg")
Class variables
var image : str | VideoFrame
-
Either a string URL or a VideoFrame object
var inference_detail : Literal['auto', 'high', 'low']
-
Detail parameter for LLM provider, if supported.
Currently only supported by OpenAI (see https://platform.openai.com/docs/guides/vision?lang=node#low-or-high-fidelity-image-understanding)
var inference_height : int | None
-
Resizing parameter for rtc.VideoFrame inputs (ignored for URL images)
var inference_width : int | None
-
Resizing parameter for rtc.VideoFrame inputs (ignored for URL images)
class ChatMessage (role: ChatRole,
id: str = <factory>,
name: str | None = None,
content: ChatContent | list[ChatContent] | None = None,
tool_calls: list[FunctionCallInfo] | None = None,
tool_call_id: str | None = None,
tool_exception: Exception | None = None)-
Expand source code
@dataclass class ChatMessage: role: ChatRole id: str = field( default_factory=lambda: utils.shortuuid("item_") ) # used by the OAI realtime API name: str | None = None content: ChatContent | list[ChatContent] | None = None tool_calls: list[function_context.FunctionCallInfo] | None = None tool_call_id: str | None = None tool_exception: Exception | None = None _metadata: dict[str, Any] = field(default_factory=dict, repr=False, init=False) @staticmethod def create_tool_from_called_function( called_function: function_context.CalledFunction, ) -> "ChatMessage": if not called_function.task.done(): raise ValueError("cannot create a tool result from a running ai function") tool_exception: Exception | None = None try: content = called_function.task.result() except BaseException as e: if isinstance(e, Exception): tool_exception = e content = f"Error: {e}" return ChatMessage( role="tool", name=called_function.call_info.function_info.name, content=content, tool_call_id=called_function.call_info.tool_call_id, tool_exception=tool_exception, ) @staticmethod def create_tool_calls( called_functions: list[function_context.FunctionCallInfo], *, text: str = "", ) -> "ChatMessage": return ChatMessage(role="assistant", tool_calls=called_functions, content=text) @staticmethod def create( *, text: str = "", images: list[ChatImage] = [], role: ChatRole = "system", id: str | None = None, ) -> "ChatMessage": id = id or utils.shortuuid("item_") if len(images) == 0: return ChatMessage(role=role, content=text, id=id) else: content: list[ChatContent] = [] if text: content.append(text) if len(images) > 0: content.extend(images) return ChatMessage(role=role, content=content, id=id) def copy(self): content = self.content if isinstance(content, list): content = content.copy() tool_calls = self.tool_calls if tool_calls is not None: tool_calls = tool_calls.copy() copied_msg = ChatMessage( role=self.role, id=self.id, name=self.name, content=content, tool_calls=tool_calls, tool_call_id=self.tool_call_id, ) copied_msg._metadata = self._metadata return copied_msg
ChatMessage(role: 'ChatRole', id: 'str' =
, name: 'str | None' = None, content: 'ChatContent | list[ChatContent] | None' = None, tool_calls: 'list[function_context.FunctionCallInfo] | None' = None, tool_call_id: 'str | None' = None, tool_exception: 'Exception | None' = None) Class variables
var content : str | ChatImage | ChatAudio | list[str | ChatImage | ChatAudio] | None
var id : str
var name : str | None
var role : Literal['system', 'user', 'assistant', 'tool']
var tool_call_id : str | None
var tool_calls : list[FunctionCallInfo] | None
var tool_exception : Exception | None
Static methods
def create(*,
text: str = '',
images: list[ChatImage] = [],
role: ChatRole = 'system',
id: str | None = None) ‑> ChatMessage-
Expand source code
@staticmethod def create( *, text: str = "", images: list[ChatImage] = [], role: ChatRole = "system", id: str | None = None, ) -> "ChatMessage": id = id or utils.shortuuid("item_") if len(images) == 0: return ChatMessage(role=role, content=text, id=id) else: content: list[ChatContent] = [] if text: content.append(text) if len(images) > 0: content.extend(images) return ChatMessage(role=role, content=content, id=id)
def create_tool_calls(called_functions: list[FunctionCallInfo],
*,
text: str = '') ‑> ChatMessage-
Expand source code
@staticmethod def create_tool_calls( called_functions: list[function_context.FunctionCallInfo], *, text: str = "", ) -> "ChatMessage": return ChatMessage(role="assistant", tool_calls=called_functions, content=text)
def create_tool_from_called_function(called_function: CalledFunction) ‑> ChatMessage
-
Expand source code
@staticmethod def create_tool_from_called_function( called_function: function_context.CalledFunction, ) -> "ChatMessage": if not called_function.task.done(): raise ValueError("cannot create a tool result from a running ai function") tool_exception: Exception | None = None try: content = called_function.task.result() except BaseException as e: if isinstance(e, Exception): tool_exception = e content = f"Error: {e}" return ChatMessage( role="tool", name=called_function.call_info.function_info.name, content=content, tool_call_id=called_function.call_info.tool_call_id, tool_exception=tool_exception, )
Methods
def copy(self)
-
Expand source code
def copy(self): content = self.content if isinstance(content, list): content = content.copy() tool_calls = self.tool_calls if tool_calls is not None: tool_calls = tool_calls.copy() copied_msg = ChatMessage( role=self.role, id=self.id, name=self.name, content=content, tool_calls=tool_calls, tool_call_id=self.tool_call_id, ) copied_msg._metadata = self._metadata return copied_msg
class Choice (delta: ChoiceDelta,
index: int = 0)-
Expand source code
@dataclass class Choice: delta: ChoiceDelta index: int = 0
Choice(delta: 'ChoiceDelta', index: 'int' = 0)
Class variables
var delta : ChoiceDelta
var index : int
class ChoiceDelta (role: ChatRole,
content: str | None = None,
tool_calls: list[FunctionCallInfo] | None = None)-
Expand source code
@dataclass class ChoiceDelta: role: ChatRole content: str | None = None tool_calls: list[function_context.FunctionCallInfo] | None = None
ChoiceDelta(role: 'ChatRole', content: 'str | None' = None, tool_calls: 'list[function_context.FunctionCallInfo] | None' = None)
Class variables
var content : str | None
var role : Literal['system', 'user', 'assistant', 'tool']
var tool_calls : list[FunctionCallInfo] | None
class CompletionUsage (completion_tokens: int, prompt_tokens: int, total_tokens: int)
-
Expand source code
@dataclass class CompletionUsage: completion_tokens: int prompt_tokens: int total_tokens: int
CompletionUsage(completion_tokens: 'int', prompt_tokens: 'int', total_tokens: 'int')
Class variables
var completion_tokens : int
var prompt_tokens : int
var total_tokens : int
class FallbackAdapter (llm: list[LLM],
*,
attempt_timeout: float = 10.0,
max_retry_per_llm: int = 1,
retry_interval: float = 5)-
Expand source code
class FallbackAdapter( LLM[Literal["llm_availability_changed"]], ): def __init__( self, llm: list[LLM], *, attempt_timeout: float = 10.0, max_retry_per_llm: int = 1, retry_interval: float = 5, ) -> None: if len(llm) < 1: raise ValueError("at least one LLM instance must be provided.") super().__init__() self._llm_instances = llm self._attempt_timeout = attempt_timeout self._max_retry_per_llm = max_retry_per_llm self._retry_interval = retry_interval self._status = [ _LLMStatus(available=True, recovering_task=None) for _ in self._llm_instances ] def chat( self, *, chat_ctx: ChatContext, conn_options: APIConnectOptions = DEFAULT_FALLBACK_API_CONNECT_OPTIONS, fnc_ctx: FunctionContext | None = None, temperature: float | None = None, n: int | None = 1, parallel_tool_calls: bool | None = None, tool_choice: Union[ToolChoice, Literal["auto", "required", "none"]] | None = None, ) -> "LLMStream": return FallbackLLMStream( llm=self, conn_options=conn_options, chat_ctx=chat_ctx, fnc_ctx=fnc_ctx, temperature=temperature, n=n, parallel_tool_calls=parallel_tool_calls, tool_choice=tool_choice, )
Helper class that provides a standard way to create an ABC using inheritance.
Ancestors
- LLM
- abc.ABC
- EventEmitter
- typing.Generic
Methods
def chat(self,
*,
chat_ctx: ChatContext,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=0, retry_interval=5.0, timeout=10.0),
fnc_ctx: FunctionContext | None = None,
temperature: float | None = None,
n: int | None = 1,
parallel_tool_calls: bool | None = None,
tool_choice: "Union[ToolChoice, Literal['auto', 'required', 'none']] | None" = None) ‑> LLMStream-
Expand source code
def chat( self, *, chat_ctx: ChatContext, conn_options: APIConnectOptions = DEFAULT_FALLBACK_API_CONNECT_OPTIONS, fnc_ctx: FunctionContext | None = None, temperature: float | None = None, n: int | None = 1, parallel_tool_calls: bool | None = None, tool_choice: Union[ToolChoice, Literal["auto", "required", "none"]] | None = None, ) -> "LLMStream": return FallbackLLMStream( llm=self, conn_options=conn_options, chat_ctx=chat_ctx, fnc_ctx=fnc_ctx, temperature=temperature, n=n, parallel_tool_calls=parallel_tool_calls, tool_choice=tool_choice, )
Inherited members
class FunctionArgInfo (name: str, description: str, type: type, default: Any, choices: tuple | None)
-
Expand source code
@dataclass(frozen=True) class FunctionArgInfo: name: str description: str type: type default: Any choices: tuple | None
FunctionArgInfo(name: 'str', description: 'str', type: 'type', default: 'Any', choices: 'tuple | None')
Class variables
var choices : tuple | None
var default : Any
var description : str
var name : str
var type : type
class FunctionCallInfo (tool_call_id: str,
function_info: FunctionInfo,
raw_arguments: str,
arguments: dict[str, Any])-
Expand source code
@dataclass(frozen=True) class FunctionCallInfo: tool_call_id: str function_info: FunctionInfo raw_arguments: str arguments: dict[str, Any] def execute(self) -> CalledFunction: function_info = self.function_info func = functools.partial(function_info.callable, **self.arguments) if asyncio.iscoroutinefunction(function_info.callable): task = asyncio.create_task(func()) else: task = asyncio.create_task(asyncio.to_thread(func)) called_fnc = CalledFunction(call_info=self, task=task) def _on_done(fut): try: called_fnc.result = fut.result() except BaseException as e: called_fnc.exception = e task.add_done_callback(_on_done) return called_fnc
FunctionCallInfo(tool_call_id: 'str', function_info: 'FunctionInfo', raw_arguments: 'str', arguments: 'dict[str, Any]')
Class variables
var arguments : dict[str, typing.Any]
var function_info : FunctionInfo
var raw_arguments : str
var tool_call_id : str
Methods
def execute(self) ‑> CalledFunction
-
Expand source code
def execute(self) -> CalledFunction: function_info = self.function_info func = functools.partial(function_info.callable, **self.arguments) if asyncio.iscoroutinefunction(function_info.callable): task = asyncio.create_task(func()) else: task = asyncio.create_task(asyncio.to_thread(func)) called_fnc = CalledFunction(call_info=self, task=task) def _on_done(fut): try: called_fnc.result = fut.result() except BaseException as e: called_fnc.exception = e task.add_done_callback(_on_done) return called_fnc
class FunctionContext
-
Expand source code
class FunctionContext: def __init__(self) -> None: self._fncs = dict[str, FunctionInfo]() for _, member in inspect.getmembers(self, predicate=inspect.ismethod): if hasattr(member, METADATA_ATTR): self._register_ai_function(member) def ai_callable( self, *, name: str | None = None, description: str | _UseDocMarker | None = None, auto_retry: bool = True, ) -> Callable: def deco(f): _set_metadata(f, name=name, desc=description, auto_retry=auto_retry) self._register_ai_function(f) return deco def _register_ai_function(self, fnc: Callable) -> None: if not hasattr(fnc, METADATA_ATTR): logger.warning(f"function {fnc.__name__} does not have ai metadata") return metadata: _AIFncMetadata = getattr(fnc, METADATA_ATTR) fnc_name = metadata.name if fnc_name in self._fncs: raise ValueError(f"duplicate ai_callable name: {fnc_name}") sig = inspect.signature(fnc) # get_type_hints with include_extra=True is needed when using Annotated # using typing.get_args with param.Annotated is returning an empty tuple for some reason type_hints = typing.get_type_hints( fnc, include_extras=True ) # Annotated[T, ...] -> T args = dict[str, FunctionArgInfo]() for name, param in sig.parameters.items(): if param.kind not in ( inspect.Parameter.POSITIONAL_OR_KEYWORD, inspect.Parameter.KEYWORD_ONLY, ): raise ValueError(f"{fnc_name}: unsupported parameter kind {param.kind}") inner_th, type_info = _extract_types(type_hints[name]) if not is_type_supported(inner_th): raise ValueError( f"{fnc_name}: unsupported type {inner_th} for parameter {name}" ) desc = type_info.description if type_info else "" choices = type_info.choices if type_info else () if ( isinstance(inner_th, type) and issubclass(inner_th, enum.Enum) and not choices ): # the enum must be a str or int (and at least one value) # this is verified by is_type_supported choices = tuple([item.value for item in inner_th]) inner_th = type(choices[0]) args[name] = FunctionArgInfo( name=name, description=desc, type=inner_th, default=param.default, choices=choices, ) self._fncs[metadata.name] = FunctionInfo( name=metadata.name, description=metadata.description, auto_retry=metadata.auto_retry, callable=fnc, arguments=args, ) @property def ai_functions(self) -> dict[str, FunctionInfo]: return self._fncs
Instance variables
prop ai_functions : dict[str, FunctionInfo]
-
Expand source code
@property def ai_functions(self) -> dict[str, FunctionInfo]: return self._fncs
Methods
def ai_callable(self,
*,
name: str | None = None,
description: str | _UseDocMarker | None = None,
auto_retry: bool = True) ‑> Callable-
Expand source code
def ai_callable( self, *, name: str | None = None, description: str | _UseDocMarker | None = None, auto_retry: bool = True, ) -> Callable: def deco(f): _set_metadata(f, name=name, desc=description, auto_retry=auto_retry) self._register_ai_function(f) return deco
class FunctionInfo (name: str,
description: str,
auto_retry: bool,
callable: Callable,
arguments: dict[str, FunctionArgInfo])-
Expand source code
@dataclass(frozen=True) class FunctionInfo: name: str description: str auto_retry: bool callable: Callable arguments: dict[str, FunctionArgInfo]
FunctionInfo(name: 'str', description: 'str', auto_retry: 'bool', callable: 'Callable', arguments: 'dict[str, FunctionArgInfo]')
Class variables
var arguments : dict[str, FunctionArgInfo]
var auto_retry : bool
var callable : Callable
var description : str
var name : str
class LLM
-
Expand source code
class LLM( ABC, rtc.EventEmitter[Union[Literal["metrics_collected"], TEvent]], Generic[TEvent], ): def __init__(self) -> None: super().__init__() self._capabilities = LLMCapabilities() self._label = f"{type(self).__module__}.{type(self).__name__}" @property def label(self) -> str: return self._label @abstractmethod def chat( self, *, chat_ctx: ChatContext, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, fnc_ctx: function_context.FunctionContext | None = None, temperature: float | None = None, n: int | None = None, parallel_tool_calls: bool | None = None, tool_choice: Union[ToolChoice, Literal["auto", "required", "none"]] | None = None, ) -> "LLMStream": ... @property def capabilities(self) -> LLMCapabilities: return self._capabilities async def aclose(self) -> None: ... async def __aenter__(self) -> LLM: return self async def __aexit__( self, exc_type: type[BaseException] | None, exc: BaseException | None, exc_tb: TracebackType | None, ) -> None: await self.aclose()
Helper class that provides a standard way to create an ABC using inheritance.
Ancestors
- abc.ABC
- EventEmitter
- typing.Generic
Subclasses
- FallbackAdapter
- livekit.plugins.anthropic.llm.LLM
- AssistantLLM
- livekit.plugins.openai.llm.LLM
Instance variables
prop capabilities : LLMCapabilities
-
Expand source code
@property def capabilities(self) -> LLMCapabilities: return self._capabilities
prop label : str
-
Expand source code
@property def label(self) -> str: return self._label
Methods
async def aclose(self) ‑> None
-
Expand source code
async def aclose(self) -> None: ...
def chat(self,
*,
chat_ctx: ChatContext,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=5.0, timeout=10.0),
fnc_ctx: FunctionContext | None = None,
temperature: float | None = None,
n: int | None = None,
parallel_tool_calls: bool | None = None,
tool_choice: "Union[ToolChoice, Literal['auto', 'required', 'none']] | None" = None) ‑> LLMStream-
Expand source code
@abstractmethod def chat( self, *, chat_ctx: ChatContext, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, fnc_ctx: function_context.FunctionContext | None = None, temperature: float | None = None, n: int | None = None, parallel_tool_calls: bool | None = None, tool_choice: Union[ToolChoice, Literal["auto", "required", "none"]] | None = None, ) -> "LLMStream": ...
Inherited members
class LLMCapabilities (supports_choices_on_int: bool = True)
-
Expand source code
@dataclass class LLMCapabilities: supports_choices_on_int: bool = True
LLMCapabilities(supports_choices_on_int: 'bool' = True)
Class variables
var supports_choices_on_int : bool
class LLMStream (llm: LLM,
*,
chat_ctx: ChatContext,
fnc_ctx: FunctionContext | None,
conn_options: APIConnectOptions)-
Expand source code
class LLMStream(ABC): def __init__( self, llm: LLM, *, chat_ctx: ChatContext, fnc_ctx: function_context.FunctionContext | None, conn_options: APIConnectOptions, ) -> None: self._llm = llm self._chat_ctx = chat_ctx self._fnc_ctx = fnc_ctx self._conn_options = conn_options self._event_ch = aio.Chan[ChatChunk]() self._event_aiter, monitor_aiter = aio.itertools.tee(self._event_ch, 2) self._metrics_task = asyncio.create_task( self._metrics_monitor_task(monitor_aiter), name="LLM._metrics_task" ) self._task = asyncio.create_task(self._main_task()) self._task.add_done_callback(lambda _: self._event_ch.close()) self._function_calls_info: list[function_context.FunctionCallInfo] = [] self._function_tasks = set[asyncio.Task[Any]]() @abstractmethod async def _run(self) -> None: ... async def _main_task(self) -> None: for i in range(self._conn_options.max_retry + 1): try: return await self._run() except APIError as e: if self._conn_options.max_retry == 0: raise elif i == self._conn_options.max_retry: raise APIConnectionError( f"failed to generate LLM completion after {self._conn_options.max_retry + 1} attempts", ) from e else: logger.warning( f"failed to generate LLM completion, retrying in {self._conn_options.retry_interval}s", exc_info=e, extra={ "llm": self._llm._label, "attempt": i + 1, }, ) await asyncio.sleep(self._conn_options.retry_interval) @utils.log_exceptions(logger=logger) async def _metrics_monitor_task( self, event_aiter: AsyncIterable[ChatChunk] ) -> None: start_time = time.perf_counter() ttft = -1.0 request_id = "" usage: CompletionUsage | None = None async for ev in event_aiter: request_id = ev.request_id if ttft == -1.0: ttft = time.perf_counter() - start_time if ev.usage is not None: usage = ev.usage duration = time.perf_counter() - start_time metrics = LLMMetrics( timestamp=time.time(), request_id=request_id, ttft=ttft, duration=duration, cancelled=self._task.cancelled(), label=self._llm._label, completion_tokens=usage.completion_tokens if usage else 0, prompt_tokens=usage.prompt_tokens if usage else 0, total_tokens=usage.total_tokens if usage else 0, tokens_per_second=usage.completion_tokens / duration if usage else 0.0, error=None, ) self._llm.emit("metrics_collected", metrics) @property def function_calls(self) -> list[function_context.FunctionCallInfo]: """List of called functions from this stream.""" return self._function_calls_info @property def chat_ctx(self) -> ChatContext: """The initial chat context of this stream.""" return self._chat_ctx @property def fnc_ctx(self) -> function_context.FunctionContext | None: """The function context of this stream.""" return self._fnc_ctx def execute_functions(self) -> list[function_context.CalledFunction]: """Execute all functions concurrently of this stream.""" called_functions: list[function_context.CalledFunction] = [] for fnc_info in self._function_calls_info: called_fnc = fnc_info.execute() self._function_tasks.add(called_fnc.task) called_fnc.task.add_done_callback(self._function_tasks.remove) called_functions.append(called_fnc) return called_functions async def aclose(self) -> None: await aio.gracefully_cancel(self._task) await utils.aio.gracefully_cancel(*self._function_tasks) await self._metrics_task async def __anext__(self) -> ChatChunk: try: val = await self._event_aiter.__anext__() except StopAsyncIteration: if not self._task.cancelled() and (exc := self._task.exception()): raise exc from None raise StopAsyncIteration return val def __aiter__(self) -> AsyncIterator[ChatChunk]: return self async def __aenter__(self) -> LLMStream: return self async def __aexit__( self, exc_type: type[BaseException] | None, exc: BaseException | None, exc_tb: TracebackType | None, ) -> None: await self.aclose()
Helper class that provides a standard way to create an ABC using inheritance.
Ancestors
- abc.ABC
Subclasses
- FallbackLLMStream
- livekit.plugins.anthropic.llm.LLMStream
- AssistantLLMStream
- livekit.plugins.openai.llm.LLMStream
Instance variables
prop chat_ctx : ChatContext
-
Expand source code
@property def chat_ctx(self) -> ChatContext: """The initial chat context of this stream.""" return self._chat_ctx
The initial chat context of this stream.
prop fnc_ctx : FunctionContext | None
-
Expand source code
@property def fnc_ctx(self) -> function_context.FunctionContext | None: """The function context of this stream.""" return self._fnc_ctx
The function context of this stream.
prop function_calls : list[FunctionCallInfo]
-
Expand source code
@property def function_calls(self) -> list[function_context.FunctionCallInfo]: """List of called functions from this stream.""" return self._function_calls_info
List of called functions from this stream.
Methods
async def aclose(self) ‑> None
-
Expand source code
async def aclose(self) -> None: await aio.gracefully_cancel(self._task) await utils.aio.gracefully_cancel(*self._function_tasks) await self._metrics_task
def execute_functions(self) ‑> list[CalledFunction]
-
Expand source code
def execute_functions(self) -> list[function_context.CalledFunction]: """Execute all functions concurrently of this stream.""" called_functions: list[function_context.CalledFunction] = [] for fnc_info in self._function_calls_info: called_fnc = fnc_info.execute() self._function_tasks.add(called_fnc.task) called_fnc.task.add_done_callback(self._function_tasks.remove) called_functions.append(called_fnc) return called_functions
Execute all functions concurrently of this stream.
class ToolChoice (type: "Literal['function']", name: str)
-
Expand source code
@dataclass class ToolChoice: type: Literal["function"] name: str
ToolChoice(type: "Literal['function']", name: 'str')
Class variables
var name : str
var type : Literal['function']
class TypeInfo (description: str, choices: tuple | list[Any] = ())
-
Expand source code
@dataclass(frozen=True, init=False) class TypeInfo: description: str choices: tuple def __init__(self, description: str, choices: tuple | list[Any] = tuple()) -> None: object.__setattr__(self, "description", description) if isinstance(choices, list): choices = tuple(choices) object.__setattr__(self, "choices", choices)
TypeInfo(description: 'str', choices: 'tuple | list[Any]' = ()) -> 'None'
Class variables
var choices : tuple
var description : str