Module livekit.plugins.openai

Sub-modules

livekit.plugins.openai.realtime

Functions

async def create_embeddings(*,
input: list[str],
model: models.EmbeddingModels = 'text-embedding-3-small',
dimensions: int | None = None,
api_key: str | None = None,
http_session: aiohttp.ClientSession | None = None) ‑> list[livekit.plugins.openai.embeddings.EmbeddingData]
Expand source code
async def create_embeddings(
    *,
    input: list[str],
    model: models.EmbeddingModels = "text-embedding-3-small",
    dimensions: int | None = None,
    api_key: str | None = None,
    http_session: aiohttp.ClientSession | None = None,
) -> list[EmbeddingData]:
    http_session = http_session or utils.http_context.http_session()

    api_key = api_key or os.environ.get("OPENAI_API_KEY")
    if not api_key:
        raise ValueError("OPENAI_API_KEY must be set")

    async with http_session.post(
        "https://api.openai.com/v1/embeddings",
        headers={"Authorization": f"Bearer {api_key}"},
        json={
            "model": model,
            "input": input,
            "encoding_format": "base64",
            "dimensions": dimensions,
        },
    ) as resp:
        json = await resp.json()
        data = json["data"]
        list_data = []
        for d in data:
            bytes = base64.b64decode(d["embedding"])
            num_floats = len(bytes) // 4
            floats = list(struct.unpack("f" * num_floats, bytes))
            list_data.append(EmbeddingData(index=d["index"], embedding=floats))

        return list_data

Classes

class EmbeddingData (index: int, embedding: list[float])
Expand source code
@dataclass
class EmbeddingData:
    index: int
    embedding: list[float]

EmbeddingData(index: 'int', embedding: 'list[float]')

Instance variables

var embedding : list[float]
var index : int
class LLM (*,
model: str | ChatModels = 'gpt-4o',
api_key: NotGivenOr[str] = NOT_GIVEN,
base_url: NotGivenOr[str] = NOT_GIVEN,
client: openai.AsyncClient | None = None,
user: NotGivenOr[str] = NOT_GIVEN,
temperature: NotGivenOr[float] = NOT_GIVEN,
parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN,
tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN,
store: NotGivenOr[bool] = NOT_GIVEN,
metadata: NotGivenOr[dict[str, str]] = NOT_GIVEN,
timeout: httpx.Timeout | None = None)
Expand source code
class LLM(llm.LLM):
    def __init__(
        self,
        *,
        model: str | ChatModels = "gpt-4o",
        api_key: NotGivenOr[str] = NOT_GIVEN,
        base_url: NotGivenOr[str] = NOT_GIVEN,
        client: openai.AsyncClient | None = None,
        user: NotGivenOr[str] = NOT_GIVEN,
        temperature: NotGivenOr[float] = NOT_GIVEN,
        parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN,
        tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN,
        store: NotGivenOr[bool] = NOT_GIVEN,
        metadata: NotGivenOr[dict[str, str]] = NOT_GIVEN,
        timeout: httpx.Timeout | None = None,
    ) -> None:
        
