Module livekit.plugins.anthropic

Classes

class LLM (*,
model: str | ChatModels = 'claude-3-5-sonnet-20241022',
api_key: str | None = None,
base_url: str | None = None,
user: str | None = None,
client: anthropic.AsyncClient | None = None,
temperature: float | None = None,
parallel_tool_calls: bool | None = None,
tool_choice: "Union[ToolChoice, Literal['auto', 'required', 'none']]" = 'auto',
caching: "Literal['ephemeral'] | None" = None)
Expand source code
class LLM(llm.LLM):
    def __init__(
        self,
        *,
        model: str | ChatModels = "claude-3-5-sonnet-20241022",
        api_key: str | None = None,
        base_url: str | None = None,
        user: str | None = None,
        client: anthropic.AsyncClient | None = None,
        temperature: float | None = None,
        parallel_tool_calls: bool | None = None,
        tool_choice: Union[ToolChoice, Literal["auto", "required", "none"]] = "auto",
        caching: Literal["ephemeral"] | None = None,
    ) -> None:
        """
        Create a new instance of Anthropic LLM.

        ``api_key`` must be set to your Anthropic API key, either using the argument or by setting
        the ``ANTHROPIC_API_KEY`` environmental variable.

        model (str | ChatModels): The model to use. Defaults to "claude-3-5-sonnet-20241022".
        api_key (str | None): The Anthropic API key. Defaults to the ANTHROPIC_API_KEY environment variable.
        base_url (str | None): The base URL for the Anthropic API. Defaults to None.
        user (str | None): The user for the Anthropic API. Defaults to None.
        client (anthropic.AsyncClient | None): The Anthropic client to use. Defaults to None.
        temperature (float | None): The temperature for the Anthropic API. Defaults to None.
        parallel_tool_calls (bool | None): Whether to parallelize tool calls. Defaults to None.
        tool_choice (Union[ToolChoice, Literal["auto", "required", "none"]] | None): The tool choice for the Anthropic API. Defaults to "auto".
        caching (Literal["ephemeral"] | None): If set to "ephemeral", caching will be enabled for the system prompt, tools, and chat history.
        """

        super().__init__(
            capabilities=LLMCapabilities(
                requires_persistent_functions=True,
                supports_choices_on_int=True,
            )
        )

        # throw an error on our end
        api_key = api_key or os.environ.get("ANTHROPIC_API_KEY")
        if api_key is None:
            raise ValueError("Anthropic API key is required")

        self._opts = LLMOptions(
            model=model,
            user=user,
            temperature=temperature,
            parallel_tool_calls=parallel_tool_calls,
            tool_choice=tool_choice,
            caching=caching,
        )
        self._client = client or anthropic.AsyncClient(
            api_key=api_key,
            base_url=base_url,
            http_client=httpx.AsyncClient(
                timeout=5.0,
                follow_redirects=True,
                limits=httpx.Limits(
                    max_connections=1000,
                    max_keepalive_connections=100,
                    keepalive_expiry=120,
                ),
            ),
        )

    def chat(
        self,
        *,
        chat_ctx: llm.ChatContext,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
        fnc_ctx: llm.FunctionContext | None = None,
        temperature: float | None = None,
        n: int | None = 1,
        parallel_tool_calls: bool | None = None,
        tool_choice: Union[ToolChoice, Literal["auto", "required", "none"]]
        | None = None,
    ) -> "LLMStream":
        if temperature is None:
            temperature = self._opts.temperature
        if parallel_tool_calls is None:
            parallel_tool_calls = self._opts.parallel_tool_calls
        if tool_choice is None:
            tool_choice = self._opts.tool_choice

        opts: dict[str, Any] = dict()
        if fnc_ctx and len(fnc_ctx.ai_functions) > 0:
            fncs_desc: list[anthropic.types.ToolParam] = []
            for i, fnc in enumerate(fnc_ctx.ai_functions.values()):
                # caching last tool will cache all the tools if caching is enabled
                cache_ctrl = (
                    CACHE_CONTROL_EPHEMERAL
                    if (i == len(fnc_ctx.ai_functions) - 1)
                    and self._opts.caching == "ephemeral"
                    else None
                )
                fncs_desc.append(
                    _build_function_description(
                        fnc,
                        cache_ctrl=cache_ctrl,
                    )
                )

            opts["tools"] = fncs_desc
            if tool_choice is not None:
                anthropic_tool_choice: dict[str, Any] | None = {"type": "auto"}
                if isinstance(tool_choice, ToolChoice):
                    if tool_choice.type == "function":
                        anthropic_tool_choice = {
                            "type": "tool",
                            "name": tool_choice.name,
                        }
                elif isinstance(tool_choice, str):
                    if tool_choice == "required":
                        anthropic_tool_choice = {"type": "any"}
                    elif tool_choice == "none":
                        opts["tools"] = []
                        anthropic_tool_choice = None
            if anthropic_tool_choice is not None:
                if parallel_tool_calls is False:
                    anthropic_tool_choice["disable_parallel_tool_use"] = True
                opts["tool_choice"] = anthropic_tool_choice

        latest_system_message: anthropic.types.TextBlockParam | None = (
            _latest_system_message(chat_ctx, caching=self._opts.caching)
        )
        if latest_system_message:
            opts["system"] = [latest_system_message]

        anthropic_ctx = _build_anthropic_context(
            chat_ctx.messages,
            id(self),
            caching=self._opts.caching,
        )
        collaped_anthropic_ctx = _merge_messages(anthropic_ctx)

        stream = self._client.messages.create(
            max_tokens=opts.get("max_tokens", 1024),
            messages=collaped_anthropic_ctx,
            model=self._opts.model,
            temperature=temperature or anthropic.NOT_GIVEN,
            top_k=n or anthropic.NOT_GIVEN,
            stream=True,
            **opts,
        )

        return LLMStream(
            self,
            anthropic_stream=stream,
            chat_ctx=chat_ctx,
            fnc_ctx=fnc_ctx,
            conn_options=conn_options,
        )

