Module livekit.agents.inference
Sub-modules
livekit.agents.inference.interruptionlivekit.agents.inference.llmlivekit.agents.inference.sttlivekit.agents.inference.tts
Classes
class AdaptiveInterruptionDetector (*,
threshold: float = 0.5,
min_interruption_duration: float = 0.05,
max_audio_duration: float = 3,
audio_prefix_duration: float = 1.0,
detection_interval: float = 0.1,
inference_timeout: float = 0.7,
base_url: str | None = None,
api_key: str | None = None,
api_secret: str | None = None,
http_session: aiohttp.ClientSession | None = None)-
Expand source code
class AdaptiveInterruptionDetector( rtc.EventEmitter[ Literal[ "overlapping_speech", "error", "metrics_collected", ] ], ): def __init__( self, *, threshold: float = THRESHOLD, min_interruption_duration: float = MIN_INTERRUPTION_DURATION, max_audio_duration: float = MAX_AUDIO_DURATION, audio_prefix_duration: float = AUDIO_PREFIX_DURATION, detection_interval: float = DETECTION_INTERVAL, inference_timeout: float = REMOTE_INFERENCE_TIMEOUT, base_url: str | None = None, api_key: str | None = None, api_secret: str | None = None, http_session: aiohttp.ClientSession | None = None, ) -> None: """ Initialize a AdaptiveInterruptionDetector instance. Args: threshold (float, optional): The threshold for the interruption detection, defaults to 0.5. min_interruption_duration (float, optional): The minimum duration, in seconds, of the interruption event, defaults to 50ms. max_audio_duration (float, optional): The maximum audio duration, including the audio prefix, in seconds, for the interruption detection, defaults to 3s. audio_prefix_duration (float, optional): The audio prefix duration, in seconds, for the interruption detection, defaults to 0.5s. detection_interval (float, optional): The interval between detections, in seconds, for the interruption detection, defaults to 0.1s. inference_timeout (float, optional): The timeout for the interruption detection, defaults to 1 second. base_url (str, optional): The base URL for the interruption detection, defaults to the shared LIVEKIT_REMOTE_EOT_URL environment variable. api_key (str, optional): The API key for the interruption detection, defaults to the LIVEKIT_INFERENCE_API_KEY environment variable. api_secret (str, optional): The API secret for the interruption detection, defaults to the LIVEKIT_INFERENCE_API_SECRET environment variable. http_session (aiohttp.ClientSession, optional): The HTTP session to use for the interruption detection. """ super().__init__() if max_audio_duration > 3.0: raise ValueError("max_audio_duration must be less than or equal to 3.0 seconds") lk_base_url = ( base_url if base_url else os.getenv("LIVEKIT_REMOTE_EOT_URL", get_default_inference_url()) ) lk_api_key: str = api_key if api_key else "" lk_api_secret: str = api_secret if api_secret else "" # use LiveKit credentials if using the inference service (production or staging) is_inference_url = lk_base_url in (DEFAULT_INFERENCE_URL, STAGING_INFERENCE_URL) if is_inference_url: lk_api_key = ( api_key if api_key else os.getenv("LIVEKIT_INFERENCE_API_KEY", os.getenv("LIVEKIT_API_KEY", "")) ) if not lk_api_key: raise ValueError( "api_key is required, either as argument or set LIVEKIT_API_KEY environmental variable" ) lk_api_secret = ( api_secret if api_secret else os.getenv("LIVEKIT_INFERENCE_API_SECRET", os.getenv("LIVEKIT_API_SECRET", "")) ) if not lk_api_secret: raise ValueError( "api_secret is required, either as argument or set LIVEKIT_API_SECRET environmental variable" ) use_proxy = True else: use_proxy = False self._opts = InterruptionOptions( sample_rate=SAMPLE_RATE, threshold=threshold, min_frames=math.ceil(min_interruption_duration * _FRAMES_PER_SECOND), max_audio_duration=max_audio_duration, audio_prefix_duration=audio_prefix_duration, detection_interval=detection_interval, inference_timeout=inference_timeout, base_url=lk_base_url, api_key=lk_api_key, api_secret=lk_api_secret, use_proxy=use_proxy, ) self._label = f"{type(self).__module__}.{type(self).__name__}" self._sample_rate = SAMPLE_RATE self._session = http_session self._streams = weakref.WeakSet[InterruptionHttpStream | InterruptionWebSocketStream]() logger.info( "adaptive interruption detector initialized", extra={ "base_url": self._opts.base_url, "detection_interval": self._opts.detection_interval, "audio_prefix_duration": self._opts.audio_prefix_duration, "max_audio_duration": self._opts.max_audio_duration, "min_frames": self._opts.min_frames, "threshold": self._opts.threshold, "inference_timeout": self._opts.inference_timeout, "use_proxy": self._opts.use_proxy, }, ) @property def model(self) -> str: return "adaptive interruption" @property def provider(self) -> str: return "livekit" @property def label(self) -> str: return self._label @property def sample_rate(self) -> int: return self._sample_rate def _emit_error(self, api_error: Exception, recoverable: bool) -> None: self.emit( "error", InterruptionDetectionError( label=self._label, error=api_error, recoverable=recoverable, ), ) def _ensure_session(self) -> aiohttp.ClientSession: if not self._session: self._session = http_context.http_session() return self._session def stream( self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS ) -> InterruptionHttpStream | InterruptionWebSocketStream: try: stream: InterruptionHttpStream | InterruptionWebSocketStream if self._opts.use_proxy: stream = InterruptionWebSocketStream(model=self, conn_options=conn_options) else: stream = InterruptionHttpStream(model=self, conn_options=conn_options) except Exception as e: self._emit_error(e, recoverable=False) raise self._streams.add(stream) return stream def update_options( self, *, threshold: NotGivenOr[float] = NOT_GIVEN, min_interruption_duration: NotGivenOr[float] = NOT_GIVEN, ) -> None: if is_given(threshold): self._opts.threshold = threshold if is_given(min_interruption_duration): self._opts.min_frames = math.ceil(min_interruption_duration * _FRAMES_PER_SECOND) for stream in self._streams: stream.update_options( threshold=threshold, min_interruption_duration=min_interruption_duration )Abstract base class for generic types.
On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::
class Mapping[KT, VT]: def __getitem__(self, key: KT) -> VT: ... # Etc.On older versions of Python, however, generic classes have to explicitly inherit from Generic.
After a class has been declared to be generic, it can then be used as follows::
def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return defaultInitialize a AdaptiveInterruptionDetector instance.
Args
threshold:float, optional- The threshold for the interruption detection, defaults to 0.5.
min_interruption_duration:float, optional- The minimum duration, in seconds, of the interruption event, defaults to 50ms.
max_audio_duration:float, optional- The maximum audio duration, including the audio prefix, in seconds, for the interruption detection, defaults to 3s.
audio_prefix_duration:float, optional- The audio prefix duration, in seconds, for the interruption detection, defaults to 0.5s.
detection_interval:float, optional- The interval between detections, in seconds, for the interruption detection, defaults to 0.1s.
inference_timeout:float, optional- The timeout for the interruption detection, defaults to 1 second.
base_url:str, optional- The base URL for the interruption detection, defaults to the shared LIVEKIT_REMOTE_EOT_URL environment variable.
api_key:str, optional- The API key for the interruption detection, defaults to the LIVEKIT_INFERENCE_API_KEY environment variable.
api_secret:str, optional- The API secret for the interruption detection, defaults to the LIVEKIT_INFERENCE_API_SECRET environment variable.
http_session:aiohttp.ClientSession, optional- The HTTP session to use for the interruption detection.
Ancestors
- EventEmitter
- typing.Generic
Instance variables
prop label : str-
Expand source code
@property def label(self) -> str: return self._label prop model : str-
Expand source code
@property def model(self) -> str: return "adaptive interruption" prop provider : str-
Expand source code
@property def provider(self) -> str: return "livekit" prop sample_rate : int-
Expand source code
@property def sample_rate(self) -> int: return self._sample_rate
Methods
def stream(self,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> InterruptionHttpStream | InterruptionWebSocketStream-
Expand source code
def stream( self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS ) -> InterruptionHttpStream | InterruptionWebSocketStream: try: stream: InterruptionHttpStream | InterruptionWebSocketStream if self._opts.use_proxy: stream = InterruptionWebSocketStream(model=self, conn_options=conn_options) else: stream = InterruptionHttpStream(model=self, conn_options=conn_options) except Exception as e: self._emit_error(e, recoverable=False) raise self._streams.add(stream) return stream def update_options(self,
*,
threshold: NotGivenOr[float] = NOT_GIVEN,
min_interruption_duration: NotGivenOr[float] = NOT_GIVEN) ‑> None-
Expand source code
def update_options( self, *, threshold: NotGivenOr[float] = NOT_GIVEN, min_interruption_duration: NotGivenOr[float] = NOT_GIVEN, ) -> None: if is_given(threshold): self._opts.threshold = threshold if is_given(min_interruption_duration): self._opts.min_frames = math.ceil(min_interruption_duration * _FRAMES_PER_SECOND) for stream in self._streams: stream.update_options( threshold=threshold, min_interruption_duration=min_interruption_duration )
Inherited members
class InterruptionDetectionError (**data: Any)-
Expand source code
class InterruptionDetectionError(BaseModel): model_config = ConfigDict(arbitrary_types_allowed=True) type: Literal["interruption_detection_error"] = "interruption_detection_error" timestamp: float = Field(default_factory=time.time) label: str error: Exception = Field(..., exclude=True) recoverable: boolUsage Documentation
A base class for creating Pydantic models.
Attributes
__class_vars__- The names of the class variables defined on the model.
__private_attributes__- Metadata about the private attributes of the model.
__signature__- The synthesized
__init__[Signature][inspect.Signature] of the model. __pydantic_complete__- Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__- The core schema of the model.
__pydantic_custom_init__- Whether the model has a custom
__init__function. __pydantic_decorators__- Metadata containing the decorators defined on the model.
This replaces
Model.__validators__andModel.__root_validators__from Pydantic V1. __pydantic_generic_metadata__- A dictionary containing metadata about generic Pydantic models.
The
originandargsitems map to the [__origin__][genericalias.origin] and [__args__][genericalias.args] attributes of [generic aliases][types-genericalias], and theparameteritem maps to the__parameter__attribute of generic classes. __pydantic_parent_namespace__- Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__- The name of the post-init method for the model, if defined.
__pydantic_root_model__- Whether the model is a [
RootModel][pydantic.root_model.RootModel]. __pydantic_serializer__- The
pydantic-coreSchemaSerializerused to dump instances of the model. __pydantic_validator__- The
pydantic-coreSchemaValidatorused to validate instances of the model. __pydantic_fields__- A dictionary of field names and their corresponding [
FieldInfo][pydantic.fields.FieldInfo] objects. __pydantic_computed_fields__- A dictionary of computed field names and their corresponding [
ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects. __pydantic_extra__- A dictionary containing extra values, if [
extra][pydantic.config.ConfigDict.extra] is set to'allow'. __pydantic_fields_set__- The names of fields explicitly set during instantiation.
__pydantic_private__- Values of private attributes set on the model instance.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Ancestors
- pydantic.main.BaseModel
Class variables
var error : Exceptionvar label : strvar model_configvar recoverable : boolvar timestamp : floatvar type : Literal['interruption_detection_error']
class LLM (model: LLMModels | str,
*,
provider: str | None = None,
base_url: str | None = None,
api_key: str | None = None,
api_secret: str | None = None,
inference_class: InferenceClass | None = None,
extra_kwargs: ChatCompletionOptions | dict[str, Any] | None = None)-
Expand source code
class LLM(llm.LLM): def __init__( self, model: LLMModels | str, *, provider: str | None = None, base_url: str | None = None, api_key: str | None = None, api_secret: str | None = None, inference_class: InferenceClass | None = None, extra_kwargs: ChatCompletionOptions | dict[str, Any] | None = None, ) -> None: super().__init__() lk_base_url = base_url if base_url else get_default_inference_url() lk_api_key = ( api_key if api_key else os.getenv("LIVEKIT_INFERENCE_API_KEY", os.getenv("LIVEKIT_API_KEY", "")) ) if not lk_api_key: raise ValueError( "api_key is required, either as argument or set LIVEKIT_API_KEY environmental variable" ) lk_api_secret = ( api_secret if api_secret else os.getenv("LIVEKIT_INFERENCE_API_SECRET", os.getenv("LIVEKIT_API_SECRET", "")) ) if not lk_api_secret: raise ValueError( "api_secret is required, either as argument or set LIVEKIT_API_SECRET environmental variable" ) self._opts = _LLMOptions( model=model, provider=provider, base_url=lk_base_url, api_key=lk_api_key, api_secret=lk_api_secret, inference_class=inference_class, extra_kwargs=extra_kwargs or {}, ) self._client = openai.AsyncClient( api_key=create_access_token(self._opts.api_key, self._opts.api_secret), base_url=self._opts.base_url, http_client=httpx.AsyncClient( timeout=httpx.Timeout(connect=15.0, read=5.0, write=5.0, pool=5.0), follow_redirects=True, limits=httpx.Limits( max_connections=50, max_keepalive_connections=50, keepalive_expiry=120 ), ), ) async def aclose(self) -> None: await self._client.close() @classmethod def from_model_string(cls, model: str) -> LLM: """Create a LLM instance from a model string""" return cls(model) def update_options( self, *, model: NotGivenOr[LLMModels | str] = NOT_GIVEN, extra_kwargs: NotGivenOr[ChatCompletionOptions | dict[str, Any]] = NOT_GIVEN, ) -> None: """Update LLM configuration options. Each option is read on the next ``chat()`` call, so a swap takes effect on the agent's next turn without recreating the LLM. ``extra_kwargs`` *replaces* the persistent kwargs dict rather than merging — pass ``{}`` to clear it. """ if is_given(model): self._opts.model = model if is_given(extra_kwargs): self._opts.extra_kwargs = dict(extra_kwargs) @property def model(self) -> str: """Get the model name for this LLM instance.""" return self._opts.model @property def provider(self) -> str: return "livekit" def chat( self, *, chat_ctx: ChatContext, tools: list[Tool] | None = None, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN, tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN, response_format: NotGivenOr[ completion_create_params.ResponseFormat | type[llm_utils.ResponseFormatT] ] = NOT_GIVEN, inference_class: NotGivenOr[InferenceClass] = NOT_GIVEN, extra_kwargs: NotGivenOr[dict[str, Any]] = NOT_GIVEN, ) -> LLMStream: extra = {} if is_given(extra_kwargs): extra.update(extra_kwargs) parallel_tool_calls = ( parallel_tool_calls if is_given(parallel_tool_calls) else self._opts.extra_kwargs.get("parallel_tool_calls", NOT_GIVEN) ) if is_given(parallel_tool_calls): extra["parallel_tool_calls"] = parallel_tool_calls extra_tool_choice = self._opts.extra_kwargs.get("tool_choice", NOT_GIVEN) tool_choice = tool_choice if is_given(tool_choice) else extra_tool_choice if is_given(tool_choice): oai_tool_choice: ChatCompletionToolChoiceOptionParam if isinstance(tool_choice, dict): oai_tool_choice = { "type": "function", "function": {"name": tool_choice["function"]["name"]}, } extra["tool_choice"] = oai_tool_choice elif tool_choice in ("auto", "required", "none"): oai_tool_choice = tool_choice extra["tool_choice"] = oai_tool_choice if is_given(response_format): extra["response_format"] = llm_utils.to_openai_response_format(response_format) # type: ignore extra.update(self._opts.extra_kwargs) effective_inference_class = ( inference_class if is_given(inference_class) else self._opts.inference_class ) self._client.api_key = create_access_token(self._opts.api_key, self._opts.api_secret) return LLMStream( self, model=self._opts.model, provider=self._opts.provider, inference_class=effective_inference_class, strict_tool_schema=True, client=self._client, chat_ctx=chat_ctx, tools=tools or [], conn_options=conn_options, extra_kwargs=extra, )Helper class that provides a standard way to create an ABC using inheritance.
Ancestors
- livekit.agents.llm.llm.LLM
- abc.ABC
- EventEmitter
- typing.Generic
Static methods
def from_model_string(model: str) ‑> LLM-
Create a LLM instance from a model string
Instance variables
prop model : str-
Expand source code
@property def model(self) -> str: """Get the model name for this LLM instance.""" return self._opts.modelGet the model name for this LLM instance.
prop provider : str-
Expand source code
@property def provider(self) -> str: return "livekit"Get the provider name/identifier for this LLM instance.
Returns
The provider name if available, "unknown" otherwise.
Note
Plugins should override this property to provide their provider information.
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: await self._client.close() def chat(self,
*,
chat_ctx: ChatContext,
tools: list[Tool] | None = None,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0),
parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN,
tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN,
response_format: NotGivenOr[completion_create_params.ResponseFormat | type[llm_utils.ResponseFormatT]] = NOT_GIVEN,
inference_class: NotGivenOr[InferenceClass] = NOT_GIVEN,
extra_kwargs: NotGivenOr[dict[str, Any]] = NOT_GIVEN) ‑> LLMStream-
Expand source code
def chat( self, *, chat_ctx: ChatContext, tools: list[Tool] | None = None, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN, tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN, response_format: NotGivenOr[ completion_create_params.ResponseFormat | type[llm_utils.ResponseFormatT] ] = NOT_GIVEN, inference_class: NotGivenOr[InferenceClass] = NOT_GIVEN, extra_kwargs: NotGivenOr[dict[str, Any]] = NOT_GIVEN, ) -> LLMStream: extra = {} if is_given(extra_kwargs): extra.update(extra_kwargs) parallel_tool_calls = ( parallel_tool_calls if is_given(parallel_tool_calls) else self._opts.extra_kwargs.get("parallel_tool_calls", NOT_GIVEN) ) if is_given(parallel_tool_calls): extra["parallel_tool_calls"] = parallel_tool_calls extra_tool_choice = self._opts.extra_kwargs.get("tool_choice", NOT_GIVEN) tool_choice = tool_choice if is_given(tool_choice) else extra_tool_choice if is_given(tool_choice): oai_tool_choice: ChatCompletionToolChoiceOptionParam if isinstance(tool_choice, dict): oai_tool_choice = { "type": "function", "function": {"name": tool_choice["function"]["name"]}, } extra["tool_choice"] = oai_tool_choice elif tool_choice in ("auto", "required", "none"): oai_tool_choice = tool_choice extra["tool_choice"] = oai_tool_choice if is_given(response_format): extra["response_format"] = llm_utils.to_openai_response_format(response_format) # type: ignore extra.update(self._opts.extra_kwargs) effective_inference_class = ( inference_class if is_given(inference_class) else self._opts.inference_class ) self._client.api_key = create_access_token(self._opts.api_key, self._opts.api_secret) return LLMStream( self, model=self._opts.model, provider=self._opts.provider, inference_class=effective_inference_class, strict_tool_schema=True, client=self._client, chat_ctx=chat_ctx, tools=tools or [], conn_options=conn_options, extra_kwargs=extra, ) def update_options(self,
*,
model: NotGivenOr[LLMModels | str] = NOT_GIVEN,
extra_kwargs: NotGivenOr[ChatCompletionOptions | dict[str, Any]] = NOT_GIVEN) ‑> None-
Expand source code
def update_options( self, *, model: NotGivenOr[LLMModels | str] = NOT_GIVEN, extra_kwargs: NotGivenOr[ChatCompletionOptions | dict[str, Any]] = NOT_GIVEN, ) -> None: """Update LLM configuration options. Each option is read on the next ``chat()`` call, so a swap takes effect on the agent's next turn without recreating the LLM. ``extra_kwargs`` *replaces* the persistent kwargs dict rather than merging — pass ``{}`` to clear it. """ if is_given(model): self._opts.model = model if is_given(extra_kwargs): self._opts.extra_kwargs = dict(extra_kwargs)Update LLM configuration options.
Each option is read on the next
chat()call, so a swap takes effect on the agent's next turn without recreating the LLM.extra_kwargsreplaces the persistent kwargs dict rather than merging — pass{}to clear it.
Inherited members
class LLMStream (llm_v: LLM | LLM,
*,
model: LLMModels | str,
provider: str | None = None,
inference_class: InferenceClass | None = None,
strict_tool_schema: bool,
client: openai.AsyncClient,
chat_ctx: llm.ChatContext,
tools: list[Tool],
conn_options: APIConnectOptions,
extra_kwargs: dict[str, Any],
provider_fmt: str = 'openai')-
Expand source code
class LLMStream(llm.LLMStream): def __init__( self, llm_v: LLM | llm.LLM, *, model: LLMModels | str, provider: str | None = None, inference_class: InferenceClass | None = None, strict_tool_schema: bool, client: openai.AsyncClient, chat_ctx: llm.ChatContext, tools: list[Tool], conn_options: APIConnectOptions, extra_kwargs: dict[str, Any], provider_fmt: str = "openai", # used internally for chat_ctx format ) -> None: super().__init__(llm_v, chat_ctx=chat_ctx, tools=tools, conn_options=conn_options) self._model = model self._provider = provider self._inference_class = inference_class self._provider_fmt = provider_fmt self._strict_tool_schema = strict_tool_schema self._client = client self._llm = llm_v self._extra_kwargs = drop_unsupported_params(model, extra_kwargs, tools=tools) self._tool_ctx = llm.ToolContext(tools) async def _run(self) -> None: # current function call that we're waiting for full completion (args are streamed) # (defined inside the _run method to make sure the state is reset for each run/attempt) self._oai_stream: openai.AsyncStream[ChatCompletionChunk] | None = None self._tool_call_id: str | None = None self._fnc_name: str | None = None self._fnc_raw_arguments: str | None = None self._tool_extra: dict[str, Any] | None = None self._tool_index: int | None = None retryable = True try: chat_ctx, _ = self._chat_ctx.to_provider_format(format=self._provider_fmt) tool_schemas = cast( list[ChatCompletionToolParam], self._tool_ctx.parse_function_tools("openai", strict=self._strict_tool_schema), ) if lk_oai_debug: tool_choice = self._extra_kwargs.get("tool_choice", NOT_GIVEN) logger.debug( "chat.completions.create", extra={ "fnc_ctx": tool_schemas, "tool_choice": tool_choice, "chat_ctx": chat_ctx, }, ) if not self._tools: # remove tool_choice from extra_kwargs if no tools are provided self._extra_kwargs.pop("tool_choice", None) extra_headers = self._extra_kwargs.setdefault("extra_headers", {}) extra_headers.update(get_inference_headers()) if self._provider: extra_headers[HEADER_INFERENCE_PROVIDER] = self._provider if self._inference_class: extra_headers[HEADER_INFERENCE_PRIORITY] = self._inference_class self._oai_stream = stream = await self._client.chat.completions.create( messages=cast(list[ChatCompletionMessageParam], chat_ctx), tools=tool_schemas or openai.omit, model=self._model, stream_options={"include_usage": True}, stream=True, timeout=httpx.Timeout(self._conn_options.timeout), **self._extra_kwargs, ) thinking = asyncio.Event() async with stream: async for chunk in stream: for choice in chunk.choices: chat_chunk = self._parse_choice(chunk.id, choice, thinking) if chat_chunk is not None: retryable = False self._event_ch.send_nowait(chat_chunk) if chunk.usage is not None: retryable = False tokens_details = chunk.usage.prompt_tokens_details cached_tokens = tokens_details.cached_tokens if tokens_details else 0 usage_chunk = llm.ChatChunk( id=chunk.id, usage=llm.CompletionUsage( completion_tokens=chunk.usage.completion_tokens, prompt_tokens=chunk.usage.prompt_tokens, prompt_cached_tokens=cached_tokens or 0, total_tokens=chunk.usage.total_tokens, service_tier=getattr(chunk, "service_tier", None), ), ) self._event_ch.send_nowait(usage_chunk) except openai.APITimeoutError: raise APITimeoutError(retryable=retryable) from None except openai.APIStatusError as e: raise APIStatusError( e.message, status_code=e.status_code, request_id=e.request_id, body=e.body, retryable=retryable, ) from None except Exception as e: raise APIConnectionError(retryable=retryable) from e def _parse_choice( self, id: str, choice: Choice, thinking: asyncio.Event ) -> llm.ChatChunk | None: delta = choice.delta # https://github.com/livekit/agents/issues/688 # the delta can be None when using Azure OpenAI (content filtering) if delta is None: return None if delta.tool_calls: for tool in delta.tool_calls: if not tool.function: continue call_chunk = None if self._tool_call_id and tool.id and tool.index != self._tool_index: call_chunk = llm.ChatChunk( id=id, delta=llm.ChoiceDelta( role="assistant", content=delta.content, tool_calls=[ llm.FunctionToolCall( arguments=self._fnc_raw_arguments or "", name=self._fnc_name or "", call_id=self._tool_call_id or "", extra=self._tool_extra, ) ], ), ) self._tool_call_id = self._fnc_name = self._fnc_raw_arguments = None self._tool_extra = None if tool.function.name: self._tool_index = tool.index self._tool_call_id = tool.id self._fnc_name = tool.function.name self._fnc_raw_arguments = tool.function.arguments or "" # Extract extra from tool call (e.g., Google thought signatures) self._tool_extra = getattr(tool, "extra_content", None) elif tool.function.arguments: self._fnc_raw_arguments += tool.function.arguments # type: ignore if call_chunk is not None: return call_chunk if choice.finish_reason in ("tool_calls", "stop") and self._tool_call_id: finish_extra = getattr(delta, "extra_content", None) call_chunk = llm.ChatChunk( id=id, delta=llm.ChoiceDelta( role="assistant", content=delta.content, extra=finish_extra, tool_calls=[ llm.FunctionToolCall( arguments=self._fnc_raw_arguments or "", name=self._fnc_name or "", call_id=self._tool_call_id or "", extra=self._tool_extra, ) ], ), ) self._tool_call_id = self._fnc_name = self._fnc_raw_arguments = None self._tool_extra = None return call_chunk delta.content = llm_utils.strip_thinking_tokens(delta.content, thinking) # Extract extra from delta (e.g., Google thought signatures on text parts) delta_extra = getattr(delta, "extra_content", None) if not delta.content and not delta_extra: return None return llm.ChatChunk( id=id, delta=llm.ChoiceDelta( content=delta.content, role="assistant", extra=delta_extra, ), )Helper class that provides a standard way to create an ABC using inheritance.
Ancestors
- livekit.agents.llm.llm.LLMStream
- abc.ABC
Subclasses
- livekit.plugins.openai.llm.LLMStream
class OverlappingSpeechEvent (**data: Any)-
Expand source code
class OverlappingSpeechEvent(BaseModel): """Represents an overlapping speech event detected during agent speech.""" model_config = ConfigDict(arbitrary_types_allowed=True) type: Literal["overlapping_speech"] = "overlapping_speech" created_at: float = Field(default_factory=time.time) """Timestamp (in seconds) when the event was emitted.""" detected_at: float = Field(default_factory=time.time) """Timestamp (in seconds) when the overlap was detected.""" is_interruption: bool = False """Whether interruption is detected.""" total_duration: float = 0.0 """RTT (Round Trip Time) time taken to perform the inference, in seconds.""" prediction_duration: float = 0.0 """Time taken to perform the inference from the model side, in seconds.""" detection_delay: float = 0.0 """Total time from the onset of the speech to the final prediction, in seconds.""" overlap_started_at: float | None = None """Timestamp (in seconds) when the overlap speech started. Useful for emitting held transcripts.""" speech_input: npt.NDArray[np.int16] | None = None """The audio input that was used for the inference.""" probabilities: npt.NDArray[np.float32] | None = None """The raw probabilities for the interruption detection.""" probability: float = 0.0 """The conservative estimated probability of the interruption event.""" num_requests: int = 0 """Number of requests sent for this event.""" @model_serializer(mode="wrap") def serialize_model(self, handler: SerializerFunctionWrapHandler) -> Any: # remove numpy arrays from the model dump copy = self.model_copy(deep=True) data = copy.speech_input, copy.probabilities copy.speech_input, copy.probabilities = None, None try: serialized = handler(copy) finally: copy.speech_input, copy.probabilities = data return serialized @classmethod def from_cache_entry( cls, *, entry: InterruptionCacheEntry, is_interruption: bool, started_at: float | None = None, ended_at: float | None = None, ) -> OverlappingSpeechEvent: """Initialize the event from a cache entry. Args: entry: The cache entry to initialize the event from. is_interruption: Whether the interruption is detected. started_at: The timestamp when the overlap speech started. ended_at: The timestamp when the overlap speech ended. Returns: The initialized event. """ return cls( type="overlapping_speech", detected_at=ended_at or time.time(), is_interruption=is_interruption, overlap_started_at=started_at, speech_input=entry.speech_input, probabilities=entry.probabilities, total_duration=entry.get_total_duration(), detection_delay=entry.get_detection_delay(), prediction_duration=entry.get_prediction_duration(), probability=entry.get_probability(), )Represents an overlapping speech event detected during agent speech.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Ancestors
- pydantic.main.BaseModel
Class variables
var created_at : float-
Timestamp (in seconds) when the event was emitted.
var detected_at : float-
Timestamp (in seconds) when the overlap was detected.
var detection_delay : float-
Total time from the onset of the speech to the final prediction, in seconds.
var is_interruption : bool-
Whether interruption is detected.
var model_configvar num_requests : int-
Number of requests sent for this event.
var overlap_started_at : float | None-
Timestamp (in seconds) when the overlap speech started. Useful for emitting held transcripts.
var prediction_duration : float-
Time taken to perform the inference from the model side, in seconds.
var probabilities : numpy.ndarray[tuple[typing.Any, ...], numpy.dtype[numpy.float32]] | None-
The raw probabilities for the interruption detection.
var probability : float-
The conservative estimated probability of the interruption event.
var speech_input : numpy.ndarray[tuple[typing.Any, ...], numpy.dtype[numpy.int16]] | None-
The audio input that was used for the inference.
var total_duration : float-
RTT (Round Trip Time) time taken to perform the inference, in seconds.
var type : Literal['overlapping_speech']
Static methods
def from_cache_entry(*,
entry: InterruptionCacheEntry,
is_interruption: bool,
started_at: float | None = None,
ended_at: float | None = None) ‑> OverlappingSpeechEvent-
Initialize the event from a cache entry.
Args
entry- The cache entry to initialize the event from.
is_interruption- Whether the interruption is detected.
started_at- The timestamp when the overlap speech started.
ended_at- The timestamp when the overlap speech ended.
Returns
The initialized event.
Methods
def serialize_model(self, handler: SerializerFunctionWrapHandler) ‑> Any-
Expand source code
@model_serializer(mode="wrap") def serialize_model(self, handler: SerializerFunctionWrapHandler) -> Any: # remove numpy arrays from the model dump copy = self.model_copy(deep=True) data = copy.speech_input, copy.probabilities copy.speech_input, copy.probabilities = None, None try: serialized = handler(copy) finally: copy.speech_input, copy.probabilities = data return serialized
class STT (model: NotGivenOr[STTModels | str] = NOT_GIVEN,
*,
language: NotGivenOr[str] = NOT_GIVEN,
base_url: NotGivenOr[str] = NOT_GIVEN,
encoding: NotGivenOr[STTEncoding] = NOT_GIVEN,
sample_rate: NotGivenOr[int] = NOT_GIVEN,
api_key: NotGivenOr[str] = NOT_GIVEN,
api_secret: NotGivenOr[str] = NOT_GIVEN,
http_session: aiohttp.ClientSession | None = None,
extra_kwargs: NotGivenOr[dict[str, Any] | CartesiaOptions | DeepgramOptions | DeepgramFluxOptions | AssemblyaiOptions | ElevenlabsOptions | XaiOptions | SpeechmaticsOptions] = NOT_GIVEN,
fallback: NotGivenOr[list[FallbackModelType] | FallbackModelType] = NOT_GIVEN,
conn_options: NotGivenOr[APIConnectOptions] = NOT_GIVEN,
vad: NotGivenOr[vad.VAD | None] = NOT_GIVEN)-
Expand source code
class STT(stt.STT): @overload def __init__( self, model: CartesiaModels, *, language: NotGivenOr[str] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, encoding: NotGivenOr[STTEncoding] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, extra_kwargs: NotGivenOr[CartesiaOptions] = NOT_GIVEN, fallback: NotGivenOr[list[FallbackModelType] | FallbackModelType] = NOT_GIVEN, conn_options: NotGivenOr[APIConnectOptions] = NOT_GIVEN, ) -> None: ... @overload def __init__( self, model: DeepgramModels, *, language: NotGivenOr[str] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, encoding: NotGivenOr[STTEncoding] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, extra_kwargs: NotGivenOr[DeepgramOptions] = NOT_GIVEN, fallback: NotGivenOr[list[FallbackModelType] | FallbackModelType] = NOT_GIVEN, conn_options: NotGivenOr[APIConnectOptions] = NOT_GIVEN, ) -> None: ... @overload def __init__( self, model: DeepgramFluxModels, *, language: NotGivenOr[str] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, encoding: NotGivenOr[STTEncoding] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, extra_kwargs: NotGivenOr[DeepgramFluxOptions] = NOT_GIVEN, fallback: NotGivenOr[list[FallbackModelType] | FallbackModelType] = NOT_GIVEN, conn_options: NotGivenOr[APIConnectOptions] = NOT_GIVEN, ) -> None: ... @overload def __init__( self, model: AssemblyAIModels, *, language: NotGivenOr[str] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, encoding: NotGivenOr[STTEncoding] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, extra_kwargs: NotGivenOr[AssemblyaiOptions] = NOT_GIVEN, fallback: NotGivenOr[list[FallbackModelType] | FallbackModelType] = NOT_GIVEN, conn_options: NotGivenOr[APIConnectOptions] = NOT_GIVEN, ) -> None: ... @overload def __init__( self, model: ElevenlabsModels, *, language: NotGivenOr[str] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, encoding: NotGivenOr[STTEncoding] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, extra_kwargs: NotGivenOr[ElevenlabsOptions] = NOT_GIVEN, fallback: NotGivenOr[list[FallbackModelType] | FallbackModelType] = NOT_GIVEN, conn_options: NotGivenOr[APIConnectOptions] = NOT_GIVEN, ) -> None: ... @overload def __init__( self, model: XaiModels, *, language: NotGivenOr[str] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, encoding: NotGivenOr[STTEncoding] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, extra_kwargs: NotGivenOr[XaiOptions] = NOT_GIVEN, fallback: NotGivenOr[list[FallbackModelType] | FallbackModelType] = NOT_GIVEN, conn_options: NotGivenOr[APIConnectOptions] = NOT_GIVEN, ) -> None: ... @overload def __init__( self, model: SpeechmaticsModels, *, language: NotGivenOr[str] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, encoding: NotGivenOr[STTEncoding] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, extra_kwargs: NotGivenOr[SpeechmaticsOptions] = NOT_GIVEN, fallback: NotGivenOr[list[FallbackModelType] | FallbackModelType] = NOT_GIVEN, conn_options: NotGivenOr[APIConnectOptions] = NOT_GIVEN, vad: NotGivenOr[vad.VAD | None] = NOT_GIVEN, ) -> None: ... @overload def __init__( self, model: str, *, language: NotGivenOr[str] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, encoding: NotGivenOr[STTEncoding] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, extra_kwargs: NotGivenOr[dict[str, Any]] = NOT_GIVEN, fallback: NotGivenOr[list[FallbackModelType] | FallbackModelType] = NOT_GIVEN, conn_options: NotGivenOr[APIConnectOptions] = NOT_GIVEN, ) -> None: ... def __init__( self, model: NotGivenOr[STTModels | str] = NOT_GIVEN, *, language: NotGivenOr[str] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, encoding: NotGivenOr[STTEncoding] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, extra_kwargs: NotGivenOr[ dict[str, Any] | CartesiaOptions | DeepgramOptions | DeepgramFluxOptions | AssemblyaiOptions | ElevenlabsOptions | XaiOptions | SpeechmaticsOptions ] = NOT_GIVEN, fallback: NotGivenOr[list[FallbackModelType] | FallbackModelType] = NOT_GIVEN, conn_options: NotGivenOr[APIConnectOptions] = NOT_GIVEN, vad: NotGivenOr[vad.VAD | None] = NOT_GIVEN, ) -> None: """Livekit Cloud Inference STT Args: model (STTModels | str, optional): STT model to use, in "provider/model[:language]" format. language (str, optional): Language of the STT model. encoding (STTEncoding, optional): Encoding of the STT model. sample_rate (int, optional): Sample rate of the STT model. base_url (str, optional): LIVEKIT_URL, if not provided, read from environment variable. api_key (str, optional): LIVEKIT_API_KEY, if not provided, read from environment variable. api_secret (str, optional): LIVEKIT_API_SECRET, if not provided, read from environment variable. http_session (aiohttp.ClientSession, optional): HTTP session to use. extra_kwargs (dict, optional): Extra kwargs to pass to the STT model. fallback (FallbackModelType, optional): Fallback models - either a list of model names, a list of FallbackModel instances. conn_options (APIConnectOptions, optional): Connection options for request attempts. vad (VAD, optional): External Voice Activity Detector. When provided, each audio frame is forwarded to the VAD and `session.finalize` is sent to the inference gateway on end of speech. Only applicable to Speechmatics models. """ # Infer diarization capability from provider-specific extra_kwargs # keys (see _DIARIZATION_EXTRA_KEYS). xAI uses "diarize" (same as # Deepgram); AssemblyAI uses "speaker_labels". diarization_enabled = _diarization_enabled( dict(extra_kwargs) if is_given(extra_kwargs) else None ) # Parse language from model string if provided: "provider/model:language" if is_given(model) and isinstance(model, str): parsed_model, parsed_language = _parse_model_string(model) model = parsed_model if is_given(parsed_language) and not is_given(language): language = parsed_language vad = _resolve_vad_for_model(model, vad if is_given(vad) else None) super().__init__( capabilities=stt.STTCapabilities( streaming=True, interim_results=True, diarization=diarization_enabled, aligned_transcript="word", offline_recognize=False, ), ) lk_base_url = base_url if is_given(base_url) else get_default_inference_url() lk_api_key = ( api_key if is_given(api_key) else os.getenv("LIVEKIT_INFERENCE_API_KEY", os.getenv("LIVEKIT_API_KEY", "")) ) if not lk_api_key: raise ValueError( "api_key is required, either as argument or set LIVEKIT_API_KEY environmental variable" ) lk_api_secret = ( api_secret if is_given(api_secret) else os.getenv("LIVEKIT_INFERENCE_API_SECRET", os.getenv("LIVEKIT_API_SECRET", "")) ) if not lk_api_secret: raise ValueError( "api_secret is required, either as argument or set LIVEKIT_API_SECRET environmental variable" ) fallback_models: NotGivenOr[list[FallbackModel]] = NOT_GIVEN if is_given(fallback): fallback_models = _normalize_fallback(fallback) self._opts = STTOptions( model=model, language=LanguageCode(language) if isinstance(language, str) else language, encoding=encoding if is_given(encoding) else DEFAULT_ENCODING, sample_rate=sample_rate if is_given(sample_rate) else DEFAULT_SAMPLE_RATE, base_url=lk_base_url, api_key=lk_api_key, api_secret=lk_api_secret, extra_kwargs=dict(extra_kwargs) if is_given(extra_kwargs) else {}, fallback=fallback_models, conn_options=conn_options if is_given(conn_options) else DEFAULT_API_CONNECT_OPTIONS, ) self._session = http_session self._vad = vad self._streams = weakref.WeakSet[SpeechStream]() @classmethod def from_model_string(cls, model: str) -> STT: """Create a STT instance from a model string Args: model (str): STT model to use, in "provider/model[:language]" format Returns: STT: STT instance """ model_name, language = _parse_model_string(model) return cls(model=model_name, language=language) @property def model(self) -> str: return self._opts.model if is_given(self._opts.model) else "unknown" @property def provider(self) -> str: return "livekit" def _ensure_session(self) -> aiohttp.ClientSession: if not self._session: self._session = utils.http_context.http_session() return self._session async def _recognize_impl( self, buffer: utils.AudioBuffer, *, language: NotGivenOr[str] = NOT_GIVEN, conn_options: APIConnectOptions, ) -> stt.SpeechEvent: raise NotImplementedError( "LiveKit Inference STT does not support batch recognition, use stream() instead" ) def stream( self, *, language: NotGivenOr[STTLanguages | str] = NOT_GIVEN, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> SpeechStream: """Create a streaming transcription session.""" options = self._sanitize_options(language=language) stream = SpeechStream( stt=self, opts=options, conn_options=conn_options, vad_instance=self._vad, ) self._streams.add(stream) return stream def update_options( self, *, model: NotGivenOr[STTModels | str] = NOT_GIVEN, language: NotGivenOr[STTLanguages | str] = NOT_GIVEN, extra: NotGivenOr[dict[str, Any]] = NOT_GIVEN, ) -> None: """Update STT configuration options.""" if is_given(model): # Mirror __init__: strip ":language" suffix and apply if not overridden. if isinstance(model, str): parsed_model, parsed_language = _parse_model_string(model) model = parsed_model if is_given(parsed_language) and not is_given(language): language = parsed_language self._opts.model = model self._vad = _resolve_vad_for_model(model, self._vad) if is_given(language): self._opts.language = LanguageCode(language) if is_given(extra): self._opts.extra_kwargs.update(extra) self._capabilities = replace( self._capabilities, diarization=_diarization_enabled(self._opts.extra_kwargs), ) for stream in self._streams: stream.update_options(model=model, language=language, extra=extra) def _sanitize_options( self, *, language: NotGivenOr[STTLanguages | str] = NOT_GIVEN ) -> STTOptions: """Create a sanitized copy of options with language override if provided.""" options = replace(self._opts) options.extra_kwargs = dict(options.extra_kwargs) if is_given(language): options.language = LanguageCode(language) return optionsHelper class that provides a standard way to create an ABC using inheritance.
Livekit Cloud Inference STT
Args
model:STTModels | str, optional- STT model to use, in "provider/model[:language]" format.
language:str, optional- Language of the STT model.
encoding:STTEncoding, optional- Encoding of the STT model.
sample_rate:int, optional- Sample rate of the STT model.
base_url:str, optional- LIVEKIT_URL, if not provided, read from environment variable.
api_key:str, optional- LIVEKIT_API_KEY, if not provided, read from environment variable.
api_secret:str, optional- LIVEKIT_API_SECRET, if not provided, read from environment variable.
http_session:aiohttp.ClientSession, optional- HTTP session to use.
extra_kwargs:dict, optional- Extra kwargs to pass to the STT model.
fallback:FallbackModelType, optional- Fallback models - either a list of model names, a list of FallbackModel instances.
conn_options:APIConnectOptions, optional- Connection options for request attempts.
vad:VAD, optional- External Voice Activity Detector. When provided, each audio
frame is forwarded to the VAD and
session.finalizeis sent to the inference gateway on end of speech. Only applicable to Speechmatics models.
Ancestors
- livekit.agents.stt.stt.STT
- abc.ABC
- EventEmitter
- typing.Generic
Static methods
def from_model_string(model: str) ‑> STT-
Create a STT instance from a model string
Args
model:str- STT model to use, in "provider/model[:language]" format
Returns
STT- STT instance
Instance variables
prop model : str-
Expand source code
@property def model(self) -> str: return self._opts.model if is_given(self._opts.model) else "unknown"Get the model name/identifier for this STT instance.
Returns
The model name if available, "unknown" otherwise.
Note
Plugins should override this property to provide their model information.
prop provider : str-
Expand source code
@property def provider(self) -> str: return "livekit"Get the provider name/identifier for this STT instance.
Returns
The provider name if available, "unknown" otherwise.
Note
Plugins should override this property to provide their provider information.
Methods
def stream(self,
*,
language: NotGivenOr[STTLanguages | str] = NOT_GIVEN,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> SpeechStream-
Expand source code
def stream( self, *, language: NotGivenOr[STTLanguages | str] = NOT_GIVEN, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> SpeechStream: """Create a streaming transcription session.""" options = self._sanitize_options(language=language) stream = SpeechStream( stt=self, opts=options, conn_options=conn_options, vad_instance=self._vad, ) self._streams.add(stream) return streamCreate a streaming transcription session.
def update_options(self,
*,
model: NotGivenOr[STTModels | str] = NOT_GIVEN,
language: NotGivenOr[STTLanguages | str] = NOT_GIVEN,
extra: NotGivenOr[dict[str, Any]] = NOT_GIVEN) ‑> None-
Expand source code
def update_options( self, *, model: NotGivenOr[STTModels | str] = NOT_GIVEN, language: NotGivenOr[STTLanguages | str] = NOT_GIVEN, extra: NotGivenOr[dict[str, Any]] = NOT_GIVEN, ) -> None: """Update STT configuration options.""" if is_given(model): # Mirror __init__: strip ":language" suffix and apply if not overridden. if isinstance(model, str): parsed_model, parsed_language = _parse_model_string(model) model = parsed_model if is_given(parsed_language) and not is_given(language): language = parsed_language self._opts.model = model self._vad = _resolve_vad_for_model(model, self._vad) if is_given(language): self._opts.language = LanguageCode(language) if is_given(extra): self._opts.extra_kwargs.update(extra) self._capabilities = replace( self._capabilities, diarization=_diarization_enabled(self._opts.extra_kwargs), ) for stream in self._streams: stream.update_options(model=model, language=language, extra=extra)Update STT configuration options.
Inherited members
class TTS (model: TTSModels | str,
*,
voice: NotGivenOr[str] = NOT_GIVEN,
language: NotGivenOr[str] = NOT_GIVEN,
encoding: NotGivenOr[TTSEncoding] = NOT_GIVEN,
sample_rate: NotGivenOr[int] = NOT_GIVEN,
base_url: NotGivenOr[str] = NOT_GIVEN,
api_key: NotGivenOr[str] = NOT_GIVEN,
api_secret: NotGivenOr[str] = NOT_GIVEN,
http_session: aiohttp.ClientSession | None = None,
extra_kwargs: NotGivenOr[dict[str, Any] | CartesiaOptions | DeepgramOptions | ElevenlabsOptions | RimeOptions | InworldOptions] = NOT_GIVEN,
fallback: NotGivenOr[list[FallbackModelType] | FallbackModelType] = NOT_GIVEN,
conn_options: NotGivenOr[APIConnectOptions] = NOT_GIVEN)-
Expand source code
class TTS(tts.TTS): @overload def __init__( self, model: CartesiaModels, *, voice: NotGivenOr[str] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, encoding: NotGivenOr[TTSEncoding] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, extra_kwargs: NotGivenOr[CartesiaOptions] = NOT_GIVEN, fallback: NotGivenOr[list[FallbackModelType] | FallbackModelType] = NOT_GIVEN, conn_options: NotGivenOr[APIConnectOptions] = NOT_GIVEN, ) -> None: pass @overload def __init__( self, model: DeepgramModels, *, voice: NotGivenOr[str] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, encoding: NotGivenOr[TTSEncoding] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, extra_kwargs: NotGivenOr[DeepgramOptions] = NOT_GIVEN, fallback: NotGivenOr[list[FallbackModelType] | FallbackModelType] = NOT_GIVEN, conn_options: NotGivenOr[APIConnectOptions] = NOT_GIVEN, ) -> None: pass @overload def __init__( self, model: ElevenlabsModels, *, voice: NotGivenOr[str] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, encoding: NotGivenOr[TTSEncoding] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, extra_kwargs: NotGivenOr[ElevenlabsOptions] = NOT_GIVEN, fallback: NotGivenOr[list[FallbackModelType] | FallbackModelType] = NOT_GIVEN, conn_options: NotGivenOr[APIConnectOptions] = NOT_GIVEN, ) -> None: pass @overload def __init__( self, model: RimeModels, *, voice: NotGivenOr[str] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, encoding: NotGivenOr[TTSEncoding] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, extra_kwargs: NotGivenOr[RimeOptions] = NOT_GIVEN, fallback: NotGivenOr[list[FallbackModelType] | FallbackModelType] = NOT_GIVEN, conn_options: NotGivenOr[APIConnectOptions] = NOT_GIVEN, ) -> None: pass @overload def __init__( self, model: InworldModels, *, voice: NotGivenOr[str] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, encoding: NotGivenOr[TTSEncoding] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, extra_kwargs: NotGivenOr[InworldOptions] = NOT_GIVEN, fallback: NotGivenOr[list[FallbackModelType] | FallbackModelType] = NOT_GIVEN, conn_options: NotGivenOr[APIConnectOptions] = NOT_GIVEN, ) -> None: pass @overload def __init__( self, model: str, *, voice: NotGivenOr[str] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, encoding: NotGivenOr[TTSEncoding] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, extra_kwargs: NotGivenOr[dict[str, Any]] = NOT_GIVEN, fallback: NotGivenOr[list[FallbackModelType] | FallbackModelType] = NOT_GIVEN, conn_options: NotGivenOr[APIConnectOptions] = NOT_GIVEN, ) -> None: pass def __init__( self, model: TTSModels | str, *, voice: NotGivenOr[str] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, encoding: NotGivenOr[TTSEncoding] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, extra_kwargs: NotGivenOr[ dict[str, Any] | CartesiaOptions | DeepgramOptions | ElevenlabsOptions | RimeOptions | InworldOptions ] = NOT_GIVEN, fallback: NotGivenOr[list[FallbackModelType] | FallbackModelType] = NOT_GIVEN, conn_options: NotGivenOr[APIConnectOptions] = NOT_GIVEN, ) -> None: """Livekit Cloud Inference TTS Args: model (TTSModels | str): TTS model to use, in "provider/model[:voice]" format voice (str, optional): Voice to use, use a default one if not provided language (str, optional): Language of the TTS model. encoding (TTSEncoding, optional): Encoding of the TTS model. sample_rate (int, optional): Sample rate of the TTS model. base_url (str, optional): LIVEKIT_URL, if not provided, read from environment variable. api_key (str, optional): LIVEKIT_API_KEY, if not provided, read from environment variable. api_secret (str, optional): LIVEKIT_API_SECRET, if not provided, read from environment variable. http_session (aiohttp.ClientSession, optional): HTTP session to use. extra_kwargs (dict, optional): Extra kwargs to pass to the TTS model. fallback (FallbackModelType, optional): Fallback models - either a list of model names, a list of FallbackModel instances. conn_options (APIConnectOptions, optional): Connection options for request attempts. """ sample_rate = sample_rate if is_given(sample_rate) else DEFAULT_SAMPLE_RATE # Parse voice from model string if provided: "provider/model:voice" if isinstance(model, str): parsed_model, parsed_voice = _parse_model_string(model) model = parsed_model if parsed_voice is not None and not is_given(voice): voice = parsed_voice resolved_extra_kwargs = dict(extra_kwargs) if is_given(extra_kwargs) else {} super().__init__( capabilities=tts.TTSCapabilities( streaming=True, aligned_transcript=_has_aligned_transcript(model, resolved_extra_kwargs), ), sample_rate=sample_rate, num_channels=1, ) lk_base_url = base_url if is_given(base_url) else get_default_inference_url() lk_api_key = ( api_key if is_given(api_key) else os.getenv("LIVEKIT_INFERENCE_API_KEY", os.getenv("LIVEKIT_API_KEY", "")) ) if not lk_api_key: raise ValueError( "api_key is required, either as argument or set LIVEKIT_API_KEY environmental variable" ) lk_api_secret = ( api_secret if is_given(api_secret) else os.getenv("LIVEKIT_INFERENCE_API_SECRET", os.getenv("LIVEKIT_API_SECRET", "")) ) if not lk_api_secret: raise ValueError( "api_secret is required, either as argument or set LIVEKIT_API_SECRET environmental variable" ) fallback_models: NotGivenOr[list[FallbackModel]] = NOT_GIVEN if is_given(fallback): fallback_models = _normalize_fallback(fallback) self._opts = _TTSOptions( model=model, voice=voice, language=LanguageCode(language) if isinstance(language, str) else language, encoding=encoding if is_given(encoding) else DEFAULT_ENCODING, sample_rate=sample_rate, base_url=lk_base_url, api_key=lk_api_key, api_secret=lk_api_secret, extra_kwargs=resolved_extra_kwargs, fallback=fallback_models, conn_options=conn_options if is_given(conn_options) else DEFAULT_API_CONNECT_OPTIONS, ) self._session = http_session self._pool = utils.ConnectionPool[aiohttp.ClientWebSocketResponse]( connect_cb=self._connect_ws, close_cb=self._close_ws, max_session_duration=300, mark_refreshed_on_get=True, ) self._streams = weakref.WeakSet[SynthesizeStream]() @classmethod def from_model_string(cls, model: str) -> TTS: """Create a TTS instance from a model string Args: model (str): TTS model to use, in "provider/model[:voice_id]" format Returns: TTS: TTS instance """ model, voice = _parse_model_string(model) return cls(model=model, voice=voice if voice else NOT_GIVEN) @property def model(self) -> str: return self._opts.model @property def provider(self) -> str: return "livekit" async def _connect_ws(self, timeout: float) -> aiohttp.ClientWebSocketResponse: session = self._ensure_session() base_url = self._opts.base_url if base_url.startswith(("http://", "https://")): base_url = base_url.replace("http", "ws", 1) headers = { **get_inference_headers(), "Authorization": f"Bearer {create_access_token(self._opts.api_key, self._opts.api_secret)}", } ws = None try: ws = await asyncio.wait_for( session.ws_connect(f"{base_url}/tts?model={self._opts.model}", headers=headers), timeout, ) except aiohttp.ClientResponseError as e: raise create_api_error_from_http(e.message, status=e.status) from e except asyncio.TimeoutError as e: raise APITimeoutError("LiveKit Inference TTS connection timed out.") from e except aiohttp.ClientConnectorError as e: raise APIConnectionError("failed to connect to LiveKit Inference TTS") from e params: dict[str, Any] = { "type": "session.create", "sample_rate": str(self._opts.sample_rate), "encoding": self._opts.encoding, "extra": self._opts.extra_kwargs, } if self._opts.voice: params["voice"] = self._opts.voice if self._opts.model: params["model"] = self._opts.model if self._opts.language: params["language"] = self._opts.language if self._opts.fallback: models = [ { "model": m.get("model"), "voice": m.get("voice"), "extra": m.get("extra_kwargs", {}), } for m in self._opts.fallback ] params["fallback"] = {"models": models} if self._opts.conn_options: params["connection"] = { "timeout": self._opts.conn_options.timeout, "retries": self._opts.conn_options.max_retry, } try: await ws.send_str(json.dumps(params)) except Exception as e: await ws.close() raise APIConnectionError( "failed to send session.create message to LiveKit Inference TTS" ) from e return ws async def _close_ws(self, ws: aiohttp.ClientWebSocketResponse) -> None: await ws.close() def _ensure_session(self) -> aiohttp.ClientSession: if not self._session: self._session = utils.http_context.http_session() return self._session def prewarm(self) -> None: self._pool.prewarm() def update_options( self, *, voice: NotGivenOr[str] = NOT_GIVEN, model: NotGivenOr[TTSModels | str] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, extra_kwargs: NotGivenOr[dict[str, Any]] = NOT_GIVEN, ) -> None: """ Args: voice (str, optional): Voice. model (TTSModels | str, optional): TTS model to use. language (str, optional): Language code for the TTS model. extra_kwargs (dict, optional): Extra kwargs to pass to the TTS model. """ if is_given(model): self._opts.model = model if is_given(voice): self._opts.voice = voice if is_given(language): self._opts.language = LanguageCode(language) if is_given(extra_kwargs): self._opts.extra_kwargs.update(extra_kwargs) self._capabilities.aligned_transcript = _has_aligned_transcript( self._opts.model, self._opts.extra_kwargs ) def synthesize( self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS ) -> tts.ChunkedStream: return self._synthesize_with_stream(text, conn_options=conn_options) def stream( self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS ) -> SynthesizeStream: stream = SynthesizeStream(tts=self, conn_options=conn_options) self._streams.add(stream) return stream async def aclose(self) -> None: for stream in list(self._streams): await stream.aclose() self._streams.clear() await self._pool.aclose()Helper class that provides a standard way to create an ABC using inheritance.
Livekit Cloud Inference TTS
Args
model:TTSModels | str- TTS model to use, in "provider/model[:voice]" format
voice:str, optional- Voice to use, use a default one if not provided
language:str, optional- Language of the TTS model.
encoding:TTSEncoding, optional- Encoding of the TTS model.
sample_rate:int, optional- Sample rate of the TTS model.
base_url:str, optional- LIVEKIT_URL, if not provided, read from environment variable.
api_key:str, optional- LIVEKIT_API_KEY, if not provided, read from environment variable.
api_secret:str, optional- LIVEKIT_API_SECRET, if not provided, read from environment variable.
http_session:aiohttp.ClientSession, optional- HTTP session to use.
extra_kwargs:dict, optional- Extra kwargs to pass to the TTS model.
fallback:FallbackModelType, optional- Fallback models - either a list of model names, a list of FallbackModel instances.
conn_options:APIConnectOptions, optional- Connection options for request attempts.
Ancestors
- livekit.agents.tts.tts.TTS
- abc.ABC
- EventEmitter
- typing.Generic
Static methods
def from_model_string(model: str) ‑> TTS-
Create a TTS instance from a model string
Args
model:str- TTS model to use, in "provider/model[:voice_id]" format
Returns
TTS- TTS instance
Instance variables
prop model : str-
Expand source code
@property def model(self) -> str: return self._opts.modelGet the model name/identifier for this TTS instance.
Returns
The model name if available, "unknown" otherwise.
Note
Plugins should override this property to provide their model information.
prop provider : str-
Expand source code
@property def provider(self) -> str: return "livekit"Get the provider name/identifier for this TTS instance.
Returns
The provider name if available, "unknown" otherwise.
Note
Plugins should override this property to provide their provider information.
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: for stream in list(self._streams): await stream.aclose() self._streams.clear() await self._pool.aclose() def prewarm(self) ‑> None-
Expand source code
def prewarm(self) -> None: self._pool.prewarm()Pre-warm connection to the TTS service
def stream(self,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> SynthesizeStream-
Expand source code
def stream( self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS ) -> SynthesizeStream: stream = SynthesizeStream(tts=self, conn_options=conn_options) self._streams.add(stream) return stream def synthesize(self,
text: str,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.agents.tts.tts.ChunkedStream-
Expand source code
def synthesize( self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS ) -> tts.ChunkedStream: return self._synthesize_with_stream(text, conn_options=conn_options) def update_options(self,
*,
voice: NotGivenOr[str] = NOT_GIVEN,
model: NotGivenOr[TTSModels | str] = NOT_GIVEN,
language: NotGivenOr[str] = NOT_GIVEN,
extra_kwargs: NotGivenOr[dict[str, Any]] = NOT_GIVEN) ‑> None-
Expand source code
def update_options( self, *, voice: NotGivenOr[str] = NOT_GIVEN, model: NotGivenOr[TTSModels | str] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, extra_kwargs: NotGivenOr[dict[str, Any]] = NOT_GIVEN, ) -> None: """ Args: voice (str, optional): Voice. model (TTSModels | str, optional): TTS model to use. language (str, optional): Language code for the TTS model. extra_kwargs (dict, optional): Extra kwargs to pass to the TTS model. """ if is_given(model): self._opts.model = model if is_given(voice): self._opts.voice = voice if is_given(language): self._opts.language = LanguageCode(language) if is_given(extra_kwargs): self._opts.extra_kwargs.update(extra_kwargs) self._capabilities.aligned_transcript = _has_aligned_transcript( self._opts.model, self._opts.extra_kwargs )Args
voice:str, optional- Voice.
model:TTSModels | str, optional- TTS model to use.
language:str, optional- Language code for the TTS model.
extra_kwargs:dict, optional- Extra kwargs to pass to the TTS model.
Inherited members