Module livekit.plugins.openai.responses

Sub-modules

livekit.plugins.openai.responses.llm

Classes

class LLM (*,
model: str | ResponsesModel = 'gpt-4.1',
api_key: NotGivenOr[str] = NOT_GIVEN,
base_url: NotGivenOr[str] = NOT_GIVEN,
client: openai.AsyncClient | None = None,
use_websocket: bool = True,
user: NotGivenOr[str] = NOT_GIVEN,
temperature: NotGivenOr[float] = NOT_GIVEN,
parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN,
reasoning: NotGivenOr[Reasoning] = NOT_GIVEN,
tool_choice: "NotGivenOr[ToolChoice | Literal['auto', 'required', 'none']]" = NOT_GIVEN,
store: NotGivenOr[bool] = NOT_GIVEN,
metadata: NotGivenOr[dict[str, str]] = NOT_GIVEN,
timeout: httpx.Timeout | None = None)
Expand source code
class LLM(llm.LLM):
    def __init__(
        self,
        *,
        model: str | ResponsesModel = "gpt-4.1",
        api_key: NotGivenOr[str] = NOT_GIVEN,
        base_url: NotGivenOr[str] = NOT_GIVEN,
        client: openai.AsyncClient | None = None,
        use_websocket: bool = True,
        user: NotGivenOr[str] = NOT_GIVEN,
        temperature: NotGivenOr[float] = NOT_GIVEN,
        parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN,
        reasoning: NotGivenOr[Reasoning] = NOT_GIVEN,
        tool_choice: NotGivenOr[ToolChoice | Literal["auto", "required", "none"]] = NOT_GIVEN,
        store: NotGivenOr[bool] = NOT_GIVEN,
        metadata: NotGivenOr[dict[str, str]] = NOT_GIVEN,
        timeout: httpx.Timeout | None = None,
    ) -> None:
        """
        Create a new instance of OpenAI Responses LLM.

        ``api_key`` must be set to your OpenAI API key, either using the argument or by setting the
        ``OPENAI_API_KEY`` environmental variable.
        """
        super().__init__()

        if not is_given(reasoning) and _supports_reasoning_effort(model):
            if model in ["gpt-5.1", "gpt-5.2", "gpt-5.4"]:
                reasoning = Reasoning(effort="none")
            else:
                reasoning = Reasoning(effort="minimal")

        if client is not None and use_websocket:
            logger.warning("use_websocket is ignored when a custom client is provided, disabling")
            use_websocket = False

        self._opts = _LLMOptions(
            model=model,
            user=user,
            temperature=temperature,
            parallel_tool_calls=parallel_tool_calls,
            tool_choice=tool_choice,
            store=store,
            metadata=metadata,
            reasoning=reasoning,
            use_websocket=use_websocket,
        )
        self._client = client
        self._owns_client = client is None
        self._ws: _ResponsesWebsocket | None = None

        self._active_streams: int = 0
        self._parallel_generation: bool = False
        self._prev_resp_id = ""
        self._prev_chat_ctx: ChatContext | None = None

        if use_websocket:
            resolved_api_key = api_key if is_given(api_key) else os.environ.get("OPENAI_API_KEY")
            if not resolved_api_key:
                raise ValueError(
                    "OpenAI API key is required, either as argument or set"
                    " OPENAI_API_KEY environment variable"
                )
            self._ws = _ResponsesWebsocket(
                api_key=resolved_api_key,
                timeout=timeout.connect if timeout is not None else None,
                base_url=base_url if is_given(base_url) else None,
            )

        else:
            self._client = client or openai.AsyncClient(
                api_key=api_key if is_given(api_key) else None,
                base_url=base_url if is_given(base_url) else None,
                max_retries=0,
                http_client=httpx.AsyncClient(
                    timeout=timeout
                    if timeout
                    else httpx.Timeout(connect=15.0, read=5.0, write=5.0, pool=5.0),
                    follow_redirects=True,
                    limits=httpx.Limits(
                        max_connections=50,
                        max_keepalive_connections=50,
                        keepalive_expiry=120,
                    ),
                ),
            )

    async def aclose(self) -> None:
        if self._ws:
            await self._ws.aclose()
        if self._owns_client and self._client:
            await self._client.close()

    @property
    def model(self) -> str:
        return self._opts.model

    @property
    def provider(self) -> str:
        if self._opts.use_websocket and self._ws is not None:
            from urllib.parse import urlparse

            return urlparse(self._ws._base_url).netloc
        if self._client is not None:
            return self._client._base_url.netloc.decode("utf-8")
        return ""

    def chat(
        self,
        *,
        chat_ctx: ChatContext,
        tools: list[Tool] | None = None,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
        parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN,
        tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN,
        extra_kwargs: NotGivenOr[dict[str, Any]] = NOT_GIVEN,
    ) -> LLMStream:
        extra = {}

        if is_given(extra_kwargs):
            extra.update(extra_kwargs)

        if is_given(self._opts.metadata):
            extra["metadata"] = self._opts.metadata

        if is_given(self._opts.user):
            extra["user"] = self._opts.user

        if is_given(self._opts.temperature):
            extra["temperature"] = self._opts.temperature

        if is_given(self._opts.store):
            extra["store"] = self._opts.store

        if is_given(self._opts.reasoning):
            extra["reasoning"] = self._opts.reasoning

        parallel_tool_calls = (
            parallel_tool_calls if is_given(parallel_tool_calls) else self._opts.parallel_tool_calls
        )
        if is_given(parallel_tool_calls):
            extra["parallel_tool_calls"] = parallel_tool_calls

        tool_choice = tool_choice if is_given(tool_choice) else self._opts.tool_choice  # type: ignore
        if is_given(tool_choice):
            oai_tool_choice: response_create_params.ToolChoice
            if isinstance(tool_choice, dict):
                oai_tool_choice = {
                    "type": "function",
                    "name": tool_choice["function"]["name"],
                }
                extra["tool_choice"] = oai_tool_choice
            elif tool_choice in ("auto", "required", "none"):
                oai_tool_choice = tool_choice  # type: ignore
                extra["tool_choice"] = oai_tool_choice

        input_chat_ctx = chat_ctx
        if (
            self._opts.store is not False
            and self._active_streams == 0
            and self._prev_chat_ctx is not None
            and self._prev_resp_id
        ):
            n = len(self._prev_chat_ctx.items)
            if ChatContext(items=chat_ctx.items[:n]).is_equivalent(self._prev_chat_ctx):
                # send only the new items appended since the last response
                input_chat_ctx = ChatContext(items=chat_ctx.items[n:])
                extra["previous_response_id"] = self._prev_resp_id
            # if the context was modified otherwise, resend the whole context and omit previous response id
        return LLMStream(
            self,
            model=self._opts.model,
            strict_tool_schema=True,
            client=self._client if self._client else None,
            chat_ctx=input_chat_ctx,
            tools=tools or [],
            conn_options=conn_options,
            extra_kwargs=extra,
            full_chat_ctx=chat_ctx,
        )

