Module livekit.agents.llm

Sub-modules

livekit.agents.llm.chat_context
livekit.agents.llm.fallback_adapter
livekit.agents.llm.function_context
livekit.agents.llm.llm

Functions

def _create_ai_function_info(fnc_ctx: FunctionContext,
tool_call_id: str,
fnc_name: str,
raw_arguments: str) ‑> FunctionCallInfo
Expand source code
def _create_ai_function_info(
    fnc_ctx: FunctionContext,
    tool_call_id: str,
    fnc_name: str,
    raw_arguments: str,  # JSON string
) -> FunctionCallInfo:
    if fnc_name not in fnc_ctx.ai_functions:
        raise ValueError(f"AI function {fnc_name} not found")

    parsed_arguments: dict[str, Any] = {}
    try:
        if raw_arguments:  # ignore empty string
            parsed_arguments = json.loads(raw_arguments)
    except json.JSONDecodeError:
        raise ValueError(
            f"AI function {fnc_name} received invalid JSON arguments - {raw_arguments}"
        )

    fnc_info = fnc_ctx.ai_functions[fnc_name]

    # Ensure all necessary arguments are present and of the correct type.
    sanitized_arguments: dict[str, Any] = {}
    for arg_info in fnc_info.arguments.values():
        if arg_info.name not in parsed_arguments:
            if arg_info.default is inspect.Parameter.empty:
                raise ValueError(
                    f"AI function {fnc_name} missing required argument {arg_info.name}"
                )
            continue

        arg_value = parsed_arguments[arg_info.name]
        is_optional, inner_th = _is_optional_type(arg_info.type)

        if typing.get_origin(inner_th) is not None:
            if not isinstance(arg_value, list):
                raise ValueError(
                    f"AI function {fnc_name} argument {arg_info.name} should be a list"
                )

            inner_type = typing.get_args(inner_th)[0]
            sanitized_value = [
                _sanitize_primitive(
                    value=v,
                    expected_type=inner_type,
                    choices=arg_info.choices,
                )
                for v in arg_value
            ]
        else:
            sanitized_value = _sanitize_primitive(
                value=arg_value,
                expected_type=inner_th,
                choices=arg_info.choices,
            )

        sanitized_arguments[arg_info.name] = sanitized_value

    return FunctionCallInfo(
        tool_call_id=tool_call_id,
        raw_arguments=raw_arguments,
        function_info=fnc_info,
        arguments=sanitized_arguments,
    )
def ai_callable(*,
name: str | None = None,
description: str | _UseDocMarker | None = None,
auto_retry: bool = False) ‑> Callable
Expand source code
def ai_callable(
    *,
    name: str | None = None,
    description: str | _UseDocMarker | None = None,
    auto_retry: bool = False,
) -> Callable:
    def deco(f):
        _set_metadata(f, name=name, desc=description, auto_retry=auto_retry)
        return f

    return deco

Classes

class AvailabilityChangedEvent (llm: LLM,
available: bool)
Expand source code
@dataclass
class AvailabilityChangedEvent:
    llm: LLM
    available: bool

AvailabilityChangedEvent(llm: 'LLM', available: 'bool')

Class variables

var available : bool
var llmLLM
class CalledFunction (call_info: FunctionCallInfo,
task: asyncio.Task[Any],
result: Any | None = None,
exception: BaseException | None = None)
Expand source code
@dataclass
class CalledFunction:
    call_info: FunctionCallInfo
    task: asyncio.Task[Any]
    result: Any | None = None
    exception: BaseException | None = None

CalledFunction(call_info: 'FunctionCallInfo', task: 'asyncio.Task[Any]', result: 'Any | None' = None, exception: 'BaseException | None' = None)

Class variables

var call_infoFunctionCallInfo
var exception : BaseException | None
var result : typing.Any | None
var task : _asyncio.Task[typing.Any]
class ChatAudio (frame: rtc.AudioFrame | list[rtc.AudioFrame])
Expand source code
@dataclass
class ChatAudio:
    frame: rtc.AudioFrame | list[rtc.AudioFrame]

ChatAudio(frame: 'rtc.AudioFrame | list[rtc.AudioFrame]')

Class variables

var frameAudioFrame | list[AudioFrame]
class ChatChunk (request_id: str,
choices: list[Choice] = <factory>,
usage: CompletionUsage | None = None)
Expand source code
@dataclass
class ChatChunk:
    request_id: str
    choices: list[Choice] = field(default_factory=list)
    usage: CompletionUsage | None = None

