Module livekit.agents.llm.utils

Functions

def build_legacy_openai_schema(function_tool: FunctionTool, *, internally_tagged: bool = False) ‑> dict[str, typing.Any]
Expand source code
def build_legacy_openai_schema(
    function_tool: FunctionTool, *, internally_tagged: bool = False
) -> dict[str, Any]:
    """non-strict mode tool description
    see https://serde.rs/enum-representations.html for the internally tagged representation"""
    model = function_arguments_to_pydantic_model(function_tool)
    info = function_tool.info
    schema = model.model_json_schema()

    if internally_tagged:
        return {
            "name": info.name,
            "description": info.description or "",
            "parameters": schema,
            "type": "function",
        }
    else:
        return {
            "type": "function",
            "function": {
                "name": info.name,
                "description": info.description or "",
                "parameters": schema,
            },
        }

non-strict mode tool description see https://serde.rs/enum-representations.html for the internally tagged representation

def build_strict_openai_schema(function_tool: FunctionTool) ‑> dict[str, typing.Any]
Expand source code
def build_strict_openai_schema(
    function_tool: FunctionTool,
) -> dict[str, Any]:
    """strict mode tool description"""
    model = function_arguments_to_pydantic_model(function_tool)
    info = function_tool.info
    schema = _strict.to_strict_json_schema(model)

    return {
        "type": "function",
        "function": {
            "name": info.name,
            "strict": True,
            "description": info.description or "",
            "parameters": schema,
        },
    }

strict mode tool description

def compute_chat_ctx_diff(old_ctx: ChatContext, new_ctx: ChatContext) ‑> DiffOps
Expand source code
def compute_chat_ctx_diff(old_ctx: ChatContext, new_ctx: ChatContext) -> DiffOps:
    """Computes the minimal list of create/remove operations to transform old_ctx into new_ctx."""
    # TODO(theomonnom): Make ChatMessage hashable and also add update ops

    old_ids = [m.id for m in old_ctx.items]
    new_ids = [m.id for m in new_ctx.items]

    lcs_ids = set(_compute_lcs(old_ids, new_ids))
    old_ctx_by_id = {item.id: item for item in old_ctx.items}

    to_remove = [msg.id for msg in old_ctx.items if msg.id not in lcs_ids]
    to_create: list[tuple[str | None, str]] = []
    to_update: list[tuple[str | None, str]] = []

    prev_id: str | None = None  # None means root
    for new_msg in new_ctx.items:
        if new_msg.id not in lcs_ids:
            to_create.append((prev_id, new_msg.id))
        else:
            # check if the content is different
            old_msg = old_ctx_by_id[new_msg.id]
            if new_msg.type == "message" and old_msg.type == "message":
                if new_msg.text_content != old_msg.text_content:
                    to_update.append((prev_id, new_msg.id))
                # TODO: check other content types

        prev_id = new_msg.id

    return DiffOps(to_remove=to_remove, to_create=to_create, to_update=to_update)

Computes the minimal list of create/remove operations to transform old_ctx into new_ctx.

