Module livekit.plugins.openai.responses.llm

Classes

class LLM (*,
model: str | ResponsesModel = 'gpt-4.1',
api_key: NotGivenOr[str] = NOT_GIVEN,
base_url: NotGivenOr[str] = NOT_GIVEN,
client: openai.AsyncClient | None = None,
use_websocket: bool = True,
user: NotGivenOr[str] = NOT_GIVEN,
temperature: NotGivenOr[float] = NOT_GIVEN,
parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN,
reasoning: NotGivenOr[Reasoning] = NOT_GIVEN,
tool_choice: "NotGivenOr[ToolChoice | Literal['auto', 'required', 'none']]" = NOT_GIVEN,
store: NotGivenOr[bool] = NOT_GIVEN,
metadata: NotGivenOr[dict[str, str]] = NOT_GIVEN,
timeout: httpx.Timeout | None = None)
Expand source code
class LLM(llm.LLM):
    def __init__(
        self,
        *,
        model: str | ResponsesModel = "gpt-4.1",
        api_key: NotGivenOr[str] = NOT_GIVEN,
        base_url: NotGivenOr[str] = NOT_GIVEN,
        client: openai.AsyncClient | None = None,
        use_websocket: bool = True,
        user: NotGivenOr[str] = NOT_GIVEN,
        temperature: NotGivenOr[float] = NOT_GIVEN,
        parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN,
        reasoning: NotGivenOr[Reasoning] = NOT_GIVEN,
        tool_choice: NotGivenOr[ToolChoice | Literal["auto", "required", "none"]] = NOT_GIVEN,
        store: NotGivenOr[bool] = NOT_GIVEN,
        metadata: NotGivenOr[dict[str, str]] = NOT_GIVEN,
        timeout: httpx.Timeout | None = None,
    ) -> None:
        """
        Create a new instance of OpenAI Responses LLM.

        ``api_key`` must be set to your OpenAI API key, either using the argument or by setting the
        ``OPENAI_API_KEY`` environmental variable.
        """
        super().__init__()

        if not is_given(reasoning) and _supports_reasoning_effort(model):
            if model in ["gpt-5.1", "gpt-5.2", "gpt-5.4"]:
                reasoning = Reasoning(effort="none")
            else:
                reasoning = Reasoning(effort="minimal")

        if client is not None and use_websocket:
            logger.warning("use_websocket is ignored when a custom client is provided, disabling")
            use_websocket = False

        self._opts = _LLMOptions(
            model=model,
            user=user,
            temperature=temperature,
            parallel_tool_calls=parallel_tool_calls,
            tool_choice=tool_choice,
            store=store,
            metadata=metadata,
            reasoning=reasoning,
            use_websocket=use_websocket,
        )
        self._client = client
        self._owns_client = client is None
        self._ws: _ResponsesWebsocket | None = None

        self._active_streams: int = 0
        self._parallel_generation: bool = False
        self._prev_resp_id = ""
        self._prev_chat_ctx: ChatContext | None = None

        if use_websocket:
            resolved_api_key = api_key if is_given(api_key) else os.environ.get("OPENAI_API_KEY")
            if not resolved_api_key:
                raise ValueError(
                    "OpenAI API key is required, either as argument or set"
                    " OPENAI_API_KEY environment variable"
                )
            self._ws = _ResponsesWebsocket(
                api_key=resolved_api_key,
                timeout=timeout.connect if timeout is not None else None,
                base_url=base_url if is_given(base_url) else None,
            )

        else:
            self._client = client or openai.AsyncClient(
                api_key=api_key if is_given(api_key) else None,
                base_url=base_url if is_given(base_url) else None,
                max_retries=0,
                http_client=httpx.AsyncClient(
                    timeout=timeout
                    if timeout
                    else httpx.Timeout(connect=15.0, read=5.0, write=5.0, pool=5.0),
                    follow_redirects=True,
                    limits=httpx.Limits(
                        max_connections=50,
                        max_keepalive_connections=50,
                        keepalive_expiry=120,
                    ),
                ),
            )

    async def aclose(self) -> None:
        if self._ws:
            await self._ws.aclose()
        if self._owns_client and self._client:
            await self._client.close()

    @property
    def model(self) -> str:
        return self._opts.model

    @property
    def provider(self) -> str:
        if self._opts.use_websocket and self._ws is not None:
            from urllib.parse import urlparse

            return urlparse(self._ws._base_url).netloc
        if self._client is not None:
            return self._client._base_url.netloc.decode("utf-8")
        return ""

    def chat(
        self,
        *,
        chat_ctx: ChatContext,
        tools: list[Tool] | None = None,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
        parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN,
        tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN,
        extra_kwargs: NotGivenOr[dict[str, Any]] = NOT_GIVEN,
    ) -> LLMStream:
        extra = {}

        if is_given(extra_kwargs):
            extra.update(extra_kwargs)

        if is_given(self._opts.metadata):
            extra["metadata"] = self._opts.metadata

        if is_given(self._opts.user):
            extra["user"] = self._opts.user

        if is_given(self._opts.temperature):
            extra["temperature"] = self._opts.temperature

        if is_given(self._opts.store):
            extra["store"] = self._opts.store

        if is_given(self._opts.reasoning):
            extra["reasoning"] = self._opts.reasoning

        parallel_tool_calls = (
            parallel_tool_calls if is_given(parallel_tool_calls) else self._opts.parallel_tool_calls
        )
        if is_given(parallel_tool_calls):
            extra["parallel_tool_calls"] = parallel_tool_calls

        tool_choice = tool_choice if is_given(tool_choice) else self._opts.tool_choice  # type: ignore
        if is_given(tool_choice):
            oai_tool_choice: response_create_params.ToolChoice
            if isinstance(tool_choice, dict):
                oai_tool_choice = {
                    "type": "function",
                    "name": tool_choice["function"]["name"],
                }
                extra["tool_choice"] = oai_tool_choice
            elif tool_choice in ("auto", "required", "none"):
                oai_tool_choice = tool_choice  # type: ignore
                extra["tool_choice"] = oai_tool_choice

        input_chat_ctx = chat_ctx
        if (
            self._opts.store is not False
            and self._active_streams == 0
            and self._prev_chat_ctx is not None
            and self._prev_resp_id
        ):
            n = len(self._prev_chat_ctx.items)
            if ChatContext(items=chat_ctx.items[:n]).is_equivalent(self._prev_chat_ctx):
                # send only the new items appended since the last response
                input_chat_ctx = ChatContext(items=chat_ctx.items[n:])
                extra["previous_response_id"] = self._prev_resp_id
            # if the context was modified otherwise, resend the whole context and omit previous response id
        return LLMStream(
            self,
            model=self._opts.model,
            strict_tool_schema=True,
            client=self._client if self._client else None,
            chat_ctx=input_chat_ctx,
            tools=tools or [],
            conn_options=conn_options,
            extra_kwargs=extra,
            full_chat_ctx=chat_ctx,
        )