ChatChunk(request_id: 'str', choices: 'list[Choice]' = , usage: 'CompletionUsage | None' = None)

Class variables

var choices : list[Choice]
var request_id : str
var usageCompletionUsage | None
class ChatContext (messages: list[ChatMessage] = <factory>)
Expand source code
@dataclass
class ChatContext:
    messages: list[ChatMessage] = field(default_factory=list)
    _metadata: dict[str, Any] = field(default_factory=dict, repr=False, init=False)

    def append(
        self, *, text: str = "", images: list[ChatImage] = [], role: ChatRole = "system"
    ) -> ChatContext:
        self.messages.append(ChatMessage.create(text=text, images=images, role=role))
        return self

    def copy(self) -> ChatContext:
        copied_chat_ctx = ChatContext(messages=[m.copy() for m in self.messages])
        copied_chat_ctx._metadata = self._metadata
        return copied_chat_ctx

ChatContext(messages: 'list[ChatMessage]' = )

Class variables

var messages : list[ChatMessage]

Methods

def append(self,
*,
text: str = '',
images: list[ChatImage] = [],
role: ChatRole = 'system') ‑> ChatContext
Expand source code
def append(
    self, *, text: str = "", images: list[ChatImage] = [], role: ChatRole = "system"
) -> ChatContext:
    self.messages.append(ChatMessage.create(text=text, images=images, role=role))
    return self
def copy(self) ‑> ChatContext
Expand source code
def copy(self) -> ChatContext:
    copied_chat_ctx = ChatContext(messages=[m.copy() for m in self.messages])
    copied_chat_ctx._metadata = self._metadata
    return copied_chat_ctx
class ChatImage (image: str | rtc.VideoFrame,
inference_width: int | None = None,
inference_height: int | None = None,
inference_detail: "Literal['auto', 'high', 'low']" = 'auto')
Expand source code
@dataclass
class ChatImage:
    """
    ChatImage is used to input images into the ChatContext on supported LLM providers / plugins.

    You may need to consult your LLM provider's documentation on supported URL types.

    ```python
    # Pass a VideoFrame directly, which will be automatically converted to a JPEG data URL internally
    async for event in rtc.VideoStream(video_track):
        chat_image = ChatImage(image=event.frame)
        # this instance is now available for your ChatContext

    # Encode your VideoFrame yourself for more control, and pass the result as a data URL (see EncodeOptions for more details)
    from livekit.agents.utils.images import encode, EncodeOptions, ResizeOptions

    image_bytes = encode(
        event.frame,
        EncodeOptions(
            format="PNG",
            resize_options=ResizeOptions(
                width=512, height=512, strategy="scale_aspect_fit"
            ),
        ),
    )
    chat_image = ChatImage(
        image=f"data:image/png;base64,{base64.b64encode(image_bytes).decode('utf-8')}"
    )

    # With an external URL
    chat_image = ChatImage(image="https://example.com/image.jpg")
    ```
    """

    image: str | rtc.VideoFrame
    """
    Either a string URL or a VideoFrame object
    """
    inference_width: int | None = None
    """
    Resizing parameter for rtc.VideoFrame inputs (ignored for URL images)
    """
    inference_height: int | None = None
    """
    Resizing parameter for rtc.VideoFrame inputs (ignored for URL images)
    """
    inference_detail: Literal["auto", "high", "low"] = "auto"
    """
    Detail parameter for LLM provider, if supported.
    
    Currently only supported by OpenAI (see https://platform.openai.com/docs/guides/vision?lang=node#low-or-high-fidelity-image-understanding)
    """
    _cache: dict[Any, Any] = field(default_factory=dict, repr=False, init=False)
    """
    _cache is used internally by LLM implementations to store a processed version of the image
    for later use.
    """

ChatImage is used to input images into the ChatContext on supported LLM providers / plugins.

You may need to consult your LLM provider's documentation on supported URL types.

# Pass a VideoFrame directly, which will be automatically converted to a JPEG data URL internally
async for event in rtc.VideoStream(video_track):
    chat_image = ChatImage(image=event.frame)
    # this instance is now available for your ChatContext

# Encode your VideoFrame yourself for more control, and pass the result as a data URL (see EncodeOptions for more details)
from livekit.agents.utils.images import encode, EncodeOptions, ResizeOptions

