Module livekit.plugins.anthropic

Classes

class LLM (*, model: str | ChatModels = 'claude-3-haiku-20240307', api_key: str | None = None, base_url: str | None = None, user: str | None = None, client: anthropic.AsyncClient | None = None, temperature: float | None = None)

Helper class that provides a standard way to create an ABC using inheritance.

Create a new instance of Anthropic LLM.

api_key must be set to your Anthropic API key, either using the argument or by setting the ANTHROPIC_API_KEY environmental variable.

Expand source code
class LLM(llm.LLM):
    def __init__(
        self,
        *,
        model: str | ChatModels = "claude-3-haiku-20240307",
        api_key: str | None = None,
        base_url: str | None = None,
        user: str | None = None,
        client: anthropic.AsyncClient | None = None,
        temperature: float | None = None,
    ) -> None:
        """
        Create a new instance of Anthropic LLM.

        ``api_key`` must be set to your Anthropic API key, either using the argument or by setting
        the ``ANTHROPIC_API_KEY`` environmental variable.
        """
        super().__init__()

        # throw an error on our end
        api_key = api_key or os.environ.get("ANTHROPIC_API_KEY")
        if api_key is None:
            raise ValueError("Anthropic API key is required")

        self._opts = LLMOptions(model=model, user=user, temperature=temperature)
        self._client = client or anthropic.AsyncClient(
            api_key=api_key,
            base_url=base_url,
            http_client=httpx.AsyncClient(
                timeout=5.0,
                follow_redirects=True,
                limits=httpx.Limits(
                    max_connections=1000,
                    max_keepalive_connections=100,
                    keepalive_expiry=120,
                ),
            ),
        )

    def chat(
        self,
        *,
        chat_ctx: llm.ChatContext,
        fnc_ctx: llm.FunctionContext | None = None,
        temperature: float | None = None,
        n: int | None = 1,
        parallel_tool_calls: bool | None = None,
    ) -> "LLMStream":
        if temperature is None:
            temperature = self._opts.temperature

        opts: dict[str, Any] = dict()
        if fnc_ctx and len(fnc_ctx.ai_functions) > 0:
            fncs_desc: list[anthropic.types.ToolParam] = []
            for fnc in fnc_ctx.ai_functions.values():
                fncs_desc.append(_build_function_description(fnc))

            opts["tools"] = fncs_desc

            if fnc_ctx and parallel_tool_calls is not None:
                opts["parallel_tool_calls"] = parallel_tool_calls

        latest_system_message = _latest_system_message(chat_ctx)
        anthropic_ctx = _build_anthropic_context(chat_ctx.messages, id(self))
        collaped_anthropic_ctx = _merge_messages(anthropic_ctx)

        stream = self._client.messages.create(
            max_tokens=opts.get("max_tokens", 1024),
            system=latest_system_message,
            messages=collaped_anthropic_ctx,
            model=self._opts.model,
            temperature=temperature or anthropic.NOT_GIVEN,
            top_k=n or anthropic.NOT_GIVEN,
            stream=True,
            **opts,
        )

        return LLMStream(
            self, anthropic_stream=stream, chat_ctx=chat_ctx, fnc_ctx=fnc_ctx
        )

Ancestors

Methods

def chat(self, *, chat_ctx: llm.ChatContext, fnc_ctx: llm.FunctionContext | None = None, temperature: float | None = None, n: int | None = 1, parallel_tool_calls: bool | None = None) ‑> livekit.plugins.anthropic.llm.LLMStream

Inherited members

