Module livekit.plugins.anthropic
Classes
class LLM (*,
model: str | ChatModels = 'claude-3-5-sonnet-20241022',
api_key: str | None = None,
base_url: str | None = None,
user: str | None = None,
client: anthropic.AsyncClient | None = None,
temperature: float | None = None,
parallel_tool_calls: bool | None = None,
tool_choice: "Union[ToolChoice, Literal['auto', 'required', 'none']]" = 'auto',
caching: "Literal['ephemeral'] | None" = None)-
Expand source code
class LLM(llm.LLM): def __init__( self, *, model: str | ChatModels = "claude-3-5-sonnet-20241022", api_key: str | None = None, base_url: str | None = None, user: str | None = None, client: anthropic.AsyncClient | None = None, temperature: float | None = None, parallel_tool_calls: bool | None = None, tool_choice: Union[ToolChoice, Literal["auto", "required", "none"]] = "auto", caching: Literal["ephemeral"] | None = None, ) -> None: """ Create a new instance of Anthropic LLM. ``api_key`` must be set to your Anthropic API key, either using the argument or by setting the ``ANTHROPIC_API_KEY`` environmental variable. model (str | ChatModels): The model to use. Defaults to "claude-3-5-sonnet-20241022". api_key (str | None): The Anthropic API key. Defaults to the ANTHROPIC_API_KEY environment variable. base_url (str | None): The base URL for the Anthropic API. Defaults to None. user (str | None): The user for the Anthropic API. Defaults to None. client (anthropic.AsyncClient | None): The Anthropic client to use. Defaults to None. temperature (float | None): The temperature for the Anthropic API. Defaults to None. parallel_tool_calls (bool | None): Whether to parallelize tool calls. Defaults to None. tool_choice (Union[ToolChoice, Literal["auto", "required", "none"]] | None): The tool choice for the Anthropic API. Defaults to "auto". caching (Literal["ephemeral"] | None): If set to "ephemeral", caching will be enabled for the system prompt, tools, and chat history. """ super().__init__( capabilities=LLMCapabilities( requires_persistent_functions=True, supports_choices_on_int=True, ) ) # throw an error on our end api_key = api_key or os.environ.get("ANTHROPIC_API_KEY") if api_key is None: raise ValueError("Anthropic API key is required") self._opts = LLMOptions( model=model, user=user, temperature=temperature, parallel_tool_calls=parallel_tool_calls, tool_choice=tool_choice, caching=caching, ) self._client = client or anthropic.AsyncClient( api_key=api_key, base_url=base_url, http_client=httpx.AsyncClient( timeout=5.0, follow_redirects=True, limits=httpx.Limits( max_connections=1000, max_keepalive_connections=100, keepalive_expiry=120, ), ), ) def chat( self, *, chat_ctx: llm.ChatContext, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, fnc_ctx: llm.FunctionContext | None = None, temperature: float | None = None, n: int | None = 1, parallel_tool_calls: bool | None = None, tool_choice: Union[ToolChoice, Literal["auto", "required", "none"]] | None = None, ) -> "LLMStream": if temperature is None: temperature = self._opts.temperature if parallel_tool_calls is None: parallel_tool_calls = self._opts.parallel_tool_calls if tool_choice is None: tool_choice = self._opts.tool_choice opts: dict[str, Any] = dict() if fnc_ctx and len(fnc_ctx.ai_functions) > 0: fncs_desc: list[anthropic.types.ToolParam] = [] for i, fnc in enumerate(fnc_ctx.ai_functions.values()): # caching last tool will cache all the tools if caching is enabled cache_ctrl = ( CACHE_CONTROL_EPHEMERAL if (i == len(fnc_ctx.ai_functions) - 1) and self._opts.caching == "ephemeral" else None ) fncs_desc.append( _build_function_description( fnc, cache_ctrl=cache_ctrl, ) ) opts["tools"] = fncs_desc if tool_choice is not None: anthropic_tool_choice: dict[str, Any] | None = {"type": "auto"} if isinstance(tool_choice, ToolChoice): if tool_choice.type == "function": anthropic_tool_choice = { "type": "tool", "name": tool_choice.name, } elif isinstance(tool_choice, str): if tool_choice == "required": anthropic_tool_choice = {"type": "any"} elif tool_choice == "none": opts["tools"] = [] anthropic_tool_choice = None if anthropic_tool_choice is not None: if parallel_tool_calls is False: anthropic_tool_choice["disable_parallel_tool_use"] = True opts["tool_choice"] = anthropic_tool_choice latest_system_message: anthropic.types.TextBlockParam | None = ( _latest_system_message(chat_ctx, caching=self._opts.caching) ) if latest_system_message: opts["system"] = [latest_system_message] anthropic_ctx = _build_anthropic_context( chat_ctx.messages, id(self), caching=self._opts.caching, ) collaped_anthropic_ctx = _merge_messages(anthropic_ctx) stream = self._client.messages.create( max_tokens=opts.get("max_tokens", 1024), messages=collaped_anthropic_ctx, model=self._opts.model, temperature=temperature or anthropic.NOT_GIVEN, top_k=n or anthropic.NOT_GIVEN, stream=True, **opts, ) return LLMStream( self, anthropic_stream=stream, chat_ctx=chat_ctx, fnc_ctx=fnc_ctx, conn_options=conn_options, )
Helper class that provides a standard way to create an ABC using inheritance.
Create a new instance of Anthropic LLM.
api_key
must be set to your Anthropic API key, either using the argument or by setting theANTHROPIC_API_KEY
environmental variable.model (str | ChatModels): The model to use. Defaults to "claude-3-5-sonnet-20241022". api_key (str | None): The Anthropic API key. Defaults to the ANTHROPIC_API_KEY environment variable. base_url (str | None): The base URL for the Anthropic API. Defaults to None. user (str | None): The user for the Anthropic API. Defaults to None. client (anthropic.AsyncClient | None): The Anthropic client to use. Defaults to None. temperature (float | None): The temperature for the Anthropic API. Defaults to None. parallel_tool_calls (bool | None): Whether to parallelize tool calls. Defaults to None. tool_choice (Union[ToolChoice, Literal["auto", "required", "none"]] | None): The tool choice for the Anthropic API. Defaults to "auto". caching (Literal["ephemeral"] | None): If set to "ephemeral", caching will be enabled for the system prompt, tools, and chat history.
Ancestors
- LLM
- abc.ABC
- EventEmitter
- typing.Generic
Methods
def chat(self,
*,
chat_ctx: llm.ChatContext,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0),
fnc_ctx: llm.FunctionContext | None = None,
temperature: float | None = None,
n: int | None = 1,
parallel_tool_calls: bool | None = None,
tool_choice: "Union[ToolChoice, Literal['auto', 'required', 'none']] | None" = None) ‑> livekit.plugins.anthropic.llm.LLMStream-
Expand source code
def chat( self, *, chat_ctx: llm.ChatContext, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, fnc_ctx: llm.FunctionContext | None = None, temperature: float | None = None, n: int | None = 1, parallel_tool_calls: bool | None = None, tool_choice: Union[ToolChoice, Literal["auto", "required", "none"]] | None = None, ) -> "LLMStream": if temperature is None: temperature = self._opts.temperature if parallel_tool_calls is None: parallel_tool_calls = self._opts.parallel_tool_calls if tool_choice is None: tool_choice = self._opts.tool_choice opts: dict[str, Any] = dict() if fnc_ctx and len(fnc_ctx.ai_functions) > 0: fncs_desc: list[anthropic.types.ToolParam] = [] for i, fnc in enumerate(fnc_ctx.ai_functions.values()): # caching last tool will cache all the tools if caching is enabled cache_ctrl = ( CACHE_CONTROL_EPHEMERAL if (i == len(fnc_ctx.ai_functions) - 1) and self._opts.caching == "ephemeral" else None ) fncs_desc.append( _build_function_description( fnc, cache_ctrl=cache_ctrl, ) ) opts["tools"] = fncs_desc if tool_choice is not None: anthropic_tool_choice: dict[str, Any] | None = {"type": "auto"} if isinstance(tool_choice, ToolChoice): if tool_choice.type == "function": anthropic_tool_choice = { "type": "tool", "name": tool_choice.name, } elif isinstance(tool_choice, str): if tool_choice == "required": anthropic_tool_choice = {"type": "any"} elif tool_choice == "none": opts["tools"] = [] anthropic_tool_choice = None if anthropic_tool_choice is not None: if parallel_tool_calls is False: anthropic_tool_choice["disable_parallel_tool_use"] = True opts["tool_choice"] = anthropic_tool_choice latest_system_message: anthropic.types.TextBlockParam | None = ( _latest_system_message(chat_ctx, caching=self._opts.caching) ) if latest_system_message: opts["system"] = [latest_system_message] anthropic_ctx = _build_anthropic_context( chat_ctx.messages, id(self), caching=self._opts.caching, ) collaped_anthropic_ctx = _merge_messages(anthropic_ctx) stream = self._client.messages.create( max_tokens=opts.get("max_tokens", 1024), messages=collaped_anthropic_ctx, model=self._opts.model, temperature=temperature or anthropic.NOT_GIVEN, top_k=n or anthropic.NOT_GIVEN, stream=True, **opts, ) return LLMStream( self, anthropic_stream=stream, chat_ctx=chat_ctx, fnc_ctx=fnc_ctx, conn_options=conn_options, )
Inherited members
class LLMStream (llm: LLM,
*,
anthropic_stream: Awaitable[anthropic.AsyncStream[anthropic.types.RawMessageStreamEvent]],
chat_ctx: llm.ChatContext,
fnc_ctx: llm.FunctionContext | None,
conn_options: APIConnectOptions)-
Expand source code
class LLMStream(llm.LLMStream): def __init__( self, llm: LLM, *, anthropic_stream: Awaitable[ anthropic.AsyncStream[anthropic.types.RawMessageStreamEvent] ], chat_ctx: llm.ChatContext, fnc_ctx: llm.FunctionContext | None, conn_options: APIConnectOptions, ) -> None: super().__init__( llm, chat_ctx=chat_ctx, fnc_ctx=fnc_ctx, conn_options=conn_options ) self._awaitable_anthropic_stream = anthropic_stream self._anthropic_stream: ( anthropic.AsyncStream[anthropic.types.RawMessageStreamEvent] | None ) = None # current function call that we're waiting for full completion (args are streamed) self._tool_call_id: str | None = None self._fnc_name: str | None = None self._fnc_raw_arguments: str | None = None self._request_id: str = "" self._ignoring_cot = False # ignore chain of thought self._input_tokens = 0 self._cache_creation_tokens = 0 self._cache_read_tokens = 0 self._output_tokens = 0 async def _run(self) -> None: retryable = True try: if not self._anthropic_stream: self._anthropic_stream = await self._awaitable_anthropic_stream async with self._anthropic_stream as stream: async for event in stream: chat_chunk = self._parse_event(event) if chat_chunk is not None: self._event_ch.send_nowait(chat_chunk) retryable = False self._event_ch.send_nowait( llm.ChatChunk( request_id=self._request_id, usage=llm.CompletionUsage( completion_tokens=self._output_tokens, prompt_tokens=self._input_tokens, total_tokens=self._input_tokens + self._output_tokens + self._cache_creation_tokens + self._cache_read_tokens, cache_creation_input_tokens=self._cache_creation_tokens, cache_read_input_tokens=self._cache_read_tokens, ), ) ) except anthropic.APITimeoutError: raise APITimeoutError(retryable=retryable) except anthropic.APIStatusError as e: raise APIStatusError( e.message, status_code=e.status_code, request_id=e.request_id, body=e.body, ) except Exception as e: raise APIConnectionError(retryable=retryable) from e def _parse_event( self, event: anthropic.types.RawMessageStreamEvent ) -> llm.ChatChunk | None: if event.type == "message_start": self._request_id = event.message.id self._input_tokens = event.message.usage.input_tokens self._output_tokens = event.message.usage.output_tokens if event.message.usage.cache_creation_input_tokens: self._cache_creation_tokens = ( event.message.usage.cache_creation_input_tokens ) if event.message.usage.cache_read_input_tokens: self._cache_read_tokens = event.message.usage.cache_read_input_tokens elif event.type == "message_delta": self._output_tokens += event.usage.output_tokens elif event.type == "content_block_start": if event.content_block.type == "tool_use": self._tool_call_id = event.content_block.id self._fnc_name = event.content_block.name self._fnc_raw_arguments = "" elif event.type == "content_block_delta": delta = event.delta if delta.type == "text_delta": text = delta.text if self._fnc_ctx is not None: # anthropic may inject COC when using functions if text.startswith("<thinking>"): self._ignoring_cot = True elif self._ignoring_cot and "</thinking>" in text: text = text.split("</thinking>")[-1] self._ignoring_cot = False if self._ignoring_cot: return None return llm.ChatChunk( request_id=self._request_id, choices=[ llm.Choice( delta=llm.ChoiceDelta(content=text, role="assistant") ) ], ) elif delta.type == "input_json_delta": assert self._fnc_raw_arguments is not None self._fnc_raw_arguments += delta.partial_json elif event.type == "content_block_stop": if self._tool_call_id is not None and self._fnc_ctx: assert self._fnc_name is not None assert self._fnc_raw_arguments is not None fnc_info = _create_ai_function_info( self._fnc_ctx, self._tool_call_id, self._fnc_name, self._fnc_raw_arguments, ) self._function_calls_info.append(fnc_info) chat_chunk = llm.ChatChunk( request_id=self._request_id, choices=[ llm.Choice( delta=llm.ChoiceDelta( role="assistant", tool_calls=[fnc_info] ), ) ], ) self._tool_call_id = self._fnc_raw_arguments = self._fnc_name = None return chat_chunk return None
Helper class that provides a standard way to create an ABC using inheritance.
Ancestors
- LLMStream
- abc.ABC
Inherited members