async def execute_function_call(tool_call: FunctionToolCall,
tool_ctx: ToolContext,
*,
call_ctx: RunContext[Any] | None = None) ‑> FunctionCallResult
Expand source code
async def execute_function_call(
    tool_call: FunctionToolCall,
    tool_ctx: ToolContext,
    *,
    call_ctx: RunContext[Any] | None = None,
) -> FunctionCallResult:
    """Execute a function tool call and return the result."""
    from .chat_context import FunctionCall, FunctionCallOutput

    fnc_call = FunctionCall(
        call_id=tool_call.call_id,
        name=tool_call.name,
        arguments=tool_call.arguments or "{}",
        extra=tool_call.extra or {},
    )

    function_tool = tool_ctx.function_tools.get(tool_call.name)
    if function_tool is None:
        logger.warning(f"unknown AI function `{tool_call.name}`")
        return FunctionCallResult(
            fnc_call=fnc_call,
            fnc_call_out=FunctionCallOutput(
                name=tool_call.name,
                call_id=tool_call.call_id,
                output=f"Unknown function: {tool_call.name}",
                is_error=True,
            ),
            raw_output=None,
            raw_exception=ValueError(f"Unknown function: {tool_call.name}"),
        )

    try:
        raw_args = tool_call.arguments or "{}"
        fnc_args, fnc_kwargs = prepare_function_arguments(
            fnc=function_tool,
            json_arguments=raw_args,
            call_ctx=call_ctx,
            fnc_call=fnc_call,
        )
        result = function_tool(*fnc_args, **fnc_kwargs)
        if asyncio.iscoroutine(result):
            result = await result

        out = make_function_call_output(fnc_call=fnc_call, output=result, exception=None)

    except Exception as e:
        if not isinstance(e, ToolError):
            logger.exception(
                f"exception executing AI function `{tool_call.name}`",
                extra={"call_id": tool_call.call_id, "arguments": tool_call.arguments},
            )
        out = make_function_call_output(fnc_call=fnc_call, output=None, exception=e)

    # surface any ctx.update() calls so callers can inspect them
    if call_ctx is not None and call_ctx._updates:
        out.fnc_call_updates = list(call_ctx._updates)
    return out

Execute a function tool call and return the result.

def function_arguments_to_pydantic_model(func: Callable[..., Any]) ‑> type[pydantic.main.BaseModel]
Expand source code
def function_arguments_to_pydantic_model(func: Callable[..., Any]) -> type[BaseModel]:
    """Create a Pydantic model from a function's signature. (excluding context types)"""

    from docstring_parser import parse_from_object

    fnc_names = func.__name__.split("_")
    fnc_name = "".join(x.capitalize() for x in fnc_names)
    model_name = fnc_name + "Args"

    docstring = parse_from_object(func)
    param_docs = {p.arg_name: p.description for p in docstring.params}

    signature = inspect.signature(func)
    type_hints = get_type_hints(func, include_extras=True)

    # field_name -> (type, FieldInfo or default)
    fields: dict[str, Any] = {}

    for param_name, param in signature.parameters.items():
        type_hint = type_hints[param_name]

        if is_context_type(type_hint, allow_subclasses=True):
            continue

        default_value = param.default if param.default is not param.empty else ...
        field_info: FieldInfo | None = None
        field_attrs: dict[str, Any] = {}

        # Annotated[str, Field(description="...")]
        if get_origin(type_hint) is Annotated:
            annotated_args = get_args(type_hint)
            type_hint = annotated_args[0]
            annotated_field = next(
                (x for x in annotated_args[1:] if isinstance(x, FieldInfo)), None
            )
            if annotated_field and hasattr(annotated_field, "asdict"):
                # `asdict` is available after pydantic 2.12
                field_dict = annotated_field.asdict()
                field_attrs = field_dict["attributes"]
                # Constraints (ge/le/gt/lt/multiple_of/min_length/pattern/...) live
                # in `metadata`, not `attributes`. Re-attach them to the annotation
                # so `Field(...)` constraints on a tool argument are preserved.
                if field_dict["metadata"]:
                    type_hint = Annotated[(type_hint, *field_dict["metadata"])]
            elif annotated_field:
                field_attrs["default"] = annotated_field.default
                field_attrs["description"] = annotated_field.description
                field_info = annotated_field

        if (
            default_value is not ...
            and field_attrs.get("default", PydanticUndefined) is PydanticUndefined
        ):
            field_attrs["default"] = default_value

        if field_attrs.get("description") is None:
            field_attrs["description"] = param_docs.get(param_name, None)

        if not field_info:
            field_info = Field(**field_attrs)
        else:
            for k, v in field_attrs.items():
                setattr(field_info, k, v)

        fields[param_name] = (type_hint, field_info)

    return create_model(model_name, **fields)