Helper class that provides a standard way to create an ABC using inheritance.

Create a new instance of OpenAI Responses LLM.

api_key must be set to your OpenAI API key, either using the argument or by setting the OPENAI_API_KEY environmental variable.

Ancestors

  • livekit.agents.llm.llm.LLM
  • abc.ABC
  • EventEmitter
  • typing.Generic

Subclasses

Instance variables

prop model : str
Expand source code
@property
def model(self) -> str:
    return self._opts.model

Get the model name/identifier for this LLM instance.

Returns

The model name if available, "unknown" otherwise.

Note

Plugins should override this property to provide their model information.

prop provider : str
Expand source code
@property
def provider(self) -> str:
    if self._opts.use_websocket and self._ws is not None:
        from urllib.parse import urlparse

        return urlparse(self._ws._base_url).netloc
    if self._client is not None:
        return self._client._base_url.netloc.decode("utf-8")
    return ""

Get the provider name/identifier for this LLM instance.

Returns

The provider name if available, "unknown" otherwise.

Note

Plugins should override this property to provide their provider information.

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    if self._ws:
        await self._ws.aclose()
    if self._owns_client and self._client:
        await self._client.close()
def chat(self,
*,
chat_ctx: ChatContext,
tools: list[Tool] | None = None,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0),
parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN,
tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN,
extra_kwargs: NotGivenOr[dict[str, Any]] = NOT_GIVEN) ‑> LLMStream
Expand source code
def chat(
    self,
    *,
    chat_ctx: ChatContext,
    tools: list[Tool] | None = None,
    conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN,
    tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN,
    extra_kwargs: NotGivenOr[dict[str, Any]] = NOT_GIVEN,
) -> LLMStream:
    extra = {}

    if is_given(extra_kwargs):
        extra.update(extra_kwargs)

    if is_given(self._opts.metadata):
        extra["metadata"] = self._opts.metadata

    if is_given(self._opts.user):
        extra["user"] = self._opts.user

    if is_given(self._opts.temperature):
        extra["temperature"] = self._opts.temperature

    if is_given(self._opts.store):
        extra["store"] = self._opts.store

    if is_given(self._opts.reasoning):
        extra["reasoning"] = self._opts.reasoning

    parallel_tool_calls = (
        parallel_tool_calls if is_given(parallel_tool_calls) else self._opts.parallel_tool_calls
    )
    if is_given(parallel_tool_calls):
        extra["parallel_tool_calls"] = parallel_tool_calls

    tool_choice = tool_choice if is_given(tool_choice) else self._opts.tool_choice  # type: ignore
    if is_given(tool_choice):
        oai_tool_choice: response_create_params.ToolChoice
        if isinstance(tool_choice, dict):
            oai_tool_choice = {
                "type": "function",
                "name": tool_choice["function"]["name"],
            }
            extra["tool_choice"] = oai_tool_choice
        elif tool_choice in ("auto", "required", "none"):
            oai_tool_choice = tool_choice  # type: ignore
            extra["tool_choice"] = oai_tool_choice

    input_chat_ctx = chat_ctx
    if (
        self._opts.store is not False
        and self._active_streams == 0
        and self._prev_chat_ctx is not None
        and self._prev_resp_id
    ):
        n = len(self._prev_chat_ctx.items)
        if ChatContext(items=chat_ctx.items[:n]).is_equivalent(self._prev_chat_ctx):
            # send only the new items appended since the last response
            input_chat_ctx = ChatContext(items=chat_ctx.items[n:])
            extra["previous_response_id"] = self._prev_resp_id
        # if the context was modified otherwise, resend the whole context and omit previous response id
    return LLMStream(
        self,
        model=self._opts.model,
        strict_tool_schema=True,
        client=self._client if self._client else None,
        chat_ctx=input_chat_ctx,
        tools=tools or [],
        conn_options=conn_options,
        extra_kwargs=extra,
        full_chat_ctx=chat_ctx,
    )

