Module livekit.agents.inference
Sub-modules
livekit.agents.inference.interruptionlivekit.agents.inference.llmlivekit.agents.inference.sttlivekit.agents.inference.tts
Classes
class AdaptiveInterruptionDetector (*,
threshold: float = 0.5,
min_interruption_duration: float = 0.05,
max_audio_duration: float = 3,
audio_prefix_duration: float = 1.0,
detection_interval: float = 0.1,
inference_timeout: float = 0.7,
base_url: str | None = None,
api_key: str | None = None,
api_secret: str | None = None,
http_session: aiohttp.ClientSession | None = None)-
Expand source code
class AdaptiveInterruptionDetector( rtc.EventEmitter[ Literal[ "overlapping_speech", "error", "metrics_collected", ] ], ): def __init__( self, *, threshold: float = THRESHOLD, min_interruption_duration: float = MIN_INTERRUPTION_DURATION, max_audio_duration: float = MAX_AUDIO_DURATION, audio_prefix_duration: float = AUDIO_PREFIX_DURATION, detection_interval: float = DETECTION_INTERVAL, inference_timeout: float = REMOTE_INFERENCE_TIMEOUT, base_url: str | None = None, api_key: str | None = None, api_secret: str | None = None, http_session: aiohttp.ClientSession | None = None, ) -> None: """ Initialize a AdaptiveInterruptionDetector instance. Args: threshold (float, optional): The threshold for the interruption detection, defaults to 0.5. min_interruption_duration (float, optional): The minimum duration, in seconds, of the interruption event, defaults to 50ms. max_audio_duration (float, optional): The maximum audio duration, including the audio prefix, in seconds, for the interruption detection, defaults to 3s. audio_prefix_duration (float, optional): The audio prefix duration, in seconds, for the interruption detection, defaults to 0.5s. detection_interval (float, optional): The interval between detections, in seconds, for the interruption detection, defaults to 0.1s. inference_timeout (float, optional): The timeout for the interruption detection, defaults to 1 second. base_url (str, optional): The base URL for the interruption detection, defaults to the shared LIVEKIT_REMOTE_EOT_URL environment variable. api_key (str, optional): The API key for the interruption detection, defaults to the LIVEKIT_INFERENCE_API_KEY environment variable. api_secret (str, optional): The API secret for the interruption detection, defaults to the LIVEKIT_INFERENCE_API_SECRET environment variable. http_session (aiohttp.ClientSession, optional): The HTTP session to use for the interruption detection. """ super().__init__() if max_audio_duration > 3.0: raise ValueError("max_audio_duration must be less than or equal to 3.0 seconds") lk_base_url = ( base_url if base_url else os.getenv("LIVEKIT_REMOTE_EOT_URL", get_default_inference_url()) ) lk_api_key: str = api_key if api_key else "" lk_api_secret: str = api_secret if api_secret else "" # use LiveKit credentials if using the inference service (production or staging) is_inference_url = lk_base_url in (DEFAULT_INFERENCE_URL, STAGING_INFERENCE_URL) if is_inference_url: lk_api_key = ( api_key if api_key else os.getenv("LIVEKIT_INFERENCE_API_KEY", os.getenv("LIVEKIT_API_KEY", "")) ) if not lk_api_key: raise ValueError( "api_key is required, either as argument or set LIVEKIT_API_KEY environmental variable" ) lk_api_secret = ( api_secret if api_secret else os.getenv("LIVEKIT_INFERENCE_API_SECRET", os.getenv("LIVEKIT_API_SECRET", "")) ) if not lk_api_secret: raise ValueError( "api_secret is required, either as argument or set LIVEKIT_API_SECRET environmental variable" ) use_proxy = True else: use_proxy = False self._opts = InterruptionOptions( sample_rate=SAMPLE_RATE, threshold=threshold, min_frames=math.ceil(min_interruption_duration * _FRAMES_PER_SECOND), max_audio_duration=max_audio_duration, audio_prefix_duration=audio_prefix_duration, detection_interval=detection_interval, inference_timeout=inference_timeout, base_url=lk_base_url, api_key=lk_api_key, api_secret=lk_api_secret, use_proxy=use_proxy, ) self._label = f"{type(self).__module__}.{type(self).__name__}" self._sample_rate = SAMPLE_RATE self._session = http_session self._streams = weakref.WeakSet[InterruptionHttpStream | InterruptionWebSocketStream]() logger.info( "adaptive interruption detector initialized", extra={ "base_url": self._opts.base_url, "detection_interval": self._opts.detection_interval, "audio_prefix_duration": self._opts.audio_prefix_duration, "max_audio_duration": self._opts.max_audio_duration, "min_frames": self._opts.min_frames, "threshold": self._opts.threshold, "inference_timeout": self._opts.inference_timeout, "use_proxy": self._opts.use_proxy, }, ) @property def model(self) -> str: return "adaptive interruption" @property def provider(self) -> str: return "livekit" @property def label(self) -> str: return self._label @property def sample_rate(self) -> int: return self._sample_rate def _emit_error(self, api_error: Exception, recoverable: bool) -> None: self.emit( "error", InterruptionDetectionError( label=self._label, error=api_error, recoverable=recoverable, ), ) def _ensure_session(self) -> aiohttp.ClientSession: if not self._session: self._session = http_context.http_session() return self._session def stream( self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS ) -> InterruptionHttpStream | InterruptionWebSocketStream: try: stream: InterruptionHttpStream | InterruptionWebSocketStream if self._opts.use_proxy: stream = InterruptionWebSocketStream(model=self, conn_options=conn_options) else: stream = InterruptionHttpStream(model=self, conn_options=conn_options) except Exception as e: self._emit_error(e, recoverable=False) raise self._streams.add(stream) return stream def update_options( self, *, threshold: NotGivenOr[float] = NOT_GIVEN, min_interruption_duration: NotGivenOr[float] = NOT_GIVEN, ) -> None: if is_given(threshold): self._opts.threshold = threshold if is_given(min_interruption_duration): self._opts.min_frames = math.ceil(min_interruption_duration * _FRAMES_PER_SECOND) for stream in self._streams: stream.update_options( threshold=threshold, min_interruption_duration=min_interruption_duration )Abstract base class for generic types.
On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::
class Mapping[KT, VT]: def __getitem__(self, key: KT) -> VT: ... # Etc.On older versions of Python, however, generic classes have to explicitly inherit from Generic.
After a class has been declared to be generic, it can then be used as follows::
def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return defaultInitialize a AdaptiveInterruptionDetector instance.
Args
threshold:float, optional- The threshold for the interruption detection, defaults to 0.5.
min_interruption_duration:float, optional- The minimum duration, in seconds, of the interruption event, defaults to 50ms.
max_audio_duration:float, optional- The maximum audio duration, including the audio prefix, in seconds, for the interruption detection, defaults to 3s.
audio_prefix_duration:float, optional- The audio prefix duration, in seconds, for the interruption detection, defaults to 0.5s.
detection_interval:float, optional- The interval between detections, in seconds, for the interruption detection, defaults to 0.1s.
inference_timeout:float, optional- The timeout for the interruption detection, defaults to 1 second.
base_url:str, optional- The base URL for the interruption detection, defaults to the shared LIVEKIT_REMOTE_EOT_URL environment variable.
api_key:str, optional- The API key for the interruption detection, defaults to the LIVEKIT_INFERENCE_API_KEY environment variable.
api_secret:str, optional- The API secret for the interruption detection, defaults to the LIVEKIT_INFERENCE_API_SECRET environment variable.
http_session:aiohttp.ClientSession, optional- The HTTP session to use for the interruption detection.
Ancestors
- EventEmitter
- typing.Generic
Instance variables
prop label : str-
Expand source code
@property def label(self) -> str: return self._label prop model : str-
Expand source code
@property def model(self) -> str: return "adaptive interruption" prop provider : str-
Expand source code
@property def provider(self) -> str: return "livekit" prop sample_rate : int-
Expand source code
@property def sample_rate(self) -> int: return self._sample_rate
Methods
def stream(self,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> InterruptionHttpStream | InterruptionWebSocketStream-
Expand source code
def stream( self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS ) -> InterruptionHttpStream | InterruptionWebSocketStream: try: stream: InterruptionHttpStream | InterruptionWebSocketStream if self._opts.use_proxy: stream = InterruptionWebSocketStream(model=self, conn_options=conn_options) else: stream = InterruptionHttpStream(model=self, conn_options=conn_options) except Exception as e: self._emit_error(e, recoverable=False) raise self._streams.add(stream) return stream def update_options(self,
*,
threshold: NotGivenOr[float] = NOT_GIVEN,
min_interruption_duration: NotGivenOr[float] = NOT_GIVEN) ‑> None-
Expand source code
def update_options( self, *, threshold: NotGivenOr[float] = NOT_GIVEN, min_interruption_duration: NotGivenOr[float] = NOT_GIVEN, ) -> None: if is_given(threshold): self._opts.threshold = threshold if is_given(min_interruption_duration): self._opts.min_frames = math.ceil(min_interruption_duration * _FRAMES_PER_SECOND) for stream in self._streams: stream.update_options( threshold=threshold, min_interruption_duration=min_interruption_duration )
Inherited members
class InterruptionDetectionError (**data: Any)-
Expand source code
class InterruptionDetectionError(BaseModel): model_config = ConfigDict(arbitrary_types_allowed=True) type: Literal["interruption_detection_error"] = "interruption_detection_error" timestamp: float = Field(default_factory=time.time) label: str error: Exception = Field(..., exclude=True) recoverable: boolUsage Documentation
A base class for creating Pydantic models.
Attributes
__class_vars__- The names of the class variables defined on the model.
__private_attributes__- Metadata about the private attributes of the model.
__signature__- The synthesized
__init__[Signature][inspect.Signature] of the model. __pydantic_complete__- Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__- The core schema of the model.
__pydantic_custom_init__- Whether the model has a custom
__init__function. __pydantic_decorators__- Metadata containing the decorators defined on the model.
This replaces
Model.__validators__andModel.__root_validators__from Pydantic V1. __pydantic_generic_metadata__- Metadata for generic models; contains data used for a similar purpose to args, origin, parameters in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__- Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__- The name of the post-init method for the model, if defined.
__pydantic_root_model__- Whether the model is a [
RootModel][pydantic.root_model.RootModel]. __pydantic_serializer__- The
pydantic-coreSchemaSerializerused to dump instances of the model. __pydantic_validator__- The
pydantic-coreSchemaValidatorused to validate instances of the model. __pydantic_fields__- A dictionary of field names and their corresponding [
FieldInfo][pydantic.fields.FieldInfo] objects. __pydantic_computed_fields__- A dictionary of computed field names and their corresponding [
ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects. __pydantic_extra__- A dictionary containing extra values, if [
extra][pydantic.config.ConfigDict.extra] is set to'allow'. __pydantic_fields_set__- The names of fields explicitly set during instantiation.
__pydantic_private__- Values of private attributes set on the model instance.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Ancestors
- pydantic.main.BaseModel
Class variables
var error : Exceptionvar label : strvar model_configvar recoverable : boolvar timestamp : floatvar type : Literal['interruption_detection_error']
class LLM (model: LLMModels | str,
*,
provider: str | None = None,
base_url: str | None = None,
api_key: str | None = None,
api_secret: str | None = None,
extra_kwargs: ChatCompletionOptions | dict[str, Any] | None = None)-
Expand source code
class LLM(llm.LLM): def __init__( self, model: LLMModels | str, *, provider: str | None = None, base_url: str | None = None, api_key: str | None = None, api_secret: str | None = None, extra_kwargs: ChatCompletionOptions | dict[str, Any] | None = None, ) -> None: super().__init__() lk_base_url = base_url if base_url else get_default_inference_url() lk_api_key = ( api_key if api_key else os.getenv("LIVEKIT_INFERENCE_API_KEY", os.getenv("LIVEKIT_API_KEY", "")) ) if not lk_api_key: raise ValueError( "api_key is required, either as argument or set LIVEKIT_API_KEY environmental variable" ) lk_api_secret = ( api_secret if api_secret else os.getenv("LIVEKIT_INFERENCE_API_SECRET", os.getenv("LIVEKIT_API_SECRET", "")) ) if not lk_api_secret: raise ValueError( "api_secret is required, either as argument or set LIVEKIT_API_SECRET environmental variable" ) self._opts = _LLMOptions( model=model, provider=provider, base_url=lk_base_url, api_key=lk_api_key, api_secret=lk_api_secret, extra_kwargs=extra_kwargs or {}, ) self._client = openai.AsyncClient( api_key=create_access_token(self._opts.api_key, self._opts.api_secret), base_url=self._opts.base_url, http_client=httpx.AsyncClient( timeout=httpx.Timeout(connect=15.0, read=5.0, write=5.0, pool=5.0), follow_redirects=True, limits=httpx.Limits( max_connections=50, max_keepalive_connections=50, keepalive_expiry=120 ), ), ) async def aclose(self) -> None: await self._client.close() @classmethod def from_model_string(cls, model: str) -> LLM: """Create a LLM instance from a model string""" return cls(model) @property def model(self) -> str: """Get the model name for this LLM instance.""" return self._opts.model @property def provider(self) -> str: return "livekit" def chat( self, *, chat_ctx: ChatContext, tools: list[Tool] | None = None, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN, tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN, response_format: NotGivenOr[ completion_create_params.ResponseFormat | type[llm_utils.ResponseFormatT] ] = NOT_GIVEN, extra_kwargs: NotGivenOr[dict[str, Any]] = NOT_GIVEN, ) -> LLMStream: extra = {} if is_given(extra_kwargs): extra.update(extra_kwargs) parallel_tool_calls = ( parallel_tool_calls if is_given(parallel_tool_calls) else self._opts.extra_kwargs.get("parallel_tool_calls", NOT_GIVEN) ) if is_given(parallel_tool_calls): extra["parallel_tool_calls"] = parallel_tool_calls extra_tool_choice = self._opts.extra_kwargs.get("tool_choice", NOT_GIVEN) tool_choice = tool_choice if is_given(tool_choice) else extra_tool_choice if is_given(tool_choice): oai_tool_choice: ChatCompletionToolChoiceOptionParam if isinstance(tool_choice, dict): oai_tool_choice = { "type": "function", "function": {"name": tool_choice["function"]["name"]}, } extra["tool_choice"] = oai_tool_choice elif tool_choice in ("auto", "required", "none"): oai_tool_choice = tool_choice extra["tool_choice"] = oai_tool_choice if is_given(response_format): extra["response_format"] = llm_utils.to_openai_response_format(response_format) # type: ignore extra.update(self._opts.extra_kwargs) self._client.api_key = create_access_token(self._opts.api_key, self._opts.api_secret) return LLMStream( self, model=self._opts.model, provider=self._opts.provider, strict_tool_schema=True, client=self._client, chat_ctx=chat_ctx, tools=tools or [], conn_options=conn_options, extra_kwargs=extra, )Helper class that provides a standard way to create an ABC using inheritance.
Ancestors
- livekit.agents.llm.llm.LLM
- abc.ABC
- EventEmitter
- typing.Generic
Static methods
def from_model_string(model: str) ‑> LLM-
Create a LLM instance from a model string
Instance variables
prop model : str-
Expand source code
@property def model(self) -> str: """Get the model name for this LLM instance.""" return self._opts.modelGet the model name for this LLM instance.
prop provider : str-
Expand source code
@property def provider(self) -> str: return "livekit"Get the provider name/identifier for this LLM instance.
Returns
The provider name if available, "unknown" otherwise.
Note
Plugins should override this property to provide their provider information.
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: await self._client.close() def chat(self,
*,
chat_ctx: ChatContext,
tools: list[Tool] | None = None,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0),
parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN,
tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN,
response_format: NotGivenOr[completion_create_params.ResponseFormat | type[llm_utils.ResponseFormatT]] = NOT_GIVEN,
extra_kwargs: NotGivenOr[dict[str, Any]] = NOT_GIVEN) ‑> LLMStream-
Expand source code
def chat( self, *, chat_ctx: ChatContext, tools: list[Tool] | None = None, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN, tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN, response_format: NotGivenOr[ completion_create_params.ResponseFormat | type[llm_utils.ResponseFormatT] ] = NOT_GIVEN, extra_kwargs: NotGivenOr[dict[str, Any]] = NOT_GIVEN, ) -> LLMStream: extra = {} if is_given(extra_kwargs): extra.update(extra_kwargs) parallel_tool_calls = ( parallel_tool_calls if is_given(parallel_tool_calls) else self._opts.extra_kwargs.get("parallel_tool_calls", NOT_GIVEN) ) if is_given(parallel_tool_calls): extra["parallel_tool_calls"] = parallel_tool_calls extra_tool_choice = self._opts.extra_kwargs.get("tool_choice", NOT_GIVEN) tool_choice = tool_choice if is_given(tool_choice) else extra_tool_choice if is_given(tool_choice): oai_tool_choice: ChatCompletionToolChoiceOptionParam if isinstance(tool_choice, dict): oai_tool_choice = { "type": "function", "function": {"name": tool_choice["function"]["name"]}, } extra["tool_choice"] = oai_tool_choice elif tool_choice in ("auto", "required", "none"): oai_tool_choice = tool_choice extra["tool_choice"] = oai_tool_choice if is_given(response_format): extra["response_format"] = llm_utils.to_openai_response_format(response_format) # type: ignore extra.update(self._opts.extra_kwargs) self._client.api_key = create_access_token(self._opts.api_key, self._opts.api_secret) return LLMStream( self, model=self._opts.model, provider=self._opts.provider, strict_tool_schema=True, client=self._client, chat_ctx=chat_ctx, tools=tools or [], conn_options=conn_options, extra_kwargs=extra, )
Inherited members
class LLMStream (llm_v: LLM | LLM,
*,
model: LLMModels | str,
provider: str | None = None,
strict_tool_schema: bool,
client: openai.AsyncClient,
chat_ctx: llm.ChatContext,
tools: list[Tool],
conn_options: APIConnectOptions,
extra_kwargs: dict[str, Any],
provider_fmt: str = 'openai')-
Expand source code
class LLMStream(llm.LLMStream): def __init__( self, llm_v: LLM | llm.LLM, *, model: LLMModels | str, provider: str | None = None, strict_tool_schema: bool, client: openai.AsyncClient, chat_ctx: llm.ChatContext, tools: list[Tool], conn_options: APIConnectOptions, extra_kwargs: dict[str, Any], provider_fmt: str = "openai", # used internally for chat_ctx format ) -> None: super().__init__(llm_v, chat_ctx=chat_ctx, tools=tools, conn_options=conn_options) self._model = model self._provider = provider self._provider_fmt = provider_fmt self._strict_tool_schema = strict_tool_schema self._client = client self._llm = llm_v self._extra_kwargs = drop_unsupported_params(model, extra_kwargs, tools=tools) self._tool_ctx = llm.ToolContext(tools) async def _run(self) -> None: # current function call that we're waiting for full completion (args are streamed) # (defined inside the _run method to make sure the state is reset for each run/attempt) self._oai_stream: openai.AsyncStream[ChatCompletionChunk] | None = None self._tool_call_id: str | None = None self._fnc_name: str | None = None self._fnc_raw_arguments: str | None = None self._tool_extra: dict[str, Any] | None = None self._tool_index: int | None = None retryable = True try: chat_ctx, _ = self._chat_ctx.to_provider_format(format=self._provider_fmt) tool_schemas = cast( list[ChatCompletionToolParam], self._tool_ctx.parse_function_tools("openai", strict=self._strict_tool_schema), ) if lk_oai_debug: tool_choice = self._extra_kwargs.get("tool_choice", NOT_GIVEN) logger.debug( "chat.completions.create", extra={ "fnc_ctx": tool_schemas, "tool_choice": tool_choice, "chat_ctx": chat_ctx, }, ) if not self._tools: # remove tool_choice from extra_kwargs if no tools are provided self._extra_kwargs.pop("tool_choice", None) extra_headers = self._extra_kwargs.setdefault("extra_headers", {}) extra_headers.update(get_inference_headers()) if self._provider: extra_headers["X-LiveKit-Inference-Provider"] = self._provider self._oai_stream = stream = await self._client.chat.completions.create( messages=cast(list[ChatCompletionMessageParam], chat_ctx), tools=tool_schemas or openai.omit, model=self._model, stream_options={"include_usage": True}, stream=True, timeout=httpx.Timeout(self._conn_options.timeout), **self._extra_kwargs, ) thinking = asyncio.Event() async with stream: async for chunk in stream: for choice in chunk.choices: chat_chunk = self._parse_choice(chunk.id, choice, thinking) if chat_chunk is not None: retryable = False self._event_ch.send_nowait(chat_chunk) if chunk.usage is not None: retryable = False tokens_details = chunk.usage.prompt_tokens_details cached_tokens = tokens_details.cached_tokens if tokens_details else 0 usage_chunk = llm.ChatChunk( id=chunk.id, usage=llm.CompletionUsage( completion_tokens=chunk.usage.completion_tokens, prompt_tokens=chunk.usage.prompt_tokens, prompt_cached_tokens=cached_tokens or 0, total_tokens=chunk.usage.total_tokens, service_tier=getattr(chunk, "service_tier", None), ), ) self._event_ch.send_nowait(usage_chunk) except openai.APITimeoutError: raise APITimeoutError(retryable=retryable) from None except openai.APIStatusError as e: raise APIStatusError( e.message, status_code=e.status_code, request_id=e.request_id, body=e.body, retryable=retryable, ) from None except Exception as e: raise APIConnectionError(retryable=retryable) from e def _parse_choice( self, id: str, choice: Choice, thinking: asyncio.Event ) -> llm.ChatChunk | None: delta = choice.delta # https://github.com/livekit/agents/issues/688 # the delta can be None when using Azure OpenAI (content filtering) if delta is None: return None if delta.tool_calls: for tool in delta.tool_calls: if not tool.function: continue call_chunk = None if self._tool_call_id and tool.id and tool.index != self._tool_index: call_chunk = llm.ChatChunk( id=id, delta=llm.ChoiceDelta( role="assistant", content=delta.content, tool_calls=[ llm.FunctionToolCall( arguments=self._fnc_raw_arguments or "", name=self._fnc_name or "", call_id=self._tool_call_id or "", extra=self._tool_extra, ) ], ), ) self._tool_call_id = self._fnc_name = self._fnc_raw_arguments = None self._tool_extra = None if tool.function.name: self._tool_index = tool.index self._tool_call_id = tool.id self._fnc_name = tool.function.name self._fnc_raw_arguments = tool.function.arguments or "" # Extract extra from tool call (e.g., Google thought signatures) self._tool_extra = getattr(tool, "extra_content", None) elif tool.function.arguments: self._fnc_raw_arguments += tool.function.arguments # type: ignore if call_chunk is not None: return call_chunk if choice.finish_reason in ("tool_calls", "stop") and self._tool_call_id: finish_extra = getattr(delta, "extra_content", None) call_chunk = llm.ChatChunk( id=id, delta=llm.ChoiceDelta( role="assistant", content=delta.content, extra=finish_extra, tool_calls=[ llm.FunctionToolCall( arguments=self._fnc_raw_arguments or "", name=self._fnc_name or "", call_id=self._tool_call_id or "", extra=self._tool_extra, ) ], ), ) self._tool_call_id = self._fnc_name = self._fnc_raw_arguments = None self._tool_extra = None return call_chunk delta.content = llm_utils.strip_thinking_tokens(delta.content, thinking) # Extract extra from delta (e.g., Google thought signatures on text parts) delta_extra = getattr(delta, "extra_content", None) if not delta.content and not delta_extra: return None return llm.ChatChunk( id=id, delta=llm.ChoiceDelta( content=delta.content, role="assistant", extra=delta_extra, ), )Helper class that provides a standard way to create an ABC using inheritance.
Ancestors
- livekit.agents.llm.llm.LLMStream
- abc.ABC
Subclasses
- livekit.plugins.openai.llm.LLMStream
class OverlappingSpeechEvent (**data: Any)-
Expand source code
class OverlappingSpeechEvent(BaseModel): """Represents an overlapping speech event detected during agent speech.""" model_config = ConfigDict(arbitrary_types_allowed=True) type: Literal["overlapping_speech"] = "overlapping_speech" created_at: float = Field(default_factory=time.time) """Timestamp (in seconds) when the event was emitted.""" detected_at: float = Field(default_factory=time.time) """Timestamp (in seconds) when the overlap was detected.""" is_interruption: bool = False """Whether interruption is detected.""" total_duration: float = 0.0 """RTT (Round Trip Time) time taken to perform the inference, in seconds.""" prediction_duration: float = 0.0 """Time taken to perform the inference from the model side, in seconds.""" detection_delay: float = 0.0 """Total time from the onset of the speech to the final prediction, in seconds.""" overlap_started_at: float | None = None """Timestamp (in seconds) when the overlap speech started. Useful for emitting held transcripts.""" speech_input: npt.NDArray[np.int16] | None = None """The audio input that was used for the inference.""" probabilities: npt.NDArray[np.float32] | None = None """The raw probabilities for the interruption detection.""" probability: float = 0.0 """The conservative estimated probability of the interruption event.""" num_requests: int = 0 """Number of requests sent for this event.""" @model_serializer(mode="wrap") def serialize_model(self, handler: SerializerFunctionWrapHandler) -> Any: # remove numpy arrays from the model dump copy = self.model_copy(deep=True) data = copy.speech_input, copy.probabilities copy.speech_input, copy.probabilities = None, None try: serialized = handler(copy) finally: copy.speech_input, copy.probabilities = data return serialized @classmethod def from_cache_entry( cls, *, entry: InterruptionCacheEntry, is_interruption: bool, started_at: float | None = None, ended_at: float | None = None, ) -> OverlappingSpeechEvent: """Initialize the event from a cache entry. Args: entry: The cache entry to initialize the event from. is_interruption: Whether the interruption is detected. started_at: The timestamp when the overlap speech started. ended_at: The timestamp when the overlap speech ended. Returns: The initialized event. """ return cls( type="overlapping_speech", detected_at=ended_at or time.time(), is_interruption=is_interruption, overlap_started_at=started_at, speech_input=entry.speech_input, probabilities=entry.probabilities, total_duration=entry.get_total_duration(), detection_delay=entry.get_detection_delay(), prediction_duration=entry.get_prediction_duration(), probability=entry.get_probability(), )Represents an overlapping speech event detected during agent speech.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.selfis explicitly positional-only to allowselfas a field name.Ancestors
- pydantic.main.BaseModel
Class variables
var created_at : float-
Timestamp (in seconds) when the event was emitted.
var detected_at : float-
Timestamp (in seconds) when the overlap was detected.
var detection_delay : float-
Total time from the onset of the speech to the final prediction, in seconds.
var is_interruption : bool-
Whether interruption is detected.
var model_configvar num_requests : int-
Number of requests sent for this event.
var overlap_started_at : float | None-
Timestamp (in seconds) when the overlap speech started. Useful for emitting held transcripts.
var prediction_duration : float-
Time taken to perform the inference from the model side, in seconds.
var probabilities : numpy.ndarray[tuple[typing.Any, ...], numpy.dtype[numpy.float32]] | None-
The raw probabilities for the interruption detection.
var probability : float-
The conservative estimated probability of the interruption event.
var speech_input : numpy.ndarray[tuple[typing.Any, ...], numpy.dtype[numpy.int16]] | None-
The audio input that was used for the inference.
var total_duration : float-
RTT (Round Trip Time) time taken to perform the inference, in seconds.
var type : Literal['overlapping_speech']
Static methods
def from_cache_entry(*,
entry: InterruptionCacheEntry,
is_interruption: bool,
started_at: float | None = None,
ended_at: float | None = None) ‑> OverlappingSpeechEvent-
Initialize the event from a cache entry.
Args
entry- The cache entry to initialize the event from.
is_interruption- Whether the interruption is detected.
started_at- The timestamp when the overlap speech started.
ended_at- The timestamp when the overlap speech ended.
Returns
The initialized event.
Methods
def serialize_model(self, handler: SerializerFunctionWrapHandler) ‑> Any-
Expand source code
@model_serializer(mode="wrap") def serialize_model(self, handler: SerializerFunctionWrapHandler) -> Any: # remove numpy arrays from the model dump copy = self.model_copy(deep=True) data = copy.speech_input, copy.probabilities copy.speech_input, copy.probabilities = None, None try: serialized = handler(copy) finally: copy.speech_input, copy.probabilities = data return serialized
class STT (model: NotGivenOr[STTModels | str] = NOT_GIVEN,
*,
language: NotGivenOr[str] = NOT_GIVEN,
base_url: NotGivenOr[str] = NOT_GIVEN,
encoding: NotGivenOr[STTEncoding] = NOT_GIVEN,
sample_rate: NotGivenOr[int] = NOT_GIVEN,
api_key: NotGivenOr[str] = NOT_GIVEN,
api_secret: NotGivenOr[str] = NOT_GIVEN,
http_session: aiohttp.ClientSession | None = None,
extra_kwargs: NotGivenOr[dict[str, Any] | CartesiaOptions | DeepgramOptions | DeepgramFluxOptions | AssemblyaiOptions | ElevenlabsOptions | XaiOptions] = NOT_GIVEN,
fallback: NotGivenOr[list[FallbackModelType] | FallbackModelType] = NOT_GIVEN,
conn_options: NotGivenOr[APIConnectOptions] = NOT_GIVEN)-
Expand source code
class STT(stt.STT): @overload def __init__( self, model: CartesiaModels, *, language: NotGivenOr[str] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, encoding: NotGivenOr[STTEncoding] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, extra_kwargs: NotGivenOr[CartesiaOptions] = NOT_GIVEN, fallback: NotGivenOr[list[FallbackModelType] | FallbackModelType] = NOT_GIVEN, conn_options: NotGivenOr[APIConnectOptions] = NOT_GIVEN, ) -> None: ... @overload def __init__( self, model: DeepgramModels, *, language: NotGivenOr[str] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, encoding: NotGivenOr[STTEncoding] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, extra_kwargs: NotGivenOr[DeepgramOptions] = NOT_GIVEN, fallback: NotGivenOr[list[FallbackModelType] | FallbackModelType] = NOT_GIVEN, conn_options: NotGivenOr[APIConnectOptions] = NOT_GIVEN, ) -> None: ... @overload def __init__( self, model: DeepgramFluxModels, *, language: NotGivenOr[str] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, encoding: NotGivenOr[STTEncoding] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, extra_kwargs: NotGivenOr[DeepgramFluxOptions] = NOT_GIVEN, fallback: NotGivenOr[list[FallbackModelType] | FallbackModelType] = NOT_GIVEN, conn_options: NotGivenOr[APIConnectOptions] = NOT_GIVEN, ) -> None: ... @overload def __init__( self, model: AssemblyAIModels, *, language: NotGivenOr[str] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, encoding: NotGivenOr[STTEncoding] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, extra_kwargs: NotGivenOr[AssemblyaiOptions] = NOT_GIVEN, fallback: NotGivenOr[list[FallbackModelType] | FallbackModelType] = NOT_GIVEN, conn_options: NotGivenOr[APIConnectOptions] = NOT_GIVEN, ) -> None: ... @overload def __init__( self, model: ElevenlabsModels, *, language: NotGivenOr[str] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, encoding: NotGivenOr[STTEncoding] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, extra_kwargs: NotGivenOr[ElevenlabsOptions] = NOT_GIVEN, fallback: NotGivenOr[list[FallbackModelType] | FallbackModelType] = NOT_GIVEN, conn_options: NotGivenOr[APIConnectOptions] = NOT_GIVEN, ) -> None: ... @overload def __init__( self, model: XaiModels, *, language: NotGivenOr[str] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, encoding: NotGivenOr[STTEncoding] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, extra_kwargs: NotGivenOr[XaiOptions] = NOT_GIVEN, fallback: NotGivenOr[list[FallbackModelType] | FallbackModelType] = NOT_GIVEN, conn_options: NotGivenOr[APIConnectOptions] = NOT_GIVEN, ) -> None: ... @overload def __init__( self, model: str, *, language: NotGivenOr[str] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, encoding: NotGivenOr[STTEncoding] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, extra_kwargs: NotGivenOr[dict[str, Any]] = NOT_GIVEN, fallback: NotGivenOr[list[FallbackModelType] | FallbackModelType] = NOT_GIVEN, conn_options: NotGivenOr[APIConnectOptions] = NOT_GIVEN, ) -> None: ... def __init__( self, model: NotGivenOr[STTModels | str] = NOT_GIVEN, *, language: NotGivenOr[str] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, encoding: NotGivenOr[STTEncoding] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, extra_kwargs: NotGivenOr[ dict[str, Any] | CartesiaOptions | DeepgramOptions | DeepgramFluxOptions | AssemblyaiOptions | ElevenlabsOptions | XaiOptions ] = NOT_GIVEN, fallback: NotGivenOr[list[FallbackModelType] | FallbackModelType] = NOT_GIVEN, conn_options: NotGivenOr[APIConnectOptions] = NOT_GIVEN, ) -> None: """Livekit Cloud Inference STT Args: model (STTModels | str, optional): STT model to use, in "provider/model[:language]" format. language (str, optional): Language of the STT model. encoding (STTEncoding, optional): Encoding of the STT model. sample_rate (int, optional): Sample rate of the STT model. base_url (str, optional): LIVEKIT_URL, if not provided, read from environment variable. api_key (str, optional): LIVEKIT_API_KEY, if not provided, read from environment variable. api_secret (str, optional): LIVEKIT_API_SECRET, if not provided, read from environment variable. http_session (aiohttp.ClientSession, optional): HTTP session to use. extra_kwargs (dict, optional): Extra kwargs to pass to the STT model. fallback (FallbackModelType, optional): Fallback models - either a list of model names, a list of FallbackModel instances. conn_options (APIConnectOptions, optional): Connection options for request attempts. """ # Infer diarization capability from provider-specific extra_kwargs # keys (see _DIARIZATION_EXTRA_KEYS). xAI uses "diarize" (same as # Deepgram); AssemblyAI uses "speaker_labels". diarization_enabled = _diarization_enabled( dict(extra_kwargs) if is_given(extra_kwargs) else None ) super().__init__( capabilities=stt.STTCapabilities( streaming=True, interim_results=True, diarization=diarization_enabled, aligned_transcript="word", offline_recognize=False, ), ) # Parse language from model string if provided: "provider/model:language" if is_given(model) and isinstance(model, str): parsed_model, parsed_language = _parse_model_string(model) model = parsed_model if is_given(parsed_language) and not is_given(language): language = parsed_language lk_base_url = base_url if is_given(base_url) else get_default_inference_url() lk_api_key = ( api_key if is_given(api_key) else os.getenv("LIVEKIT_INFERENCE_API_KEY", os.getenv("LIVEKIT_API_KEY", "")) ) if not lk_api_key: raise ValueError( "api_key is required, either as argument or set LIVEKIT_API_KEY environmental variable" ) lk_api_secret = ( api_secret if is_given(api_secret) else os.getenv("LIVEKIT_INFERENCE_API_SECRET", os.getenv("LIVEKIT_API_SECRET", "")) ) if not lk_api_secret: raise ValueError( "api_secret is required, either as argument or set LIVEKIT_API_SECRET environmental variable" ) fallback_models: NotGivenOr[list[FallbackModel]] = NOT_GIVEN if is_given(fallback): fallback_models = _normalize_fallback(fallback) self._opts = STTOptions( model=model, language=LanguageCode(language) if isinstance(language, str) else language, encoding=encoding if is_given(encoding) else DEFAULT_ENCODING, sample_rate=sample_rate if is_given(sample_rate) else DEFAULT_SAMPLE_RATE, base_url=lk_base_url, api_key=lk_api_key, api_secret=lk_api_secret, extra_kwargs=dict(extra_kwargs) if is_given(extra_kwargs) else {}, fallback=fallback_models, conn_options=conn_options if is_given(conn_options) else DEFAULT_API_CONNECT_OPTIONS, ) self._session = http_session self._streams = weakref.WeakSet[SpeechStream]() @classmethod def from_model_string(cls, model: str) -> STT: """Create a STT instance from a model string Args: model (str): STT model to use, in "provider/model[:language]" format Returns: STT: STT instance """ model_name, language = _parse_model_string(model) return cls(model=model_name, language=language) @property def model(self) -> str: return self._opts.model if is_given(self._opts.model) else "unknown" @property def provider(self) -> str: return "livekit" def _ensure_session(self) -> aiohttp.ClientSession: if not self._session: self._session = utils.http_context.http_session() return self._session async def _recognize_impl( self, buffer: utils.AudioBuffer, *, language: NotGivenOr[str] = NOT_GIVEN, conn_options: APIConnectOptions, ) -> stt.SpeechEvent: raise NotImplementedError( "LiveKit Inference STT does not support batch recognition, use stream() instead" ) def stream( self, *, language: NotGivenOr[STTLanguages | str] = NOT_GIVEN, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> SpeechStream: """Create a streaming transcription session.""" options = self._sanitize_options(language=language) stream = SpeechStream(stt=self, opts=options, conn_options=conn_options) self._streams.add(stream) return stream def update_options( self, *, model: NotGivenOr[STTModels | str] = NOT_GIVEN, language: NotGivenOr[STTLanguages | str] = NOT_GIVEN, extra: NotGivenOr[dict[str, Any]] = NOT_GIVEN, ) -> None: """Update STT configuration options.""" if is_given(model): self._opts.model = model if is_given(language): self._opts.language = LanguageCode(language) if is_given(extra): self._opts.extra_kwargs.update(extra) self._capabilities = replace( self._capabilities, diarization=_diarization_enabled(self._opts.extra_kwargs), ) for stream in self._streams: stream.update_options(model=model, language=language, extra=extra) def _sanitize_options( self, *, language: NotGivenOr[STTLanguages | str] = NOT_GIVEN ) -> STTOptions: """Create a sanitized copy of options with language override if provided.""" options = replace(self._opts) options.extra_kwargs = dict(options.extra_kwargs) if is_given(language): options.language = LanguageCode(language) return optionsHelper class that provides a standard way to create an ABC using inheritance.
Livekit Cloud Inference STT
Args
model:STTModels | str, optional- STT model to use, in "provider/model[:language]" format.
language:str, optional- Language of the STT model.
encoding:STTEncoding, optional- Encoding of the STT model.
sample_rate:int, optional- Sample rate of the STT model.
base_url:str, optional- LIVEKIT_URL, if not provided, read from environment variable.
api_key:str, optional- LIVEKIT_API_KEY, if not provided, read from environment variable.
api_secret:str, optional- LIVEKIT_API_SECRET, if not provided, read from environment variable.
http_session:aiohttp.ClientSession, optional- HTTP session to use.
extra_kwargs:dict, optional- Extra kwargs to pass to the STT model.
fallback:FallbackModelType, optional- Fallback models - either a list of model names, a list of FallbackModel instances.
conn_options:APIConnectOptions, optional- Connection options for request attempts.
Ancestors
- livekit.agents.stt.stt.STT
- abc.ABC
- EventEmitter
- typing.Generic
Static methods
def from_model_string(model: str) ‑> STT-
Create a STT instance from a model string
Args
model:str- STT model to use, in "provider/model[:language]" format
Returns
STT- STT instance
Instance variables
prop model : str-
Expand source code
@property def model(self) -> str: return self._opts.model if is_given(self._opts.model) else "unknown"Get the model name/identifier for this STT instance.
Returns
The model name if available, "unknown" otherwise.
Note
Plugins should override this property to provide their model information.
prop provider : str-
Expand source code
@property def provider(self) -> str: return "livekit"Get the provider name/identifier for this STT instance.
Returns
The provider name if available, "unknown" otherwise.
Note
Plugins should override this property to provide their provider information.
Methods
def stream(self,
*,
language: NotGivenOr[STTLanguages | str] = NOT_GIVEN,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> SpeechStream-
Expand source code
def stream( self, *, language: NotGivenOr[STTLanguages | str] = NOT_GIVEN, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> SpeechStream: """Create a streaming transcription session.""" options = self._sanitize_options(language=language) stream = SpeechStream(stt=self, opts=options, conn_options=conn_options) self._streams.add(stream) return streamCreate a streaming transcription session.
def update_options(self,
*,
model: NotGivenOr[STTModels | str] = NOT_GIVEN,
language: NotGivenOr[STTLanguages | str] = NOT_GIVEN,
extra: NotGivenOr[dict[str, Any]] = NOT_GIVEN) ‑> None-
Expand source code
def update_options( self, *, model: NotGivenOr[STTModels | str] = NOT_GIVEN, language: NotGivenOr[STTLanguages | str] = NOT_GIVEN, extra: NotGivenOr[dict[str, Any]] = NOT_GIVEN, ) -> None: """Update STT configuration options.""" if is_given(model): self._opts.model = model if is_given(language): self._opts.language = LanguageCode(language) if is_given(extra): self._opts.extra_kwargs.update(extra) self._capabilities = replace( self._capabilities, diarization=_diarization_enabled(self._opts.extra_kwargs), ) for stream in self._streams: stream.update_options(model=model, language=language, extra=extra)Update STT configuration options.
Inherited members
class TTS (model: TTSModels | str,
*,
voice: NotGivenOr[str] = NOT_GIVEN,
language: NotGivenOr[str] = NOT_GIVEN,
encoding: NotGivenOr[TTSEncoding] = NOT_GIVEN,
sample_rate: NotGivenOr[int] = NOT_GIVEN,
base_url: NotGivenOr[str] = NOT_GIVEN,
api_key: NotGivenOr[str] = NOT_GIVEN,
api_secret: NotGivenOr[str] = NOT_GIVEN,
http_session: aiohttp.ClientSession | None = None,
extra_kwargs: NotGivenOr[dict[str, Any] | CartesiaOptions | DeepgramOptions | ElevenlabsOptions | RimeOptions | InworldOptions] = NOT_GIVEN,
fallback: NotGivenOr[list[FallbackModelType] | FallbackModelType] = NOT_GIVEN,
conn_options: NotGivenOr[APIConnectOptions] = NOT_GIVEN)-
Expand source code
class TTS(tts.TTS): @overload def __init__( self, model: CartesiaModels, *, voice: NotGivenOr[str] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, encoding: NotGivenOr[TTSEncoding] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, extra_kwargs: NotGivenOr[CartesiaOptions] = NOT_GIVEN, fallback: NotGivenOr[list[FallbackModelType] | FallbackModelType] = NOT_GIVEN, conn_options: NotGivenOr[APIConnectOptions] = NOT_GIVEN, ) -> None: pass @overload def __init__( self, model: DeepgramModels, *, voice: NotGivenOr[str] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, encoding: NotGivenOr[TTSEncoding] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, extra_kwargs: NotGivenOr[DeepgramOptions] = NOT_GIVEN, fallback: NotGivenOr[list[FallbackModelType] | FallbackModelType] = NOT_GIVEN, conn_options: NotGivenOr[APIConnectOptions] = NOT_GIVEN, ) -> None: pass @overload def __init__( self, model: ElevenlabsModels, *, voice: NotGivenOr[str] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, encoding: NotGivenOr[TTSEncoding] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, extra_kwargs: NotGivenOr[ElevenlabsOptions] = NOT_GIVEN, fallback: NotGivenOr[list[FallbackModelType] | FallbackModelType] = NOT_GIVEN, conn_options: NotGivenOr[APIConnectOptions] = NOT_GIVEN, ) -> None: pass @overload def __init__( self, model: RimeModels, *, voice: NotGivenOr[str] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, encoding: NotGivenOr[TTSEncoding] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, extra_kwargs: NotGivenOr[RimeOptions] = NOT_GIVEN, fallback: NotGivenOr[list[FallbackModelType] | FallbackModelType] = NOT_GIVEN, conn_options: NotGivenOr[APIConnectOptions] = NOT_GIVEN, ) -> None: pass @overload def __init__( self, model: InworldModels, *, voice: NotGivenOr[str] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, encoding: NotGivenOr[TTSEncoding] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, extra_kwargs: NotGivenOr[InworldOptions] = NOT_GIVEN, fallback: NotGivenOr[list[FallbackModelType] | FallbackModelType] = NOT_GIVEN, conn_options: NotGivenOr[APIConnectOptions] = NOT_GIVEN, ) -> None: pass @overload def __init__( self, model: str, *, voice: NotGivenOr[str] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, encoding: NotGivenOr[TTSEncoding] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, extra_kwargs: NotGivenOr[dict[str, Any]] = NOT_GIVEN, fallback: NotGivenOr[list[FallbackModelType] | FallbackModelType] = NOT_GIVEN, conn_options: NotGivenOr[APIConnectOptions] = NOT_GIVEN, ) -> None: pass def __init__( self, model: TTSModels | str, *, voice: NotGivenOr[str] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, encoding: NotGivenOr[TTSEncoding] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, extra_kwargs: NotGivenOr[ dict[str, Any] | CartesiaOptions | DeepgramOptions | ElevenlabsOptions | RimeOptions | InworldOptions ] = NOT_GIVEN, fallback: NotGivenOr[list[FallbackModelType] | FallbackModelType] = NOT_GIVEN, conn_options: NotGivenOr[APIConnectOptions] = NOT_GIVEN, ) -> None: """Livekit Cloud Inference TTS Args: model (TTSModels | str): TTS model to use, in "provider/model[:voice]" format voice (str, optional): Voice to use, use a default one if not provided language (str, optional): Language of the TTS model. encoding (TTSEncoding, optional): Encoding of the TTS model. sample_rate (int, optional): Sample rate of the TTS model. base_url (str, optional): LIVEKIT_URL, if not provided, read from environment variable. api_key (str, optional): LIVEKIT_API_KEY, if not provided, read from environment variable. api_secret (str, optional): LIVEKIT_API_SECRET, if not provided, read from environment variable. http_session (aiohttp.ClientSession, optional): HTTP session to use. extra_kwargs (dict, optional): Extra kwargs to pass to the TTS model. fallback (FallbackModelType, optional): Fallback models - either a list of model names, a list of FallbackModel instances. conn_options (APIConnectOptions, optional): Connection options for request attempts. """ sample_rate = sample_rate if is_given(sample_rate) else DEFAULT_SAMPLE_RATE super().__init__( capabilities=tts.TTSCapabilities(streaming=True, aligned_transcript=False), sample_rate=sample_rate, num_channels=1, ) # Parse voice from model string if provided: "provider/model:voice" if isinstance(model, str): parsed_model, parsed_voice = _parse_model_string(model) model = parsed_model if parsed_voice is not None and not is_given(voice): voice = parsed_voice lk_base_url = base_url if is_given(base_url) else get_default_inference_url() lk_api_key = ( api_key if is_given(api_key) else os.getenv("LIVEKIT_INFERENCE_API_KEY", os.getenv("LIVEKIT_API_KEY", "")) ) if not lk_api_key: raise ValueError( "api_key is required, either as argument or set LIVEKIT_API_KEY environmental variable" ) lk_api_secret = ( api_secret if is_given(api_secret) else os.getenv("LIVEKIT_INFERENCE_API_SECRET", os.getenv("LIVEKIT_API_SECRET", "")) ) if not lk_api_secret: raise ValueError( "api_secret is required, either as argument or set LIVEKIT_API_SECRET environmental variable" ) fallback_models: NotGivenOr[list[FallbackModel]] = NOT_GIVEN if is_given(fallback): fallback_models = _normalize_fallback(fallback) self._opts = _TTSOptions( model=model, voice=voice, language=LanguageCode(language) if isinstance(language, str) else language, encoding=encoding if is_given(encoding) else DEFAULT_ENCODING, sample_rate=sample_rate, base_url=lk_base_url, api_key=lk_api_key, api_secret=lk_api_secret, extra_kwargs=dict(extra_kwargs) if is_given(extra_kwargs) else {}, fallback=fallback_models, conn_options=conn_options if is_given(conn_options) else DEFAULT_API_CONNECT_OPTIONS, ) self._session = http_session self._pool = utils.ConnectionPool[aiohttp.ClientWebSocketResponse]( connect_cb=self._connect_ws, close_cb=self._close_ws, max_session_duration=300, mark_refreshed_on_get=True, ) self._streams = weakref.WeakSet[SynthesizeStream]() @classmethod def from_model_string(cls, model: str) -> TTS: """Create a TTS instance from a model string Args: model (str): TTS model to use, in "provider/model[:voice_id]" format Returns: TTS: TTS instance """ model, voice = _parse_model_string(model) return cls(model=model, voice=voice if voice else NOT_GIVEN) @property def model(self) -> str: return self._opts.model @property def provider(self) -> str: return "livekit" async def _connect_ws(self, timeout: float) -> aiohttp.ClientWebSocketResponse: session = self._ensure_session() base_url = self._opts.base_url if base_url.startswith(("http://", "https://")): base_url = base_url.replace("http", "ws", 1) headers = { **get_inference_headers(), "Authorization": f"Bearer {create_access_token(self._opts.api_key, self._opts.api_secret)}", } ws = None try: ws = await asyncio.wait_for( session.ws_connect(f"{base_url}/tts?model={self._opts.model}", headers=headers), timeout, ) except aiohttp.ClientResponseError as e: raise create_api_error_from_http(e.message, status=e.status) from e except asyncio.TimeoutError as e: raise APITimeoutError("LiveKit Inference TTS connection timed out.") from e except aiohttp.ClientConnectorError as e: raise APIConnectionError("failed to connect to LiveKit Inference TTS") from e params: dict[str, Any] = { "type": "session.create", "sample_rate": str(self._opts.sample_rate), "encoding": self._opts.encoding, "extra": self._opts.extra_kwargs, } if self._opts.voice: params["voice"] = self._opts.voice if self._opts.model: params["model"] = self._opts.model if self._opts.language: params["language"] = self._opts.language if self._opts.fallback: models = [ { "model": m.get("model"), "voice": m.get("voice"), "extra": m.get("extra_kwargs", {}), } for m in self._opts.fallback ] params["fallback"] = {"models": models} if self._opts.conn_options: params["connection"] = { "timeout": self._opts.conn_options.timeout, "retries": self._opts.conn_options.max_retry, } try: await ws.send_str(json.dumps(params)) except Exception as e: await ws.close() raise APIConnectionError( "failed to send session.create message to LiveKit Inference TTS" ) from e return ws async def _close_ws(self, ws: aiohttp.ClientWebSocketResponse) -> None: await ws.close() def _ensure_session(self) -> aiohttp.ClientSession: if not self._session: self._session = utils.http_context.http_session() return self._session def prewarm(self) -> None: self._pool.prewarm() def update_options( self, *, voice: NotGivenOr[str] = NOT_GIVEN, model: NotGivenOr[TTSModels | str] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, extra_kwargs: NotGivenOr[dict[str, Any]] = NOT_GIVEN, ) -> None: """ Args: voice (str, optional): Voice. model (TTSModels | str, optional): TTS model to use. language (str, optional): Language code for the TTS model. extra_kwargs (dict, optional): Extra kwargs to pass to the TTS model. """ if is_given(model): self._opts.model = model if is_given(voice): self._opts.voice = voice if is_given(language): self._opts.language = LanguageCode(language) if is_given(extra_kwargs): self._opts.extra_kwargs.update(extra_kwargs) def synthesize( self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS ) -> tts.ChunkedStream: return self._synthesize_with_stream(text, conn_options=conn_options) def stream( self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS ) -> SynthesizeStream: stream = SynthesizeStream(tts=self, conn_options=conn_options) self._streams.add(stream) return stream async def aclose(self) -> None: for stream in list(self._streams): await stream.aclose() self._streams.clear() await self._pool.aclose()Helper class that provides a standard way to create an ABC using inheritance.
Livekit Cloud Inference TTS
Args
model:TTSModels | str- TTS model to use, in "provider/model[:voice]" format
voice:str, optional- Voice to use, use a default one if not provided
language:str, optional- Language of the TTS model.
encoding:TTSEncoding, optional- Encoding of the TTS model.
sample_rate:int, optional- Sample rate of the TTS model.
base_url:str, optional- LIVEKIT_URL, if not provided, read from environment variable.
api_key:str, optional- LIVEKIT_API_KEY, if not provided, read from environment variable.
api_secret:str, optional- LIVEKIT_API_SECRET, if not provided, read from environment variable.
http_session:aiohttp.ClientSession, optional- HTTP session to use.
extra_kwargs:dict, optional- Extra kwargs to pass to the TTS model.
fallback:FallbackModelType, optional- Fallback models - either a list of model names, a list of FallbackModel instances.
conn_options:APIConnectOptions, optional- Connection options for request attempts.
Ancestors
- livekit.agents.tts.tts.TTS
- abc.ABC
- EventEmitter
- typing.Generic
Static methods
def from_model_string(model: str) ‑> TTS-
Create a TTS instance from a model string
Args
model:str- TTS model to use, in "provider/model[:voice_id]" format
Returns
TTS- TTS instance
Instance variables
prop model : str-
Expand source code
@property def model(self) -> str: return self._opts.modelGet the model name/identifier for this TTS instance.
Returns
The model name if available, "unknown" otherwise.
Note
Plugins should override this property to provide their model information.
prop provider : str-
Expand source code
@property def provider(self) -> str: return "livekit"Get the provider name/identifier for this TTS instance.
Returns
The provider name if available, "unknown" otherwise.
Note
Plugins should override this property to provide their provider information.
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: for stream in list(self._streams): await stream.aclose() self._streams.clear() await self._pool.aclose() def prewarm(self) ‑> None-
Expand source code
def prewarm(self) -> None: self._pool.prewarm()Pre-warm connection to the TTS service
def stream(self,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> SynthesizeStream-
Expand source code
def stream( self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS ) -> SynthesizeStream: stream = SynthesizeStream(tts=self, conn_options=conn_options) self._streams.add(stream) return stream def synthesize(self,
text: str,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.agents.tts.tts.ChunkedStream-
Expand source code
def synthesize( self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS ) -> tts.ChunkedStream: return self._synthesize_with_stream(text, conn_options=conn_options) def update_options(self,
*,
voice: NotGivenOr[str] = NOT_GIVEN,
model: NotGivenOr[TTSModels | str] = NOT_GIVEN,
language: NotGivenOr[str] = NOT_GIVEN,
extra_kwargs: NotGivenOr[dict[str, Any]] = NOT_GIVEN) ‑> None-
Expand source code
def update_options( self, *, voice: NotGivenOr[str] = NOT_GIVEN, model: NotGivenOr[TTSModels | str] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, extra_kwargs: NotGivenOr[dict[str, Any]] = NOT_GIVEN, ) -> None: """ Args: voice (str, optional): Voice. model (TTSModels | str, optional): TTS model to use. language (str, optional): Language code for the TTS model. extra_kwargs (dict, optional): Extra kwargs to pass to the TTS model. """ if is_given(model): self._opts.model = model if is_given(voice): self._opts.voice = voice if is_given(language): self._opts.language = LanguageCode(language) if is_given(extra_kwargs): self._opts.extra_kwargs.update(extra_kwargs)Args
voice:str, optional- Voice.
model:TTSModels | str, optional- TTS model to use.
language:str, optional- Language code for the TTS model.
extra_kwargs:dict, optional- Extra kwargs to pass to the TTS model.
Inherited members