image_bytes = encode(
    event.frame,
    EncodeOptions(
        format="PNG",
        resize_options=ResizeOptions(
            width=512, height=512, strategy="scale_aspect_fit"
        ),
    ),
)
chat_image = ChatImage(
    image=f"data:image/png;base64,{base64.b64encode(image_bytes).decode('utf-8')}"
)

# With an external URL
chat_image = ChatImage(image="https://example.com/image.jpg")

Class variables

var image : str | VideoFrame

Either a string URL or a VideoFrame object

var inference_detail : Literal['auto', 'high', 'low']

Detail parameter for LLM provider, if supported.

Currently only supported by OpenAI (see https://platform.openai.com/docs/guides/vision?lang=node#low-or-high-fidelity-image-understanding)

var inference_height : int | None

Resizing parameter for rtc.VideoFrame inputs (ignored for URL images)

var inference_width : int | None

Resizing parameter for rtc.VideoFrame inputs (ignored for URL images)

class ChatMessage (role: ChatRole,
id: str = <factory>,
name: str | None = None,
content: ChatContent | list[ChatContent] | None = None,
tool_calls: list[FunctionCallInfo] | None = None,
tool_call_id: str | None = None,
tool_exception: Exception | None = None)
Expand source code
@dataclass
class ChatMessage:
    role: ChatRole
    id: str = field(
        default_factory=lambda: utils.shortuuid("item_")
    )  # used by the OAI realtime API
    name: str | None = None
    content: ChatContent | list[ChatContent] | None = None
    tool_calls: list[function_context.FunctionCallInfo] | None = None
    tool_call_id: str | None = None
    tool_exception: Exception | None = None
    _metadata: dict[str, Any] = field(default_factory=dict, repr=False, init=False)

    @staticmethod
    def create_tool_from_called_function(
        called_function: function_context.CalledFunction,
    ) -> "ChatMessage":
        if not called_function.task.done():
            raise ValueError("cannot create a tool result from a running ai function")

        tool_exception: Exception | None = None
        try:
            content = called_function.task.result()
        except BaseException as e:
            if isinstance(e, Exception):
                tool_exception = e
            content = f"Error: {e}"

        return ChatMessage(
            role="tool",
            name=called_function.call_info.function_info.name,
            content=content,
            tool_call_id=called_function.call_info.tool_call_id,
            tool_exception=tool_exception,
        )

    @staticmethod
    def create_tool_calls(
        called_functions: list[function_context.FunctionCallInfo],
        *,
        text: str = "",
    ) -> "ChatMessage":
        return ChatMessage(role="assistant", tool_calls=called_functions, content=text)

    @staticmethod
    def create(
        *,
        text: str = "",
        images: list[ChatImage] = [],
        role: ChatRole = "system",
        id: str | None = None,
    ) -> "ChatMessage":
        id = id or utils.shortuuid("item_")
        if len(images) == 0:
            return ChatMessage(role=role, content=text, id=id)
        else:
            content: list[ChatContent] = []
            if text:
                content.append(text)

            if len(images) > 0:
                content.extend(images)

            return ChatMessage(role=role, content=content, id=id)

    def copy(self):
        content = self.content
        if isinstance(content, list):
            content = content.copy()

        tool_calls = self.tool_calls
        if tool_calls is not None:
            tool_calls = tool_calls.copy()

        copied_msg = ChatMessage(
            role=self.role,
            id=self.id,
            name=self.name,
            content=content,
            tool_calls=tool_calls,
            tool_call_id=self.tool_call_id,
        )
        copied_msg._metadata = self._metadata
        return copied_msg

ChatMessage(role: 'ChatRole', id: 'str' = , name: 'str | None' = None, content: 'ChatContent | list[ChatContent] | None' = None, tool_calls: 'list[function_context.FunctionCallInfo] | None' = None, tool_call_id: 'str | None' = None, tool_exception: 'Exception | None' = None)

Class variables

var content : str | ChatImage | ChatAudio | list[str | ChatImage | ChatAudio] | None
var id : str
var name : str | None
var role : Literal['system', 'user', 'assistant', 'tool']
var tool_call_id : str | None
var tool_calls : list[FunctionCallInfo] | None
var tool_exception : Exception | None

Static methods

def create(*,
text: str = '',
images: list[ChatImage] = [],
role: ChatRole = 'system',
id: str | None = None) ‑> ChatMessage
Expand source code
@staticmethod
def create(
    *,
    text: str = "",
    images: list[ChatImage] = [],
    role: ChatRole = "system",
    id: str | None = None,
) -> "ChatMessage":
    id = id or utils.shortuuid("item_")
    if len(images) == 0:
        return ChatMessage(role=role, content=text, id=id)
    else:
        content: list[ChatContent] = []
        if text:
            content.append(text)

        if len(images) > 0:
            content.extend(images)

        return ChatMessage(role=role, content=content, id=id)
def create_tool_calls(called_functions: list[FunctionCallInfo],
*,
text: str = '') ‑> ChatMessage
Expand source code
@staticmethod
def create_tool_calls(
    called_functions: list[function_context.FunctionCallInfo],
    *,
    text: str = "",
) -> "ChatMessage":
    return ChatMessage(role="assistant", tool_calls=called_functions, content=text)
def create_tool_from_called_function(called_function: CalledFunction) ‑> ChatMessage
Expand source code
@staticmethod
def create_tool_from_called_function(
    called_function: function_context.CalledFunction,
) -> "ChatMessage":
    if not called_function.task.done():
        raise ValueError("cannot create a tool result from a running ai function")

    tool_exception: Exception | None = None
    try:
        content = called_function.task.result()
    except BaseException as e:
        if isinstance(e, Exception):
            tool_exception = e
        content = f"Error: {e}"

    return ChatMessage(
        role="tool",
        name=called_function.call_info.function_info.name,
        content=content,
        tool_call_id=called_function.call_info.tool_call_id,
        tool_exception=tool_exception,
    )

Methods

def copy(self)
Expand source code
def copy(self):
    content = self.content
    if isinstance(content, list):
        content = content.copy()

    tool_calls = self.tool_calls
    if tool_calls is not None:
        tool_calls = tool_calls.copy()

    copied_msg = ChatMessage(
        role=self.role,
        id=self.id,
        name=self.name,
        content=content,
        tool_calls=tool_calls,
        tool_call_id=self.tool_call_id,
    )
    copied_msg._metadata = self._metadata
    return copied_msg
class Choice (delta: ChoiceDelta,
index: int = 0)
Expand source code
@dataclass
class Choice:
    delta: ChoiceDelta
    index: int = 0

Choice(delta: 'ChoiceDelta', index: 'int' = 0)

Class variables

var deltaChoiceDelta
var index : int
class ChoiceDelta (role: ChatRole,
content: str | None = None,
tool_calls: list[FunctionCallInfo] | None = None)
Expand source code
@dataclass
class ChoiceDelta:
    role: ChatRole
    content: str | None = None
    tool_calls: list[function_context.FunctionCallInfo] | None = None

ChoiceDelta(role: 'ChatRole', content: 'str | None' = None, tool_calls: 'list[function_context.FunctionCallInfo] | None' = None)

Class variables

var content : str | None
var role : Literal['system', 'user', 'assistant', 'tool']
var tool_calls : list[FunctionCallInfo] | None
class CompletionUsage (completion_tokens: int, prompt_tokens: int, total_tokens: int)
Expand source code
@dataclass
class CompletionUsage:
    completion_tokens: int
    prompt_tokens: int
    total_tokens: int

CompletionUsage(completion_tokens: 'int', prompt_tokens: 'int', total_tokens: 'int')

Class variables

var completion_tokens : int
var prompt_tokens : int
var total_tokens : int
class FallbackAdapter (llm: list[LLM],
*,
attempt_timeout: float = 10.0,
max_retry_per_llm: int = 1,
retry_interval: float = 5)
Expand source code
class FallbackAdapter(
    LLM[Literal["llm_availability_changed"]],
):
    def __init__(
        self,
        llm: list[LLM],
        *,
        attempt_timeout: float = 10.0,
        max_retry_per_llm: int = 1,
        retry_interval: float = 5,
    ) -> None:
        if len(llm) < 1:
            raise ValueError("at least one LLM instance must be provided.")

        super().__init__()

        self._llm_instances = llm
        self._attempt_timeout = attempt_timeout
        self._max_retry_per_llm = max_retry_per_llm
        self._retry_interval = retry_interval

        self._status = [
            _LLMStatus(available=True, recovering_task=None)
            for _ in self._llm_instances
        ]

    def chat(
        self,
        *,
        chat_ctx: ChatContext,
        conn_options: APIConnectOptions = DEFAULT_FALLBACK_API_CONNECT_OPTIONS,
        fnc_ctx: FunctionContext | None = None,
        temperature: float | None = None,
        n: int | None = 1,
        parallel_tool_calls: bool | None = None,
        tool_choice: Union[ToolChoice, Literal["auto", "required", "none"]]
        | None = None,
    ) -> "LLMStream":
        return FallbackLLMStream(
            llm=self,
            conn_options=conn_options,
            chat_ctx=chat_ctx,
            fnc_ctx=fnc_ctx,
            temperature=temperature,
            n=n,
            parallel_tool_calls=parallel_tool_calls,
            tool_choice=tool_choice,
        )

Helper class that provides a standard way to create an ABC using inheritance.

Ancestors

Methods

def chat(self,
*,
chat_ctx: ChatContext,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=0, retry_interval=5.0, timeout=10.0),
fnc_ctx: FunctionContext | None = None,
temperature: float | None = None,
n: int | None = 1,
parallel_tool_calls: bool | None = None,
tool_choice: "Union[ToolChoice, Literal['auto', 'required', 'none']] | None" = None) ‑> LLMStream
Expand source code
def chat(
    self,
    *,
    chat_ctx: ChatContext,
    conn_options: APIConnectOptions = DEFAULT_FALLBACK_API_CONNECT_OPTIONS,
    fnc_ctx: FunctionContext | None = None,
    temperature: float | None = None,
    n: int | None = 1,
    parallel_tool_calls: bool | None = None,
    tool_choice: Union[ToolChoice, Literal["auto", "required", "none"]]
    | None = None,
) -> "LLMStream":
    return FallbackLLMStream(
        llm=self,
        conn_options=conn_options,
        chat_ctx=chat_ctx,
        fnc_ctx=fnc_ctx,
        temperature=temperature,
        n=n,
        parallel_tool_calls=parallel_tool_calls,
        tool_choice=tool_choice,
    )

