Module livekit.plugins.anthropic

Classes

class LLM (*, model: str | ChatModels = 'claude-3-haiku-20240307', api_key: str | None = None, base_url: str | None = None, user: str | None = None, client: anthropic.AsyncClient | None = None, temperature: float | None = None)

Helper class that provides a standard way to create an ABC using inheritance.

Create a new instance of Anthropic LLM.

api_key must be set to your Anthropic API key, either using the argument or by setting the ANTHROPIC_API_KEY environmental variable.

Expand source code
class LLM(llm.LLM):
    def __init__(
        self,
        *,
        model: str | ChatModels = "claude-3-haiku-20240307",
        api_key: str | None = None,
        base_url: str | None = None,
        user: str | None = None,
        client: anthropic.AsyncClient | None = None,
        temperature: float | None = None,
    ) -> None:
        """
        Create a new instance of Anthropic LLM.

        ``api_key`` must be set to your Anthropic API key, either using the argument or by setting
        the ``ANTHROPIC_API_KEY`` environmental variable.
        """
        # throw an error on our end
        api_key = api_key or os.environ.get("ANTHROPIC_API_KEY")
        if api_key is None:
            raise ValueError("Anthropic API key is required")

        self._opts = LLMOptions(model=model, user=user, temperature=temperature)
        self._client = client or anthropic.AsyncClient(
            api_key=api_key,
            base_url=base_url,
            http_client=httpx.AsyncClient(
                timeout=5.0,
                follow_redirects=True,
                limits=httpx.Limits(
                    max_connections=1000,
                    max_keepalive_connections=100,
                    keepalive_expiry=120,
                ),
            ),
        )

    def chat(
        self,
        *,
        chat_ctx: llm.ChatContext,
        fnc_ctx: llm.FunctionContext | None = None,
        temperature: float | None = None,
        n: int | None = 1,
        parallel_tool_calls: bool | None = None,
    ) -> "LLMStream":
        if temperature is None:
            temperature = self._opts.temperature

        opts: dict[str, Any] = dict()
        if fnc_ctx and len(fnc_ctx.ai_functions) > 0:
            fncs_desc: list[anthropic.types.ToolParam] = []
            for fnc in fnc_ctx.ai_functions.values():
                fncs_desc.append(_build_function_description(fnc))

            opts["tools"] = fncs_desc

            if fnc_ctx and parallel_tool_calls is not None:
                opts["parallel_tool_calls"] = parallel_tool_calls

        latest_system_message = _latest_system_message(chat_ctx)
        anthropic_ctx = _build_anthropic_context(chat_ctx.messages, id(self))
        collaped_anthropic_ctx = _merge_messages(anthropic_ctx)
        stream = self._client.messages.create(
            max_tokens=opts.get("max_tokens", 1000),
            system=latest_system_message,
            messages=collaped_anthropic_ctx,
            model=self._opts.model,
            temperature=temperature or anthropic.NOT_GIVEN,
            top_k=n or anthropic.NOT_GIVEN,
            stream=True,
            **opts,
        )

        return LLMStream(anthropic_stream=stream, chat_ctx=chat_ctx, fnc_ctx=fnc_ctx)

Ancestors

Methods

def chat(self, *, chat_ctx: llm.ChatContext, fnc_ctx: llm.FunctionContext | None = None, temperature: float | None = None, n: int | None = 1, parallel_tool_calls: bool | None = None) ‑> livekit.plugins.anthropic.llm.LLMStream
class LLMStream (*, anthropic_stream: Awaitable[anthropic.AsyncStream[anthropic.types.RawMessageStreamEvent]], chat_ctx: llm.ChatContext, fnc_ctx: llm.FunctionContext | None)

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code
class LLMStream(llm.LLMStream):
    def __init__(
        self,
        *,
        anthropic_stream: Awaitable[
            anthropic.AsyncStream[anthropic.types.RawMessageStreamEvent]
        ],
        chat_ctx: llm.ChatContext,
        fnc_ctx: llm.FunctionContext | None,
    ) -> None:
        super().__init__(chat_ctx=chat_ctx, fnc_ctx=fnc_ctx)
        self._awaitable_anthropic_stream = anthropic_stream
        self._anthropic_stream: (
            anthropic.AsyncStream[anthropic.types.RawMessageStreamEvent] | None
        ) = None

        # current function call that we're waiting for full completion (args are streamed)
        self._tool_call_id: str | None = None
        self._fnc_name: str | None = None
        self._fnc_raw_arguments: str | None = None

    async def aclose(self) -> None:
        if self._anthropic_stream:
            await self._anthropic_stream.close()

        return await super().aclose()

    async def __anext__(self):
        if not self._anthropic_stream:
            self._anthropic_stream = await self._awaitable_anthropic_stream

        fn_calling_enabled = self._fnc_ctx is not None
        ignore = False

        async for event in self._anthropic_stream:
            if event.type == "message_start":
                pass
            elif event.type == "message_delta":
                pass
            elif event.type == "message_stop":
                pass
            elif event.type == "content_block_start":
                if event.content_block.type == "tool_use":
                    self._tool_call_id = event.content_block.id
                    self._fnc_raw_arguments = ""
                    self._fnc_name = event.content_block.name
            elif event.type == "content_block_delta":
                delta = event.delta
                if delta.type == "text_delta":
                    text = delta.text

                    # Anthropic seems to add a prompt when tool calling is enabled
                    # where responses always start with a "<thinking>" block containing
                    # the LLM's chain of thought. It's very verbose and not useful for voice
                    # applications.
                    if fn_calling_enabled:
                        if text.startswith("<thinking>"):
                            ignore = True

                        if "</thinking>" in text:
                            text = text.split("</thinking>")[-1]
                            ignore = False

                    if ignore:
                        continue

                    return llm.ChatChunk(
                        choices=[
                            llm.Choice(
                                delta=llm.ChoiceDelta(content=text, role="assistant")
                            )
                        ]
                    )
                elif delta.type == "input_json_delta":
                    assert self._fnc_raw_arguments is not None
                    self._fnc_raw_arguments += delta.partial_json

            elif event.type == "content_block_stop":
                if self._tool_call_id is not None and self._fnc_ctx:
                    assert self._fnc_name is not None
                    assert self._fnc_raw_arguments is not None
                    fnc_info = _create_ai_function_info(
                        self._fnc_ctx,
                        self._tool_call_id,
                        self._fnc_name,
                        self._fnc_raw_arguments,
                    )
                    self._function_calls_info.append(fnc_info)
                    chunk = llm.ChatChunk(
                        choices=[
                            llm.Choice(
                                delta=llm.ChoiceDelta(
                                    role="assistant", tool_calls=[fnc_info]
                                ),
                                index=0,
                            )
                        ]
                    )
                    self._tool_call_id = None
                    self._fnc_raw_arguments = None
                    self._fnc_name = None
                    return chunk

        raise StopAsyncIteration

Ancestors

Methods

async def aclose(self) ‑> None

Inherited members