Create a Pydantic model from a function's signature. (excluding context types)

def is_context_type(ty: type, *, allow_subclasses: bool = False) ‑> bool
Expand source code
def is_context_type(ty: type, *, allow_subclasses: bool = False) -> bool:
    from ..voice.events import RunContext

    origin = get_origin(ty)

    if not allow_subclasses:
        return ty is RunContext or origin is RunContext

    if origin is not None:
        try:
            return issubclass(origin, RunContext)
        except TypeError:
            return False

    try:
        return issubclass(ty, RunContext)
    except TypeError:
        return False
def is_typed_dict(cls: type | Any) ‑> bool
Expand source code
def is_typed_dict(cls: type | Any) -> bool:
    return isinstance(cls, type) and issubclass(cls, dict) and hasattr(cls, "__annotations__")
def make_function_call_output(*, fnc_call: FunctionCall, output: Any, exception: BaseException | None) ‑> FunctionCallResult
Expand source code
def make_function_call_output(
    *,
    fnc_call: FunctionCall,
    output: Any,
    exception: BaseException | None,
) -> FunctionCallResult:
    """Create a FunctionCallResult, handling ToolError, StopResponse, and validation."""
    from .chat_context import FunctionCallOutput
    from .tool_context import StopResponse, ToolError

    if isinstance(output, BaseException):
        exception = output
        output = None

    if isinstance(exception, ToolError):
        return FunctionCallResult(
            fnc_call=fnc_call,
            fnc_call_out=FunctionCallOutput(
                name=fnc_call.name,
                call_id=fnc_call.call_id,
                output=exception.message,
                is_error=True,
            ),
            raw_output=output,
            raw_exception=exception,
        )

    if isinstance(exception, StopResponse):
        return FunctionCallResult(
            fnc_call=fnc_call,
            fnc_call_out=None,
            raw_output=output,
            raw_exception=exception,
        )

    if exception is not None:
        return FunctionCallResult(
            fnc_call=fnc_call,
            fnc_call_out=FunctionCallOutput(
                name=fnc_call.name,
                call_id=fnc_call.call_id,
                output="An internal error occurred",
                is_error=True,
            ),
            raw_output=output,
            raw_exception=exception,
        )

    if not _is_valid_function_output(output):
        logger.error(
            f"AI function `{fnc_call.name}` returned an invalid output",
            extra={"call_id": fnc_call.call_id, "output": output},
        )
        return FunctionCallResult(
            fnc_call=fnc_call,
            fnc_call_out=None,
            raw_output=output,
            raw_exception=None,
        )

    return FunctionCallResult(
        fnc_call=fnc_call,
        fnc_call_out=FunctionCallOutput(
            name=fnc_call.name,
            call_id=fnc_call.call_id,
            output=str(output or ""),
            is_error=False,
        ),
        raw_output=output,
        raw_exception=None,
    )

Create a FunctionCallResult, handling ToolError, StopResponse, and validation.