Inherited members

class FunctionArgInfo (name: str, description: str, type: type, default: Any, choices: tuple | None)
Expand source code
@dataclass(frozen=True)
class FunctionArgInfo:
    name: str
    description: str
    type: type
    default: Any
    choices: tuple | None

FunctionArgInfo(name: 'str', description: 'str', type: 'type', default: 'Any', choices: 'tuple | None')

Class variables

var choices : tuple | None
var default : Any
var description : str
var name : str
var type : type
class FunctionCallInfo (tool_call_id: str,
function_info: FunctionInfo,
raw_arguments: str,
arguments: dict[str, Any])
Expand source code
@dataclass(frozen=True)
class FunctionCallInfo:
    tool_call_id: str
    function_info: FunctionInfo
    raw_arguments: str
    arguments: dict[str, Any]

    def execute(self) -> CalledFunction:
        function_info = self.function_info
        func = functools.partial(function_info.callable, **self.arguments)
        if asyncio.iscoroutinefunction(function_info.callable):
            task = asyncio.create_task(func())
        else:
            task = asyncio.create_task(asyncio.to_thread(func))

        called_fnc = CalledFunction(call_info=self, task=task)

        def _on_done(fut):
            try:
                called_fnc.result = fut.result()
            except BaseException as e:
                called_fnc.exception = e

        task.add_done_callback(_on_done)
        return called_fnc