Helper class that provides a standard way to create an ABC using inheritance.

Create a new instance of OpenAI Responses LLM.

api_key must be set to your OpenAI API key, either using the argument or by setting the OPENAI_API_KEY environmental variable.

Ancestors

  • livekit.agents.llm.llm.LLM
  • abc.ABC
  • EventEmitter
  • typing.Generic

Subclasses

Instance variables

prop model : str
Expand source code
@property
def model(self) -> str:
    return self._opts.model

Get the model name/identifier for this LLM instance.

Returns

The model name if available, "unknown" otherwise.

Note

Plugins should override this property to provide their model information.

prop provider : str
Expand source code
@property
def provider(self) -> str:
    if self._opts.use_websocket and self._ws is not None:
        from urllib.parse import urlparse

        return urlparse(self._ws._base_url).netloc
    if self._client is not None:
        return self._client._base_url.netloc.decode("utf-8")
    return ""

Get the provider name/identifier for this LLM instance.

Returns

The provider name if available, "unknown" otherwise.

Note

Plugins should override this property to provide their provider information.

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    if self._ws:
        await self._ws.aclose()
    if self._owns_client and self._client:
        await self._client.close()
def chat(self,
*,
chat_ctx: ChatContext,
tools: list[Tool] | None = None,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0),
parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN,
tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN,
extra_kwargs: NotGivenOr[dict[str, Any]] = NOT_GIVEN) ‑> LLMStream
Expand source code
def chat(
    self,
    *,
    chat_ctx: ChatContext,
    tools: list[Tool] | None = None,
    conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN,
    tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN,
    extra_kwargs: NotGivenOr[dict[str, Any]] = NOT_GIVEN,
) -> LLMStream:
    extra = {}

    if is_given(extra_kwargs):
        extra.update(extra_kwargs)

    if is_given(self._opts.metadata):
        extra["metadata"] = self._opts.metadata

    if is_given(self._opts.user):
        extra["user"] = self._opts.user

    if is_given(self._opts.temperature):
        extra["temperature"] = self._opts.temperature

    if is_given(self._opts.store):
        extra["store"] = self._opts.store

    if is_given(self._opts.reasoning):
        extra["reasoning"] = self._opts.reasoning

    parallel_tool_calls = (
        parallel_tool_calls if is_given(parallel_tool_calls) else self._opts.parallel_tool_calls
    )
    if is_given(parallel_tool_calls):
        extra["parallel_tool_calls"] = parallel_tool_calls

    tool_choice = tool_choice if is_given(tool_choice) else self._opts.tool_choice  # type: ignore
    if is_given(tool_choice):
        oai_tool_choice: response_create_params.ToolChoice
        if isinstance(tool_choice, dict):
            oai_tool_choice = {
                "type": "function",
                "name": tool_choice["function"]["name"],
            }
            extra["tool_choice"] = oai_tool_choice
        elif tool_choice in ("auto", "required", "none"):
            oai_tool_choice = tool_choice  # type: ignore
            extra["tool_choice"] = oai_tool_choice

    input_chat_ctx = chat_ctx
    if (
        self._opts.store is not False
        and self._active_streams == 0
        and self._prev_chat_ctx is not None
        and self._prev_resp_id
    ):
        n = len(self._prev_chat_ctx.items)
        if ChatContext(items=chat_ctx.items[:n]).is_equivalent(self._prev_chat_ctx):
            # send only the new items appended since the last response
            input_chat_ctx = ChatContext(items=chat_ctx.items[n:])
            extra["previous_response_id"] = self._prev_resp_id
        # if the context was modified otherwise, resend the whole context and omit previous response id
    return LLMStream(
        self,
        model=self._opts.model,
        strict_tool_schema=True,
        client=self._client if self._client else None,
        chat_ctx=input_chat_ctx,
        tools=tools or [],
        conn_options=conn_options,
        extra_kwargs=extra,
        full_chat_ctx=chat_ctx,
    )

Inherited members