def parse_function_arguments(json_arguments: str) ‑> dict[str, typing.Any]
Expand source code
def parse_function_arguments(json_arguments: str) -> dict[str, Any]:
    """Parse a raw JSON tool-call arguments string into a dict.

    First tries strict parsing; if the JSON is malformed (common with smaller /
    open-weight models that fumble special tokens or escaping), falls back to
    ``json_repair`` and then strips known chat-template token leaks.

    Raises ``ValueError`` if the arguments can't be recovered or don't decode
    to a dict-shaped value.
    """
    try:
        args_dict: Any = from_json(json_arguments)
    except ValueError as strict_err:
        repaired = json_repair.loads(json_arguments)
        if repaired == "":
            # json_repair returns "" when it can't recover anything meaningful.
            raise ValueError(
                f"could not parse function arguments as JSON: {strict_err}: {json_arguments[:200]}"
            ) from strict_err
        # After a repair, also strip leaked chat-template tokens — many of
        # the failures we see are caused by `<|...|>` markers bleeding into
        # the model's structured output.
        cleaned = _strip_template_tokens(repaired)
        logger.warning(
            "repaired malformed function-call JSON arguments",
            extra={
                "raw_arguments": json_arguments[:500],
                "repaired": cleaned,
                "error": str(strict_err),
            },
        )
        args_dict = cleaned

    # Some providers (e.g. Nova Sonic) double-encode tool arguments as nested
    # JSON strings. Unwrap until we reach a non-string value.
    while isinstance(args_dict, str):
        try:
            args_dict = from_json(args_dict)
        except Exception:
            raise ValueError(
                f"function arguments decoded to a non-JSON string: {args_dict[:200]}"
            ) from None

    if args_dict is None:
        return {}
    if not isinstance(args_dict, dict):
        raise ValueError(
            f"expected dict from function arguments, "
            f"got {type(args_dict).__name__}: {json_arguments[:200]}"
        )
    return args_dict

Parse a raw JSON tool-call arguments string into a dict.

First tries strict parsing; if the JSON is malformed (common with smaller / open-weight models that fumble special tokens or escaping), falls back to json_repair and then strips known chat-template token leaks.

Raises ValueError if the arguments can't be recovered or don't decode to a dict-shaped value.

def prepare_function_arguments(*,
fnc: FunctionTool | RawFunctionTool,
json_arguments: str | dict[str, Any],
call_ctx: RunContext[Any] | None = None,
fnc_call: FunctionCall | None = None) ‑> tuple[tuple[Any, ...], dict[str, Any]]
Expand source code
def prepare_function_arguments(
    *,
    fnc: FunctionTool | RawFunctionTool,
    json_arguments: str | dict[str, Any],
    call_ctx: RunContext[Any] | None = None,
    fnc_call: FunctionCall | None = None,
) -> tuple[tuple[Any, ...], dict[str, Any]]:  # returns args, kwargs
    """Create the positional and keyword arguments to call a function tool from
    the raw function output from the LLM.

    Argument-validation failures (bad JSON, pydantic ValidationError, missing
    required params) are surfaced as :class:`ToolError` so the LLM gets a
    concrete error message and can self-correct on its next turn.

    When ``fnc_call`` is provided and ``json_arguments`` is a string, the
    canonicalized JSON (post json_repair) is written back to
    ``fnc_call.arguments`` BEFORE validation runs.
    """
    # phase 1: parse — raw JSON failures raise ToolError immediately (no
    # canonical to provide since the input itself was unparseable)
    if isinstance(json_arguments, dict):
        args_dict = json_arguments
    else:
        try:
            args_dict = parse_function_arguments(json_arguments)
        except ValueError as e:
            logger.error(
                f"error parsing arguments for `{fnc.info.name}`",
                extra={"function": fnc.info.name, "arguments": json_arguments},
            )
            raise ToolError(f"Error parsing arguments for `{fnc.info.name}`: {e}") from e

        # write canonical BEFORE validation so a downstream validation failure
        # still leaves valid JSON in chat history
        if fnc_call is not None:
            canonical = json.dumps(args_dict, default=str)
            if canonical != json_arguments:
                fnc_call.arguments = canonical

    # phase 2: validate + bind
    try:
        return _prepare_function_arguments(fnc=fnc, args_dict=args_dict, call_ctx=call_ctx)
    except ToolError:
        raise
    except (pydantic.ValidationError, ValueError, TypeError) as e:
        logger.error(
            f"error parsing arguments for `{fnc.info.name}`",
            extra={"function": fnc.info.name, "arguments": json_arguments},
        )
        raise ToolError(f"Error parsing arguments for `{fnc.info.name}`: {e}") from e
    except Exception:
        logger.exception(
            f"error parsing arguments for `{fnc.info.name}`",
            extra={"function": fnc.info.name, "arguments": json_arguments},
        )
        raise