FunctionCallInfo(tool_call_id: 'str', function_info: 'FunctionInfo', raw_arguments: 'str', arguments: 'dict[str, Any]')

Class variables

var arguments : dict[str, typing.Any]
var function_infoFunctionInfo
var raw_arguments : str
var tool_call_id : str

Methods

def execute(self) ‑> CalledFunction
Expand source code
def execute(self) -> CalledFunction:
    function_info = self.function_info
    func = functools.partial(function_info.callable, **self.arguments)
    if asyncio.iscoroutinefunction(function_info.callable):
        task = asyncio.create_task(func())
    else:
        task = asyncio.create_task(asyncio.to_thread(func))

    called_fnc = CalledFunction(call_info=self, task=task)

    def _on_done(fut):
        try:
            called_fnc.result = fut.result()
        except BaseException as e:
            called_fnc.exception = e

    task.add_done_callback(_on_done)
    return called_fnc
class FunctionContext
Expand source code
class FunctionContext:
    def __init__(self) -> None:
        self._fncs = dict[str, FunctionInfo]()

        for _, member in inspect.getmembers(self, predicate=inspect.ismethod):
            if hasattr(member, METADATA_ATTR):
                self._register_ai_function(member)

    def ai_callable(
        self,
        *,
        name: str | None = None,
        description: str | _UseDocMarker | None = None,
        auto_retry: bool = True,
    ) -> Callable:
        def deco(f):
            _set_metadata(f, name=name, desc=description, auto_retry=auto_retry)
            self._register_ai_function(f)

        return deco

    def _register_ai_function(self, fnc: Callable) -> None:
        if not hasattr(fnc, METADATA_ATTR):
            logger.warning(f"function {fnc.__name__} does not have ai metadata")
            return

        metadata: _AIFncMetadata = getattr(fnc, METADATA_ATTR)
        fnc_name = metadata.name
        if fnc_name in self._fncs:
            raise ValueError(f"duplicate ai_callable name: {fnc_name}")

        sig = inspect.signature(fnc)

        # get_type_hints with include_extra=True is needed when using Annotated
        # using typing.get_args with param.Annotated is returning an empty tuple for some reason
        type_hints = typing.get_type_hints(
            fnc, include_extras=True
        )  # Annotated[T, ...] -> T
        args = dict[str, FunctionArgInfo]()

        for name, param in sig.parameters.items():
            if param.kind not in (
                inspect.Parameter.POSITIONAL_OR_KEYWORD,
                inspect.Parameter.KEYWORD_ONLY,
            ):
                raise ValueError(f"{fnc_name}: unsupported parameter kind {param.kind}")

            inner_th, type_info = _extract_types(type_hints[name])

            if not is_type_supported(inner_th):
                raise ValueError(
                    f"{fnc_name}: unsupported type {inner_th} for parameter {name}"
                )

            desc = type_info.description if type_info else ""
            choices = type_info.choices if type_info else ()

            if (
                isinstance(inner_th, type)
                and issubclass(inner_th, enum.Enum)
                and not choices
            ):
                # the enum must be a str or int (and at least one value)
                # this is verified by is_type_supported
                choices = tuple([item.value for item in inner_th])
                inner_th = type(choices[0])

            args[name] = FunctionArgInfo(
                name=name,
                description=desc,
                type=inner_th,
                default=param.default,
                choices=choices,
            )

        self._fncs[metadata.name] = FunctionInfo(
            name=metadata.name,
            description=metadata.description,
            auto_retry=metadata.auto_retry,
            callable=fnc,
            arguments=args,
        )

    @property
    def ai_functions(self) -> dict[str, FunctionInfo]:
        return self._fncs

