Module livekit.plugins.anthropic

Classes

class LLM (*,
model: str | ChatModels = 'claude-3-5-sonnet-20241022',
api_key: str | None = None,
base_url: str | None = None,
user: str | None = None,
client: anthropic.AsyncClient | None = None,
temperature: float | None = None,
parallel_tool_calls: bool | None = None,
tool_choice: "Union[ToolChoice, Literal['auto', 'required', 'none']]" = 'auto',
caching: "Literal['ephemeral'] | None" = None)
Expand source code
class LLM(llm.LLM):
    def __init__(
        self,
        *,
        model: str | ChatModels = "claude-3-5-sonnet-20241022",
        api_key: str | None = None,
        base_url: str | None = None,
        user: str | None = None,
        client: anthropic.AsyncClient | None = None,
        temperature: float | None = None,
        parallel_tool_calls: bool | None = None,
        tool_choice: Union[ToolChoice, Literal["auto", "required", "none"]] = "auto",
        caching: Literal["ephemeral"] | None = None,
    ) -> None:
        
""" Create a new instance of Anthropic LLM. ``api_key`` must be set to your Anthropic API key, either using the argument or by setting the ``ANTHROPIC_API_KEY`` environmental variable. model (str | ChatModels): The model to use. Defaults to "claude-3-5-sonnet-20241022". api_key (str | None): The Anthropic API key. Defaults to the ANTHROPIC_API_KEY environment variable. base_url (str | None): The base URL for the Anthropic API. Defaults to None. user (str | None): The user for the Anthropic API. Defaults to None. client (anthropic.AsyncClient | None): The Anthropic client to use. Defaults to None. temperature (float | None): The temperature for the Anthropic API. Defaults to None. parallel_tool_calls (bool | None): Whether to parallelize tool calls. Defaults to None. tool_choice (Union[ToolChoice, Literal["auto", "required", "none"]] | None): The tool choice for the Anthropic API. Defaults to "auto". caching (Literal["ephemeral"] | None): If set to "ephemeral", caching will be enabled for the system prompt, tools, and chat history. """
super().__init__( capabilities=LLMCapabilities( requires_persistent_functions=True, supports_choices_on_int=True, ) ) # throw an error on our end api_key = api_key or os.environ.get("ANTHROPIC_API_KEY") if api_key is None: raise ValueError("Anthropic API key is required") self._opts = LLMOptions( model=model, user=user, temperature=temperature, parallel_tool_calls=parallel_tool_calls, tool_choice=tool_choice, caching=caching, ) self._client = client or anthropic.AsyncClient( api_key=api_key, base_url=base_url, http_client=httpx.AsyncClient( timeout=5.0, follow_redirects=True, limits=httpx.Limits( max_connections=1000, max_keepalive_connections=100, keepalive_expiry=120, ), ), ) def chat( self, *, chat_ctx: llm.ChatContext, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, fnc_ctx: llm.FunctionContext | None = None, temperature: float | None = None, n: int | None = 1, parallel_tool_calls: bool | None = None, tool_choice: Union[ToolChoice, Literal["auto", "required", "none"]] | None = None, ) -> "LLMStream": if temperature is None: temperature = self._opts.temperature if parallel_tool_calls is None: parallel_tool_calls = self._opts.parallel_tool_calls if tool_choice is None: tool_choice = self._opts.tool_choice opts: dict[str, Any] = dict() if fnc_ctx and len(fnc_ctx.ai_functions) > 0: fncs_desc: list[anthropic.types.ToolParam] = [] for i, fnc in enumerate(fnc_ctx.ai_functions.values()): # caching last tool will cache all the tools if caching is enabled cache_ctrl = ( CACHE_CONTROL_EPHEMERAL if (i == len(fnc_ctx.ai_functions) - 1) and self._opts.caching == "ephemeral" else None ) fncs_desc.append( _build_function_description( fnc, cache_ctrl=cache_ctrl, ) ) opts["tools"] = fncs_desc if tool_choice is not None: anthropic_tool_choice: dict[str, Any] | None = {"type": "auto"} if isinstance(tool_choice, ToolChoice): if tool_choice.type == "function": anthropic_tool_choice = { "type": "tool", "name": tool_choice.name, } elif isinstance(tool_choice, str): if tool_choice == "required": anthropic_tool_choice = {"type": "any"} elif tool_choice == "none": opts["tools"] = [] anthropic_tool_choice = None if anthropic_tool_choice is not None: if parallel_tool_calls is False: anthropic_tool_choice["disable_parallel_tool_use"] = True opts["tool_choice"] = anthropic_tool_choice latest_system_message: anthropic.types.TextBlockParam | None = ( _latest_system_message(chat_ctx, caching=self._opts.caching) ) if latest_system_message: opts["system"] = [latest_system_message] anthropic_ctx = _build_anthropic_context( chat_ctx.messages, id(self), caching=self._opts.caching, ) collaped_anthropic_ctx = _merge_messages(anthropic_ctx) stream = self._client.messages.create( max_tokens=opts.get("max_tokens", 1024), messages=collaped_anthropic_ctx, model=self._opts.model, temperature=temperature or anthropic.NOT_GIVEN, top_k=n or anthropic.NOT_GIVEN, stream=True, **opts, ) return LLMStream( self, anthropic_stream=stream, chat_ctx=chat_ctx, fnc_ctx=fnc_ctx, conn_options=conn_options, )

