Module livekit.plugins.anthropic
Classes
class LLM (*, model: str | ChatModels = 'claude-3-haiku-20240307', api_key: str | None = None, base_url: str | None = None, user: str | None = None, client: anthropic.AsyncClient | None = None, temperature: float | None = None)
-
Helper class that provides a standard way to create an ABC using inheritance.
Create a new instance of Anthropic LLM.
api_key
must be set to your Anthropic API key, either using the argument or by setting theANTHROPIC_API_KEY
environmental variable.Expand source code
class LLM(llm.LLM): def __init__( self, *, model: str | ChatModels = "claude-3-haiku-20240307", api_key: str | None = None, base_url: str | None = None, user: str | None = None, client: anthropic.AsyncClient | None = None, temperature: float | None = None, ) -> None: """ Create a new instance of Anthropic LLM. ``api_key`` must be set to your Anthropic API key, either using the argument or by setting the ``ANTHROPIC_API_KEY`` environmental variable. """ super().__init__() # throw an error on our end api_key = api_key or os.environ.get("ANTHROPIC_API_KEY") if api_key is None: raise ValueError("Anthropic API key is required") self._opts = LLMOptions(model=model, user=user, temperature=temperature) self._client = client or anthropic.AsyncClient( api_key=api_key, base_url=base_url, http_client=httpx.AsyncClient( timeout=5.0, follow_redirects=True, limits=httpx.Limits( max_connections=1000, max_keepalive_connections=100, keepalive_expiry=120, ), ), ) def chat( self, *, chat_ctx: llm.ChatContext, fnc_ctx: llm.FunctionContext | None = None, temperature: float | None = None, n: int | None = 1, parallel_tool_calls: bool | None = None, ) -> "LLMStream": if temperature is None: temperature = self._opts.temperature opts: dict[str, Any] = dict() if fnc_ctx and len(fnc_ctx.ai_functions) > 0: fncs_desc: list[anthropic.types.ToolParam] = [] for fnc in fnc_ctx.ai_functions.values(): fncs_desc.append(_build_function_description(fnc)) opts["tools"] = fncs_desc if fnc_ctx and parallel_tool_calls is not None: opts["parallel_tool_calls"] = parallel_tool_calls latest_system_message = _latest_system_message(chat_ctx) anthropic_ctx = _build_anthropic_context(chat_ctx.messages, id(self)) collaped_anthropic_ctx = _merge_messages(anthropic_ctx) stream = self._client.messages.create( max_tokens=opts.get("max_tokens", 1024), system=latest_system_message, messages=collaped_anthropic_ctx, model=self._opts.model, temperature=temperature or anthropic.NOT_GIVEN, top_k=n or anthropic.NOT_GIVEN, stream=True, **opts, ) return LLMStream( self, anthropic_stream=stream, chat_ctx=chat_ctx, fnc_ctx=fnc_ctx )
Ancestors
- LLM
- abc.ABC
- EventEmitter
- typing.Generic
Methods
def chat(self, *, chat_ctx: llm.ChatContext, fnc_ctx: llm.FunctionContext | None = None, temperature: float | None = None, n: int | None = 1, parallel_tool_calls: bool | None = None) ‑> livekit.plugins.anthropic.llm.LLMStream
Inherited members
class LLMStream (llm: LLM, *, anthropic_stream: Awaitable[anthropic.AsyncStream[anthropic.types.RawMessageStreamEvent]], chat_ctx: llm.ChatContext, fnc_ctx: llm.FunctionContext | None)
-
Helper class that provides a standard way to create an ABC using inheritance.
Expand source code
class LLMStream(llm.LLMStream): def __init__( self, llm: LLM, *, anthropic_stream: Awaitable[ anthropic.AsyncStream[anthropic.types.RawMessageStreamEvent] ], chat_ctx: llm.ChatContext, fnc_ctx: llm.FunctionContext | None, ) -> None: super().__init__(llm, chat_ctx=chat_ctx, fnc_ctx=fnc_ctx) self._awaitable_anthropic_stream = anthropic_stream self._anthropic_stream: ( anthropic.AsyncStream[anthropic.types.RawMessageStreamEvent] | None ) = None # current function call that we're waiting for full completion (args are streamed) self._tool_call_id: str | None = None self._fnc_name: str | None = None self._fnc_raw_arguments: str | None = None self._request_id: str = "" self._ignoring_cot = False # ignore chain of thought self._input_tokens = 0 self._output_tokens = 0 async def _main_task(self) -> None: try: if not self._anthropic_stream: self._anthropic_stream = await self._awaitable_anthropic_stream async with self._anthropic_stream as stream: async for event in stream: chat_chunk = self._parse_event(event) if chat_chunk is not None: self._event_ch.send_nowait(chat_chunk) self._event_ch.send_nowait( llm.ChatChunk( request_id=self._request_id, usage=llm.CompletionUsage( completion_tokens=self._output_tokens, prompt_tokens=self._input_tokens, total_tokens=self._input_tokens + self._output_tokens, ), ) ) except anthropic.APITimeoutError: raise APITimeoutError() except anthropic.APIStatusError as e: raise APIStatusError( e.message, status_code=e.status_code, request_id=e.request_id, body=e.body, ) except Exception as e: raise APIConnectionError() from e def _parse_event( self, event: anthropic.types.RawMessageStreamEvent ) -> llm.ChatChunk | None: if event.type == "message_start": self._request_id = event.message.id self._input_tokens = event.message.usage.input_tokens self._output_tokens = event.message.usage.output_tokens elif event.type == "message_delta": self._output_tokens += event.usage.output_tokens elif event.type == "content_block_start": if event.content_block.type == "tool_use": self._tool_call_id = event.content_block.id self._fnc_name = event.content_block.name self._fnc_raw_arguments = "" elif event.type == "content_block_delta": delta = event.delta if delta.type == "text_delta": text = delta.text if self._fnc_ctx is not None: # anthropic may inject COC when using functions if text.startswith("<thinking>"): self._ignoring_cot = True elif self._ignoring_cot and "</thinking>" in text: text = text.split("</thinking>")[-1] self._ignoring_cot = False if self._ignoring_cot: return None return llm.ChatChunk( request_id=self._request_id, choices=[ llm.Choice( delta=llm.ChoiceDelta(content=text, role="assistant") ) ], ) elif delta.type == "input_json_delta": assert self._fnc_raw_arguments is not None self._fnc_raw_arguments += delta.partial_json elif event.type == "content_block_stop": if self._tool_call_id is not None and self._fnc_ctx: assert self._fnc_name is not None assert self._fnc_raw_arguments is not None fnc_info = _create_ai_function_info( self._fnc_ctx, self._tool_call_id, self._fnc_name, self._fnc_raw_arguments, ) self._function_calls_info.append(fnc_info) chat_chunk = llm.ChatChunk( request_id=self._request_id, choices=[ llm.Choice( delta=llm.ChoiceDelta( role="assistant", tool_calls=[fnc_info] ), ) ], ) self._tool_call_id = self._fnc_raw_arguments = self._fnc_name = None return chat_chunk return None
Ancestors
- LLMStream
- abc.ABC
Inherited members