Module livekit.agents.llm.utils
Functions
def build_legacy_openai_schema(function_tool: FunctionTool, *, internally_tagged: bool = False) ‑> dict[str, typing.Any]
-
Expand source code
def build_legacy_openai_schema( function_tool: FunctionTool, *, internally_tagged: bool = False ) -> dict[str, Any]: """non-strict mode tool description see https://serde.rs/enum-representations.html for the internally tagged representation""" model = function_arguments_to_pydantic_model(function_tool) info = get_function_info(function_tool) schema = model.model_json_schema() if internally_tagged: return { "name": info.name, "description": info.description or "", "parameters": schema, "type": "function", } else: return { "type": "function", "function": { "name": info.name, "description": info.description or "", "parameters": schema, }, }
non-strict mode tool description see https://serde.rs/enum-representations.html for the internally tagged representation
def build_strict_openai_schema(function_tool: FunctionTool) ‑> dict[str, typing.Any]
-
Expand source code
def build_strict_openai_schema( function_tool: FunctionTool, ) -> dict[str, Any]: """strict mode tool description""" model = function_arguments_to_pydantic_model(function_tool) info = get_function_info(function_tool) schema = _strict.to_strict_json_schema(model) return { "type": "function", "function": { "name": info.name, "strict": True, "description": info.description or "", "parameters": schema, }, }
strict mode tool description
def compute_chat_ctx_diff(old_ctx: ChatContext, new_ctx: ChatContext) ‑> DiffOps
-
Expand source code
def compute_chat_ctx_diff(old_ctx: ChatContext, new_ctx: ChatContext) -> DiffOps: """Computes the minimal list of create/remove operations to transform old_ctx into new_ctx.""" # TODO(theomonnom): Make ChatMessage hashable and also add update ops old_ids = [m.id for m in old_ctx.items] new_ids = [m.id for m in new_ctx.items] lcs_ids = set(_compute_lcs(old_ids, new_ids)) old_ctx_by_id = {item.id: item for item in old_ctx.items} to_remove = [msg.id for msg in old_ctx.items if msg.id not in lcs_ids] to_create: list[tuple[str | None, str]] = [] to_update: list[tuple[str | None, str]] = [] prev_id: str | None = None # None means root for new_msg in new_ctx.items: if new_msg.id not in lcs_ids: to_create.append((prev_id, new_msg.id)) else: # check if the content is different old_msg = old_ctx_by_id[new_msg.id] if new_msg.type == "message" and old_msg.type == "message": if new_msg.text_content != old_msg.text_content: to_update.append((prev_id, new_msg.id)) # TODO: check other content types prev_id = new_msg.id return DiffOps(to_remove=to_remove, to_create=to_create, to_update=to_update)
Computes the minimal list of create/remove operations to transform old_ctx into new_ctx.
def function_arguments_to_pydantic_model(func: Callable[..., Any]) ‑> type[pydantic.main.BaseModel]
-
Expand source code
def function_arguments_to_pydantic_model(func: Callable[..., Any]) -> type[BaseModel]: """Create a Pydantic model from a function's signature. (excluding context types)""" from docstring_parser import parse_from_object fnc_names = func.__name__.split("_") fnc_name = "".join(x.capitalize() for x in fnc_names) model_name = fnc_name + "Args" docstring = parse_from_object(func) param_docs = {p.arg_name: p.description for p in docstring.params} signature = inspect.signature(func) type_hints = get_type_hints(func, include_extras=True) # field_name -> (type, FieldInfo or default) fields: dict[str, Any] = {} for param_name, param in signature.parameters.items(): type_hint = type_hints[param_name] if is_context_type(type_hint): continue default_value = param.default if param.default is not param.empty else ... field_info = Field() # Annotated[str, Field(description="...")] if get_origin(type_hint) is Annotated: annotated_args = get_args(type_hint) type_hint = annotated_args[0] field_info = next( (x for x in annotated_args[1:] if isinstance(x, FieldInfo)), field_info ) if default_value is not ... and field_info.default is PydanticUndefined: field_info.default = default_value if field_info.description is None: field_info.description = param_docs.get(param_name, None) fields[param_name] = (type_hint, field_info) return create_model(model_name, **fields)
Create a Pydantic model from a function's signature. (excluding context types)
def is_context_type(ty: type) ‑> bool
-
Expand source code
def is_context_type(ty: type) -> bool: from ..voice.events import RunContext origin = get_origin(ty) is_call_context = ty is RunContext or origin is RunContext return is_call_context
def is_typed_dict(cls: type | Any) ‑> bool
-
Expand source code
def is_typed_dict(cls: type | Any) -> bool: return isinstance(cls, type) and issubclass(cls, dict) and hasattr(cls, "__annotations__")
def prepare_function_arguments(*,
fnc: FunctionTool | RawFunctionTool,
json_arguments: str,
call_ctx: RunContext[Any] | None = None) ‑> tuple[tuple[Any, ...], dict[str, Any]]-
Expand source code
def prepare_function_arguments( *, fnc: FunctionTool | RawFunctionTool, json_arguments: str, # raw function output from the LLM call_ctx: RunContext[Any] | None = None, ) -> tuple[tuple[Any, ...], dict[str, Any]]: # returns args, kwargs """ Create the positional and keyword arguments to call a function tool from the raw function output from the LLM. """ signature = inspect.signature(fnc) type_hints = get_type_hints(fnc, include_extras=True) args_dict = from_json(json_arguments) if is_function_tool(fnc): model_type = function_arguments_to_pydantic_model(fnc) # Function arguments with default values are treated as optional # when converted to strict LLM function descriptions. (e.g., we convert default # parameters to type: ["string", "null"]). # The following make sure to use the default value when we receive None. # (Only if the type can't be Optional) for param_name, param in signature.parameters.items(): type_hint = type_hints[param_name] if param_name in args_dict and args_dict[param_name] is None: if not _is_optional_type(type_hint): if param.default is not inspect.Parameter.empty: args_dict[param_name] = param.default else: raise ValueError( f"Received None for required parameter '{param_name} ;" "this argument cannot be None and no default is available." ) model = model_type.model_validate(args_dict) # can raise ValidationError raw_fields = _shallow_model_dump(model) elif is_raw_function_tool(fnc): # e.g async def open_gate(self, raw_arguments: dict[str, object]): # raw_arguments is required when using raw function tools raw_fields = { "raw_arguments": args_dict, } else: raise ValueError(f"Unsupported function tool type: {type(fnc)}") # inject RunContext if needed context_dict = {} for param_name, _ in signature.parameters.items(): type_hint = type_hints[param_name] if is_context_type(type_hint) and call_ctx is not None: context_dict[param_name] = call_ctx bound = signature.bind(**{**raw_fields, **context_dict}) bound.apply_defaults() return bound.args, bound.kwargs
Create the positional and keyword arguments to call a function tool from the raw function output from the LLM.
def serialize_image(image: ImageContent) ‑> SerializedImage
-
Expand source code
def serialize_image(image: ImageContent) -> SerializedImage: if isinstance(image.image, str): if image.image.startswith("data:"): header, b64_data = image.image.split(",", 1) encoded_data = base64.b64decode(b64_data) header_mime = header.split(";")[0].split(":")[1] if image.mime_type and image.mime_type != header_mime: logger.warning( f"""Provided mime_type '{image.mime_type}' does not match data URL mime type '{header_mime}'. Using provided mime_type.""" ) mime_type = image.mime_type else: mime_type = header_mime supported_types = {"image/jpeg", "image/png", "image/webp", "image/gif"} if mime_type not in supported_types: raise ValueError( f"Unsupported mime_type {mime_type}. Must be jpeg, png, webp, or gif" ) return SerializedImage( data_bytes=encoded_data, mime_type=mime_type, inference_detail=image.inference_detail, ) else: return SerializedImage( mime_type=image.mime_type, inference_detail=image.inference_detail, external_url=image.image, ) elif isinstance(image.image, rtc.VideoFrame): opts = images.EncodeOptions() if image.inference_width and image.inference_height: opts.resize_options = images.ResizeOptions( width=image.inference_width, height=image.inference_height, strategy="scale_aspect_fit", ) encoded_data = images.encode(image.image, opts) return SerializedImage( data_bytes=encoded_data, mime_type="image/jpeg", inference_detail=image.inference_detail, ) raise ValueError("Unsupported image type")
def strip_thinking_tokens(content: str | None, thinking: asyncio.Event) ‑> str | None
-
Expand source code
def strip_thinking_tokens(content: str | None, thinking: asyncio.Event) -> str | None: if content is None: return None if thinking.is_set(): idx = content.find(THINK_TAG_END) if idx >= 0: thinking.clear() content = content[idx + len(THINK_TAG_END) :] else: content = None else: idx = content.find(THINK_TAG_START) if idx >= 0: thinking.set() content = content[idx + len(THINK_TAG_START) :] return content
def to_openai_response_format(response_format: type | dict[str, Any]) ‑> dict[str, typing.Any]
-
Expand source code
def to_openai_response_format(response_format: type | dict[str, Any]) -> dict[str, Any]: name, json_schema_type = to_response_format_param(response_format) schema = _strict.to_strict_json_schema(json_schema_type) return { "type": "json_schema", "json_schema": { "schema": schema, "name": name, "strict": True, }, }
def to_response_format_param(response_format: type | dict[str, Any]) ‑> tuple[str, type[pydantic.main.BaseModel] | pydantic.type_adapter.TypeAdapter[typing.Any]]
-
Expand source code
def to_response_format_param( response_format: type | dict[str, Any], ) -> tuple[str, type[BaseModel] | TypeAdapter[Any]]: if isinstance(response_format, dict): # TODO(theomonnom): better type validation, copy TypedDict from OpenAI if response_format.get("type", "") not in ("text", "json_schema", "json_object"): raise TypeError("Unsupported response_format type") # TODO(long): fix return value raise TypeError("Unsupported response_format type") return response_format # add support for TypedDict if is_typed_dict(response_format): response_format = create_model( response_format.__name__, **{k: (v, ...) for k, v in response_format.__annotations__.items()}, # type: ignore ) json_schema_type: type[BaseModel] | TypeAdapter[Any] | None = None if inspect.isclass(response_format) and issubclass(response_format, BaseModel): name = response_format.__name__ json_schema_type = response_format elif inspect.isclass(response_format) and hasattr( response_format, "__pydantic_config__" ): # @pydantic.dataclass name = response_format.__name__ json_schema_type = TypeAdapter(response_format) else: raise TypeError(f"Unsupported response_format type - {response_format}") return name, json_schema_type
Classes
class DiffOps (to_remove: list[str],
to_create: list[tuple[str | None, str]],
to_update: list[tuple[str | None, str]])-
Expand source code
@dataclass class DiffOps: to_remove: list[str] to_create: list[ tuple[str | None, str] ] # (previous_item_id, id), if previous_item_id is None, add to the root to_update: list[ tuple[str | None, str] ] # (previous_item_id, id), the items with the same id but different content
DiffOps(to_remove: 'list[str]', to_create: 'list[tuple[str | None, str]]', to_update: 'list[tuple[str | None, str]]')
Instance variables
var to_create : list[tuple[str | None, str]]
var to_remove : list[str]
var to_update : list[tuple[str | None, str]]
class SerializedImage (inference_detail: str,
mime_type: str | None,
data_bytes: bytes | None = None,
external_url: str | None = None)-
Expand source code
@dataclass class SerializedImage: inference_detail: str mime_type: str | None data_bytes: bytes | None = None external_url: str | None = None
SerializedImage(inference_detail: 'str', mime_type: 'str | None', data_bytes: 'bytes | None' = None, external_url: 'str | None' = None)
Instance variables
var data_bytes : bytes | None
var external_url : str | None
var inference_detail : str
var mime_type : str | None