Instance variables

prop ai_functions : dict[str, FunctionInfo]
Expand source code
@property
def ai_functions(self) -> dict[str, FunctionInfo]:
    return self._fncs

Methods

def ai_callable(self,
*,
name: str | None = None,
description: str | _UseDocMarker | None = None,
auto_retry: bool = True) ‑> Callable
Expand source code
def ai_callable(
    self,
    *,
    name: str | None = None,
    description: str | _UseDocMarker | None = None,
    auto_retry: bool = True,
) -> Callable:
    def deco(f):
        _set_metadata(f, name=name, desc=description, auto_retry=auto_retry)
        self._register_ai_function(f)

    return deco
class FunctionInfo (name: str,
description: str,
auto_retry: bool,
callable: Callable,
arguments: dict[str, FunctionArgInfo])
Expand source code
@dataclass(frozen=True)
class FunctionInfo:
    name: str
    description: str
    auto_retry: bool
    callable: Callable
    arguments: dict[str, FunctionArgInfo]

FunctionInfo(name: 'str', description: 'str', auto_retry: 'bool', callable: 'Callable', arguments: 'dict[str, FunctionArgInfo]')

Class variables

var arguments : dict[str, FunctionArgInfo]
var auto_retry : bool
var callable : Callable
var description : str
var name : str
class LLM
Expand source code
class LLM(
    ABC,
    rtc.EventEmitter[Union[Literal["metrics_collected"], TEvent]],
    Generic[TEvent],
):
    def __init__(self) -> None:
        super().__init__()
        self._capabilities = LLMCapabilities()
        self._label = f"{type(self).__module__}.{type(self).__name__}"

    @property
    def label(self) -> str:
        return self._label

    @abstractmethod
    def chat(
        self,
        *,
        chat_ctx: ChatContext,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
        fnc_ctx: function_context.FunctionContext | None = None,
        temperature: float | None = None,
        n: int | None = None,
        parallel_tool_calls: bool | None = None,
        tool_choice: Union[ToolChoice, Literal["auto", "required", "none"]]
        | None = None,
    ) -> "LLMStream": ...

    @property
    def capabilities(self) -> LLMCapabilities:
        return self._capabilities

    async def aclose(self) -> None: ...

    async def __aenter__(self) -> LLM:
        return self

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> None:
        await self.aclose()