""" Create a new instance of OpenAI LLM. ``api_key`` must be set to your OpenAI API key, either using the argument or by setting the ``OPENAI_API_KEY`` environmental variable. """
super().__init__() self._opts = _LLMOptions( model=model, user=user, temperature=temperature, parallel_tool_calls=parallel_tool_calls, tool_choice=tool_choice, store=store, metadata=metadata, ) self._client = client or openai.AsyncClient( api_key=api_key if is_given(api_key) else None, base_url=base_url if is_given(base_url) else None, max_retries=0, http_client=httpx.AsyncClient( timeout=timeout if timeout else httpx.Timeout(connect=15.0, read=5.0, write=5.0, pool=5.0), follow_redirects=True, limits=httpx.Limits( max_connections=50, max_keepalive_connections=50, keepalive_expiry=120, ), ), ) @staticmethod def with_azure( *, model: str | ChatModels = "gpt-4o", azure_endpoint: str | None = None, azure_deployment: str | None = None, api_version: str | None = None, api_key: str | None = None, azure_ad_token: str | None = None, azure_ad_token_provider: AsyncAzureADTokenProvider | None = None, organization: str | None = None, project: str | None = None, base_url: str | None = None, user: NotGivenOr[str] = NOT_GIVEN, temperature: NotGivenOr[float] = NOT_GIVEN, parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN, tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN, timeout: httpx.Timeout | None = None, ) -> LLM:
""" This automatically infers the following arguments from their corresponding environment variables if they are not provided: - `api_key` from `AZURE_OPENAI_API_KEY` - `organization` from `OPENAI_ORG_ID` - `project` from `OPENAI_PROJECT_ID` - `azure_ad_token` from `AZURE_OPENAI_AD_TOKEN` - `api_version` from `OPENAI_API_VERSION` - `azure_endpoint` from `AZURE_OPENAI_ENDPOINT` """
# noqa: E501 azure_client = openai.AsyncAzureOpenAI( max_retries=0, azure_endpoint=azure_endpoint, azure_deployment=azure_deployment, api_version=api_version, api_key=api_key, azure_ad_token=azure_ad_token, azure_ad_token_provider=azure_ad_token_provider, organization=organization, project=project, base_url=base_url, timeout=timeout if timeout else httpx.Timeout(connect=15.0, read=5.0, write=5.0, pool=5.0), ) # type: ignore return LLM( model=model, client=azure_client, user=user, temperature=temperature, parallel_tool_calls=parallel_tool_calls, tool_choice=tool_choice, ) @staticmethod def with_cerebras( *, model: str | CerebrasChatModels = "llama3.1-8b", api_key: str | None = None, base_url: str = "https://api.cerebras.ai/v1", client: openai.AsyncClient | None = None, user: NotGivenOr[str] = NOT_GIVEN, temperature: NotGivenOr[float] = NOT_GIVEN, parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN, tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN, ) -> LLM:
""" Create a new instance of Cerebras LLM. ``api_key`` must be set to your Cerebras API key, either using the argument or by setting the ``CEREBRAS_API_KEY`` environmental variable. @integrations:cerebras:llm """
api_key = api_key or os.environ.get("CEREBRAS_API_KEY") if api_key is None: raise ValueError( "Cerebras API key is required, either as argument or set CEREBAAS_API_KEY environmental variable" # noqa: E501 ) return LLM( model=model, api_key=api_key, base_url=base_url, client=client, user=user, temperature=temperature, parallel_tool_calls=parallel_tool_calls, tool_choice=tool_choice, ) @staticmethod def with_fireworks( *, model: str = "accounts/fireworks/models/llama-v3p3-70b-instruct", api_key: str | None = None, base_url: str = "https://api.fireworks.ai/inference/v1", client: openai.AsyncClient | None = None, user: NotGivenOr[str] = NOT_GIVEN, temperature: NotGivenOr[float] = NOT_GIVEN, parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN, tool_choice: ToolChoice = "auto", ) -> LLM:
""" Create a new instance of Fireworks LLM. ``api_key`` must be set to your Fireworks API key, either using the argument or by setting the ``FIREWORKS_API_KEY`` environmental variable. """
api_key = api_key or os.environ.get("FIREWORKS_API_KEY") if api_key is None: raise ValueError( "Fireworks API key is required, either as argument or set FIREWORKS_API_KEY environmental variable" # noqa: E501 ) return LLM( model=model, api_key=api_key, base_url=base_url, client=client, user=user, temperature=temperature, parallel_tool_calls=parallel_tool_calls, tool_choice=tool_choice, ) @staticmethod def with_x_ai( *, model: str | XAIChatModels = "grok-2-public", api_key: str | None = None, base_url: str = "https://api.x.ai/v1", client: openai.AsyncClient | None = None, user: NotGivenOr[str] = NOT_GIVEN, temperature: NotGivenOr[float] = NOT_GIVEN, parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN, tool_choice: ToolChoice = "auto", ):
""" Create a new instance of XAI LLM. ``api_key`` must be set to your XAI API key, either using the argument or by setting the ``XAI_API_KEY`` environmental variable. """
api_key = api_key or os.environ.get("XAI_API_KEY") if api_key is None: raise ValueError( "XAI API key is required, either as argument or set XAI_API_KEY environmental variable" # noqa: E501 ) return LLM( model=model, api_key=api_key, base_url=base_url, client=client, user=user, temperature=temperature, parallel_tool_calls=parallel_tool_calls, tool_choice=tool_choice, ) @staticmethod def with_deepseek( *, model: str | DeepSeekChatModels = "deepseek-chat", api_key: str | None = None, base_url: str = "https://api.deepseek.com/v1", client: openai.AsyncClient | None = None, user: NotGivenOr[str] = NOT_GIVEN, temperature: NotGivenOr[float] = NOT_GIVEN, parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN, tool_choice: ToolChoice = "auto", ) -> LLM:
""" Create a new instance of DeepSeek LLM. ``api_key`` must be set to your DeepSeek API key, either using the argument or by setting the ``DEEPSEEK_API_KEY`` environmental variable. """
api_key = api_key or os.environ.get("DEEPSEEK_API_KEY") if api_key is None: raise ValueError( "DeepSeek API key is required, either as argument or set DEEPSEEK_API_KEY environmental variable" # noqa: E501 ) return LLM( model=model, api_key=api_key, base_url=base_url, client=client, user=user, temperature=temperature, parallel_tool_calls=parallel_tool_calls, tool_choice=tool_choice, ) @staticmethod def with_octo( *, model: str | OctoChatModels = "llama-2-13b-chat", api_key: str | None = None, base_url: str = "https://text.octoai.run/v1", client: openai.AsyncClient | None = None, user: NotGivenOr[str] = NOT_GIVEN, temperature: NotGivenOr[float] = NOT_GIVEN, parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN, tool_choice: ToolChoice = "auto", ) -> LLM:
""" Create a new instance of OctoAI LLM. ``api_key`` must be set to your OctoAI API key, either using the argument or by setting the ``OCTOAI_TOKEN`` environmental variable. """
api_key = api_key or os.environ.get("OCTOAI_TOKEN") if api_key is None: raise ValueError( "OctoAI API key is required, either as argument or set OCTOAI_TOKEN environmental variable" # noqa: E501 ) return LLM( model=model, api_key=api_key, base_url=base_url, client=client, user=user, temperature=temperature, parallel_tool_calls=parallel_tool_calls, tool_choice=tool_choice, ) @staticmethod def with_ollama( *, model: str = "llama3.1", base_url: str = "http://localhost:11434/v1", client: openai.AsyncClient | None = None, temperature: NotGivenOr[float] = NOT_GIVEN, parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN, tool_choice: ToolChoice = "auto", ) -> LLM: """ Create a new instance of Ollama LLM. """ return LLM( model=model, api_key="ollama", base_url=base_url, client=client, temperature=temperature, parallel_tool_calls=parallel_tool_calls, tool_choice=tool_choice, ) @staticmethod def with_perplexity( *, model: str | PerplexityChatModels = "llama-3.1-sonar-small-128k-chat", api_key: str | None = None, base_url: str = "https://api.perplexity.ai", client: openai.AsyncClient | None = None, user: NotGivenOr[str] = NOT_GIVEN, temperature: NotGivenOr[float] = NOT_GIVEN, parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN, tool_choice: ToolChoice = "auto", ) -> LLM:
""" Create a new instance of PerplexityAI LLM. ``api_key`` must be set to your TogetherAI API key, either using the argument or by setting the ``PERPLEXITY_API_KEY`` environmental variable. """
api_key = api_key or os.environ.get("PERPLEXITY_API_KEY") if api_key is None: raise ValueError( "Perplexity AI API key is required, either as argument or set PERPLEXITY_API_KEY environmental variable" # noqa: E501 ) return LLM( model=model, api_key=api_key, base_url=base_url, client=client, user=user, temperature=temperature, parallel_tool_calls=parallel_tool_calls, tool_choice=tool_choice, ) @staticmethod def with_together( *, model: str | TogetherChatModels = "meta-llama/Meta-Llama-3.1-8B-Instruct-Turbo", api_key: str | None = None, base_url: str = "https://api.together.xyz/v1", client: openai.AsyncClient | None = None, user: NotGivenOr[str] = NOT_GIVEN, temperature: NotGivenOr[float] = NOT_GIVEN, parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN, tool_choice: ToolChoice = "auto", ) -> LLM:
""" Create a new instance of TogetherAI LLM. ``api_key`` must be set to your TogetherAI API key, either using the argument or by setting the ``TOGETHER_API_KEY`` environmental variable. """
api_key = api_key or os.environ.get("TOGETHER_API_KEY") if api_key is None: raise ValueError( "Together AI API key is required, either as argument or set TOGETHER_API_KEY environmental variable" # noqa: E501 ) return LLM( model=model, api_key=api_key, base_url=base_url, client=client, user=user, temperature=temperature, parallel_tool_calls=parallel_tool_calls, tool_choice=tool_choice, ) @staticmethod def with_telnyx( *, model: str | TelnyxChatModels = "meta-llama/Meta-Llama-3.1-70B-Instruct", api_key: str | None = None, base_url: str = "https://api.telnyx.com/v2/ai", client: openai.AsyncClient | None = None, user: NotGivenOr[str] = NOT_GIVEN, temperature: NotGivenOr[float] = NOT_GIVEN, parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN, tool_choice: ToolChoice = "auto", ) -> LLM:
""" Create a new instance of Telnyx LLM. ``api_key`` must be set to your Telnyx API key, either using the argument or by setting the ``TELNYX_API_KEY`` environmental variable. """
api_key = api_key or os.environ.get("TELNYX_API_KEY") if api_key is None: raise ValueError( "Telnyx AI API key is required, either as argument or set TELNYX_API_KEY environmental variable" # noqa: E501 ) return LLM( model=model, api_key=api_key, base_url=base_url, client=client, user=user, temperature=temperature, parallel_tool_calls=parallel_tool_calls, tool_choice=tool_choice, ) def chat( self, *, chat_ctx: ChatContext, tools: list[FunctionTool] | None = None, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN, tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN, response_format: NotGivenOr[ completion_create_params.ResponseFormat | type[llm_utils.ResponseFormatT] ] = NOT_GIVEN, extra_kwargs: NotGivenOr[dict[str, Any]] = NOT_GIVEN, ) -> LLMStream: extra = {} if is_given(extra_kwargs): extra.update(extra_kwargs) if is_given(self._opts.metadata): extra["metadata"] = self._opts.metadata if is_given(self._opts.user): extra["user"] = self._opts.user parallel_tool_calls = ( parallel_tool_calls if is_given(parallel_tool_calls) else self._opts.parallel_tool_calls ) if is_given(parallel_tool_calls): extra["parallel_tool_calls"] = parallel_tool_calls tool_choice = tool_choice if is_given(tool_choice) else self._opts.tool_choice # type: ignore if is_given(tool_choice): oai_tool_choice: ChatCompletionToolChoiceOptionParam if isinstance(tool_choice, dict): oai_tool_choice = { "type": "function", "function": {"name": tool_choice["function"]["name"]}, } extra["tool_choice"] = oai_tool_choice elif tool_choice in ("auto", "required", "none"): oai_tool_choice = tool_choice extra["tool_choice"] = oai_tool_choice if is_given(response_format): extra["response_format"] = llm_utils.to_response_format_param(response_format) return LLMStream( self, model=self._opts.model, client=self._client, chat_ctx=chat_ctx, tools=tools or [], conn_options=conn_options, extra_kwargs=extra, )

Helper class that provides a standard way to create an ABC using inheritance.

Create a new instance of OpenAI LLM.

api_key must be set to your OpenAI API key, either using the argument or by setting the OPENAI_API_KEY environmental variable.

Ancestors

  • livekit.agents.llm.llm.LLM
  • abc.ABC
  • EventEmitter
  • typing.Generic

Subclasses

Static methods

def with_azure(*,
model: str | ChatModels = 'gpt-4o',
azure_endpoint: str | None = None,
azure_deployment: str | None = None,
api_version: str | None = None,
api_key: str | None = None,
azure_ad_token: str | None = None,
azure_ad_token_provider: AsyncAzureADTokenProvider | None = None,
organization: str | None = None,
project: str | None = None,
base_url: str | None = None,
user: NotGivenOr[str] = NOT_GIVEN,
temperature: NotGivenOr[float] = NOT_GIVEN,
parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN,
tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN,
timeout: httpx.Timeout | None = None) ‑> livekit.plugins.openai.llm.LLM
Expand source code
@staticmethod
def with_azure(
    *,
    model: str | ChatModels = "gpt-4o",
    azure_endpoint: str | None = None,
    azure_deployment: str | None = None,
    api_version: str | None = None,
    api_key: str | None = None,
    azure_ad_token: str | None = None,
    azure_ad_token_provider: AsyncAzureADTokenProvider | None = None,
    organization: str | None = None,
    project: str | None = None,
    base_url: str | None = None,
    user: NotGivenOr[str] = NOT_GIVEN,
    temperature: NotGivenOr[float] = NOT_GIVEN,
    parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN,
    tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN,
    timeout: httpx.Timeout | None = None,
) -> LLM:
    
""" This automatically infers the following arguments from their corresponding environment variables if they are not provided: - `api_key` from `AZURE_OPENAI_API_KEY` - `organization` from `OPENAI_ORG_ID` - `project` from `OPENAI_PROJECT_ID` - `azure_ad_token` from `AZURE_OPENAI_AD_TOKEN` - `api_version` from `OPENAI_API_VERSION` - `azure_endpoint` from `AZURE_OPENAI_ENDPOINT` """
# noqa: E501 azure_client = openai.AsyncAzureOpenAI( max_retries=0, azure_endpoint=azure_endpoint, azure_deployment=azure_deployment, api_version=api_version, api_key=api_key, azure_ad_token=azure_ad_token, azure_ad_token_provider=azure_ad_token_provider, organization=organization, project=project, base_url=base_url, timeout=timeout if timeout else httpx.Timeout(connect=15.0, read=5.0, write=5.0, pool=5.0), ) # type: ignore return LLM( model=model, client=azure_client, user=user, temperature=temperature, parallel_tool_calls=parallel_tool_calls, tool_choice=tool_choice, )

This automatically infers the following arguments from their corresponding environment variables if they are not provided: - api_key from AZURE_OPENAI_API_KEY - organization from OPENAI_ORG_ID - project from OPENAI_PROJECT_ID - azure_ad_token from AZURE_OPENAI_AD_TOKEN - api_version from OPENAI_API_VERSION - azure_endpoint from AZURE_OPENAI_ENDPOINT

def with_cerebras(*,
model: str | CerebrasChatModels = 'llama3.1-8b',
api_key: str | None = None,
base_url: str = 'https://api.cerebras.ai/v1',
client: openai.AsyncClient | None = None,
user: NotGivenOr[str] = NOT_GIVEN,
temperature: NotGivenOr[float] = NOT_GIVEN,
parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN,
tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN) ‑> livekit.plugins.openai.llm.LLM
Expand source code
@staticmethod
def with_cerebras(
    *,
    model: str | CerebrasChatModels = "llama3.1-8b",
    api_key: str | None = None,
    base_url: str = "https://api.cerebras.ai/v1",
    client: openai.AsyncClient | None = None,
    user: NotGivenOr[str] = NOT_GIVEN,
    temperature: NotGivenOr[float] = NOT_GIVEN,
    parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN,
    tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN,
) -> LLM:
    
""" Create a new instance of Cerebras LLM. ``api_key`` must be set to your Cerebras API key, either using the argument or by setting the ``CEREBRAS_API_KEY`` environmental variable. @integrations:cerebras:llm """
api_key = api_key or os.environ.get("CEREBRAS_API_KEY") if api_key is None: raise ValueError( "Cerebras API key is required, either as argument or set CEREBAAS_API_KEY environmental variable" # noqa: E501 ) return LLM( model=model, api_key=api_key, base_url=base_url, client=client, user=user, temperature=temperature, parallel_tool_calls=parallel_tool_calls, tool_choice=tool_choice, )

Create a new instance of Cerebras LLM.

api_key must be set to your Cerebras API key, either using the argument or by setting the CEREBRAS_API_KEY environmental variable. @integrations:cerebras:llm

def with_deepseek(*,
model: str | DeepSeekChatModels = 'deepseek-chat',
api_key: str | None = None,
base_url: str = 'https://api.deepseek.com/v1',
client: openai.AsyncClient | None = None,
user: NotGivenOr[str] = NOT_GIVEN,
temperature: NotGivenOr[float] = NOT_GIVEN,
parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN,
tool_choice: ToolChoice = 'auto') ‑> livekit.plugins.openai.llm.LLM
Expand source code
@staticmethod
def with_deepseek(
    *,
    model: str | DeepSeekChatModels = "deepseek-chat",
    api_key: str | None = None,
    base_url: str = "https://api.deepseek.com/v1",
    client: openai.AsyncClient | None = None,
    user: NotGivenOr[str] = NOT_GIVEN,
    temperature: NotGivenOr[float] = NOT_GIVEN,
    parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN,
    tool_choice: ToolChoice = "auto",
) -> LLM:
    
""" Create a new instance of DeepSeek LLM. ``api_key`` must be set to your DeepSeek API key, either using the argument or by setting the ``DEEPSEEK_API_KEY`` environmental variable. """
api_key = api_key or os.environ.get("DEEPSEEK_API_KEY") if api_key is None: raise ValueError( "DeepSeek API key is required, either as argument or set DEEPSEEK_API_KEY environmental variable" # noqa: E501 ) return LLM( model=model, api_key=api_key, base_url=base_url, client=client, user=user, temperature=temperature, parallel_tool_calls=parallel_tool_calls, tool_choice=tool_choice, )

Create a new instance of DeepSeek LLM.

api_key must be set to your DeepSeek API key, either using the argument or by setting the DEEPSEEK_API_KEY environmental variable.

def with_fireworks(*,
model: str = 'accounts/fireworks/models/llama-v3p3-70b-instruct',
api_key: str | None = None,
base_url: str = 'https://api.fireworks.ai/inference/v1',
client: openai.AsyncClient | None = None,
user: NotGivenOr[str] = NOT_GIVEN,
temperature: NotGivenOr[float] = NOT_GIVEN,
parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN,
tool_choice: ToolChoice = 'auto') ‑> livekit.plugins.openai.llm.LLM
Expand source code
@staticmethod
def with_fireworks(
    *,
    model: str = "accounts/fireworks/models/llama-v3p3-70b-instruct",
    api_key: str | None = None,
    base_url: str = "https://api.fireworks.ai/inference/v1",
    client: openai.AsyncClient | None = None,
    user: NotGivenOr[str] = NOT_GIVEN,
    temperature: NotGivenOr[float] = NOT_GIVEN,
    parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN,
    tool_choice: ToolChoice = "auto",
) -> LLM:
    
""" Create a new instance of Fireworks LLM. ``api_key`` must be set to your Fireworks API key, either using the argument or by setting the ``FIREWORKS_API_KEY`` environmental variable. """
api_key = api_key or os.environ.get("FIREWORKS_API_KEY") if api_key is None: raise ValueError( "Fireworks API key is required, either as argument or set FIREWORKS_API_KEY environmental variable" # noqa: E501 ) return LLM( model=model, api_key=api_key, base_url=base_url, client=client, user=user, temperature=temperature, parallel_tool_calls=parallel_tool_calls, tool_choice=tool_choice, )

Create a new instance of Fireworks LLM.

api_key must be set to your Fireworks API key, either using the argument or by setting the FIREWORKS_API_KEY environmental variable.

def with_octo(*,
model: str | OctoChatModels = 'llama-2-13b-chat',
api_key: str | None = None,
base_url: str = 'https://text.octoai.run/v1',
client: openai.AsyncClient | None = None,
user: NotGivenOr[str] = NOT_GIVEN,
temperature: NotGivenOr[float] = NOT_GIVEN,
parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN,
tool_choice: ToolChoice = 'auto') ‑> livekit.plugins.openai.llm.LLM
Expand source code
@staticmethod
def with_octo(
    *,
    model: str | OctoChatModels = "llama-2-13b-chat",
    api_key: str | None = None,
    base_url: str = "https://text.octoai.run/v1",
    client: openai.AsyncClient | None = None,
    user: NotGivenOr[str] = NOT_GIVEN,
    temperature: NotGivenOr[float] = NOT_GIVEN,
    parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN,
    tool_choice: ToolChoice = "auto",
) -> LLM:
    """
    Create a new instance of OctoAI LLM.

    ``api_key`` must be set to your OctoAI API key, either using the argument or by setting
    the ``OCTOAI_TOKEN`` environmental variable.
    """

    api_key = api_key or os.environ.get("OCTOAI_TOKEN")
    if api_key is None:
        raise ValueError(
            "OctoAI API key is required, either as argument or set OCTOAI_TOKEN environmental variable"  # noqa: E501
        )

    return LLM(
        model=model,
        api_key=api_key,
        base_url=base_url,
        client=client,
        user=user,
        temperature=temperature,
        parallel_tool_calls=parallel_tool_calls,
        tool_choice=tool_choice,
    )

Create a new instance of OctoAI LLM.

api_key must be set to your OctoAI API key, either using the argument or by setting the OCTOAI_TOKEN environmental variable.

def with_ollama(*,
model: str = 'llama3.1',
base_url: str = 'http://localhost:11434/v1',
client: openai.AsyncClient | None = None,
temperature: NotGivenOr[float] = NOT_GIVEN,
parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN,
tool_choice: ToolChoice = 'auto') ‑> livekit.plugins.openai.llm.LLM
Expand source code
@staticmethod
def with_ollama(
    *,
    model: str = "llama3.1",
    base_url: str = "http://localhost:11434/v1",
    client: openai.AsyncClient | None = None,
    temperature: NotGivenOr[float] = NOT_GIVEN,
    parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN,
    tool_choice: ToolChoice = "auto",
) -> LLM:
    """
    Create a new instance of Ollama LLM.
    """

    return LLM(
        model=model,
        api_key="ollama",
        base_url=base_url,
        client=client,
        temperature=temperature,
        parallel_tool_calls=parallel_tool_calls,
        tool_choice=tool_choice,
    )

Create a new instance of Ollama LLM.

def with_perplexity(*,
model: str | PerplexityChatModels = 'llama-3.1-sonar-small-128k-chat',
api_key: str | None = None,
base_url: str = 'https://api.perplexity.ai',
client: openai.AsyncClient | None = None,
user: NotGivenOr[str] = NOT_GIVEN,
temperature: NotGivenOr[float] = NOT_GIVEN,
parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN,
tool_choice: ToolChoice = 'auto') ‑> livekit.plugins.openai.llm.LLM
Expand source code
@staticmethod
def with_perplexity(
    *,
    model: str | PerplexityChatModels = "llama-3.1-sonar-small-128k-chat",
    api_key: str | None = None,
    base_url: str = "https://api.perplexity.ai",
    client: openai.AsyncClient | None = None,
    user: NotGivenOr[str] = NOT_GIVEN,
    temperature: NotGivenOr[float] = NOT_GIVEN,
    parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN,
    tool_choice: ToolChoice = "auto",
) -> LLM:
    
""" Create a new instance of PerplexityAI LLM. ``api_key`` must be set to your TogetherAI API key, either using the argument or by setting the ``PERPLEXITY_API_KEY`` environmental variable. """
api_key = api_key or os.environ.get("PERPLEXITY_API_KEY") if api_key is None: raise ValueError( "Perplexity AI API key is required, either as argument or set PERPLEXITY_API_KEY environmental variable" # noqa: E501 ) return LLM( model=model, api_key=api_key, base_url=base_url, client=client, user=user, temperature=temperature, parallel_tool_calls=parallel_tool_calls, tool_choice=tool_choice, )

Create a new instance of PerplexityAI LLM.

api_key must be set to your TogetherAI API key, either using the argument or by setting the PERPLEXITY_API_KEY environmental variable.

def with_telnyx(*,
model: str | TelnyxChatModels = 'meta-llama/Meta-Llama-3.1-70B-Instruct',
api_key: str | None = None,
base_url: str = 'https://api.telnyx.com/v2/ai',
client: openai.AsyncClient | None = None,
user: NotGivenOr[str] = NOT_GIVEN,
temperature: NotGivenOr[float] = NOT_GIVEN,
parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN,
tool_choice: ToolChoice = 'auto') ‑> livekit.plugins.openai.llm.LLM
Expand source code
@staticmethod
def with_telnyx(
    *,
    model: str | TelnyxChatModels = "meta-llama/Meta-Llama-3.1-70B-Instruct",
    api_key: str | None = None,
    base_url: str = "https://api.telnyx.com/v2/ai",
    client: openai.AsyncClient | None = None,
    user: NotGivenOr[str] = NOT_GIVEN,
    temperature: NotGivenOr[float] = NOT_GIVEN,
    parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN,
    tool_choice: ToolChoice = "auto",
) -> LLM:
    """
    Create a new instance of Telnyx LLM.

    ``api_key`` must be set to your Telnyx API key, either using the argument or by setting
    the ``TELNYX_API_KEY`` environmental variable.
    """

    api_key = api_key or os.environ.get("TELNYX_API_KEY")
    if api_key is None:
        raise ValueError(
            "Telnyx AI API key is required, either as argument or set TELNYX_API_KEY environmental variable"  # noqa: E501
        )

    return LLM(
        model=model,
        api_key=api_key,
        base_url=base_url,
        client=client,
        user=user,
        temperature=temperature,
        parallel_tool_calls=parallel_tool_calls,
        tool_choice=tool_choice,
    )

Create a new instance of Telnyx LLM.

api_key must be set to your Telnyx API key, either using the argument or by setting the TELNYX_API_KEY environmental variable.

def with_together(*,
model: str | TogetherChatModels = 'meta-llama/Meta-Llama-3.1-8B-Instruct-Turbo',
api_key: str | None = None,
base_url: str = 'https://api.together.xyz/v1',
client: openai.AsyncClient | None = None,
user: NotGivenOr[str] = NOT_GIVEN,
temperature: NotGivenOr[float] = NOT_GIVEN,
parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN,
tool_choice: ToolChoice = 'auto') ‑> livekit.plugins.openai.llm.LLM
Expand source code
@staticmethod
def with_together(
    *,
    model: str | TogetherChatModels = "meta-llama/Meta-Llama-3.1-8B-Instruct-Turbo",
    api_key: str | None = None,
    base_url: str = "https://api.together.xyz/v1",
    client: openai.AsyncClient | None = None,
    user: NotGivenOr[str] = NOT_GIVEN,
    temperature: NotGivenOr[float] = NOT_GIVEN,
    parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN,
    tool_choice: ToolChoice = "auto",
) -> LLM:
    
""" Create a new instance of TogetherAI LLM. ``api_key`` must be set to your TogetherAI API key, either using the argument or by setting the ``TOGETHER_API_KEY`` environmental variable. """
api_key = api_key or os.environ.get("TOGETHER_API_KEY") if api_key is None: raise ValueError( "Together AI API key is required, either as argument or set TOGETHER_API_KEY environmental variable" # noqa: E501 ) return LLM( model=model, api_key=api_key, base_url=base_url, client=client, user=user, temperature=temperature, parallel_tool_calls=parallel_tool_calls, tool_choice=tool_choice, )

Create a new instance of TogetherAI LLM.

api_key must be set to your TogetherAI API key, either using the argument or by setting the TOGETHER_API_KEY environmental variable.

def with_x_ai(*,
model: str | XAIChatModels = 'grok-2-public',
api_key: str | None = None,
base_url: str = 'https://api.x.ai/v1',
client: openai.AsyncClient | None = None,
user: NotGivenOr[str] = NOT_GIVEN,
temperature: NotGivenOr[float] = NOT_GIVEN,
parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN,
tool_choice: ToolChoice = 'auto')
Expand source code
@staticmethod
def with_x_ai(
    *,
    model: str | XAIChatModels = "grok-2-public",
    api_key: str | None = None,
    base_url: str = "https://api.x.ai/v1",
    client: openai.AsyncClient | None = None,
    user: NotGivenOr[str] = NOT_GIVEN,
    temperature: NotGivenOr[float] = NOT_GIVEN,
    parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN,
    tool_choice: ToolChoice = "auto",
):
    """
    Create a new instance of XAI LLM.

    ``api_key`` must be set to your XAI API key, either using the argument or by setting
    the ``XAI_API_KEY`` environmental variable.
    """
    api_key = api_key or os.environ.get("XAI_API_KEY")
    if api_key is None:
        raise ValueError(
            "XAI API key is required, either as argument or set XAI_API_KEY environmental variable"  # noqa: E501
        )

    return LLM(
        model=model,
        api_key=api_key,
        base_url=base_url,
        client=client,
        user=user,
        temperature=temperature,
        parallel_tool_calls=parallel_tool_calls,
        tool_choice=tool_choice,
    )

Create a new instance of XAI LLM.

api_key must be set to your XAI API key, either using the argument or by setting the XAI_API_KEY environmental variable.

Methods

def chat(self,
*,
chat_ctx: ChatContext,
tools: list[FunctionTool] | None = None,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0),
parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN,
tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN,
response_format: NotGivenOr[completion_create_params.ResponseFormat | type[llm_utils.ResponseFormatT]] = NOT_GIVEN,
extra_kwargs: NotGivenOr[dict[str, Any]] = NOT_GIVEN) ‑> livekit.plugins.openai.llm.LLMStream
Expand source code
def chat(
    self,
    *,
    chat_ctx: ChatContext,
    tools: list[FunctionTool] | None = None,
    conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN,
    tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN,
    response_format: NotGivenOr[
        completion_create_params.ResponseFormat | type[llm_utils.ResponseFormatT]
    ] = NOT_GIVEN,
    extra_kwargs: NotGivenOr[dict[str, Any]] = NOT_GIVEN,
) -> LLMStream:
    extra = {}
    if is_given(extra_kwargs):
        extra.update(extra_kwargs)

    if is_given(self._opts.metadata):
        extra["metadata"] = self._opts.metadata

    if is_given(self._opts.user):
        extra["user"] = self._opts.user

    parallel_tool_calls = (
        parallel_tool_calls if is_given(parallel_tool_calls) else self._opts.parallel_tool_calls
    )
    if is_given(parallel_tool_calls):
        extra["parallel_tool_calls"] = parallel_tool_calls

    tool_choice = tool_choice if is_given(tool_choice) else self._opts.tool_choice  # type: ignore
    if is_given(tool_choice):
        oai_tool_choice: ChatCompletionToolChoiceOptionParam
        if isinstance(tool_choice, dict):
            oai_tool_choice = {
                "type": "function",
                "function": {"name": tool_choice["function"]["name"]},
            }
            extra["tool_choice"] = oai_tool_choice
        elif tool_choice in ("auto", "required", "none"):
            oai_tool_choice = tool_choice
            extra["tool_choice"] = oai_tool_choice

    if is_given(response_format):
        extra["response_format"] = llm_utils.to_response_format_param(response_format)

    return LLMStream(
        self,
        model=self._opts.model,
        client=self._client,
        chat_ctx=chat_ctx,
        tools=tools or [],
        conn_options=conn_options,
        extra_kwargs=extra,
    )

Inherited members

class LLMStream (llm: LLM,
*,
model: str | ChatModels,
client: openai.AsyncClient,
chat_ctx: llm.ChatContext,
tools: list[FunctionTool],
conn_options: APIConnectOptions,
extra_kwargs: dict[str, Any])
Expand source code
class LLMStream(llm.LLMStream):
    def __init__(
        self,
        llm: LLM,
        *,
        model: str | ChatModels,
        client: openai.AsyncClient,
        chat_ctx: llm.ChatContext,
        tools: list[FunctionTool],
        conn_options: APIConnectOptions,
        extra_kwargs: dict[str, Any],
    ) -> None:
        super().__init__(llm, chat_ctx=chat_ctx, tools=tools, conn_options=conn_options)
        self._model = model
        self._client = client
        self._llm = llm
        self._extra_kwargs = extra_kwargs

    async def _run(self) -> None:
        # current function call that we're waiting for full completion (args are streamed)
        # (defined inside the _run method to make sure the state is reset for each run/attempt)
        self._oai_stream: openai.AsyncStream[ChatCompletionChunk] | None = None
        self._tool_call_id: str | None = None
        self._fnc_name: str | None = None
        self._fnc_raw_arguments: str | None = None
        self._tool_index: int | None = None
        retryable = True

        try:
            self._oai_stream = stream = await self._client.chat.completions.create(
                messages=to_chat_ctx(self._chat_ctx, id(self._llm)),
                tools=to_fnc_ctx(self._tools) if self._tools else openai.NOT_GIVEN,
                model=self._model,
                stream_options={"include_usage": True},
                stream=True,
                **self._extra_kwargs,
            )

            async with stream:
                async for chunk in stream:
                    for choice in chunk.choices:
                        chat_chunk = self._parse_choice(chunk.id, choice)
                        if chat_chunk is not None:
                            retryable = False
                            self._event_ch.send_nowait(chat_chunk)

                    if chunk.usage is not None:
                        retryable = False
                        chunk = llm.ChatChunk(
                            id=chunk.id,
                            usage=llm.CompletionUsage(
                                completion_tokens=chunk.usage.completion_tokens,
                                prompt_tokens=chunk.usage.prompt_tokens,
                                total_tokens=chunk.usage.total_tokens,
                            ),
                        )
                        self._event_ch.send_nowait(chunk)

        except openai.APITimeoutError:
            raise APITimeoutError(retryable=retryable) from None
        except openai.APIStatusError as e:
            raise APIStatusError(
                e.message,
                status_code=e.status_code,
                request_id=e.request_id,
                body=e.body,
                retryable=retryable,
            ) from None
        except Exception as e:
            raise APIConnectionError(retryable=retryable) from e

    def _parse_choice(self, id: str, choice: Choice) -> llm.ChatChunk | None:
        delta = choice.delta

        # https://github.com/livekit/agents/issues/688
        # the delta can be None when using Azure OpenAI (content filtering)
        if delta is None:
            return None

        if delta.tool_calls:
            for tool in delta.tool_calls:
                if not tool.function:
                    continue

                call_chunk = None
                if self._tool_call_id and tool.id and tool.index != self._tool_index:
                    call_chunk = llm.ChatChunk(
                        id=id,
                        delta=llm.ChoiceDelta(
                            role="assistant",
                            content=delta.content,
                            tool_calls=[
                                llm.FunctionToolCall(
                                    arguments=self._fnc_raw_arguments or "",
                                    name=self._fnc_name or "",
                                    call_id=self._tool_call_id or "",
                                )
                            ],
                        ),
                    )
                    self._tool_call_id = self._fnc_name = self._fnc_raw_arguments = None

                if tool.function.name:
                    self._tool_index = tool.index
                    self._tool_call_id = tool.id
                    self._fnc_name = tool.function.name
                    self._fnc_raw_arguments = tool.function.arguments or ""
                elif tool.function.arguments:
                    self._fnc_raw_arguments += tool.function.arguments  # type: ignore

                if call_chunk is not None:
                    return call_chunk

        if choice.finish_reason in ("tool_calls", "stop") and self._tool_call_id:
            call_chunk = llm.ChatChunk(
                id=id,
                delta=llm.ChoiceDelta(
                    role="assistant",
                    content=delta.content,
                    tool_calls=[
                        llm.FunctionToolCall(
                            arguments=self._fnc_raw_arguments or "",
                            name=self._fnc_name or "",
                            call_id=self._tool_call_id or "",
                        )
                    ],
                ),
            )
            self._tool_call_id = self._fnc_name = self._fnc_raw_arguments = None
            return call_chunk

        return llm.ChatChunk(
            id=id,
            delta=llm.ChoiceDelta(content=delta.content, role="assistant"),
        )

Helper class that provides a standard way to create an ABC using inheritance.

Ancestors

  • livekit.agents.llm.llm.LLMStream
  • abc.ABC
class STT (*,
language: str = 'en',
detect_language: bool = False,
model: STTModels | str = 'gpt-4o-mini-transcribe',
prompt: NotGivenOr[str] = NOT_GIVEN,
turn_detection: NotGivenOr[SessionTurnDetection] = NOT_GIVEN,
noise_reduction_type: NotGivenOr[str] = NOT_GIVEN,
base_url: NotGivenOr[str] = NOT_GIVEN,
api_key: NotGivenOr[str] = NOT_GIVEN,
client: openai.AsyncClient | None = None,
use_realtime: bool = True)
Expand source code
class STT(stt.STT):
    def __init__(
        self,
        *,
        language: str = "en",
        detect_language: bool = False,
        model: STTModels | str = "gpt-4o-mini-transcribe",
        prompt: NotGivenOr[str] = NOT_GIVEN,
        turn_detection: NotGivenOr[SessionTurnDetection] = NOT_GIVEN,
        noise_reduction_type: NotGivenOr[str] = NOT_GIVEN,
        base_url: NotGivenOr[str] = NOT_GIVEN,
        api_key: NotGivenOr[str] = NOT_GIVEN,
        client: openai.AsyncClient | None = None,
        use_realtime: bool = True,
    ):
        
""" Create a new instance of OpenAI STT. Args: language: The language code to use for transcription (e.g., "en" for English). detect_language: Whether to automatically detect the language. model: The OpenAI model to use for transcription. prompt: Optional text prompt to guide the transcription. Only supported for whisper-1. turn_detection: When using realtime transcription, this controls how model detects the user is done speaking. Final transcripts are generated only after the turn is over. See: https://platform.openai.com/docs/guides/realtime-vad noise_reduction_type: Type of noise reduction to apply. "near_field" or "far_field" This isn't needed when using LiveKit's noise cancellation. base_url: Custom base URL for OpenAI API. api_key: Your OpenAI API key. If not provided, will use the OPENAI_API_KEY environment variable. client: Optional pre-configured OpenAI AsyncClient instance. """
# noqa: E501 super().__init__( capabilities=stt.STTCapabilities(streaming=use_realtime, interim_results=use_realtime) ) if detect_language: language = "" if not is_given(turn_detection): turn_detection = { "type": "server_vad", "threshold": 0.5, "prefix_padding_ms": 600, "silence_duration_ms": 350, } self._opts = _STTOptions( language=language, detect_language=detect_language, model=model, prompt=prompt, turn_detection=turn_detection, ) if is_given(noise_reduction_type): self._opts.noise_reduction_type = noise_reduction_type self._client = client or openai.AsyncClient( max_retries=0, api_key=api_key if is_given(api_key) else None, base_url=base_url if is_given(base_url) else None, http_client=httpx.AsyncClient( timeout=httpx.Timeout(connect=15.0, read=5.0, write=5.0, pool=5.0), follow_redirects=True, limits=httpx.Limits( max_connections=50, max_keepalive_connections=50, keepalive_expiry=120, ), ), ) self._streams = weakref.WeakSet[SpeechStream]() self._session: aiohttp.ClientSession | None = None self._pool = utils.ConnectionPool[aiohttp.ClientWebSocketResponse]( max_session_duration=_max_session_duration, connect_cb=self._connect_ws, close_cb=self._close_ws, ) @staticmethod def with_groq( *, model: GroqAudioModels | str = "whisper-large-v3-turbo", api_key: NotGivenOr[str] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, client: openai.AsyncClient | None = None, language: str = "en", detect_language: bool = False, prompt: NotGivenOr[str] = NOT_GIVEN, ) -> STT:
""" Create a new instance of Groq STT. ``api_key`` must be set to your Groq API key, either using the argument or by setting the ``GROQ_API_KEY`` environmental variable. """
groq_api_key = api_key if is_given(api_key) else os.environ.get("GROQ_API_KEY") if not groq_api_key: raise ValueError("Groq API key is required") if not is_given(base_url): base_url = "https://api.groq.com/openai/v1" return STT( model=model, api_key=groq_api_key, base_url=base_url, client=client, language=language, detect_language=detect_language, prompt=prompt, use_realtime=False, ) def stream( self, *, language: NotGivenOr[str] = NOT_GIVEN, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> SpeechStream: if is_given(language): self._opts.language = language stream = SpeechStream( stt=self, pool=self._pool, ) self._streams.add(stream) return stream def update_options( self, *, model: NotGivenOr[STTModels | GroqAudioModels | str] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, detect_language: NotGivenOr[bool] = NOT_GIVEN, prompt: NotGivenOr[str] = NOT_GIVEN, turn_detection: NotGivenOr[SessionTurnDetection] = NOT_GIVEN, noise_reduction_type: NotGivenOr[str] = NOT_GIVEN, ) -> None:
""" Update the options for the speech stream. Most options are updated at the connection level. SpeechStreams will be recreated when options are updated. Args: language: The language to transcribe in. detect_language: Whether to automatically detect the language. model: The model to use for transcription. prompt: Optional text prompt to guide the transcription. Only supported for whisper-1. turn_detection: When using realtime, this controls how model detects the user is done speaking. noise_reduction_type: Type of noise reduction to apply. "near_field" or "far_field" """
# noqa: E501 if is_given(model): self._opts.model = model if is_given(language): self._opts.language = language if is_given(detect_language): self._opts.detect_language = detect_language self._opts.language = "" if is_given(prompt): self._opts.prompt = prompt if is_given(turn_detection): self._opts.turn_detection = turn_detection if is_given(noise_reduction_type): self._opts.noise_reduction_type = noise_reduction_type for stream in self._streams: if is_given(language): stream.update_options(language=language) async def _connect_ws(self) -> aiohttp.ClientWebSocketResponse: prompt = self._opts.prompt if is_given(self._opts.prompt) else "" realtime_config: dict[str, Any] = { "type": "transcription_session.update", "session": { "input_audio_format": "pcm16", "input_audio_transcription": { "model": self._opts.model, "prompt": prompt, }, "turn_detection": self._opts.turn_detection, }, } if self._opts.language: realtime_config["session"]["input_audio_transcription"]["language"] = ( self._opts.language ) if self._opts.noise_reduction_type: realtime_config["session"]["input_audio_noise_reduction"] = { "type": self._opts.noise_reduction_type } query_params: dict[str, str] = { "intent": "transcription", } headers = { "User-Agent": "LiveKit Agents", "Authorization": f"Bearer {self._client.api_key}", "OpenAI-Beta": "realtime=v1", } url = f"{str(self._client.base_url).rstrip('/')}/realtime?{urlencode(query_params)}" if url.startswith("http"): url = url.replace("http", "ws", 1) session = self._ensure_session() ws = await asyncio.wait_for( session.ws_connect(url, headers=headers), DEFAULT_API_CONNECT_OPTIONS.timeout, ) await ws.send_json(realtime_config) return ws async def _close_ws(self, ws: aiohttp.ClientWebSocketResponse): await ws.close() def _ensure_session(self) -> aiohttp.ClientSession: if not self._session: self._session = utils.http_context.http_session() return self._session async def _recognize_impl( self, buffer: AudioBuffer, *, language: NotGivenOr[str] = NOT_GIVEN, conn_options: APIConnectOptions, ) -> stt.SpeechEvent: try: if is_given(language): self._opts.language = language data = rtc.combine_audio_frames(buffer).to_wav_bytes() prompt = self._opts.prompt if is_given(self._opts.prompt) else openai.NOT_GIVEN format = "json" if self._opts.model == "whisper-1": # verbose_json returns language and other details, only supported for whisper-1 format = "verbose_json" resp = await self._client.audio.transcriptions.create( file=( "file.wav", data, "audio/wav", ), model=self._opts.model, # type: ignore language=self._opts.language, prompt=prompt, response_format=format, timeout=httpx.Timeout(30, connect=conn_options.timeout), ) sd = stt.SpeechData(text=resp.text, language=self._opts.language) if isinstance(resp, TranscriptionVerbose) and resp.language: sd.language = resp.language return stt.SpeechEvent( type=stt.SpeechEventType.FINAL_TRANSCRIPT, alternatives=[sd], ) except openai.APITimeoutError: raise APITimeoutError() # noqa: B904 except openai.APIStatusError as e: raise APIStatusError( # noqa: B904 e.message, status_code=e.status_code, request_id=e.request_id, body=e.body, ) except Exception as e: raise APIConnectionError() from e

Helper class that provides a standard way to create an ABC using inheritance.

Create a new instance of OpenAI STT.

Args

language
The language code to use for transcription (e.g., "en" for English).
detect_language
Whether to automatically detect the language.
model
The OpenAI model to use for transcription.
prompt
Optional text prompt to guide the transcription. Only supported for whisper-1.
turn_detection
When using realtime transcription, this controls how model detects the user is done speaking. Final transcripts are generated only after the turn is over. See: https://platform.openai.com/docs/guides/realtime-vad
noise_reduction_type
Type of noise reduction to apply. "near_field" or "far_field" This isn't needed when using LiveKit's noise cancellation.
base_url
Custom base URL for OpenAI API.
api_key
Your OpenAI API key. If not provided, will use the OPENAI_API_KEY environment variable.
client
Optional pre-configured OpenAI AsyncClient instance.

Ancestors

  • livekit.agents.stt.stt.STT
  • abc.ABC
  • EventEmitter
  • typing.Generic

Subclasses

Static methods

def with_groq(*,
model: GroqAudioModels | str = 'whisper-large-v3-turbo',
api_key: NotGivenOr[str] = NOT_GIVEN,
base_url: NotGivenOr[str] = NOT_GIVEN,
client: openai.AsyncClient | None = None,
language: str = 'en',
detect_language: bool = False,
prompt: NotGivenOr[str] = NOT_GIVEN) ‑> livekit.plugins.openai.stt.STT
Expand source code
@staticmethod
def with_groq(
    *,
    model: GroqAudioModels | str = "whisper-large-v3-turbo",
    api_key: NotGivenOr[str] = NOT_GIVEN,
    base_url: NotGivenOr[str] = NOT_GIVEN,
    client: openai.AsyncClient | None = None,
    language: str = "en",
    detect_language: bool = False,
    prompt: NotGivenOr[str] = NOT_GIVEN,
) -> STT:
    """
    Create a new instance of Groq STT.

    ``api_key`` must be set to your Groq API key, either using the argument or by setting
    the ``GROQ_API_KEY`` environmental variable.
    """
    groq_api_key = api_key if is_given(api_key) else os.environ.get("GROQ_API_KEY")
    if not groq_api_key:
        raise ValueError("Groq API key is required")

    if not is_given(base_url):
        base_url = "https://api.groq.com/openai/v1"

    return STT(
        model=model,
        api_key=groq_api_key,
        base_url=base_url,
        client=client,
        language=language,
        detect_language=detect_language,
        prompt=prompt,
        use_realtime=False,
    )

Create a new instance of Groq STT.

api_key must be set to your Groq API key, either using the argument or by setting the GROQ_API_KEY environmental variable.

Methods

def stream(self,
*,
language: NotGivenOr[str] = NOT_GIVEN,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.openai.stt.SpeechStream
Expand source code
def stream(
    self,
    *,
    language: NotGivenOr[str] = NOT_GIVEN,
    conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
) -> SpeechStream:
    if is_given(language):
        self._opts.language = language
    stream = SpeechStream(
        stt=self,
        pool=self._pool,
    )
    self._streams.add(stream)
    return stream
def update_options(self,
*,
model: NotGivenOr[STTModels | GroqAudioModels | str] = NOT_GIVEN,
language: NotGivenOr[str] = NOT_GIVEN,
detect_language: NotGivenOr[bool] = NOT_GIVEN,
prompt: NotGivenOr[str] = NOT_GIVEN,
turn_detection: NotGivenOr[SessionTurnDetection] = NOT_GIVEN,
noise_reduction_type: NotGivenOr[str] = NOT_GIVEN) ‑> None
Expand source code
def update_options(
    self,
    *,
    model: NotGivenOr[STTModels | GroqAudioModels | str] = NOT_GIVEN,
    language: NotGivenOr[str] = NOT_GIVEN,
    detect_language: NotGivenOr[bool] = NOT_GIVEN,
    prompt: NotGivenOr[str] = NOT_GIVEN,
    turn_detection: NotGivenOr[SessionTurnDetection] = NOT_GIVEN,
    noise_reduction_type: NotGivenOr[str] = NOT_GIVEN,
) -> None:
    
""" Update the options for the speech stream. Most options are updated at the connection level. SpeechStreams will be recreated when options are updated. Args: language: The language to transcribe in. detect_language: Whether to automatically detect the language. model: The model to use for transcription. prompt: Optional text prompt to guide the transcription. Only supported for whisper-1. turn_detection: When using realtime, this controls how model detects the user is done speaking. noise_reduction_type: Type of noise reduction to apply. "near_field" or "far_field" """
# noqa: E501 if is_given(model): self._opts.model = model if is_given(language): self._opts.language = language if is_given(detect_language): self._opts.detect_language = detect_language self._opts.language = "" if is_given(prompt): self._opts.prompt = prompt if is_given(turn_detection): self._opts.turn_detection = turn_detection if is_given(noise_reduction_type): self._opts.noise_reduction_type = noise_reduction_type for stream in self._streams: if is_given(language): stream.update_options(language=language)

Update the options for the speech stream. Most options are updated at the connection level. SpeechStreams will be recreated when options are updated.

Args

language
The language to transcribe in.
detect_language
Whether to automatically detect the language.
model
The model to use for transcription.
prompt
Optional text prompt to guide the transcription. Only supported for whisper-1.
turn_detection
When using realtime, this controls how model detects the user is done speaking.
noise_reduction_type
Type of noise reduction to apply. "near_field" or "far_field"

Inherited members

class TTS (*,
model: TTSModels | str = 'gpt-4o-mini-tts',
voice: TTSVoices | str = 'ash',
speed: float = 1.0,
instructions: NotGivenOr[str] = NOT_GIVEN,
base_url: NotGivenOr[str] = NOT_GIVEN,
api_key: NotGivenOr[str] = NOT_GIVEN,
client: openai.AsyncClient | None = None)
Expand source code
class TTS(tts.TTS):
    def __init__(
        self,
        *,
        model: TTSModels | str = DEFAULT_MODEL,
        voice: TTSVoices | str = DEFAULT_VOICE,
        speed: float = 1.0,
        instructions: NotGivenOr[str] = NOT_GIVEN,
        base_url: NotGivenOr[str] = NOT_GIVEN,
        api_key: NotGivenOr[str] = NOT_GIVEN,
        client: openai.AsyncClient | None = None,
    ) -> None:
        
""" Create a new instance of OpenAI TTS. ``api_key`` must be set to your OpenAI API key, either using the argument or by setting the ``OPENAI_API_KEY`` environmental variable. """
super().__init__( capabilities=tts.TTSCapabilities( streaming=False, ), sample_rate=OPENAI_TTS_SAMPLE_RATE, num_channels=OPENAI_TTS_CHANNELS, ) self._opts = _TTSOptions( model=model, voice=voice, speed=speed, instructions=instructions if is_given(instructions) else None, ) self._client = client or openai.AsyncClient( max_retries=0, api_key=api_key if is_given(api_key) else None, base_url=base_url if is_given(base_url) else None, http_client=httpx.AsyncClient( timeout=httpx.Timeout(connect=15.0, read=5.0, write=5.0, pool=5.0), follow_redirects=True, limits=httpx.Limits( max_connections=50, max_keepalive_connections=50, keepalive_expiry=120, ), ), ) def update_options( self, *, model: NotGivenOr[TTSModels | str] = NOT_GIVEN, voice: NotGivenOr[TTSVoices | str] = NOT_GIVEN, speed: NotGivenOr[float] = NOT_GIVEN, instructions: NotGivenOr[str] = NOT_GIVEN, ) -> None: if is_given(model): self._opts.model = model if is_given(voice): self._opts.voice = voice if is_given(speed): self._opts.speed = speed if is_given(instructions): self._opts.instructions = instructions @staticmethod def create_azure_client( *, model: TTSModels | str = DEFAULT_MODEL, voice: TTSVoices | str = DEFAULT_VOICE, speed: float = 1.0, instructions: NotGivenOr[str] = NOT_GIVEN, azure_endpoint: str | None = None, azure_deployment: str | None = None, api_version: str | None = None, api_key: str | None = None, azure_ad_token: str | None = None, azure_ad_token_provider: AsyncAzureADTokenProvider | None = None, organization: str | None = None, project: str | None = None, base_url: str | None = None, ) -> TTS:
""" This automatically infers the following arguments from their corresponding environment variables if they are not provided: - `api_key` from `AZURE_OPENAI_API_KEY` - `organization` from `OPENAI_ORG_ID` - `project` from `OPENAI_PROJECT_ID` - `azure_ad_token` from `AZURE_OPENAI_AD_TOKEN` - `api_version` from `OPENAI_API_VERSION` - `azure_endpoint` from `AZURE_OPENAI_ENDPOINT` """
# noqa: E501 azure_client = openai.AsyncAzureOpenAI( max_retries=0, azure_endpoint=azure_endpoint, azure_deployment=azure_deployment, api_version=api_version, api_key=api_key, azure_ad_token=azure_ad_token, azure_ad_token_provider=azure_ad_token_provider, organization=organization, project=project, base_url=base_url, ) # type: ignore return TTS( model=model, voice=voice, speed=speed, instructions=instructions, client=azure_client, ) def synthesize( self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> ChunkedStream: return ChunkedStream( tts=self, input_text=text, conn_options=conn_options, opts=self._opts, client=self._client, )

Helper class that provides a standard way to create an ABC using inheritance.

Create a new instance of OpenAI TTS.

api_key must be set to your OpenAI API key, either using the argument or by setting the OPENAI_API_KEY environmental variable.

Ancestors

  • livekit.agents.tts.tts.TTS
  • abc.ABC
  • EventEmitter
  • typing.Generic

Static methods

def create_azure_client(*,
model: TTSModels | str = 'gpt-4o-mini-tts',
voice: TTSVoices | str = 'ash',
speed: float = 1.0,
instructions: NotGivenOr[str] = NOT_GIVEN,
azure_endpoint: str | None = None,
azure_deployment: str | None = None,
api_version: str | None = None,
api_key: str | None = None,
azure_ad_token: str | None = None,
azure_ad_token_provider: AsyncAzureADTokenProvider | None = None,
organization: str | None = None,
project: str | None = None,
base_url: str | None = None) ‑> livekit.plugins.openai.tts.TTS
Expand source code
@staticmethod
def create_azure_client(
    *,
    model: TTSModels | str = DEFAULT_MODEL,
    voice: TTSVoices | str = DEFAULT_VOICE,
    speed: float = 1.0,
    instructions: NotGivenOr[str] = NOT_GIVEN,
    azure_endpoint: str | None = None,
    azure_deployment: str | None = None,
    api_version: str | None = None,
    api_key: str | None = None,
    azure_ad_token: str | None = None,
    azure_ad_token_provider: AsyncAzureADTokenProvider | None = None,
    organization: str | None = None,
    project: str | None = None,
    base_url: str | None = None,
) -> TTS:
    
""" This automatically infers the following arguments from their corresponding environment variables if they are not provided: - `api_key` from `AZURE_OPENAI_API_KEY` - `organization` from `OPENAI_ORG_ID` - `project` from `OPENAI_PROJECT_ID` - `azure_ad_token` from `AZURE_OPENAI_AD_TOKEN` - `api_version` from `OPENAI_API_VERSION` - `azure_endpoint` from `AZURE_OPENAI_ENDPOINT` """
# noqa: E501 azure_client = openai.AsyncAzureOpenAI( max_retries=0, azure_endpoint=azure_endpoint, azure_deployment=azure_deployment, api_version=api_version, api_key=api_key, azure_ad_token=azure_ad_token, azure_ad_token_provider=azure_ad_token_provider, organization=organization, project=project, base_url=base_url, ) # type: ignore return TTS( model=model, voice=voice, speed=speed, instructions=instructions, client=azure_client, )

This automatically infers the following arguments from their corresponding environment variables if they are not provided: - api_key from AZURE_OPENAI_API_KEY - organization from OPENAI_ORG_ID - project from OPENAI_PROJECT_ID - azure_ad_token from AZURE_OPENAI_AD_TOKEN - api_version from OPENAI_API_VERSION - azure_endpoint from AZURE_OPENAI_ENDPOINT

Methods

def synthesize(self,
text: str,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.openai.tts.ChunkedStream
Expand source code
def synthesize(
    self,
    text: str,
    *,
    conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
) -> ChunkedStream:
    return ChunkedStream(
        tts=self,
        input_text=text,
        conn_options=conn_options,
        opts=self._opts,
        client=self._client,
    )
def update_options(self,
*,
model: NotGivenOr[TTSModels | str] = NOT_GIVEN,
voice: NotGivenOr[TTSVoices | str] = NOT_GIVEN,
speed: NotGivenOr[float] = NOT_GIVEN,
instructions: NotGivenOr[str] = NOT_GIVEN) ‑> None
Expand source code
def update_options(
    self,
    *,
    model: NotGivenOr[TTSModels | str] = NOT_GIVEN,
    voice: NotGivenOr[TTSVoices | str] = NOT_GIVEN,
    speed: NotGivenOr[float] = NOT_GIVEN,
    instructions: NotGivenOr[str] = NOT_GIVEN,
) -> None:
    if is_given(model):
        self._opts.model = model
    if is_given(voice):
        self._opts.voice = voice
    if is_given(speed):
        self._opts.speed = speed
    if is_given(instructions):
        self._opts.instructions = instructions

Inherited members