Inherited members

class LLMStream (llm: LLM,
*,
model: str | ResponsesModel,
strict_tool_schema: bool,
client: openai.AsyncClient | None,
chat_ctx: llm.ChatContext,
tools: list[Tool],
conn_options: APIConnectOptions,
extra_kwargs: dict[str, Any],
full_chat_ctx: llm.ChatContext)
Expand source code
class LLMStream(llm.LLMStream):
    def __init__(
        self,
        llm: LLM,
        *,
        model: str | ResponsesModel,
        strict_tool_schema: bool,
        client: openai.AsyncClient | None,
        chat_ctx: llm.ChatContext,
        tools: list[Tool],
        conn_options: APIConnectOptions,
        extra_kwargs: dict[str, Any],
        full_chat_ctx: llm.ChatContext,
    ) -> None:
        super().__init__(llm, chat_ctx=chat_ctx, tools=tools, conn_options=conn_options)
        self._model = model
        self._strict_tool_schema = strict_tool_schema
        self._response_id: str = ""
        self._response_completed: bool = False
        self._client = client
        self._llm: LLM = llm
        self._extra_kwargs = drop_unsupported_params(model, extra_kwargs)
        self._full_chat_ctx = full_chat_ctx.copy()

    async def _run(self) -> None:
        if self._llm._active_streams > 0:
            self._llm._parallel_generation = True
        self._llm._active_streams += 1
        try:
            await self._run_impl()
        finally:
            self._llm._active_streams -= 1
            if self._llm._active_streams == 0 and self._llm._parallel_generation:
                self._llm._prev_resp_id = ""
                self._llm._prev_chat_ctx = None
                self._llm._parallel_generation = False

    async def _run_impl(self) -> None:
        self._response_completed = False
        chat_ctx, _ = self._chat_ctx.to_provider_format(format="openai.responses")

        self._tool_ctx = llm.ToolContext(self.tools)
        tool_schemas = cast(
            list[ToolParam],
            self._tool_ctx.parse_function_tools(
                "openai.responses", strict=self._strict_tool_schema
            ),
        )

        if self._llm._opts.use_websocket is not False:
            retryable = True
            try:
                if self._llm._ws is None:
                    raise RuntimeError("use_websocket is True but _ws is None")

                payload = {
                    "type": "response.create",
                    "model": self._model,
                    "input": chat_ctx,
                    "tools": tool_schemas,
                    **self._extra_kwargs,
                }
                async for raw_event in self._llm._ws.generate_response(payload):
                    parsed_ev = self._parse_ws_event(raw_event)
                    self._process_event(parsed_ev)
                    retryable = False

                if not self._response_completed:
                    raise APIConnectionError(retryable=True)
            except (APIConnectionError, APIStatusError, APITimeoutError):
                raise
            except Exception as e:
                raise APIConnectionError(retryable=retryable) from e

        else:
            self._oai_stream: openai.AsyncStream[ResponseStreamEvent] | None = None
            retryable = True
            try:
                self._oai_stream = stream = cast(
                    openai.AsyncStream[ResponseStreamEvent],
                    await self._client.responses.create(  # type: ignore
                        model=self._model,
                        tools=tool_schemas,
                        input=cast(str | ResponseInputParam | openai.Omit, chat_ctx),
                        stream=True,
                        timeout=httpx.Timeout(self._conn_options.timeout),
                        **self._extra_kwargs,
                    ),
                )

                async with stream:
                    async for event in stream:
                        self._process_event(event)
                        retryable = False

            except openai.APITimeoutError:
                raise APITimeoutError(retryable=retryable)  # noqa: B904
            except openai.APIStatusError as e:
                raise APIStatusError(  # noqa: B904
                    e.message,
                    status_code=e.status_code,
                    request_id=e.request_id,
                    body=e.body,
                    retryable=retryable,
                )
            except (APIConnectionError, APIStatusError, APITimeoutError):
                raise
            except Exception as e:
                raise APIConnectionError(retryable=retryable) from e

    def _parse_ws_event(self, event: dict) -> ResponseStreamEvent | None:
        event_type = event.get("type", "")
        if event_type == "error":
            return ResponseErrorEvent.model_validate({**event.get("error", {}), **event})
        elif event_type == "response.created":
            return ResponseCreatedEvent.model_validate(event)
        elif event_type == "response.output_item.done":
            return ResponseOutputItemDoneEvent.model_validate(event)
        elif event_type == "response.output_text.delta":
            return ResponseTextDeltaEvent.model_validate(event)
        elif event_type == "response.completed":
            return ResponseCompletedEvent.model_validate(event)
        elif event_type == "response.failed":
            return ResponseFailedEvent.model_validate(event)
        return None

    def _process_event(self, event: ResponseStreamEvent | None) -> None:
        if event is None:
            return
        chunk = None
        if isinstance(event, ResponseErrorEvent):
            self._handle_error(event)
        if isinstance(event, ResponseCreatedEvent):
            self._handle_response_created(event)
        if isinstance(event, ResponseOutputItemDoneEvent):
            chunk = self._handle_output_items_done(event)
        if isinstance(event, ResponseTextDeltaEvent):
            chunk = self._handle_response_output_text_delta(event)
        if isinstance(event, ResponseCompletedEvent):
            chunk = self._handle_response_completed(event)
        if isinstance(event, ResponseFailedEvent):
            self._handle_response_failed(event)
        if chunk is not None:
            self._event_ch.send_nowait(chunk)

    def _handle_error(self, event: ResponseErrorEvent) -> None:
        error_code = -1
        try:
            error_code = int(event.code) if event.code else -1
        except ValueError:
            pass
        raise APIStatusError(event.message, status_code=error_code, retryable=False)

    def _handle_response_failed(self, event: ResponseFailedEvent) -> None:
        err = event.response.error
        raise APIStatusError(
            err.message if err else "response.failed",
            status_code=-1,
            retryable=False,
        )

    def _handle_response_created(self, event: ResponseCreatedEvent) -> None:
        self._response_id = event.response.id

    def _handle_response_completed(self, event: ResponseCompletedEvent) -> llm.ChatChunk | None:
        self._response_completed = True
        self._llm._prev_chat_ctx = self._full_chat_ctx
        self._llm._prev_resp_id = self._response_id

        chunk = None
        if usage := event.response.usage:
            chunk = llm.ChatChunk(
                id=self._response_id,
                usage=llm.CompletionUsage(
                    completion_tokens=usage.output_tokens,
                    prompt_tokens=usage.input_tokens,
                    prompt_cached_tokens=usage.input_tokens_details.cached_tokens
                    if usage.input_tokens_details
                    else 0,
                    total_tokens=usage.total_tokens,
                ),
            )
        return chunk

    def _handle_output_items_done(self, event: ResponseOutputItemDoneEvent) -> llm.ChatChunk | None:
        chunk = None
        if event.item.type == "function_call":
            chunk = llm.ChatChunk(
                id=self._response_id,
                delta=llm.ChoiceDelta(
                    role="assistant",
                    content=None,
                    tool_calls=[
                        llm.FunctionToolCall(
                            arguments=event.item.arguments,
                            name=event.item.name,
                            call_id=event.item.call_id,
                        )
                    ],
                ),
            )
        return chunk

    def _handle_response_output_text_delta(
        self, event: ResponseTextDeltaEvent
    ) -> llm.ChatChunk | None:
        return llm.ChatChunk(
            id=self._response_id,
            delta=llm.ChoiceDelta(content=event.delta, role="assistant"),
        )

Helper class that provides a standard way to create an ABC using inheritance.

Ancestors

  • livekit.agents.llm.llm.LLMStream
  • abc.ABC