Module livekit.agents.inference.llm
Functions
def to_fnc_ctx(fnc_ctx: list[llm.FunctionTool | llm.RawFunctionTool], *, strict: bool = True) ‑> list[openai.types.chat.chat_completion_function_tool_param.ChatCompletionFunctionToolParam]
-
Expand source code
def to_fnc_ctx( fnc_ctx: list[llm.FunctionTool | llm.RawFunctionTool], *, strict: bool = True ) -> list[ChatCompletionToolParam]: tools: list[ChatCompletionToolParam] = [] for fnc in fnc_ctx: if is_raw_function_tool(fnc): info = get_raw_function_info(fnc) tools.append( { "type": "function", "function": info.raw_schema, # type: ignore } ) elif is_function_tool(fnc): schema = ( llm.utils.build_strict_openai_schema(fnc) if strict else llm.utils.build_legacy_openai_schema(fnc) ) tools.append(schema) # type: ignore return tools
Classes
class BasetenOptions (*args, **kwargs)
-
Expand source code
class BasetenOptions(TypedDict, total=False): top_p: float
dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)
Ancestors
- builtins.dict
Class variables
var top_p : float
class CerebrasOptions (*args, **kwargs)
-
Expand source code
class CerebrasOptions(TypedDict, total=False): top_p: float
dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)
Ancestors
- builtins.dict
Class variables
var top_p : float
class GroqOptions (*args, **kwargs)
-
Expand source code
class GroqOptions(TypedDict, total=False): top_p: float
dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)
Ancestors
- builtins.dict
Class variables
var top_p : float
class LLM (model: LLMModels | str,
*,
temperature: NotGivenOr[float] = NOT_GIVEN,
parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN,
tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN,
max_completion_tokens: NotGivenOr[int] = NOT_GIVEN,
base_url: NotGivenOr[str] = NOT_GIVEN,
api_key: NotGivenOr[str] = NOT_GIVEN,
api_secret: NotGivenOr[str] = NOT_GIVEN,
timeout: httpx.Timeout | None = None,
max_retries: NotGivenOr[int] = NOT_GIVEN,
verbosity: NotGivenOr[Verbosity] = NOT_GIVEN,
extra_kwargs: NotGivenOr[dict[str, Any] | OpenaiOptions | CerebrasOptions | GroqOptions | BasetenOptions] = NOT_GIVEN)-
Expand source code
class LLM(llm.LLM): @overload def __init__( self, model: OpenaiModels, *, temperature: NotGivenOr[float] = NOT_GIVEN, parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN, tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN, max_completion_tokens: NotGivenOr[int] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, timeout: httpx.Timeout | None = None, max_retries: NotGivenOr[int] = NOT_GIVEN, verbosity: NotGivenOr[Verbosity] = NOT_GIVEN, extra_kwargs: NotGivenOr[OpenaiOptions] = NOT_GIVEN, ) -> None: pass @overload def __init__( self, model: CerebrasModels, *, temperature: NotGivenOr[float] = NOT_GIVEN, parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN, tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN, max_completion_tokens: NotGivenOr[int] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, timeout: httpx.Timeout | None = None, max_retries: NotGivenOr[int] = NOT_GIVEN, verbosity: NotGivenOr[Verbosity] = NOT_GIVEN, extra_kwargs: NotGivenOr[CerebrasOptions] = NOT_GIVEN, ) -> None: pass @overload def __init__( self, model: GroqModels, *, temperature: NotGivenOr[float] = NOT_GIVEN, parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN, tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN, max_completion_tokens: NotGivenOr[int] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, timeout: httpx.Timeout | None = None, max_retries: NotGivenOr[int] = NOT_GIVEN, verbosity: NotGivenOr[Verbosity] = NOT_GIVEN, extra_kwargs: NotGivenOr[GroqOptions] = NOT_GIVEN, ) -> None: pass @overload def __init__( self, model: BasetenModels, *, temperature: NotGivenOr[float] = NOT_GIVEN, parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN, tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN, max_completion_tokens: NotGivenOr[int] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, timeout: httpx.Timeout | None = None, max_retries: NotGivenOr[int] = NOT_GIVEN, verbosity: NotGivenOr[Verbosity] = NOT_GIVEN, extra_kwargs: NotGivenOr[BasetenOptions] = NOT_GIVEN, ) -> None: pass @overload def __init__( self, model: LLMModels | str, *, temperature: NotGivenOr[float] = NOT_GIVEN, parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN, tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN, max_completion_tokens: NotGivenOr[int] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, timeout: httpx.Timeout | None = None, max_retries: NotGivenOr[int] = NOT_GIVEN, verbosity: NotGivenOr[Verbosity] = NOT_GIVEN, extra_kwargs: NotGivenOr[dict[str, Any]] = NOT_GIVEN, ) -> None: pass def __init__( self, model: LLMModels | str, *, temperature: NotGivenOr[float] = NOT_GIVEN, parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN, tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN, max_completion_tokens: NotGivenOr[int] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, timeout: httpx.Timeout | None = None, max_retries: NotGivenOr[int] = NOT_GIVEN, verbosity: NotGivenOr[Verbosity] = NOT_GIVEN, extra_kwargs: NotGivenOr[ dict[str, Any] | OpenaiOptions | CerebrasOptions | GroqOptions | BasetenOptions ] = NOT_GIVEN, ) -> None: super().__init__() lk_base_url = ( base_url if is_given(base_url) else os.environ.get("LIVEKIT_INFERENCE_URL", DEFAULT_BASE_URL) ) lk_api_key = ( api_key if is_given(api_key) else os.getenv("LIVEKIT_INFERENCE_API_KEY", os.getenv("LIVEKIT_API_KEY", "")) ) if not lk_api_key: raise ValueError( "api_key is required, either as argument or set LIVEKIT_API_KEY environmental variable" ) lk_api_secret = ( api_secret if is_given(api_secret) else os.getenv("LIVEKIT_INFERENCE_API_SECRET", os.getenv("LIVEKIT_API_SECRET", "")) ) if not lk_api_secret: raise ValueError( "api_secret is required, either as argument or set LIVEKIT_API_SECRET environmental variable" ) self._opts = _LLMOptions( model=model, temperature=temperature, parallel_tool_calls=parallel_tool_calls, tool_choice=tool_choice, max_completion_tokens=max_completion_tokens, base_url=lk_base_url, api_key=lk_api_key, api_secret=lk_api_secret, verbosity=verbosity, extra_kwargs=dict(extra_kwargs) if is_given(extra_kwargs) else {}, ) self._client = openai.AsyncClient( api_key=create_access_token(self._opts.api_key, self._opts.api_secret), base_url=self._opts.base_url, max_retries=max_retries if is_given(max_retries) else 0, http_client=httpx.AsyncClient( timeout=timeout if timeout else httpx.Timeout(connect=15.0, read=5.0, write=5.0, pool=5.0), follow_redirects=True, limits=httpx.Limits( max_connections=50, max_keepalive_connections=50, keepalive_expiry=120, ), ), ) @property def model(self) -> str: """Get the model name for this LLM instance.""" return self._opts.model def chat( self, *, chat_ctx: ChatContext, tools: list[FunctionTool | RawFunctionTool] | None = None, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN, tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN, response_format: NotGivenOr[ completion_create_params.ResponseFormat | type[llm_utils.ResponseFormatT] ] = NOT_GIVEN, extra_kwargs: NotGivenOr[dict[str, Any]] = NOT_GIVEN, ) -> LLMStream: extra = {} if is_given(extra_kwargs): extra.update(extra_kwargs) if is_given(self._opts.max_completion_tokens): extra["max_completion_tokens"] = self._opts.max_completion_tokens if is_given(self._opts.temperature): extra["temperature"] = self._opts.temperature if is_given(self._opts.verbosity): extra["verbosity"] = self._opts.verbosity parallel_tool_calls = ( parallel_tool_calls if is_given(parallel_tool_calls) else self._opts.parallel_tool_calls ) if is_given(parallel_tool_calls): extra["parallel_tool_calls"] = parallel_tool_calls tool_choice = tool_choice if is_given(tool_choice) else self._opts.tool_choice # type: ignore if is_given(tool_choice): oai_tool_choice: ChatCompletionToolChoiceOptionParam if isinstance(tool_choice, dict): oai_tool_choice = { "type": "function", "function": {"name": tool_choice["function"]["name"]}, } extra["tool_choice"] = oai_tool_choice elif tool_choice in ("auto", "required", "none"): oai_tool_choice = tool_choice extra["tool_choice"] = oai_tool_choice if is_given(response_format): extra["response_format"] = llm_utils.to_openai_response_format(response_format) # type: ignore extra.update(self._opts.extra_kwargs) # reset the access token to avoid expiration self._client.api_key = create_access_token(self._opts.api_key, self._opts.api_secret) return LLMStream( self, model=self._opts.model, provider_fmt="openai", # always sent in openai format strict_tool_schema=True, client=self._client, chat_ctx=chat_ctx, tools=tools or [], conn_options=conn_options, extra_kwargs=extra, )
Helper class that provides a standard way to create an ABC using inheritance.
Ancestors
- livekit.agents.llm.llm.LLM
- abc.ABC
- EventEmitter
- typing.Generic
Instance variables
prop model : str
-
Expand source code
@property def model(self) -> str: """Get the model name for this LLM instance.""" return self._opts.model
Get the model name for this LLM instance.
Methods
def chat(self,
*,
chat_ctx: ChatContext,
tools: list[FunctionTool | RawFunctionTool] | None = None,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0),
parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN,
tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN,
response_format: NotGivenOr[completion_create_params.ResponseFormat | type[llm_utils.ResponseFormatT]] = NOT_GIVEN,
extra_kwargs: NotGivenOr[dict[str, Any]] = NOT_GIVEN) ‑> LLMStream-
Expand source code
def chat( self, *, chat_ctx: ChatContext, tools: list[FunctionTool | RawFunctionTool] | None = None, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN, tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN, response_format: NotGivenOr[ completion_create_params.ResponseFormat | type[llm_utils.ResponseFormatT] ] = NOT_GIVEN, extra_kwargs: NotGivenOr[dict[str, Any]] = NOT_GIVEN, ) -> LLMStream: extra = {} if is_given(extra_kwargs): extra.update(extra_kwargs) if is_given(self._opts.max_completion_tokens): extra["max_completion_tokens"] = self._opts.max_completion_tokens if is_given(self._opts.temperature): extra["temperature"] = self._opts.temperature if is_given(self._opts.verbosity): extra["verbosity"] = self._opts.verbosity parallel_tool_calls = ( parallel_tool_calls if is_given(parallel_tool_calls) else self._opts.parallel_tool_calls ) if is_given(parallel_tool_calls): extra["parallel_tool_calls"] = parallel_tool_calls tool_choice = tool_choice if is_given(tool_choice) else self._opts.tool_choice # type: ignore if is_given(tool_choice): oai_tool_choice: ChatCompletionToolChoiceOptionParam if isinstance(tool_choice, dict): oai_tool_choice = { "type": "function", "function": {"name": tool_choice["function"]["name"]}, } extra["tool_choice"] = oai_tool_choice elif tool_choice in ("auto", "required", "none"): oai_tool_choice = tool_choice extra["tool_choice"] = oai_tool_choice if is_given(response_format): extra["response_format"] = llm_utils.to_openai_response_format(response_format) # type: ignore extra.update(self._opts.extra_kwargs) # reset the access token to avoid expiration self._client.api_key = create_access_token(self._opts.api_key, self._opts.api_secret) return LLMStream( self, model=self._opts.model, provider_fmt="openai", # always sent in openai format strict_tool_schema=True, client=self._client, chat_ctx=chat_ctx, tools=tools or [], conn_options=conn_options, extra_kwargs=extra, )
Inherited members
class LLMStream (llm: LLM | llm.LLM,
*,
model: LLMModels | str,
provider_fmt: str,
strict_tool_schema: bool,
client: openai.AsyncClient,
chat_ctx: llm.ChatContext,
tools: list[FunctionTool | RawFunctionTool],
conn_options: APIConnectOptions,
extra_kwargs: dict[str, Any])-
Expand source code
class LLMStream(llm.LLMStream): def __init__( self, llm: LLM | llm.LLM, *, model: LLMModels | str, provider_fmt: str, strict_tool_schema: bool, client: openai.AsyncClient, chat_ctx: llm.ChatContext, tools: list[FunctionTool | RawFunctionTool], conn_options: APIConnectOptions, extra_kwargs: dict[str, Any], ) -> None: super().__init__(llm, chat_ctx=chat_ctx, tools=tools, conn_options=conn_options) self._model = model self._provider_fmt = provider_fmt self._strict_tool_schema = strict_tool_schema self._client = client self._llm = llm self._extra_kwargs = extra_kwargs async def _run(self) -> None: # current function call that we're waiting for full completion (args are streamed) # (defined inside the _run method to make sure the state is reset for each run/attempt) self._oai_stream: openai.AsyncStream[ChatCompletionChunk] | None = None self._tool_call_id: str | None = None self._fnc_name: str | None = None self._fnc_raw_arguments: str | None = None self._tool_index: int | None = None retryable = True try: chat_ctx, _ = self._chat_ctx.to_provider_format(format=self._provider_fmt) fnc_ctx = ( to_fnc_ctx(self._tools, strict=self._strict_tool_schema) if self._tools else openai.NOT_GIVEN ) if lk_oai_debug: tool_choice = self._extra_kwargs.get("tool_choice", NOT_GIVEN) logger.debug( "chat.completions.create", extra={ "fnc_ctx": fnc_ctx, "tool_choice": tool_choice, "chat_ctx": chat_ctx, }, ) if not self._tools: # remove tool_choice from extra_kwargs if no tools are provided self._extra_kwargs.pop("tool_choice", None) self._oai_stream = stream = await self._client.chat.completions.create( messages=cast(list[ChatCompletionMessageParam], chat_ctx), tools=fnc_ctx, model=self._model, stream_options={"include_usage": True}, stream=True, timeout=httpx.Timeout(self._conn_options.timeout), **self._extra_kwargs, ) thinking = asyncio.Event() async with stream: async for chunk in stream: for choice in chunk.choices: chat_chunk = self._parse_choice(chunk.id, choice, thinking) if chat_chunk is not None: retryable = False self._event_ch.send_nowait(chat_chunk) if chunk.usage is not None: retryable = False tokens_details = chunk.usage.prompt_tokens_details cached_tokens = tokens_details.cached_tokens if tokens_details else 0 chunk = llm.ChatChunk( id=chunk.id, usage=llm.CompletionUsage( completion_tokens=chunk.usage.completion_tokens, prompt_tokens=chunk.usage.prompt_tokens, prompt_cached_tokens=cached_tokens or 0, total_tokens=chunk.usage.total_tokens, ), ) self._event_ch.send_nowait(chunk) except openai.APITimeoutError: raise APITimeoutError(retryable=retryable) from None except openai.APIStatusError as e: raise APIStatusError( e.message, status_code=e.status_code, request_id=e.request_id, body=e.body, retryable=retryable, ) from None except Exception as e: raise APIConnectionError(retryable=retryable) from e def _parse_choice( self, id: str, choice: Choice, thinking: asyncio.Event ) -> llm.ChatChunk | None: delta = choice.delta # https://github.com/livekit/agents/issues/688 # the delta can be None when using Azure OpenAI (content filtering) if delta is None: return None if delta.tool_calls: for tool in delta.tool_calls: if not tool.function: continue call_chunk = None if self._tool_call_id and tool.id and tool.index != self._tool_index: call_chunk = llm.ChatChunk( id=id, delta=llm.ChoiceDelta( role="assistant", content=delta.content, tool_calls=[ llm.FunctionToolCall( arguments=self._fnc_raw_arguments or "", name=self._fnc_name or "", call_id=self._tool_call_id or "", ) ], ), ) self._tool_call_id = self._fnc_name = self._fnc_raw_arguments = None if tool.function.name: self._tool_index = tool.index self._tool_call_id = tool.id self._fnc_name = tool.function.name self._fnc_raw_arguments = tool.function.arguments or "" elif tool.function.arguments: self._fnc_raw_arguments += tool.function.arguments # type: ignore if call_chunk is not None: return call_chunk if choice.finish_reason in ("tool_calls", "stop") and self._tool_call_id: call_chunk = llm.ChatChunk( id=id, delta=llm.ChoiceDelta( role="assistant", content=delta.content, tool_calls=[ llm.FunctionToolCall( arguments=self._fnc_raw_arguments or "", name=self._fnc_name or "", call_id=self._tool_call_id or "", ) ], ), ) self._tool_call_id = self._fnc_name = self._fnc_raw_arguments = None return call_chunk delta.content = llm_utils.strip_thinking_tokens(delta.content, thinking) if not delta.content: return None return llm.ChatChunk( id=id, delta=llm.ChoiceDelta(content=delta.content, role="assistant"), )
Helper class that provides a standard way to create an ABC using inheritance.
Ancestors
- livekit.agents.llm.llm.LLMStream
- abc.ABC
Subclasses
- livekit.plugins.openai.llm.LLMStream
class OpenaiOptions (*args, **kwargs)
-
Expand source code
class OpenaiOptions(TypedDict, total=False): top_p: float
dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)
Ancestors
- builtins.dict
Class variables
var top_p : float