Module livekit.agents.llm.utils
Functions
def build_legacy_openai_schema(function_tool: FunctionTool, *, internally_tagged: bool = False) ‑> dict[str, typing.Any]-
Expand source code
def build_legacy_openai_schema( function_tool: FunctionTool, *, internally_tagged: bool = False ) -> dict[str, Any]: """non-strict mode tool description see https://serde.rs/enum-representations.html for the internally tagged representation""" model = function_arguments_to_pydantic_model(function_tool) info = function_tool.info schema = model.model_json_schema() if internally_tagged: return { "name": info.name, "description": info.description or "", "parameters": schema, "type": "function", } else: return { "type": "function", "function": { "name": info.name, "description": info.description or "", "parameters": schema, }, }non-strict mode tool description see https://serde.rs/enum-representations.html for the internally tagged representation
def build_strict_openai_schema(function_tool: FunctionTool) ‑> dict[str, typing.Any]-
Expand source code
def build_strict_openai_schema( function_tool: FunctionTool, ) -> dict[str, Any]: """strict mode tool description""" model = function_arguments_to_pydantic_model(function_tool) info = function_tool.info schema = _strict.to_strict_json_schema(model) return { "type": "function", "function": { "name": info.name, "strict": True, "description": info.description or "", "parameters": schema, }, }strict mode tool description
def compute_chat_ctx_diff(old_ctx: ChatContext, new_ctx: ChatContext) ‑> DiffOps-
Expand source code
def compute_chat_ctx_diff(old_ctx: ChatContext, new_ctx: ChatContext) -> DiffOps: """Computes the minimal list of create/remove operations to transform old_ctx into new_ctx.""" # TODO(theomonnom): Make ChatMessage hashable and also add update ops old_ids = [m.id for m in old_ctx.items] new_ids = [m.id for m in new_ctx.items] lcs_ids = set(_compute_lcs(old_ids, new_ids)) old_ctx_by_id = {item.id: item for item in old_ctx.items} to_remove = [msg.id for msg in old_ctx.items if msg.id not in lcs_ids] to_create: list[tuple[str | None, str]] = [] to_update: list[tuple[str | None, str]] = [] prev_id: str | None = None # None means root for new_msg in new_ctx.items: if new_msg.id not in lcs_ids: to_create.append((prev_id, new_msg.id)) else: # check if the content is different old_msg = old_ctx_by_id[new_msg.id] if new_msg.type == "message" and old_msg.type == "message": if new_msg.text_content != old_msg.text_content: to_update.append((prev_id, new_msg.id)) # TODO: check other content types prev_id = new_msg.id return DiffOps(to_remove=to_remove, to_create=to_create, to_update=to_update)Computes the minimal list of create/remove operations to transform old_ctx into new_ctx.
async def execute_function_call(tool_call: FunctionToolCall,
tool_ctx: ToolContext,
*,
call_ctx: RunContext[Any] | None = None) ‑> FunctionCallResult-
Expand source code
async def execute_function_call( tool_call: FunctionToolCall, tool_ctx: ToolContext, *, call_ctx: RunContext[Any] | None = None, ) -> FunctionCallResult: """Execute a function tool call and return the result.""" from .chat_context import FunctionCall, FunctionCallOutput fnc_call = FunctionCall( call_id=tool_call.call_id, name=tool_call.name, arguments=tool_call.arguments or "{}", extra=tool_call.extra or {}, ) function_tool = tool_ctx.function_tools.get(tool_call.name) if function_tool is None: logger.warning(f"unknown AI function `{tool_call.name}`") return FunctionCallResult( fnc_call=fnc_call, fnc_call_out=FunctionCallOutput( name=tool_call.name, call_id=tool_call.call_id, output=f"Unknown function: {tool_call.name}", is_error=True, ), raw_output=None, raw_exception=ValueError(f"Unknown function: {tool_call.name}"), ) try: raw_args = tool_call.arguments or "{}" fnc_args, fnc_kwargs = prepare_function_arguments( fnc=function_tool, json_arguments=raw_args, call_ctx=call_ctx, fnc_call=fnc_call, ) result = function_tool(*fnc_args, **fnc_kwargs) if asyncio.iscoroutine(result): result = await result out = make_function_call_output(fnc_call=fnc_call, output=result, exception=None) except Exception as e: if not isinstance(e, ToolError): logger.exception( f"exception executing AI function `{tool_call.name}`", extra={"call_id": tool_call.call_id, "arguments": tool_call.arguments}, ) out = make_function_call_output(fnc_call=fnc_call, output=None, exception=e) # surface any ctx.update() calls so callers can inspect them if call_ctx is not None and call_ctx._updates: out.fnc_call_updates = list(call_ctx._updates) return outExecute a function tool call and return the result.
def function_arguments_to_pydantic_model(func: Callable[..., Any]) ‑> type[pydantic.main.BaseModel]-
Expand source code
def function_arguments_to_pydantic_model(func: Callable[..., Any]) -> type[BaseModel]: """Create a Pydantic model from a function's signature. (excluding context types)""" from docstring_parser import parse_from_object fnc_names = func.__name__.split("_") fnc_name = "".join(x.capitalize() for x in fnc_names) model_name = fnc_name + "Args" docstring = parse_from_object(func) param_docs = {p.arg_name: p.description for p in docstring.params} signature = inspect.signature(func) type_hints = get_type_hints(func, include_extras=True) # field_name -> (type, FieldInfo or default) fields: dict[str, Any] = {} for param_name, param in signature.parameters.items(): type_hint = type_hints[param_name] if is_context_type(type_hint, allow_subclasses=True): continue default_value = param.default if param.default is not param.empty else ... field_info: FieldInfo | None = None field_attrs: dict[str, Any] = {} # Annotated[str, Field(description="...")] if get_origin(type_hint) is Annotated: annotated_args = get_args(type_hint) type_hint = annotated_args[0] annotated_field = next( (x for x in annotated_args[1:] if isinstance(x, FieldInfo)), None ) if annotated_field and hasattr(annotated_field, "asdict"): # `asdict` is available after pydantic 2.12 field_dict = annotated_field.asdict() field_attrs = field_dict["attributes"] # Constraints (ge/le/gt/lt/multiple_of/min_length/pattern/...) live # in `metadata`, not `attributes`. Re-attach them to the annotation # so `Field(...)` constraints on a tool argument are preserved. if field_dict["metadata"]: type_hint = Annotated[(type_hint, *field_dict["metadata"])] elif annotated_field: field_attrs["default"] = annotated_field.default field_attrs["description"] = annotated_field.description field_info = annotated_field if ( default_value is not ... and field_attrs.get("default", PydanticUndefined) is PydanticUndefined ): field_attrs["default"] = default_value if field_attrs.get("description") is None: field_attrs["description"] = param_docs.get(param_name, None) if not field_info: field_info = Field(**field_attrs) else: for k, v in field_attrs.items(): setattr(field_info, k, v) fields[param_name] = (type_hint, field_info) return create_model(model_name, **fields)Create a Pydantic model from a function's signature. (excluding context types)
def is_context_type(ty: type, *, allow_subclasses: bool = False) ‑> bool-
Expand source code
def is_context_type(ty: type, *, allow_subclasses: bool = False) -> bool: from ..voice.events import RunContext origin = get_origin(ty) if not allow_subclasses: return ty is RunContext or origin is RunContext if origin is not None: try: return issubclass(origin, RunContext) except TypeError: return False try: return issubclass(ty, RunContext) except TypeError: return False def is_typed_dict(cls: type | Any) ‑> bool-
Expand source code
def is_typed_dict(cls: type | Any) -> bool: return isinstance(cls, type) and issubclass(cls, dict) and hasattr(cls, "__annotations__") def make_function_call_output(*, fnc_call: FunctionCall, output: Any, exception: BaseException | None) ‑> FunctionCallResult-
Expand source code
def make_function_call_output( *, fnc_call: FunctionCall, output: Any, exception: BaseException | None, ) -> FunctionCallResult: """Create a FunctionCallResult, handling ToolError, StopResponse, and validation.""" from .chat_context import FunctionCallOutput from .tool_context import StopResponse, ToolError if isinstance(output, BaseException): exception = output output = None if isinstance(exception, ToolError): return FunctionCallResult( fnc_call=fnc_call, fnc_call_out=FunctionCallOutput( name=fnc_call.name, call_id=fnc_call.call_id, output=exception.message, is_error=True, ), raw_output=output, raw_exception=exception, ) if isinstance(exception, StopResponse): return FunctionCallResult( fnc_call=fnc_call, fnc_call_out=None, raw_output=output, raw_exception=exception, ) if exception is not None: return FunctionCallResult( fnc_call=fnc_call, fnc_call_out=FunctionCallOutput( name=fnc_call.name, call_id=fnc_call.call_id, output="An internal error occurred", is_error=True, ), raw_output=output, raw_exception=exception, ) if not _is_valid_function_output(output): logger.error( f"AI function `{fnc_call.name}` returned an invalid output", extra={"call_id": fnc_call.call_id, "output": output}, ) return FunctionCallResult( fnc_call=fnc_call, fnc_call_out=None, raw_output=output, raw_exception=None, ) return FunctionCallResult( fnc_call=fnc_call, fnc_call_out=FunctionCallOutput( name=fnc_call.name, call_id=fnc_call.call_id, output=str(output or ""), is_error=False, ), raw_output=output, raw_exception=None, )Create a FunctionCallResult, handling ToolError, StopResponse, and validation.
def parse_function_arguments(json_arguments: str) ‑> dict[str, typing.Any]-
Expand source code
def parse_function_arguments(json_arguments: str) -> dict[str, Any]: """Parse a raw JSON tool-call arguments string into a dict. First tries strict parsing; if the JSON is malformed (common with smaller / open-weight models that fumble special tokens or escaping), falls back to ``json_repair`` and then strips known chat-template token leaks. Raises ``ValueError`` if the arguments can't be recovered or don't decode to a dict-shaped value. """ try: args_dict: Any = from_json(json_arguments) except ValueError as strict_err: repaired = json_repair.loads(json_arguments) if repaired == "": # json_repair returns "" when it can't recover anything meaningful. raise ValueError( f"could not parse function arguments as JSON: {strict_err}: {json_arguments[:200]}" ) from strict_err # After a repair, also strip leaked chat-template tokens — many of # the failures we see are caused by `<|...|>` markers bleeding into # the model's structured output. cleaned = _strip_template_tokens(repaired) logger.warning( "repaired malformed function-call JSON arguments", extra={ "raw_arguments": json_arguments[:500], "repaired": cleaned, "error": str(strict_err), }, ) args_dict = cleaned # Some providers (e.g. Nova Sonic) double-encode tool arguments as nested # JSON strings. Unwrap until we reach a non-string value. while isinstance(args_dict, str): try: args_dict = from_json(args_dict) except Exception: raise ValueError( f"function arguments decoded to a non-JSON string: {args_dict[:200]}" ) from None if args_dict is None: return {} if not isinstance(args_dict, dict): raise ValueError( f"expected dict from function arguments, " f"got {type(args_dict).__name__}: {json_arguments[:200]}" ) return args_dictParse a raw JSON tool-call arguments string into a dict.
First tries strict parsing; if the JSON is malformed (common with smaller / open-weight models that fumble special tokens or escaping), falls back to
json_repairand then strips known chat-template token leaks.Raises
ValueErrorif the arguments can't be recovered or don't decode to a dict-shaped value. def prepare_function_arguments(*,
fnc: FunctionTool | RawFunctionTool,
json_arguments: str | dict[str, Any],
call_ctx: RunContext[Any] | None = None,
fnc_call: FunctionCall | None = None) ‑> tuple[tuple[Any, ...], dict[str, Any]]-
Expand source code
def prepare_function_arguments( *, fnc: FunctionTool | RawFunctionTool, json_arguments: str | dict[str, Any], call_ctx: RunContext[Any] | None = None, fnc_call: FunctionCall | None = None, ) -> tuple[tuple[Any, ...], dict[str, Any]]: # returns args, kwargs """Create the positional and keyword arguments to call a function tool from the raw function output from the LLM. Argument-validation failures (bad JSON, pydantic ValidationError, missing required params) are surfaced as :class:`ToolError` so the LLM gets a concrete error message and can self-correct on its next turn. When ``fnc_call`` is provided and ``json_arguments`` is a string, the canonicalized JSON (post json_repair) is written back to ``fnc_call.arguments`` BEFORE validation runs. """ # phase 1: parse — raw JSON failures raise ToolError immediately (no # canonical to provide since the input itself was unparseable) if isinstance(json_arguments, dict): args_dict = json_arguments else: try: args_dict = parse_function_arguments(json_arguments) except ValueError as e: logger.error( f"error parsing arguments for `{fnc.info.name}`", extra={"function": fnc.info.name, "arguments": json_arguments}, ) raise ToolError(f"Error parsing arguments for `{fnc.info.name}`: {e}") from e # write canonical BEFORE validation so a downstream validation failure # still leaves valid JSON in chat history if fnc_call is not None: canonical = json.dumps(args_dict, default=str) if canonical != json_arguments: fnc_call.arguments = canonical # phase 2: validate + bind try: return _prepare_function_arguments(fnc=fnc, args_dict=args_dict, call_ctx=call_ctx) except ToolError: raise except (pydantic.ValidationError, ValueError, TypeError) as e: logger.error( f"error parsing arguments for `{fnc.info.name}`", extra={"function": fnc.info.name, "arguments": json_arguments}, ) raise ToolError(f"Error parsing arguments for `{fnc.info.name}`: {e}") from e except Exception: logger.exception( f"error parsing arguments for `{fnc.info.name}`", extra={"function": fnc.info.name, "arguments": json_arguments}, ) raiseCreate the positional and keyword arguments to call a function tool from the raw function output from the LLM.
Argument-validation failures (bad JSON, pydantic ValidationError, missing required params) are surfaced as :class:
ToolErrorso the LLM gets a concrete error message and can self-correct on its next turn.When
fnc_callis provided andjson_argumentsis a string, the canonicalized JSON (post json_repair) is written back tofnc_call.argumentsBEFORE validation runs. def serialize_image(image: ImageContent, *, use_cache: bool = True) ‑> SerializedImage-
Expand source code
def serialize_image(image: ImageContent, *, use_cache: bool = True) -> SerializedImage: cache_key = "serialized_image" # TODO(long): use hash of encoding options if available if use_cache and cache_key in image._cache: return cast(SerializedImage, image._cache[cache_key]) serialized_image: SerializedImage if isinstance(image.image, str): if image.image.startswith("data:"): header, b64_data = image.image.split(",", 1) encoded_data = base64.b64decode(b64_data) header_mime = header.split(";")[0].split(":")[1] if image.mime_type and image.mime_type != header_mime: logger.warning( f"""Provided mime_type '{image.mime_type}' does not match data URL mime type '{header_mime}'. Using provided mime_type.""" ) mime_type = image.mime_type else: mime_type = header_mime supported_types = {"image/jpeg", "image/png", "image/webp", "image/gif"} if mime_type not in supported_types: raise ValueError( f"Unsupported mime_type {mime_type}. Must be jpeg, png, webp, or gif" ) serialized_image = SerializedImage( data_bytes=encoded_data, mime_type=mime_type, inference_detail=image.inference_detail, ) else: serialized_image = SerializedImage( mime_type=image.mime_type, inference_detail=image.inference_detail, external_url=image.image, ) elif isinstance(image.image, rtc.VideoFrame): opts = images.EncodeOptions() if image.inference_width and image.inference_height: opts.resize_options = images.ResizeOptions( width=image.inference_width, height=image.inference_height, strategy="scale_aspect_fit", ) encoded_data = images.encode(image.image, opts) serialized_image = SerializedImage( data_bytes=encoded_data, mime_type="image/jpeg", inference_detail=image.inference_detail, ) else: raise ValueError("Unsupported image type") if use_cache: image._cache[cache_key] = serialized_image return serialized_image def strip_thinking_tokens(content: str | None, thinking: asyncio.Event) ‑> str | None-
Expand source code
def strip_thinking_tokens(content: str | None, thinking: asyncio.Event) -> str | None: if content is None: return None if thinking.is_set(): idx = content.find(THINK_TAG_END) if idx >= 0: thinking.clear() content = content[idx + len(THINK_TAG_END) :] else: content = None else: idx = content.find(THINK_TAG_START) if idx >= 0: thinking.set() content = content[idx + len(THINK_TAG_START) :] return content def to_openai_response_format(response_format: type | dict[str, Any]) ‑> dict[str, typing.Any]-
Expand source code
def to_openai_response_format(response_format: type | dict[str, Any]) -> dict[str, Any]: name, json_schema_type = to_response_format_param(response_format) schema = _strict.to_strict_json_schema(json_schema_type) return { "type": "json_schema", "json_schema": { "schema": schema, "name": name, "strict": True, }, } def to_response_format_param(response_format: type | dict[str, Any]) ‑> tuple[str, type[pydantic.main.BaseModel] | pydantic.type_adapter.TypeAdapter[typing.Any]]-
Expand source code
def to_response_format_param( response_format: type | dict[str, Any], ) -> tuple[str, type[BaseModel] | TypeAdapter[Any]]: if isinstance(response_format, dict): # TODO(theomonnom): better type validation, copy TypedDict from OpenAI if response_format.get("type", "") not in ("text", "json_schema", "json_object"): raise TypeError("Unsupported response_format type") # TODO(long): fix return value raise TypeError("Unsupported response_format type") return response_format # add support for TypedDict if is_typed_dict(response_format): response_format = create_model( response_format.__name__, **{k: (v, ...) for k, v in response_format.__annotations__.items()}, # type: ignore ) json_schema_type: type[BaseModel] | TypeAdapter[Any] | None = None if inspect.isclass(response_format) and issubclass(response_format, BaseModel): name = response_format.__name__ json_schema_type = response_format elif inspect.isclass(response_format) and hasattr( response_format, "__pydantic_config__" ): # @pydantic.dataclass name = response_format.__name__ json_schema_type = TypeAdapter(response_format) else: raise TypeError(f"Unsupported response_format type - {response_format}") return name, json_schema_type
Classes
class DiffOps (to_remove: list[str],
to_create: list[tuple[str | None, str]],
to_update: list[tuple[str | None, str]])-
Expand source code
@dataclass class DiffOps: to_remove: list[str] to_create: list[ tuple[str | None, str] ] # (previous_item_id, id), if previous_item_id is None, add to the root to_update: list[ tuple[str | None, str] ] # (previous_item_id, id), the items with the same id but different contentDiffOps(to_remove: 'list[str]', to_create: 'list[tuple[str | None, str]]', to_update: 'list[tuple[str | None, str]]')
Instance variables
var to_create : list[tuple[str | None, str]]var to_remove : list[str]var to_update : list[tuple[str | None, str]]
class FunctionCallResult (fnc_call: FunctionCall,
fnc_call_out: FunctionCallOutput | None,
raw_output: Any,
raw_exception: BaseException | None,
fnc_call_updates: list[tuple[FunctionCall, FunctionCallOutput]] = <factory>)-
Expand source code
@dataclass class FunctionCallResult: fnc_call: FunctionCall fnc_call_out: FunctionCallOutput | None raw_output: Any raw_exception: BaseException | None fnc_call_updates: list[tuple[FunctionCall, FunctionCallOutput]] = field(default_factory=list) """Synthesized pairs from any ``ctx.update()`` calls during this standalone execution. Empty unless the tool actually called ``ctx.update()``."""FunctionCallResult(fnc_call: 'FunctionCall', fnc_call_out: 'FunctionCallOutput | None', raw_output: 'Any', raw_exception: 'BaseException | None', fnc_call_updates: 'list[tuple[FunctionCall, FunctionCallOutput]]' =
) Instance variables
var fnc_call : FunctionCallvar fnc_call_out : FunctionCallOutput | Nonevar fnc_call_updates : list[tuple[FunctionCall, FunctionCallOutput]]-
Synthesized pairs from any
ctx.update()calls during this standalone execution. Empty unless the tool actually calledctx.update(). var raw_exception : BaseException | Nonevar raw_output : Any
class SerializedImage (inference_detail: str,
mime_type: str | None,
data_bytes: bytes | None = None,
external_url: str | None = None)-
Expand source code
@dataclass class SerializedImage: inference_detail: str mime_type: str | None data_bytes: bytes | None = None external_url: str | None = NoneSerializedImage(inference_detail: 'str', mime_type: 'str | None', data_bytes: 'bytes | None' = None, external_url: 'str | None' = None)
Instance variables
var data_bytes : bytes | Nonevar external_url : str | Nonevar inference_detail : strvar mime_type : str | None