Helper class that provides a standard way to create an ABC using inheritance.

Create a new instance of Anthropic LLM.

api_key must be set to your Anthropic API key, either using the argument or by setting the ANTHROPIC_API_KEY environmental variable.

model (str | ChatModels): The model to use. Defaults to "claude-3-5-sonnet-20241022". api_key (str | None): The Anthropic API key. Defaults to the ANTHROPIC_API_KEY environment variable. base_url (str | None): The base URL for the Anthropic API. Defaults to None. user (str | None): The user for the Anthropic API. Defaults to None. client (anthropic.AsyncClient | None): The Anthropic client to use. Defaults to None. temperature (float | None): The temperature for the Anthropic API. Defaults to None. parallel_tool_calls (bool | None): Whether to parallelize tool calls. Defaults to None. tool_choice (Union[ToolChoice, Literal["auto", "required", "none"]] | None): The tool choice for the Anthropic API. Defaults to "auto". caching (Literal["ephemeral"] | None): If set to "ephemeral", caching will be enabled for the system prompt, tools, and chat history.

Ancestors

Methods

def chat(self,
*,
chat_ctx: llm.ChatContext,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0),
fnc_ctx: llm.FunctionContext | None = None,
temperature: float | None = None,
n: int | None = 1,
parallel_tool_calls: bool | None = None,
tool_choice: "Union[ToolChoice, Literal['auto', 'required', 'none']] | None" = None) ‑> livekit.plugins.anthropic.llm.LLMStream
Expand source code
def chat(
    self,
    *,
    chat_ctx: llm.ChatContext,
    conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    fnc_ctx: llm.FunctionContext | None = None,
    temperature: float | None = None,
    n: int | None = 1,
    parallel_tool_calls: bool | None = None,
    tool_choice: Union[ToolChoice, Literal["auto", "required", "none"]]
    | None = None,
) -> "LLMStream":
    if temperature is None:
        temperature = self._opts.temperature
    if parallel_tool_calls is None:
        parallel_tool_calls = self._opts.parallel_tool_calls
    if tool_choice is None:
        tool_choice = self._opts.tool_choice

    opts: dict[str, Any] = dict()
    if fnc_ctx and len(fnc_ctx.ai_functions) > 0:
        fncs_desc: list[anthropic.types.ToolParam] = []
        for i, fnc in enumerate(fnc_ctx.ai_functions.values()):
            # caching last tool will cache all the tools if caching is enabled
            cache_ctrl = (
                CACHE_CONTROL_EPHEMERAL
                if (i == len(fnc_ctx.ai_functions) - 1)
                and self._opts.caching == "ephemeral"
                else None
            )
            fncs_desc.append(
                _build_function_description(
                    fnc,
                    cache_ctrl=cache_ctrl,
                )
            )

        opts["tools"] = fncs_desc
        if tool_choice is not None:
            anthropic_tool_choice: dict[str, Any] | None = {"type": "auto"}
            if isinstance(tool_choice, ToolChoice):
                if tool_choice.type == "function":
                    anthropic_tool_choice = {
                        "type": "tool",
                        "name": tool_choice.name,
                    }
            elif isinstance(tool_choice, str):
                if tool_choice == "required":
                    anthropic_tool_choice = {"type": "any"}
                elif tool_choice == "none":
                    opts["tools"] = []
                    anthropic_tool_choice = None
        if anthropic_tool_choice is not None:
            if parallel_tool_calls is False:
                anthropic_tool_choice["disable_parallel_tool_use"] = True
            opts["tool_choice"] = anthropic_tool_choice

    latest_system_message: anthropic.types.TextBlockParam | None = (
        _latest_system_message(chat_ctx, caching=self._opts.caching)
    )
    if latest_system_message:
        opts["system"] = [latest_system_message]

    anthropic_ctx = _build_anthropic_context(
        chat_ctx.messages,
        id(self),
        caching=self._opts.caching,
    )
    collaped_anthropic_ctx = _merge_messages(anthropic_ctx)

    stream = self._client.messages.create(
        max_tokens=opts.get("max_tokens", 1024),
        messages=collaped_anthropic_ctx,
        model=self._opts.model,
        temperature=temperature or anthropic.NOT_GIVEN,
        top_k=n or anthropic.NOT_GIVEN,
        stream=True,
        **opts,
    )

    return LLMStream(
        self,
        anthropic_stream=stream,
        chat_ctx=chat_ctx,
        fnc_ctx=fnc_ctx,
        conn_options=conn_options,
    )