Helper class that provides a standard way to create an ABC using inheritance.

Create a new instance of Anthropic LLM.

api_key must be set to your Anthropic API key, either using the argument or by setting the ANTHROPIC_API_KEY environmental variable.

model (str | ChatModels): The model to use. Defaults to "claude-3-5-sonnet-20241022". api_key (str | None): The Anthropic API key. Defaults to the ANTHROPIC_API_KEY environment variable. base_url (str | None): The base URL for the Anthropic API. Defaults to None. user (str | None): The user for the Anthropic API. Defaults to None. client (anthropic.AsyncClient | None): The Anthropic client to use. Defaults to None. temperature (float | None): The temperature for the Anthropic API. Defaults to None. parallel_tool_calls (bool | None): Whether to parallelize tool calls. Defaults to None. tool_choice (Union[ToolChoice, Literal["auto", "required", "none"]] | None): The tool choice for the Anthropic API. Defaults to "auto". caching (Literal["ephemeral"] | None): If set to "ephemeral", caching will be enabled for the system prompt, tools, and chat history.

Ancestors

Methods

def chat(self,
*,
chat_ctx: llm.ChatContext,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0),
fnc_ctx: llm.FunctionContext | None = None,
temperature: float | None = None,
n: int | None = 1,
parallel_tool_calls: bool | None = None,
tool_choice: "Union[ToolChoice, Literal['auto', 'required', 'none']] | None" = None) ‑> livekit.plugins.anthropic.llm.LLMStream
Expand source code
def chat(
    self,
    *,
    chat_ctx: llm.ChatContext,
    conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    fnc_ctx: llm.FunctionContext | None = None,
    temperature: float | None = None,
    n: int | None = 1,
    parallel_tool_calls: bool | None = None,
    tool_choice: Union[ToolChoice, Literal["auto", "required", "none"]]
    | None = None,
) -> "LLMStream":
    if temperature is None:
        temperature = self._opts.temperature
    if parallel_tool_calls is None:
        parallel_tool_calls = self._opts.parallel_tool_calls
    if tool_choice is None:
        tool_choice = self._opts.tool_choice

    opts: dict[str, Any] = dict()
    if fnc_ctx and len(fnc_ctx.ai_functions) > 0:
        fncs_desc: list[anthropic.types.ToolParam] = []
        for i, fnc in enumerate(fnc_ctx.ai_functions.values()):
            # caching last tool will cache all the tools if caching is enabled
            cache_ctrl = (
                CACHE_CONTROL_EPHEMERAL
                if (i == len(fnc_ctx.ai_functions) - 1)
                and self._opts.caching == "ephemeral"
                else None
            )
            fncs_desc.append(
                _build_function_description(
                    fnc,
                    cache_ctrl=cache_ctrl,
                )
            )

        opts["tools"] = fncs_desc
        if tool_choice is not None:
            anthropic_tool_choice: dict[str, Any] | None = {"type": "auto"}
            if isinstance(tool_choice, ToolChoice):
                if tool_choice.type == "function":
                    anthropic_tool_choice = {
                        "type": "tool",
                        "name": tool_choice.name,
                    }
            elif isinstance(tool_choice, str):
                if tool_choice == "required":
                    anthropic_tool_choice = {"type": "any"}
                elif tool_choice == "none":
                    opts["tools"] = []
                    anthropic_tool_choice = None
        if anthropic_tool_choice is not None:
            if parallel_tool_calls is False:
                anthropic_tool_choice["disable_parallel_tool_use"] = True
            opts["tool_choice"] = anthropic_tool_choice

    latest_system_message: anthropic.types.TextBlockParam | None = (
        _latest_system_message(chat_ctx, caching=self._opts.caching)
    )
    if latest_system_message:
        opts["system"] = [latest_system_message]

    anthropic_ctx = _build_anthropic_context(
        chat_ctx.messages,
        id(self),
        caching=self._opts.caching,
    )
    collaped_anthropic_ctx = _merge_messages(anthropic_ctx)

    stream = self._client.messages.create(
        max_tokens=opts.get("max_tokens", 1024),
        messages=collaped_anthropic_ctx,
        model=self._opts.model,
        temperature=temperature or anthropic.NOT_GIVEN,
        top_k=n or anthropic.NOT_GIVEN,
        stream=True,
        **opts,
    )

    return LLMStream(
        self,
        anthropic_stream=stream,
        chat_ctx=chat_ctx,
        fnc_ctx=fnc_ctx,
        conn_options=conn_options,
    )

Inherited members