Create the positional and keyword arguments to call a function tool from the raw function output from the LLM.

Argument-validation failures (bad JSON, pydantic ValidationError, missing required params) are surfaced as :class:ToolError so the LLM gets a concrete error message and can self-correct on its next turn.

When fnc_call is provided and json_arguments is a string, the canonicalized JSON (post json_repair) is written back to fnc_call.arguments BEFORE validation runs.

def serialize_image(image: ImageContent, *, use_cache: bool = True) ‑> SerializedImage
Expand source code
def serialize_image(image: ImageContent, *, use_cache: bool = True) -> SerializedImage:
    cache_key = "serialized_image"  # TODO(long): use hash of encoding options if available
    if use_cache and cache_key in image._cache:
        return cast(SerializedImage, image._cache[cache_key])

    serialized_image: SerializedImage
    if isinstance(image.image, str):
        if image.image.startswith("data:"):
            header, b64_data = image.image.split(",", 1)
            encoded_data = base64.b64decode(b64_data)
            header_mime = header.split(";")[0].split(":")[1]
            if image.mime_type and image.mime_type != header_mime:
                logger.warning(
                    f"""Provided mime_type '{image.mime_type}' does not match data URL mime type
                    '{header_mime}'. Using provided mime_type."""
                )
                mime_type = image.mime_type
            else:
                mime_type = header_mime
            supported_types = {"image/jpeg", "image/png", "image/webp", "image/gif"}
            if mime_type not in supported_types:
                raise ValueError(
                    f"Unsupported mime_type {mime_type}. Must be jpeg, png, webp, or gif"
                )

            serialized_image = SerializedImage(
                data_bytes=encoded_data,
                mime_type=mime_type,
                inference_detail=image.inference_detail,
            )
        else:
            serialized_image = SerializedImage(
                mime_type=image.mime_type,
                inference_detail=image.inference_detail,
                external_url=image.image,
            )

    elif isinstance(image.image, rtc.VideoFrame):
        opts = images.EncodeOptions()
        if image.inference_width and image.inference_height:
            opts.resize_options = images.ResizeOptions(
                width=image.inference_width,
                height=image.inference_height,
                strategy="scale_aspect_fit",
            )
        encoded_data = images.encode(image.image, opts)

        serialized_image = SerializedImage(
            data_bytes=encoded_data,
            mime_type="image/jpeg",
            inference_detail=image.inference_detail,
        )
    else:
        raise ValueError("Unsupported image type")

    if use_cache:
        image._cache[cache_key] = serialized_image
    return serialized_image
def strip_thinking_tokens(content: str | None, thinking: asyncio.Event) ‑> str | None
Expand source code
def strip_thinking_tokens(content: str | None, thinking: asyncio.Event) -> str | None:
    if content is None:
        return None

    if thinking.is_set():
        idx = content.find(THINK_TAG_END)
        if idx >= 0:
            thinking.clear()
            content = content[idx + len(THINK_TAG_END) :]
        else:
            content = None
    else:
        idx = content.find(THINK_TAG_START)
        if idx >= 0:
            thinking.set()
            content = content[idx + len(THINK_TAG_START) :]

    return content
def to_openai_response_format(response_format: type | dict[str, Any]) ‑> dict[str, typing.Any]
Expand source code
def to_openai_response_format(response_format: type | dict[str, Any]) -> dict[str, Any]:
    name, json_schema_type = to_response_format_param(response_format)

    schema = _strict.to_strict_json_schema(json_schema_type)
    return {
        "type": "json_schema",
        "json_schema": {
            "schema": schema,
            "name": name,
            "strict": True,
        },
    }