Helper class that provides a standard way to create an ABC using inheritance.

Ancestors

Subclasses

Instance variables

prop capabilitiesLLMCapabilities
Expand source code
@property
def capabilities(self) -> LLMCapabilities:
    return self._capabilities
prop label : str
Expand source code
@property
def label(self) -> str:
    return self._label

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None: ...
def chat(self,
*,
chat_ctx: ChatContext,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=5.0, timeout=10.0),
fnc_ctx: FunctionContext | None = None,
temperature: float | None = None,
n: int | None = None,
parallel_tool_calls: bool | None = None,
tool_choice: "Union[ToolChoice, Literal['auto', 'required', 'none']] | None" = None) ‑> LLMStream
Expand source code
@abstractmethod
def chat(
    self,
    *,
    chat_ctx: ChatContext,
    conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    fnc_ctx: function_context.FunctionContext | None = None,
    temperature: float | None = None,
    n: int | None = None,
    parallel_tool_calls: bool | None = None,
    tool_choice: Union[ToolChoice, Literal["auto", "required", "none"]]
    | None = None,
) -> "LLMStream": ...

Inherited members

class LLMCapabilities (supports_choices_on_int: bool = True)
Expand source code
@dataclass
class LLMCapabilities:
    supports_choices_on_int: bool = True

LLMCapabilities(supports_choices_on_int: 'bool' = True)

Class variables

var supports_choices_on_int : bool
class LLMStream (llm: LLM,
*,
chat_ctx: ChatContext,
fnc_ctx: FunctionContext | None,
conn_options: APIConnectOptions)
Expand source code
class LLMStream(ABC):
    def __init__(
        self,
        llm: LLM,
        *,
        chat_ctx: ChatContext,
        fnc_ctx: function_context.FunctionContext | None,
        conn_options: APIConnectOptions,
    ) -> None:
        self._llm = llm
        self._chat_ctx = chat_ctx
        self._fnc_ctx = fnc_ctx
        self._conn_options = conn_options

        self._event_ch = aio.Chan[ChatChunk]()
        self._event_aiter, monitor_aiter = aio.itertools.tee(self._event_ch, 2)
        self._metrics_task = asyncio.create_task(
            self._metrics_monitor_task(monitor_aiter), name="LLM._metrics_task"
        )

        self._task = asyncio.create_task(self._main_task())
        self._task.add_done_callback(lambda _: self._event_ch.close())

        self._function_calls_info: list[function_context.FunctionCallInfo] = []
        self._function_tasks = set[asyncio.Task[Any]]()

    @abstractmethod
    async def _run(self) -> None: ...

    async def _main_task(self) -> None:
        for i in range(self._conn_options.max_retry + 1):
            try:
                return await self._run()
            except APIError as e:
                if self._conn_options.max_retry == 0:
                    raise
                elif i == self._conn_options.max_retry:
                    raise APIConnectionError(
                        f"failed to generate LLM completion after {self._conn_options.max_retry + 1} attempts",
                    ) from e
                else:
                    logger.warning(
                        f"failed to generate LLM completion, retrying in {self._conn_options.retry_interval}s",
                        exc_info=e,
                        extra={
                            "llm": self._llm._label,
                            "attempt": i + 1,
                        },
                    )

                await asyncio.sleep(self._conn_options.retry_interval)

    @utils.log_exceptions(logger=logger)
    async def _metrics_monitor_task(
        self, event_aiter: AsyncIterable[ChatChunk]
    ) -> None:
        start_time = time.perf_counter()
        ttft = -1.0
        request_id = ""
        usage: CompletionUsage | None = None

        async for ev in event_aiter:
            request_id = ev.request_id
            if ttft == -1.0:
                ttft = time.perf_counter() - start_time

            if ev.usage is not None:
                usage = ev.usage

        duration = time.perf_counter() - start_time
        metrics = LLMMetrics(
            timestamp=time.time(),
            request_id=request_id,
            ttft=ttft,
            duration=duration,
            cancelled=self._task.cancelled(),
            label=self._llm._label,
            completion_tokens=usage.completion_tokens if usage else 0,
            prompt_tokens=usage.prompt_tokens if usage else 0,
            total_tokens=usage.total_tokens if usage else 0,
            tokens_per_second=usage.completion_tokens / duration if usage else 0.0,
            error=None,
        )
        self._llm.emit("metrics_collected", metrics)

    @property
    def function_calls(self) -> list[function_context.FunctionCallInfo]:
        """List of called functions from this stream."""
        return self._function_calls_info

    @property
    def chat_ctx(self) -> ChatContext:
        """The initial chat context of this stream."""
        return self._chat_ctx

    @property
    def fnc_ctx(self) -> function_context.FunctionContext | None:
        """The function context of this stream."""
        return self._fnc_ctx

    def execute_functions(self) -> list[function_context.CalledFunction]:
        """Execute all functions concurrently of this stream."""
        called_functions: list[function_context.CalledFunction] = []
        for fnc_info in self._function_calls_info:
            called_fnc = fnc_info.execute()
            self._function_tasks.add(called_fnc.task)
            called_fnc.task.add_done_callback(self._function_tasks.remove)
            called_functions.append(called_fnc)

        return called_functions

    async def aclose(self) -> None:
        await aio.gracefully_cancel(self._task)
        await utils.aio.gracefully_cancel(*self._function_tasks)
        await self._metrics_task

    async def __anext__(self) -> ChatChunk:
        try:
            val = await self._event_aiter.__anext__()
        except StopAsyncIteration:
            if not self._task.cancelled() and (exc := self._task.exception()):
                raise exc from None

            raise StopAsyncIteration

        return val

    def __aiter__(self) -> AsyncIterator[ChatChunk]:
        return self

    async def __aenter__(self) -> LLMStream:
        return self

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> None:
        await self.aclose()