class LLMStream (llm: LLM,
*,
anthropic_stream: Awaitable[anthropic.AsyncStream[anthropic.types.RawMessageStreamEvent]],
chat_ctx: llm.ChatContext,
fnc_ctx: llm.FunctionContext | None,
conn_options: APIConnectOptions)
Expand source code
class LLMStream(llm.LLMStream):
    def __init__(
        self,
        llm: LLM,
        *,
        anthropic_stream: Awaitable[
            anthropic.AsyncStream[anthropic.types.RawMessageStreamEvent]
        ],
        chat_ctx: llm.ChatContext,
        fnc_ctx: llm.FunctionContext | None,
        conn_options: APIConnectOptions,
    ) -> None:
        super().__init__(
            llm, chat_ctx=chat_ctx, fnc_ctx=fnc_ctx, conn_options=conn_options
        )
        self._awaitable_anthropic_stream = anthropic_stream
        self._anthropic_stream: (
            anthropic.AsyncStream[anthropic.types.RawMessageStreamEvent] | None
        ) = None

        # current function call that we're waiting for full completion (args are streamed)
        self._tool_call_id: str | None = None
        self._fnc_name: str | None = None
        self._fnc_raw_arguments: str | None = None

        self._request_id: str = ""
        self._ignoring_cot = False  # ignore chain of thought
        self._input_tokens = 0
        self._cache_creation_tokens = 0
        self._cache_read_tokens = 0
        self._output_tokens = 0

    async def _run(self) -> None:
        retryable = True
        try:
            if not self._anthropic_stream:
                self._anthropic_stream = await self._awaitable_anthropic_stream

            async with self._anthropic_stream as stream:
                async for event in stream:
                    chat_chunk = self._parse_event(event)
                    if chat_chunk is not None:
                        self._event_ch.send_nowait(chat_chunk)
                        retryable = False

                self._event_ch.send_nowait(
                    llm.ChatChunk(
                        request_id=self._request_id,
                        usage=llm.CompletionUsage(
                            completion_tokens=self._output_tokens,
                            prompt_tokens=self._input_tokens,
                            total_tokens=self._input_tokens
                            + self._output_tokens
                            + self._cache_creation_tokens
                            + self._cache_read_tokens,
                            cache_creation_input_tokens=self._cache_creation_tokens,
                            cache_read_input_tokens=self._cache_read_tokens,
                        ),
                    )
                )
        except anthropic.APITimeoutError:
            raise APITimeoutError(retryable=retryable)
        except anthropic.APIStatusError as e:
            raise APIStatusError(
                e.message,
                status_code=e.status_code,
                request_id=e.request_id,
                body=e.body,
            )
        except Exception as e:
            raise APIConnectionError(retryable=retryable) from e

    def _parse_event(
        self, event: anthropic.types.RawMessageStreamEvent
    ) -> llm.ChatChunk | None:
        if event.type == "message_start":
            self._request_id = event.message.id
            self._input_tokens = event.message.usage.input_tokens
            self._output_tokens = event.message.usage.output_tokens
            if event.message.usage.cache_creation_input_tokens:
                self._cache_creation_tokens = (
                    event.message.usage.cache_creation_input_tokens
                )
            if event.message.usage.cache_read_input_tokens:
                self._cache_read_tokens = event.message.usage.cache_read_input_tokens
        elif event.type == "message_delta":
            self._output_tokens += event.usage.output_tokens
        elif event.type == "content_block_start":
            if event.content_block.type == "tool_use":
                self._tool_call_id = event.content_block.id
                self._fnc_name = event.content_block.name
                self._fnc_raw_arguments = ""
        elif event.type == "content_block_delta":
            delta = event.delta
            if delta.type == "text_delta":
                text = delta.text

                if self._fnc_ctx is not None:
                    # anthropic may inject COC when using functions
                    if text.startswith("<thinking>"):
                        self._ignoring_cot = True
                    elif self._ignoring_cot and "</thinking>" in text:
                        text = text.split("</thinking>")[-1]
                        self._ignoring_cot = False

                if self._ignoring_cot:
                    return None

                return llm.ChatChunk(
                    request_id=self._request_id,
                    choices=[
                        llm.Choice(
                            delta=llm.ChoiceDelta(content=text, role="assistant")
                        )
                    ],
                )
            elif delta.type == "input_json_delta":
                assert self._fnc_raw_arguments is not None
                self._fnc_raw_arguments += delta.partial_json

        elif event.type == "content_block_stop":
            if self._tool_call_id is not None and self._fnc_ctx:
                assert self._fnc_name is not None
                assert self._fnc_raw_arguments is not None

                fnc_info = _create_ai_function_info(
                    self._fnc_ctx,
                    self._tool_call_id,
                    self._fnc_name,
                    self._fnc_raw_arguments,
                )
                self._function_calls_info.append(fnc_info)

                chat_chunk = llm.ChatChunk(
                    request_id=self._request_id,
                    choices=[
                        llm.Choice(
                            delta=llm.ChoiceDelta(
                                role="assistant", tool_calls=[fnc_info]
                            ),
                        )
                    ],
                )
                self._tool_call_id = self._fnc_raw_arguments = self._fnc_name = None
                return chat_chunk

        return None

Helper class that provides a standard way to create an ABC using inheritance.

Ancestors

Inherited members