def to_response_format_param(response_format: type | dict[str, Any]) ‑> tuple[str, type[pydantic.main.BaseModel] | pydantic.type_adapter.TypeAdapter[typing.Any]]
Expand source code
def to_response_format_param(
    response_format: type | dict[str, Any],
) -> tuple[str, type[BaseModel] | TypeAdapter[Any]]:
    if isinstance(response_format, dict):
        # TODO(theomonnom): better type validation, copy TypedDict from OpenAI
        if response_format.get("type", "") not in ("text", "json_schema", "json_object"):
            raise TypeError("Unsupported response_format type")

        # TODO(long): fix return value
        raise TypeError("Unsupported response_format type")
        return response_format

    # add support for TypedDict
    if is_typed_dict(response_format):
        response_format = create_model(
            response_format.__name__,
            **{k: (v, ...) for k, v in response_format.__annotations__.items()},  # type: ignore
        )
    json_schema_type: type[BaseModel] | TypeAdapter[Any] | None = None
    if inspect.isclass(response_format) and issubclass(response_format, BaseModel):
        name = response_format.__name__
        json_schema_type = response_format
    elif inspect.isclass(response_format) and hasattr(
        response_format, "__pydantic_config__"
    ):  # @pydantic.dataclass
        name = response_format.__name__
        json_schema_type = TypeAdapter(response_format)
    else:
        raise TypeError(f"Unsupported response_format type - {response_format}")

    return name, json_schema_type

Classes

class DiffOps (to_remove: list[str],
to_create: list[tuple[str | None, str]],
to_update: list[tuple[str | None, str]])
Expand source code
@dataclass
class DiffOps:
    to_remove: list[str]
    to_create: list[
        tuple[str | None, str]
    ]  # (previous_item_id, id), if previous_item_id is None, add to the root
    to_update: list[
        tuple[str | None, str]
    ]  # (previous_item_id, id), the items with the same id but different content

DiffOps(to_remove: 'list[str]', to_create: 'list[tuple[str | None, str]]', to_update: 'list[tuple[str | None, str]]')

Instance variables

var to_create : list[tuple[str | None, str]]
var to_remove : list[str]
var to_update : list[tuple[str | None, str]]
class FunctionCallResult (fnc_call: FunctionCall,
fnc_call_out: FunctionCallOutput | None,
raw_output: Any,
raw_exception: BaseException | None,
fnc_call_updates: list[tuple[FunctionCall, FunctionCallOutput]] = <factory>)
Expand source code
@dataclass
class FunctionCallResult:
    fnc_call: FunctionCall
    fnc_call_out: FunctionCallOutput | None
    raw_output: Any
    raw_exception: BaseException | None
    fnc_call_updates: list[tuple[FunctionCall, FunctionCallOutput]] = field(default_factory=list)
    """Synthesized pairs from any ``ctx.update()`` calls during this standalone
    execution. Empty unless the tool actually called ``ctx.update()``."""

FunctionCallResult(fnc_call: 'FunctionCall', fnc_call_out: 'FunctionCallOutput | None', raw_output: 'Any', raw_exception: 'BaseException | None', fnc_call_updates: 'list[tuple[FunctionCall, FunctionCallOutput]]' = )

Instance variables

var fnc_call : FunctionCall
var fnc_call_out : FunctionCallOutput | None
var fnc_call_updates : list[tuple[FunctionCall, FunctionCallOutput]]

Synthesized pairs from any ctx.update() calls during this standalone execution. Empty unless the tool actually called ctx.update().

var raw_exception : BaseException | None
var raw_output : Any
class SerializedImage (inference_detail: str,
mime_type: str | None,
data_bytes: bytes | None = None,
external_url: str | None = None)
Expand source code
@dataclass
class SerializedImage:
    inference_detail: str
    mime_type: str | None
    data_bytes: bytes | None = None
    external_url: str | None = None

SerializedImage(inference_detail: 'str', mime_type: 'str | None', data_bytes: 'bytes | None' = None, external_url: 'str | None' = None)

Instance variables

var data_bytes : bytes | None
var external_url : str | None
var inference_detail : str
var mime_type : str | None