Helper class that provides a standard way to create an ABC using inheritance.

Ancestors

  • abc.ABC

Subclasses

Instance variables

prop chat_ctxChatContext
Expand source code
@property
def chat_ctx(self) -> ChatContext:
    """The initial chat context of this stream."""
    return self._chat_ctx

The initial chat context of this stream.

prop fnc_ctxFunctionContext | None
Expand source code
@property
def fnc_ctx(self) -> function_context.FunctionContext | None:
    """The function context of this stream."""
    return self._fnc_ctx

The function context of this stream.

prop function_calls : list[FunctionCallInfo]
Expand source code
@property
def function_calls(self) -> list[function_context.FunctionCallInfo]:
    """List of called functions from this stream."""
    return self._function_calls_info

List of called functions from this stream.

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    await aio.gracefully_cancel(self._task)
    await utils.aio.gracefully_cancel(*self._function_tasks)
    await self._metrics_task
def execute_functions(self) ‑> list[CalledFunction]
Expand source code
def execute_functions(self) -> list[function_context.CalledFunction]:
    """Execute all functions concurrently of this stream."""
    called_functions: list[function_context.CalledFunction] = []
    for fnc_info in self._function_calls_info:
        called_fnc = fnc_info.execute()
        self._function_tasks.add(called_fnc.task)
        called_fnc.task.add_done_callback(self._function_tasks.remove)
        called_functions.append(called_fnc)

    return called_functions

Execute all functions concurrently of this stream.

class ToolChoice (type: "Literal['function']", name: str)
Expand source code
@dataclass
class ToolChoice:
    type: Literal["function"]
    name: str

ToolChoice(type: "Literal['function']", name: 'str')

Class variables

var name : str
var type : Literal['function']
class TypeInfo (description: str, choices: tuple | list[Any] = ())
Expand source code
@dataclass(frozen=True, init=False)
class TypeInfo:
    description: str
    choices: tuple

    def __init__(self, description: str, choices: tuple | list[Any] = tuple()) -> None:
        object.__setattr__(self, "description", description)

        if isinstance(choices, list):
            choices = tuple(choices)

        object.__setattr__(self, "choices", choices)

TypeInfo(description: 'str', choices: 'tuple | list[Any]' = ()) -> 'None'

Class variables

var choices : tuple
var description : str