class LLMStream (llm: LLM, *, anthropic_stream: Awaitable[anthropic.AsyncStream[anthropic.types.RawMessageStreamEvent]], chat_ctx: llm.ChatContext, fnc_ctx: llm.FunctionContext | None)

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code
class LLMStream(llm.LLMStream):
    def __init__(
        self,
        llm: LLM,
        *,
        anthropic_stream: Awaitable[
            anthropic.AsyncStream[anthropic.types.RawMessageStreamEvent]
        ],
        chat_ctx: llm.ChatContext,
        fnc_ctx: llm.FunctionContext | None,
    ) -> None:
        super().__init__(llm, chat_ctx=chat_ctx, fnc_ctx=fnc_ctx)
        self._awaitable_anthropic_stream = anthropic_stream
        self._anthropic_stream: (
            anthropic.AsyncStream[anthropic.types.RawMessageStreamEvent] | None
        ) = None

        # current function call that we're waiting for full completion (args are streamed)
        self._tool_call_id: str | None = None
        self._fnc_name: str | None = None
        self._fnc_raw_arguments: str | None = None

        self._request_id: str = ""
        self._ignoring_cot = False  # ignore chain of thought
        self._input_tokens = 0
        self._output_tokens = 0

    async def _main_task(self) -> None:
        try:
            if not self._anthropic_stream:
                self._anthropic_stream = await self._awaitable_anthropic_stream

            async with self._anthropic_stream as stream:
                async for event in stream:
                    chat_chunk = self._parse_event(event)
                    if chat_chunk is not None:
                        self._event_ch.send_nowait(chat_chunk)

                self._event_ch.send_nowait(
                    llm.ChatChunk(
                        request_id=self._request_id,
                        usage=llm.CompletionUsage(
                            completion_tokens=self._output_tokens,
                            prompt_tokens=self._input_tokens,
                            total_tokens=self._input_tokens + self._output_tokens,
                        ),
                    )
                )
        except anthropic.APITimeoutError:
            raise APITimeoutError()
        except anthropic.APIStatusError as e:
            raise APIStatusError(
                e.message,
                status_code=e.status_code,
                request_id=e.request_id,
                body=e.body,
            )
        except Exception as e:
            raise APIConnectionError() from e

    def _parse_event(
        self, event: anthropic.types.RawMessageStreamEvent
    ) -> llm.ChatChunk | None:
        if event.type == "message_start":
            self._request_id = event.message.id
            self._input_tokens = event.message.usage.input_tokens
            self._output_tokens = event.message.usage.output_tokens
        elif event.type == "message_delta":
            self._output_tokens += event.usage.output_tokens
        elif event.type == "content_block_start":
            if event.content_block.type == "tool_use":
                self._tool_call_id = event.content_block.id
                self._fnc_name = event.content_block.name
                self._fnc_raw_arguments = ""
        elif event.type == "content_block_delta":
            delta = event.delta
            if delta.type == "text_delta":
                text = delta.text

                if self._fnc_ctx is not None:
                    # anthropic may inject COC when using functions
                    if text.startswith("<thinking>"):
                        self._ignoring_cot = True
                    elif self._ignoring_cot and "</thinking>" in text:
                        text = text.split("</thinking>")[-1]
                        self._ignoring_cot = False

                if self._ignoring_cot:
                    return None

                return llm.ChatChunk(
                    request_id=self._request_id,
                    choices=[
                        llm.Choice(
                            delta=llm.ChoiceDelta(content=text, role="assistant")
                        )
                    ],
                )
            elif delta.type == "input_json_delta":
                assert self._fnc_raw_arguments is not None
                self._fnc_raw_arguments += delta.partial_json

        elif event.type == "content_block_stop":
            if self._tool_call_id is not None and self._fnc_ctx:
                assert self._fnc_name is not None
                assert self._fnc_raw_arguments is not None

                fnc_info = _create_ai_function_info(
                    self._fnc_ctx,
                    self._tool_call_id,
                    self._fnc_name,
                    self._fnc_raw_arguments,
                )
                self._function_calls_info.append(fnc_info)

                chat_chunk = llm.ChatChunk(
                    request_id=self._request_id,
                    choices=[
                        llm.Choice(
                            delta=llm.ChoiceDelta(
                                role="assistant", tool_calls=[fnc_info]
                            ),
                        )
                    ],
                )
                self._tool_call_id = self._fnc_raw_arguments = self._fnc_name = None
                return chat_chunk

        return None

Ancestors

Inherited members