Module livekit.agents.llm.fallback_adapter
Classes
class AvailabilityChangedEvent (llm: LLM, available: bool)
-
Expand source code
@dataclass class AvailabilityChangedEvent: llm: LLM available: bool
AvailabilityChangedEvent(llm: 'LLM', available: 'bool')
Class variables
var available : bool
var llm : LLM
class FallbackAdapter (llm: list[LLM],
*,
attempt_timeout: float = 10.0,
max_retry_per_llm: int = 1,
retry_interval: float = 5)-
Expand source code
class FallbackAdapter( LLM[Literal["llm_availability_changed"]], ): def __init__( self, llm: list[LLM], *, attempt_timeout: float = 10.0, max_retry_per_llm: int = 1, retry_interval: float = 5, ) -> None: if len(llm) < 1: raise ValueError("at least one LLM instance must be provided.") super().__init__() self._llm_instances = llm self._attempt_timeout = attempt_timeout self._max_retry_per_llm = max_retry_per_llm self._retry_interval = retry_interval self._status = [ _LLMStatus(available=True, recovering_task=None) for _ in self._llm_instances ] def chat( self, *, chat_ctx: ChatContext, conn_options: APIConnectOptions = DEFAULT_FALLBACK_API_CONNECT_OPTIONS, fnc_ctx: FunctionContext | None = None, temperature: float | None = None, n: int | None = 1, parallel_tool_calls: bool | None = None, tool_choice: Union[ToolChoice, Literal["auto", "required", "none"]] | None = None, ) -> "LLMStream": return FallbackLLMStream( llm=self, conn_options=conn_options, chat_ctx=chat_ctx, fnc_ctx=fnc_ctx, temperature=temperature, n=n, parallel_tool_calls=parallel_tool_calls, tool_choice=tool_choice, )
Helper class that provides a standard way to create an ABC using inheritance.
Ancestors
- LLM
- abc.ABC
- EventEmitter
- typing.Generic
Methods
def chat(self,
*,
chat_ctx: ChatContext,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=0, retry_interval=5.0, timeout=10.0),
fnc_ctx: FunctionContext | None = None,
temperature: float | None = None,
n: int | None = 1,
parallel_tool_calls: bool | None = None,
tool_choice: "Union[ToolChoice, Literal['auto', 'required', 'none']] | None" = None) ‑> LLMStream-
Expand source code
def chat( self, *, chat_ctx: ChatContext, conn_options: APIConnectOptions = DEFAULT_FALLBACK_API_CONNECT_OPTIONS, fnc_ctx: FunctionContext | None = None, temperature: float | None = None, n: int | None = 1, parallel_tool_calls: bool | None = None, tool_choice: Union[ToolChoice, Literal["auto", "required", "none"]] | None = None, ) -> "LLMStream": return FallbackLLMStream( llm=self, conn_options=conn_options, chat_ctx=chat_ctx, fnc_ctx=fnc_ctx, temperature=temperature, n=n, parallel_tool_calls=parallel_tool_calls, tool_choice=tool_choice, )
Inherited members
class FallbackLLMStream (*,
llm: FallbackAdapter,
conn_options: APIConnectOptions,
chat_ctx: ChatContext,
fnc_ctx: FunctionContext | None,
temperature: float | None,
n: int | None,
parallel_tool_calls: bool | None,
tool_choice: "Union[ToolChoice, Literal['auto', 'required', 'none']] | None" = None)-
Expand source code
class FallbackLLMStream(LLMStream): def __init__( self, *, llm: FallbackAdapter, conn_options: APIConnectOptions, chat_ctx: ChatContext, fnc_ctx: FunctionContext | None, temperature: float | None, n: int | None, parallel_tool_calls: bool | None, tool_choice: Union[ToolChoice, Literal["auto", "required", "none"]] | None = None, ) -> None: super().__init__( llm, chat_ctx=chat_ctx, fnc_ctx=fnc_ctx, conn_options=conn_options ) self._fallback_adapter = llm self._temperature = temperature self._n = n self._parallel_tool_calls = parallel_tool_calls self._tool_choice = tool_choice async def _try_generate( self, *, llm: LLM, recovering: bool = False ) -> AsyncIterable[ChatChunk]: try: async with llm.chat( chat_ctx=self._chat_ctx, fnc_ctx=self._fnc_ctx, temperature=self._temperature, n=self._n, parallel_tool_calls=self._parallel_tool_calls, tool_choice=self._tool_choice, conn_options=dataclasses.replace( self._conn_options, max_retry=self._fallback_adapter._max_retry_per_llm, timeout=self._fallback_adapter._attempt_timeout, retry_interval=self._fallback_adapter._retry_interval, ), ) as stream: async for chunk in stream: yield chunk except asyncio.TimeoutError: if recovering: logger.warning(f"{llm.label} recovery timed out") raise logger.warning( f"{llm.label} timed out, switching to next LLM", ) raise except APIError as e: if recovering: logger.warning( f"{llm.label} recovery failed", exc_info=e, ) raise logger.warning( f"{llm.label} failed, switching to next LLM", exc_info=e, ) raise except Exception: if recovering: logger.exception( f"{llm.label} recovery unexpected error", ) raise logger.exception( f"{llm.label} unexpected error, switching to next LLM", ) raise def _try_recovery(self, llm: LLM) -> None: llm_status = self._fallback_adapter._status[ self._fallback_adapter._llm_instances.index(llm) ] if llm_status.recovering_task is None or llm_status.recovering_task.done(): async def _recover_llm_task(llm: LLM) -> None: try: async for _ in self._try_generate(llm=llm, recovering=True): pass llm_status.available = True logger.info(f"llm.FallbackAdapter, {llm.label} recovered") self._fallback_adapter.emit( "llm_availability_changed", AvailabilityChangedEvent(llm=llm, available=True), ) except Exception: return llm_status.recovering_task = asyncio.create_task(_recover_llm_task(llm)) async def _run(self) -> None: start_time = time.time() all_failed = all( not llm_status.available for llm_status in self._fallback_adapter._status ) if all_failed: logger.error("all LLMs are unavailable, retrying..") for i, llm in enumerate(self._fallback_adapter._llm_instances): llm_status = self._fallback_adapter._status[i] if llm_status.available or all_failed: chunk_sent = False try: async for synthesized_audio in self._try_generate( llm=llm, recovering=False ): chunk_sent = True self._event_ch.send_nowait(synthesized_audio) return except Exception: # exceptions already logged inside _try_synthesize if llm_status.available: llm_status.available = False self._fallback_adapter.emit( "llm_availability_changed", AvailabilityChangedEvent(llm=llm, available=False), ) if chunk_sent: raise self._try_recovery(llm) raise APIConnectionError( "all LLMs failed (%s) after %s seconds" % ( [llm.label for llm in self._fallback_adapter._llm_instances], time.time() - start_time, ) )
Helper class that provides a standard way to create an ABC using inheritance.
Ancestors
- LLMStream
- abc.ABC
Inherited members