Module livekit.agents.inference
Sub-modules
livekit.agents.inference.llm
livekit.agents.inference.stt
livekit.agents.inference.tts
Classes
class LLM (model: LLMModels | str,
*,
temperature: NotGivenOr[float] = NOT_GIVEN,
parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN,
tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN,
max_completion_tokens: NotGivenOr[int] = NOT_GIVEN,
base_url: NotGivenOr[str] = NOT_GIVEN,
api_key: NotGivenOr[str] = NOT_GIVEN,
api_secret: NotGivenOr[str] = NOT_GIVEN,
timeout: httpx.Timeout | None = None,
max_retries: NotGivenOr[int] = NOT_GIVEN,
verbosity: NotGivenOr[Verbosity] = NOT_GIVEN,
extra_kwargs: NotGivenOr[dict[str, Any] | OpenaiOptions | CerebrasOptions | GroqOptions | BasetenOptions] = NOT_GIVEN)-
Expand source code
class LLM(llm.LLM): @overload def __init__( self, model: OpenaiModels, *, temperature: NotGivenOr[float] = NOT_GIVEN, parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN, tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN, max_completion_tokens: NotGivenOr[int] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, timeout: httpx.Timeout | None = None, max_retries: NotGivenOr[int] = NOT_GIVEN, verbosity: NotGivenOr[Verbosity] = NOT_GIVEN, extra_kwargs: NotGivenOr[OpenaiOptions] = NOT_GIVEN, ) -> None: pass @overload def __init__( self, model: CerebrasModels, *, temperature: NotGivenOr[float] = NOT_GIVEN, parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN, tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN, max_completion_tokens: NotGivenOr[int] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, timeout: httpx.Timeout | None = None, max_retries: NotGivenOr[int] = NOT_GIVEN, verbosity: NotGivenOr[Verbosity] = NOT_GIVEN, extra_kwargs: NotGivenOr[CerebrasOptions] = NOT_GIVEN, ) -> None: pass @overload def __init__( self, model: GroqModels, *, temperature: NotGivenOr[float] = NOT_GIVEN, parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN, tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN, max_completion_tokens: NotGivenOr[int] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, timeout: httpx.Timeout | None = None, max_retries: NotGivenOr[int] = NOT_GIVEN, verbosity: NotGivenOr[Verbosity] = NOT_GIVEN, extra_kwargs: NotGivenOr[GroqOptions] = NOT_GIVEN, ) -> None: pass @overload def __init__( self, model: BasetenModels, *, temperature: NotGivenOr[float] = NOT_GIVEN, parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN, tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN, max_completion_tokens: NotGivenOr[int] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, timeout: httpx.Timeout | None = None, max_retries: NotGivenOr[int] = NOT_GIVEN, verbosity: NotGivenOr[Verbosity] = NOT_GIVEN, extra_kwargs: NotGivenOr[BasetenOptions] = NOT_GIVEN, ) -> None: pass @overload def __init__( self, model: LLMModels | str, *, temperature: NotGivenOr[float] = NOT_GIVEN, parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN, tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN, max_completion_tokens: NotGivenOr[int] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, timeout: httpx.Timeout | None = None, max_retries: NotGivenOr[int] = NOT_GIVEN, verbosity: NotGivenOr[Verbosity] = NOT_GIVEN, extra_kwargs: NotGivenOr[dict[str, Any]] = NOT_GIVEN, ) -> None: pass def __init__( self, model: LLMModels | str, *, temperature: NotGivenOr[float] = NOT_GIVEN, parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN, tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN, max_completion_tokens: NotGivenOr[int] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, timeout: httpx.Timeout | None = None, max_retries: NotGivenOr[int] = NOT_GIVEN, verbosity: NotGivenOr[Verbosity] = NOT_GIVEN, extra_kwargs: NotGivenOr[ dict[str, Any] | OpenaiOptions | CerebrasOptions | GroqOptions | BasetenOptions ] = NOT_GIVEN, ) -> None: super().__init__() lk_base_url = ( base_url if is_given(base_url) else os.environ.get("LIVEKIT_INFERENCE_URL", DEFAULT_BASE_URL) ) lk_api_key = ( api_key if is_given(api_key) else os.getenv("LIVEKIT_INFERENCE_API_KEY", os.getenv("LIVEKIT_API_KEY", "")) ) if not lk_api_key: raise ValueError( "api_key is required, either as argument or set LIVEKIT_API_KEY environmental variable" ) lk_api_secret = ( api_secret if is_given(api_secret) else os.getenv("LIVEKIT_INFERENCE_API_SECRET", os.getenv("LIVEKIT_API_SECRET", "")) ) if not lk_api_secret: raise ValueError( "api_secret is required, either as argument or set LIVEKIT_API_SECRET environmental variable" ) self._opts = _LLMOptions( model=model, temperature=temperature, parallel_tool_calls=parallel_tool_calls, tool_choice=tool_choice, max_completion_tokens=max_completion_tokens, base_url=lk_base_url, api_key=lk_api_key, api_secret=lk_api_secret, verbosity=verbosity, extra_kwargs=dict(extra_kwargs) if is_given(extra_kwargs) else {}, ) self._client = openai.AsyncClient( api_key=create_access_token(self._opts.api_key, self._opts.api_secret), base_url=self._opts.base_url, max_retries=max_retries if is_given(max_retries) else 0, http_client=httpx.AsyncClient( timeout=timeout if timeout else httpx.Timeout(connect=15.0, read=5.0, write=5.0, pool=5.0), follow_redirects=True, limits=httpx.Limits( max_connections=50, max_keepalive_connections=50, keepalive_expiry=120, ), ), ) @property def model(self) -> str: """Get the model name for this LLM instance.""" return self._opts.model def chat( self, *, chat_ctx: ChatContext, tools: list[FunctionTool | RawFunctionTool] | None = None, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN, tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN, response_format: NotGivenOr[ completion_create_params.ResponseFormat | type[llm_utils.ResponseFormatT] ] = NOT_GIVEN, extra_kwargs: NotGivenOr[dict[str, Any]] = NOT_GIVEN, ) -> LLMStream: extra = {} if is_given(extra_kwargs): extra.update(extra_kwargs) if is_given(self._opts.max_completion_tokens): extra["max_completion_tokens"] = self._opts.max_completion_tokens if is_given(self._opts.temperature): extra["temperature"] = self._opts.temperature if is_given(self._opts.verbosity): extra["verbosity"] = self._opts.verbosity parallel_tool_calls = ( parallel_tool_calls if is_given(parallel_tool_calls) else self._opts.parallel_tool_calls ) if is_given(parallel_tool_calls): extra["parallel_tool_calls"] = parallel_tool_calls tool_choice = tool_choice if is_given(tool_choice) else self._opts.tool_choice # type: ignore if is_given(tool_choice): oai_tool_choice: ChatCompletionToolChoiceOptionParam if isinstance(tool_choice, dict): oai_tool_choice = { "type": "function", "function": {"name": tool_choice["function"]["name"]}, } extra["tool_choice"] = oai_tool_choice elif tool_choice in ("auto", "required", "none"): oai_tool_choice = tool_choice extra["tool_choice"] = oai_tool_choice if is_given(response_format): extra["response_format"] = llm_utils.to_openai_response_format(response_format) # type: ignore extra.update(self._opts.extra_kwargs) # reset the access token to avoid expiration self._client.api_key = create_access_token(self._opts.api_key, self._opts.api_secret) return LLMStream( self, model=self._opts.model, provider_fmt="openai", # always sent in openai format strict_tool_schema=True, client=self._client, chat_ctx=chat_ctx, tools=tools or [], conn_options=conn_options, extra_kwargs=extra, )
Helper class that provides a standard way to create an ABC using inheritance.
Ancestors
- livekit.agents.llm.llm.LLM
- abc.ABC
- EventEmitter
- typing.Generic
Instance variables
prop model : str
-
Expand source code
@property def model(self) -> str: """Get the model name for this LLM instance.""" return self._opts.model
Get the model name for this LLM instance.
Methods
def chat(self,
*,
chat_ctx: ChatContext,
tools: list[FunctionTool | RawFunctionTool] | None = None,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0),
parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN,
tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN,
response_format: NotGivenOr[completion_create_params.ResponseFormat | type[llm_utils.ResponseFormatT]] = NOT_GIVEN,
extra_kwargs: NotGivenOr[dict[str, Any]] = NOT_GIVEN) ‑> LLMStream-
Expand source code
def chat( self, *, chat_ctx: ChatContext, tools: list[FunctionTool | RawFunctionTool] | None = None, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN, tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN, response_format: NotGivenOr[ completion_create_params.ResponseFormat | type[llm_utils.ResponseFormatT] ] = NOT_GIVEN, extra_kwargs: NotGivenOr[dict[str, Any]] = NOT_GIVEN, ) -> LLMStream: extra = {} if is_given(extra_kwargs): extra.update(extra_kwargs) if is_given(self._opts.max_completion_tokens): extra["max_completion_tokens"] = self._opts.max_completion_tokens if is_given(self._opts.temperature): extra["temperature"] = self._opts.temperature if is_given(self._opts.verbosity): extra["verbosity"] = self._opts.verbosity parallel_tool_calls = ( parallel_tool_calls if is_given(parallel_tool_calls) else self._opts.parallel_tool_calls ) if is_given(parallel_tool_calls): extra["parallel_tool_calls"] = parallel_tool_calls tool_choice = tool_choice if is_given(tool_choice) else self._opts.tool_choice # type: ignore if is_given(tool_choice): oai_tool_choice: ChatCompletionToolChoiceOptionParam if isinstance(tool_choice, dict): oai_tool_choice = { "type": "function", "function": {"name": tool_choice["function"]["name"]}, } extra["tool_choice"] = oai_tool_choice elif tool_choice in ("auto", "required", "none"): oai_tool_choice = tool_choice extra["tool_choice"] = oai_tool_choice if is_given(response_format): extra["response_format"] = llm_utils.to_openai_response_format(response_format) # type: ignore extra.update(self._opts.extra_kwargs) # reset the access token to avoid expiration self._client.api_key = create_access_token(self._opts.api_key, self._opts.api_secret) return LLMStream( self, model=self._opts.model, provider_fmt="openai", # always sent in openai format strict_tool_schema=True, client=self._client, chat_ctx=chat_ctx, tools=tools or [], conn_options=conn_options, extra_kwargs=extra, )
Inherited members
class LLMStream (llm: LLM | LLM,
*,
model: LLMModels | str,
provider_fmt: str,
strict_tool_schema: bool,
client: openai.AsyncClient,
chat_ctx: llm.ChatContext,
tools: list[FunctionTool | RawFunctionTool],
conn_options: APIConnectOptions,
extra_kwargs: dict[str, Any])-
Expand source code
class LLMStream(llm.LLMStream): def __init__( self, llm: LLM | llm.LLM, *, model: LLMModels | str, provider_fmt: str, strict_tool_schema: bool, client: openai.AsyncClient, chat_ctx: llm.ChatContext, tools: list[FunctionTool | RawFunctionTool], conn_options: APIConnectOptions, extra_kwargs: dict[str, Any], ) -> None: super().__init__(llm, chat_ctx=chat_ctx, tools=tools, conn_options=conn_options) self._model = model self._provider_fmt = provider_fmt self._strict_tool_schema = strict_tool_schema self._client = client self._llm = llm self._extra_kwargs = extra_kwargs async def _run(self) -> None: # current function call that we're waiting for full completion (args are streamed) # (defined inside the _run method to make sure the state is reset for each run/attempt) self._oai_stream: openai.AsyncStream[ChatCompletionChunk] | None = None self._tool_call_id: str | None = None self._fnc_name: str | None = None self._fnc_raw_arguments: str | None = None self._tool_index: int | None = None retryable = True try: chat_ctx, _ = self._chat_ctx.to_provider_format(format=self._provider_fmt) fnc_ctx = ( to_fnc_ctx(self._tools, strict=self._strict_tool_schema) if self._tools else openai.NOT_GIVEN ) if lk_oai_debug: tool_choice = self._extra_kwargs.get("tool_choice", NOT_GIVEN) logger.debug( "chat.completions.create", extra={ "fnc_ctx": fnc_ctx, "tool_choice": tool_choice, "chat_ctx": chat_ctx, }, ) if not self._tools: # remove tool_choice from extra_kwargs if no tools are provided self._extra_kwargs.pop("tool_choice", None) self._oai_stream = stream = await self._client.chat.completions.create( messages=cast(list[ChatCompletionMessageParam], chat_ctx), tools=fnc_ctx, model=self._model, stream_options={"include_usage": True}, stream=True, timeout=httpx.Timeout(self._conn_options.timeout), **self._extra_kwargs, ) thinking = asyncio.Event() async with stream: async for chunk in stream: for choice in chunk.choices: chat_chunk = self._parse_choice(chunk.id, choice, thinking) if chat_chunk is not None: retryable = False self._event_ch.send_nowait(chat_chunk) if chunk.usage is not None: retryable = False tokens_details = chunk.usage.prompt_tokens_details cached_tokens = tokens_details.cached_tokens if tokens_details else 0 chunk = llm.ChatChunk( id=chunk.id, usage=llm.CompletionUsage( completion_tokens=chunk.usage.completion_tokens, prompt_tokens=chunk.usage.prompt_tokens, prompt_cached_tokens=cached_tokens or 0, total_tokens=chunk.usage.total_tokens, ), ) self._event_ch.send_nowait(chunk) except openai.APITimeoutError: raise APITimeoutError(retryable=retryable) from None except openai.APIStatusError as e: raise APIStatusError( e.message, status_code=e.status_code, request_id=e.request_id, body=e.body, retryable=retryable, ) from None except Exception as e: raise APIConnectionError(retryable=retryable) from e def _parse_choice( self, id: str, choice: Choice, thinking: asyncio.Event ) -> llm.ChatChunk | None: delta = choice.delta # https://github.com/livekit/agents/issues/688 # the delta can be None when using Azure OpenAI (content filtering) if delta is None: return None if delta.tool_calls: for tool in delta.tool_calls: if not tool.function: continue call_chunk = None if self._tool_call_id and tool.id and tool.index != self._tool_index: call_chunk = llm.ChatChunk( id=id, delta=llm.ChoiceDelta( role="assistant", content=delta.content, tool_calls=[ llm.FunctionToolCall( arguments=self._fnc_raw_arguments or "", name=self._fnc_name or "", call_id=self._tool_call_id or "", ) ], ), ) self._tool_call_id = self._fnc_name = self._fnc_raw_arguments = None if tool.function.name: self._tool_index = tool.index self._tool_call_id = tool.id self._fnc_name = tool.function.name self._fnc_raw_arguments = tool.function.arguments or "" elif tool.function.arguments: self._fnc_raw_arguments += tool.function.arguments # type: ignore if call_chunk is not None: return call_chunk if choice.finish_reason in ("tool_calls", "stop") and self._tool_call_id: call_chunk = llm.ChatChunk( id=id, delta=llm.ChoiceDelta( role="assistant", content=delta.content, tool_calls=[ llm.FunctionToolCall( arguments=self._fnc_raw_arguments or "", name=self._fnc_name or "", call_id=self._tool_call_id or "", ) ], ), ) self._tool_call_id = self._fnc_name = self._fnc_raw_arguments = None return call_chunk delta.content = llm_utils.strip_thinking_tokens(delta.content, thinking) if not delta.content: return None return llm.ChatChunk( id=id, delta=llm.ChoiceDelta(content=delta.content, role="assistant"), )
Helper class that provides a standard way to create an ABC using inheritance.
Ancestors
- livekit.agents.llm.llm.LLMStream
- abc.ABC
Subclasses
- livekit.plugins.openai.llm.LLMStream
class STT (model: NotGivenOr[STTModels | str] = NOT_GIVEN,
*,
language: NotGivenOr[str] = NOT_GIVEN,
base_url: NotGivenOr[str] = NOT_GIVEN,
encoding: NotGivenOr[STTEncoding] = NOT_GIVEN,
sample_rate: NotGivenOr[int] = NOT_GIVEN,
api_key: NotGivenOr[str] = NOT_GIVEN,
api_secret: NotGivenOr[str] = NOT_GIVEN,
http_session: aiohttp.ClientSession | None = None,
extra_kwargs: NotGivenOr[dict[str, Any] | CartesiaOptions | DeepgramOptions | AssemblyaiOptions] = NOT_GIVEN)-
Expand source code
class STT(stt.STT): @overload def __init__( self, model: CartesiaModels, *, language: NotGivenOr[str] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, encoding: NotGivenOr[STTEncoding] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, extra_kwargs: NotGivenOr[CartesiaOptions] = NOT_GIVEN, ) -> None: ... @overload def __init__( self, model: DeepgramModels, *, language: NotGivenOr[str] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, encoding: NotGivenOr[STTEncoding] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, extra_kwargs: NotGivenOr[DeepgramOptions] = NOT_GIVEN, ) -> None: ... @overload def __init__( self, model: AssemblyaiModels, *, language: NotGivenOr[str] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, encoding: NotGivenOr[STTEncoding] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, extra_kwargs: NotGivenOr[AssemblyaiOptions] = NOT_GIVEN, ) -> None: ... @overload def __init__( self, model: str, *, language: NotGivenOr[str] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, encoding: NotGivenOr[STTEncoding] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, extra_kwargs: NotGivenOr[dict[str, Any]] = NOT_GIVEN, ) -> None: ... def __init__( self, model: NotGivenOr[STTModels | str] = NOT_GIVEN, *, language: NotGivenOr[str] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, encoding: NotGivenOr[STTEncoding] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, extra_kwargs: NotGivenOr[ dict[str, Any] | CartesiaOptions | DeepgramOptions | AssemblyaiOptions ] = NOT_GIVEN, ) -> None: """Livekit Cloud Inference STT Args: model (STTModels | str, optional): STT model to use. language (str, optional): Language of the STT model. encoding (STTEncoding, optional): Encoding of the STT model. sample_rate (int, optional): Sample rate of the STT model. base_url (str, optional): LIVEKIT_URL, if not provided, read from environment variable. api_key (str, optional): LIVEKIT_API_KEY, if not provided, read from environment variable. api_secret (str, optional): LIVEKIT_API_SECRET, if not provided, read from environment variable. http_session (aiohttp.ClientSession, optional): HTTP session to use. extra_kwargs (dict, optional): Extra kwargs to pass to the STT model. """ super().__init__( capabilities=stt.STTCapabilities(streaming=True, interim_results=True), ) lk_base_url = ( base_url if is_given(base_url) else os.environ.get("LIVEKIT_INFERENCE_URL", DEFAULT_BASE_URL) ) lk_api_key = ( api_key if is_given(api_key) else os.getenv("LIVEKIT_INFERENCE_API_KEY", os.getenv("LIVEKIT_API_KEY", "")) ) if not lk_api_key: raise ValueError( "api_key is required, either as argument or set LIVEKIT_API_KEY environmental variable" ) lk_api_secret = ( api_secret if is_given(api_secret) else os.getenv("LIVEKIT_INFERENCE_API_SECRET", os.getenv("LIVEKIT_API_SECRET", "")) ) if not lk_api_secret: raise ValueError( "api_secret is required, either as argument or set LIVEKIT_API_SECRET environmental variable" ) self._opts = STTOptions( model=model, language=language, encoding=encoding if is_given(encoding) else DEFAULT_ENCODING, sample_rate=sample_rate if is_given(sample_rate) else DEFAULT_SAMPLE_RATE, base_url=lk_base_url, api_key=lk_api_key, api_secret=lk_api_secret, extra_kwargs=dict(extra_kwargs) if is_given(extra_kwargs) else {}, ) self._session = http_session self._streams = weakref.WeakSet[SpeechStream]() def _ensure_session(self) -> aiohttp.ClientSession: if not self._session: self._session = utils.http_context.http_session() return self._session async def _recognize_impl( self, buffer: utils.AudioBuffer, *, language: NotGivenOr[str] = NOT_GIVEN, conn_options: APIConnectOptions, ) -> stt.SpeechEvent: raise NotImplementedError( "LiveKit STT does not support batch recognition, use stream() instead" ) def stream( self, *, language: NotGivenOr[STTLanguages | str] = NOT_GIVEN, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> SpeechStream: """Create a streaming transcription session.""" options = self._sanitize_options(language=language) stream = SpeechStream(stt=self, opts=options, conn_options=conn_options) self._streams.add(stream) return stream def update_options( self, *, model: NotGivenOr[STTModels | str] = NOT_GIVEN, language: NotGivenOr[STTLanguages | str] = NOT_GIVEN, ) -> None: """Update STT configuration options.""" if is_given(model): self._opts.model = model if is_given(language): self._opts.language = language for stream in self._streams: stream.update_options(model=model, language=language) def _sanitize_options( self, *, language: NotGivenOr[STTLanguages | str] = NOT_GIVEN ) -> STTOptions: """Create a sanitized copy of options with language override if provided.""" options = replace(self._opts) if is_given(language): options.language = language return options
Helper class that provides a standard way to create an ABC using inheritance.
Livekit Cloud Inference STT
Args
model
:STTModels | str
, optional- STT model to use.
language
:str
, optional- Language of the STT model.
encoding
:STTEncoding
, optional- Encoding of the STT model.
sample_rate
:int
, optional- Sample rate of the STT model.
base_url
:str
, optional- LIVEKIT_URL, if not provided, read from environment variable.
api_key
:str
, optional- LIVEKIT_API_KEY, if not provided, read from environment variable.
api_secret
:str
, optional- LIVEKIT_API_SECRET, if not provided, read from environment variable.
http_session
:aiohttp.ClientSession
, optional- HTTP session to use.
extra_kwargs
:dict
, optional- Extra kwargs to pass to the STT model.
Ancestors
- livekit.agents.stt.stt.STT
- abc.ABC
- EventEmitter
- typing.Generic
Methods
def stream(self,
*,
language: NotGivenOr[STTLanguages | str] = NOT_GIVEN,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> SpeechStream-
Expand source code
def stream( self, *, language: NotGivenOr[STTLanguages | str] = NOT_GIVEN, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> SpeechStream: """Create a streaming transcription session.""" options = self._sanitize_options(language=language) stream = SpeechStream(stt=self, opts=options, conn_options=conn_options) self._streams.add(stream) return stream
Create a streaming transcription session.
def update_options(self,
*,
model: NotGivenOr[STTModels | str] = NOT_GIVEN,
language: NotGivenOr[STTLanguages | str] = NOT_GIVEN) ‑> None-
Expand source code
def update_options( self, *, model: NotGivenOr[STTModels | str] = NOT_GIVEN, language: NotGivenOr[STTLanguages | str] = NOT_GIVEN, ) -> None: """Update STT configuration options.""" if is_given(model): self._opts.model = model if is_given(language): self._opts.language = language for stream in self._streams: stream.update_options(model=model, language=language)
Update STT configuration options.
Inherited members
class TTS (model: NotGivenOr[TTSModels | str] = NOT_GIVEN,
*,
voice: NotGivenOr[str] = NOT_GIVEN,
language: NotGivenOr[str] = NOT_GIVEN,
encoding: NotGivenOr[TTSEncoding] = NOT_GIVEN,
sample_rate: NotGivenOr[int] = NOT_GIVEN,
base_url: NotGivenOr[str] = NOT_GIVEN,
api_key: NotGivenOr[str] = NOT_GIVEN,
api_secret: NotGivenOr[str] = NOT_GIVEN,
http_session: aiohttp.ClientSession | None = None,
extra_kwargs: NotGivenOr[dict[str, Any] | CartesiaOptions | ElevenlabsOptions | RimeOptions | InworldOptions] = NOT_GIVEN)-
Expand source code
class TTS(tts.TTS): @overload def __init__( self, model: CartesiaModels, *, voice: NotGivenOr[str] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, encoding: NotGivenOr[TTSEncoding] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, extra_kwargs: NotGivenOr[CartesiaOptions] = NOT_GIVEN, ) -> None: pass @overload def __init__( self, model: ElevenlabsModels, *, voice: NotGivenOr[str] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, encoding: NotGivenOr[TTSEncoding] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, extra_kwargs: NotGivenOr[ElevenlabsOptions] = NOT_GIVEN, ) -> None: pass @overload def __init__( self, model: RimeModels, *, voice: NotGivenOr[str] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, encoding: NotGivenOr[TTSEncoding] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, extra_kwargs: NotGivenOr[RimeOptions] = NOT_GIVEN, ) -> None: pass @overload def __init__( self, model: InworldModels, *, voice: NotGivenOr[str] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, encoding: NotGivenOr[TTSEncoding] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, extra_kwargs: NotGivenOr[InworldOptions] = NOT_GIVEN, ) -> None: pass @overload def __init__( self, model: NotGivenOr[str] = NOT_GIVEN, *, voice: NotGivenOr[str] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, encoding: NotGivenOr[TTSEncoding] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, extra_kwargs: NotGivenOr[dict[str, Any]] = NOT_GIVEN, ) -> None: pass def __init__( self, model: NotGivenOr[TTSModels | str] = NOT_GIVEN, # TODO: add a default model *, voice: NotGivenOr[str] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, encoding: NotGivenOr[TTSEncoding] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, extra_kwargs: NotGivenOr[ dict[str, Any] | CartesiaOptions | ElevenlabsOptions | RimeOptions | InworldOptions ] = NOT_GIVEN, ) -> None: """Livekit Cloud Inference TTS Args: model (TTSModels | str, optional): TTS model to use, in "provider/model[:voice_id]" format voice (str, optional): Voice to use, use a default one if not provided language (str, optional): Language of the TTS model. encoding (TTSEncoding, optional): Encoding of the TTS model. sample_rate (int, optional): Sample rate of the TTS model. base_url (str, optional): LIVEKIT_URL, if not provided, read from environment variable. api_key (str, optional): LIVEKIT_API_KEY, if not provided, read from environment variable. api_secret (str, optional): LIVEKIT_API_SECRET, if not provided, read from environment variable. http_session (aiohttp.ClientSession, optional): HTTP session to use. extra_kwargs (dict, optional): Extra kwargs to pass to the TTS model. """ sample_rate = sample_rate if is_given(sample_rate) else DEFAULT_SAMPLE_RATE super().__init__( capabilities=tts.TTSCapabilities(streaming=True, aligned_transcript=False), sample_rate=sample_rate, num_channels=1, ) lk_base_url = ( base_url if is_given(base_url) else os.environ.get("LIVEKIT_INFERENCE_URL", DEFAULT_BASE_URL) ) lk_api_key = ( api_key if is_given(api_key) else os.getenv("LIVEKIT_INFERENCE_API_KEY", os.getenv("LIVEKIT_API_KEY", "")) ) if not lk_api_key: raise ValueError( "api_key is required, either as argument or set LIVEKIT_API_KEY environmental variable" ) lk_api_secret = ( api_secret if is_given(api_secret) else os.getenv("LIVEKIT_INFERENCE_API_SECRET", os.getenv("LIVEKIT_API_SECRET", "")) ) if not lk_api_secret: raise ValueError( "api_secret is required, either as argument or set LIVEKIT_API_SECRET environmental variable" ) # read voice id from the model if provided: "provider/model:voice_id" if is_given(model) and (idx := model.rfind(":")) != -1: if is_given(voice) and voice != model[idx + 1 :]: logger.warning( "`voice` is provided via both argument and model, using the one from the argument", extra={"voice": voice, "model": model}, ) else: voice = model[idx + 1 :] model = model[:idx] self._opts = _TTSOptions( model=model, voice=voice, language=language, encoding=encoding if is_given(encoding) else DEFAULT_ENCODING, sample_rate=sample_rate, base_url=lk_base_url, api_key=lk_api_key, api_secret=lk_api_secret, extra_kwargs=dict(extra_kwargs) if is_given(extra_kwargs) else {}, ) self._session = http_session self._pool = utils.ConnectionPool[aiohttp.ClientWebSocketResponse]( connect_cb=self._connect_ws, close_cb=self._close_ws, max_session_duration=300, mark_refreshed_on_get=True, ) self._streams = weakref.WeakSet[SynthesizeStream]() async def _connect_ws(self, timeout: float) -> aiohttp.ClientWebSocketResponse: session = self._ensure_session() base_url = self._opts.base_url if base_url.startswith(("http://", "https://")): base_url = base_url.replace("http", "ws", 1) headers = { "Authorization": f"Bearer {create_access_token(self._opts.api_key, self._opts.api_secret)}", } ws = None try: ws = await asyncio.wait_for( session.ws_connect(f"{base_url}/tts", headers=headers), timeout ) except (aiohttp.ClientConnectorError, asyncio.TimeoutError) as e: if isinstance(e, aiohttp.ClientResponseError) and e.status == 429: raise APIStatusError("LiveKit TTS quota exceeded", status_code=e.status) from e raise APIConnectionError("failed to connect to LiveKit TTS") from e params = { "type": "session.create", "sample_rate": str(self._opts.sample_rate), "encoding": self._opts.encoding, "extra": self._opts.extra_kwargs, } if self._opts.voice: params["voice"] = self._opts.voice if self._opts.model: params["model"] = self._opts.model if self._opts.language: params["language"] = self._opts.language try: await ws.send_str(json.dumps(params)) except Exception as e: await ws.close() raise APIConnectionError("failed to send session.create message to LiveKit TTS") from e return ws async def _close_ws(self, ws: aiohttp.ClientWebSocketResponse) -> None: await ws.close() def _ensure_session(self) -> aiohttp.ClientSession: if not self._session: self._session = utils.http_context.http_session() return self._session def prewarm(self) -> None: self._pool.prewarm() def update_options( self, *, voice: NotGivenOr[str] = NOT_GIVEN, model: NotGivenOr[TTSModels | str] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, ) -> None: """ Args: voice (str, optional): Voice. model (TTSModels | str, optional): TTS model to use. language (str, optional): Language code for the TTS model. """ if is_given(model): self._opts.model = model if is_given(voice): self._opts.voice = voice if is_given(language): self._opts.language = language def synthesize( self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS ) -> tts.ChunkedStream: raise NotImplementedError("ChunkedStream is not implemented") def stream( self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS ) -> SynthesizeStream: stream = SynthesizeStream(tts=self, conn_options=conn_options) self._streams.add(stream) return stream async def aclose(self) -> None: for stream in list(self._streams): await stream.aclose() self._streams.clear() await self._pool.aclose()
Helper class that provides a standard way to create an ABC using inheritance.
Livekit Cloud Inference TTS
Args
model
:TTSModels | str
, optional- TTS model to use, in "provider/model[:voice_id]" format
voice
:str
, optional- Voice to use, use a default one if not provided
language
:str
, optional- Language of the TTS model.
encoding
:TTSEncoding
, optional- Encoding of the TTS model.
sample_rate
:int
, optional- Sample rate of the TTS model.
base_url
:str
, optional- LIVEKIT_URL, if not provided, read from environment variable.
api_key
:str
, optional- LIVEKIT_API_KEY, if not provided, read from environment variable.
api_secret
:str
, optional- LIVEKIT_API_SECRET, if not provided, read from environment variable.
http_session
:aiohttp.ClientSession
, optional- HTTP session to use.
extra_kwargs
:dict
, optional- Extra kwargs to pass to the TTS model.
Ancestors
- livekit.agents.tts.tts.TTS
- abc.ABC
- EventEmitter
- typing.Generic
Methods
async def aclose(self) ‑> None
-
Expand source code
async def aclose(self) -> None: for stream in list(self._streams): await stream.aclose() self._streams.clear() await self._pool.aclose()
def prewarm(self) ‑> None
-
Expand source code
def prewarm(self) -> None: self._pool.prewarm()
Pre-warm connection to the TTS service
def stream(self,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> SynthesizeStream-
Expand source code
def stream( self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS ) -> SynthesizeStream: stream = SynthesizeStream(tts=self, conn_options=conn_options) self._streams.add(stream) return stream
def synthesize(self,
text: str,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.agents.tts.tts.ChunkedStream-
Expand source code
def synthesize( self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS ) -> tts.ChunkedStream: raise NotImplementedError("ChunkedStream is not implemented")
def update_options(self,
*,
voice: NotGivenOr[str] = NOT_GIVEN,
model: NotGivenOr[TTSModels | str] = NOT_GIVEN,
language: NotGivenOr[str] = NOT_GIVEN) ‑> None-
Expand source code
def update_options( self, *, voice: NotGivenOr[str] = NOT_GIVEN, model: NotGivenOr[TTSModels | str] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, ) -> None: """ Args: voice (str, optional): Voice. model (TTSModels | str, optional): TTS model to use. language (str, optional): Language code for the TTS model. """ if is_given(model): self._opts.model = model if is_given(voice): self._opts.voice = voice if is_given(language): self._opts.language = language
Args
voice
:str
, optional- Voice.
model
:TTSModels | str
, optional- TTS model to use.
language
:str
, optional- Language code for the TTS model.
Inherited members