Inherited members

class LLMStream (llm: LLM,
*,
anthropic_stream: Awaitable[anthropic.AsyncStream[anthropic.types.RawMessageStreamEvent]],
chat_ctx: llm.ChatContext,
fnc_ctx: llm.FunctionContext | None,
conn_options: APIConnectOptions)
Expand source code
class LLMStream(llm.LLMStream):
    def __init__(
        self,
        llm: LLM,
        *,
        anthropic_stream: Awaitable[
            anthropic.AsyncStream[anthropic.types.RawMessageStreamEvent]
        ],
        chat_ctx: llm.ChatContext,
        fnc_ctx: llm.FunctionContext | None,
        conn_options: APIConnectOptions,
    ) -> None:
        super().__init__(
            llm, chat_ctx=chat_ctx, fnc_ctx=fnc_ctx, conn_options=conn_options
        )
        self._awaitable_anthropic_stream = anthropic_stream
        self._anthropic_stream: (
            anthropic.AsyncStream[anthropic.types.RawMessageStreamEvent] | None
        ) = None

        # current function call that we're waiting for full completion (args are streamed)
        self._tool_call_id: str | None = None
        self._fnc_name: str | None = None
        self._fnc_raw_arguments: str | None = None

        self._request_id: str = ""
        self._ignoring_cot = False  # ignore chain of thought
        self._input_tokens = 0
        self._cache_creation_tokens = 0
        self._cache_read_tokens = 0
        self._output_tokens = 0

    async def _run(self) -> None:
        retryable = True
        try:
            if not self._anthropic_stream:
                self._anthropic_stream = await self._awaitable_anthropic_stream

            async with self._anthropic_stream as stream:
                async for event in stream:
                    chat_chunk = self._parse_event(event)
                    if chat_chunk is not None:
                        self._event_ch.send_nowait(chat_chunk)
                        retryable = False

                self._event_ch.send_nowait(
                    llm.ChatChunk(
                        request_id=self._request_id,
                        usage=llm.CompletionUsage(
                            completion_tokens=self._output_tokens,
                            prompt_tokens=self._input_tokens,
                            total_tokens=self._input_tokens
                            + self._output_tokens
                            + self._cache_creation_tokens
                            + self._cache_read_tokens,
                            cache_creation_input_tokens=self._cache_creation_tokens,
                            cache_read_input_tokens=self._cache_read_tokens,
                        ),
                    )
                )
        except anthropic.APITimeoutError:
            raise APITimeoutError(retryable=retryable)
        except anthropic.APIStatusError as e:
            raise APIStatusError(
                e.message,
                status_code=e.status_code,
                request_id=e.request_id,
                body=e.body,
            )
        except Exception as e:
            raise APIConnectionError(retryable=retryable) from e

    def _parse_event(
        self, event: anthropic.types.RawMessageStreamEvent
    ) -> llm.ChatChunk | None:
        if event.type == "message_start":
            self._request_id = event.message.id
            self._input_tokens = event.message.usage.input_tokens
            self._output_tokens = event.message.usage.output_tokens
            if event.message.usage.cache_creation_input_tokens:
                self._cache_creation_tokens = (
                    event.message.usage.cache_creation_input_tokens
                )
            if event.message.usage.cache_read_input_tokens:
                self._cache_read_tokens = event.message.usage.cache_read_input_tokens
        elif event.type == "message_delta":
            self._output_tokens += event.usage.output_tokens
        elif event.type == "content_block_start":
            if event.content_block.type == "tool_use":
                self._tool_call_id = event.content_block.id
                self._fnc_name = event.content_block.name
                self._fnc_raw_arguments = ""
        elif event.type == "content_block_delta":
            delta = event.delta
            if delta.type == "text_delta":
                text = delta.text

                if self._fnc_ctx is not None:
                    # anthropic may inject COC when using functions
                    if text.startswith("<thinking>"):
                        self._ignoring_cot = True
                    elif self._ignoring_cot and "</thinking>" in text:
                        text = text.split("</thinking>")[-1]
                        self._ignoring_cot = False

                if self._ignoring_cot:
                    return None

                return llm.ChatChunk(
                    request_id=self._request_id,
                    choices=[
                        llm.Choice(
                            delta=llm.ChoiceDelta(content=text, role="assistant")
                        )
                    ],
                )
            elif delta.type == "input_json_delta":
                assert self._fnc_raw_arguments is not None
                self._fnc_raw_arguments += delta.partial_json

        elif event.type == "content_block_stop":
            if self._tool_call_id is not None and self._fnc_ctx:
                assert self._fnc_name is not None
                assert self._fnc_raw_arguments is not None

                fnc_info = _create_ai_function_info(
                    self._fnc_ctx,
                    self._tool_call_id,
                    self._fnc_name,
                    self._fnc_raw_arguments,
                )
                self._function_calls_info.append(fnc_info)

                chat_chunk = llm.ChatChunk(
                    request_id=self._request_id,
                    choices=[
                        llm.Choice(
                            delta=llm.ChoiceDelta(
                                role="assistant", tool_calls=[fnc_info]
                            ),
                        )
                    ],
                )
                self._tool_call_id = self._fnc_raw_arguments = self._fnc_name = None
                return chat_chunk

        return None

Helper class that provides a standard way to create an ABC using inheritance.

Ancestors

Inherited members