Module livekit.plugins.xai

xAI plugin for LiveKit Agents

Sub-modules

livekit.plugins.xai.realtime
livekit.plugins.xai.responses

Classes

class FileSearch (vector_store_ids: list[str] = <factory>,
max_num_results: int | None = None)
Expand source code
@dataclass
class FileSearch(XAITool):
    """Enable file search tool for searching uploaded document collections."""

    vector_store_ids: list[str] = field(default_factory=list)
    max_num_results: int | None = None

    def __post_init__(self) -> None:
        super().__init__(id="xai_file_search")

    def to_dict(self) -> dict[str, Any]:
        result: dict[str, Any] = {
            "type": "file_search",
            "vector_store_ids": self.vector_store_ids,
        }
        if self.max_num_results is not None:
            result["max_num_results"] = self.max_num_results

        return result

Enable file search tool for searching uploaded document collections.

Ancestors

  • livekit.plugins.xai.tools.XAITool
  • livekit.agents.llm.tool_context.ProviderTool
  • livekit.agents.llm.tool_context.Tool
  • abc.ABC

Instance variables

var max_num_results : int | None
var vector_store_ids : list[str]

Methods

def to_dict(self) ‑> dict[str, typing.Any]
Expand source code
def to_dict(self) -> dict[str, Any]:
    result: dict[str, Any] = {
        "type": "file_search",
        "vector_store_ids": self.vector_store_ids,
    }
    if self.max_num_results is not None:
        result["max_num_results"] = self.max_num_results

    return result
class STT (*,
enable_interim_results: bool = True,
sample_rate: int = 16000,
enable_diarization: bool = False,
language: STTLanguages | str = 'en',
endpointing: int = 100,
api_key: NotGivenOr[str] = NOT_GIVEN,
http_session: aiohttp.ClientSession | None = None)
Expand source code
class STT(stt.STT):
    def __init__(
        self,
        *,
        enable_interim_results: bool = True,
        sample_rate: int = SAMPLE_RATE,
        enable_diarization: bool = False,
        language: STTLanguages | str = "en",
        endpointing: int = 100,
        api_key: NotGivenOr[str] = NOT_GIVEN,
        http_session: aiohttp.ClientSession | None = None,
    ) -> None:
        """Create a new instance of xAI STT.

        Args:
            enable_interim_results (bool, optional): Whether to return interim (non-final) transcription results. Defaults to True.
            sample_rate: The sample rate of the audio in Hz. Defaults to 16000.
            enable_diarization: Whether to enable speaker diarization. Words will include a speaker field. Defaults to False.
            language: BCP-47 language code for transcription (e.g. "en", "fr", "de"). Defaults to "en".
            endpointing: Silence duration in milliseconds before an utterance-final event is fired. xAI's default is 10ms, but we default to 100ms for better compatibility with LK EOT models.
            api_key: Your xAI API key. If not provided, will look for XAI_API_KEY environment variable.
            http_session: Optional aiohttp ClientSession to use for requests.

        Raises:
            ValueError: If no API key is provided or found in environment variables.

        Note:
            The api_key must be set either through the constructor argument or by setting
            the XAI_API_KEY environmental variable.
        """  # noqa: E501

        super().__init__(
            capabilities=stt.STTCapabilities(
                streaming=True,
                interim_results=enable_interim_results,
                diarization=enable_diarization,
                aligned_transcript="word",
            )
        )

        xai_api_key = api_key if is_given(api_key) else os.environ.get("XAI_API_KEY")
        if not xai_api_key:
            raise ValueError("xAI API key is required")
        self._api_key = xai_api_key

        self._opts = STTOptions(
            enable_interim_results=enable_interim_results,
            sample_rate=sample_rate,
            enable_diarization=enable_diarization,
            language=language,
            endpointing=endpointing,
        )
        self._session = http_session
        self._streams = weakref.WeakSet[SpeechStream]()

    def _ensure_session(self) -> aiohttp.ClientSession:
        if not self._session:
            self._session = utils.http_context.http_session()

        return self._session

    async def _recognize_impl(
        self,
        buffer: AudioBuffer,
        *,
        language: NotGivenOr[str] = NOT_GIVEN,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    ) -> stt.SpeechEvent:
        lang = language if is_given(language) else self._opts.language
        form = aiohttp.FormData()
        form.add_field(
            "file",
            rtc.combine_audio_frames(buffer).to_wav_bytes(),
            filename="audio.wav",
            content_type="audio/wav",
        )
        form.add_field("language", lang)
        form.add_field("format", "true")

        try:
            async with self._ensure_session().post(
                url=XAI_REST_URL,
                data=form,
                headers={
                    "Authorization": f"Bearer {self._api_key}",
                    "Accept": "application/json",
                },
                timeout=aiohttp.ClientTimeout(
                    total=30,
                    sock_connect=conn_options.timeout,
                ),
            ) as res:
                return _prerecorded_transcription_to_speech_event(
                    await res.json(), enable_diarization=self._opts.enable_diarization
                )
        except asyncio.TimeoutError as e:
            raise APITimeoutError() from e
        except aiohttp.ClientResponseError as e:
            raise APIStatusError(
                message=e.message,
                status_code=e.status,
                request_id=None,
                body=None,
            ) from e
        except Exception as e:
            raise APIConnectionError() from e

    def _sanitize_options(self, *, language: NotGivenOr[str] = NOT_GIVEN) -> STTOptions:
        config = dataclasses.replace(self._opts)
        if is_given(language):
            config.language = language
        return config

    def stream(
        self,
        *,
        language: NotGivenOr[str] = NOT_GIVEN,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    ) -> SpeechStream:
        config = self._sanitize_options(language=language)
        stream = SpeechStream(
            stt=self,
            conn_options=conn_options,
            opts=config,
            api_key=self._api_key,
            http_session=self._ensure_session(),
        )
        self._streams.add(stream)
        return stream

    def update_options(
        self,
        *,
        interim_results: NotGivenOr[bool] = NOT_GIVEN,
        sample_rate: NotGivenOr[int] = NOT_GIVEN,
        enable_diarization: NotGivenOr[bool] = NOT_GIVEN,
        language: NotGivenOr[STTLanguages | str] = NOT_GIVEN,
        endpointing: NotGivenOr[int] = NOT_GIVEN,
    ) -> None:
        if is_given(interim_results):
            self._opts.enable_interim_results = interim_results

        if is_given(sample_rate):
            self._opts.sample_rate = sample_rate

        if is_given(enable_diarization):
            self._opts.enable_diarization = enable_diarization

        if is_given(language):
            self._opts.language = language

        if is_given(endpointing):
            self._opts.endpointing = endpointing

        for stream in self._streams:
            stream.update_options(
                enable_interim_results=interim_results,
                sample_rate=sample_rate,
                enable_diarization=enable_diarization,
                language=language,
                endpointing=endpointing,
            )

Helper class that provides a standard way to create an ABC using inheritance.

Create a new instance of xAI STT.

Args

enable_interim_results : bool, optional
Whether to return interim (non-final) transcription results. Defaults to True.
sample_rate
The sample rate of the audio in Hz. Defaults to 16000.
enable_diarization
Whether to enable speaker diarization. Words will include a speaker field. Defaults to False.
language
BCP-47 language code for transcription (e.g. "en", "fr", "de"). Defaults to "en".
endpointing
Silence duration in milliseconds before an utterance-final event is fired. xAI's default is 10ms, but we default to 100ms for better compatibility with LK EOT models.
api_key
Your xAI API key. If not provided, will look for XAI_API_KEY environment variable.
http_session
Optional aiohttp ClientSession to use for requests.

Raises

ValueError
If no API key is provided or found in environment variables.

Note

The api_key must be set either through the constructor argument or by setting the XAI_API_KEY environmental variable.

Ancestors

  • livekit.agents.stt.stt.STT
  • abc.ABC
  • EventEmitter
  • typing.Generic

Methods

def stream(self,
*,
language: NotGivenOr[str] = NOT_GIVEN,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.xai.stt.SpeechStream
Expand source code
def stream(
    self,
    *,
    language: NotGivenOr[str] = NOT_GIVEN,
    conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
) -> SpeechStream:
    config = self._sanitize_options(language=language)
    stream = SpeechStream(
        stt=self,
        conn_options=conn_options,
        opts=config,
        api_key=self._api_key,
        http_session=self._ensure_session(),
    )
    self._streams.add(stream)
    return stream
def update_options(self,
*,
interim_results: NotGivenOr[bool] = NOT_GIVEN,
sample_rate: NotGivenOr[int] = NOT_GIVEN,
enable_diarization: NotGivenOr[bool] = NOT_GIVEN,
language: NotGivenOr[STTLanguages | str] = NOT_GIVEN,
endpointing: NotGivenOr[int] = NOT_GIVEN) ‑> None
Expand source code
def update_options(
    self,
    *,
    interim_results: NotGivenOr[bool] = NOT_GIVEN,
    sample_rate: NotGivenOr[int] = NOT_GIVEN,
    enable_diarization: NotGivenOr[bool] = NOT_GIVEN,
    language: NotGivenOr[STTLanguages | str] = NOT_GIVEN,
    endpointing: NotGivenOr[int] = NOT_GIVEN,
) -> None:
    if is_given(interim_results):
        self._opts.enable_interim_results = interim_results

    if is_given(sample_rate):
        self._opts.sample_rate = sample_rate

    if is_given(enable_diarization):
        self._opts.enable_diarization = enable_diarization

    if is_given(language):
        self._opts.language = language

    if is_given(endpointing):
        self._opts.endpointing = endpointing

    for stream in self._streams:
        stream.update_options(
            enable_interim_results=interim_results,
            sample_rate=sample_rate,
            enable_diarization=enable_diarization,
            language=language,
            endpointing=endpointing,
        )

Inherited members

class TTS (*,
api_key: NotGivenOr[str] = NOT_GIVEN,
voice: GrokVoices | str = 'ara',
language: TTSLanguages | str = 'auto',
tokenizer: tokenize.WordTokenizer | None = None,
http_session: aiohttp.ClientSession | None = None)
Expand source code
class TTS(tts.TTS):
    def __init__(
        self,
        *,
        api_key: NotGivenOr[str] = NOT_GIVEN,
        voice: GrokVoices | str = DEFAULT_VOICE,
        language: TTSLanguages | str = "auto",
        tokenizer: tokenize.WordTokenizer | None = None,
        http_session: aiohttp.ClientSession | None = None,
    ) -> None:
        """
        Create a new instance of the xAI TTS.

        See [xAI TTS Documentation Link] for more documentation on all of these options.

        Args:
            voice (str, optional): The voice ID for the desired voice. Defaults to "ara".
            language (TTSLanguages | str, optional): Language code for synthesis (e.g., "en", "fr", "ja"). Defaults to "auto".
            api_key (str | None, optional): The xAI API key. If not provided, it will be read from the xAI environment variable.
            http_session (aiohttp.ClientSession | None, optional): An existing aiohttp ClientSession to use. If not provided, a new session will be created.
        """  # noqa: E501
        super().__init__(
            capabilities=tts.TTSCapabilities(streaming=True),
            sample_rate=SAMPLE_RATE,
            num_channels=NUM_CHANNELS,
        )

        resolved_key: str | None = api_key if is_given(api_key) else os.environ.get("XAI_API_KEY")
        if not resolved_key:
            raise ValueError(
                "xAI API key is required, either as argument or set XAI_API_KEY"
                " environment variable"
            )
        self._api_key = resolved_key
        if tokenizer is None:
            tokenizer = tokenize.basic.WordTokenizer(ignore_punctuation=False)
        self._opts = _TTSOptions(
            voice=voice,
            language=language,
            tokenizer=tokenizer,
        )

        self._session = http_session
        self._streams = weakref.WeakSet[SynthesizeStream]()

    @property
    def model(self) -> str:
        return "unknown"

    @property
    def provider(self) -> str:
        return "xAI"

    async def _connect_ws(self, timeout: float) -> aiohttp.ClientWebSocketResponse:
        params = {
            "voice": self._opts.voice,
            "language": self._opts.language,
            "codec": "pcm",
            "sample_rate": SAMPLE_RATE,
        }
        url = f"{XAI_WEBSOCKET_URL}?{urlencode(params)}"
        try:
            ws = await asyncio.wait_for(
                self._ensure_session().ws_connect(
                    url,
                    headers={"Authorization": f"Bearer {self._api_key}"},
                ),
                timeout,
            )
        except (
            aiohttp.ClientConnectorError,
            aiohttp.ClientConnectionResetError,
            asyncio.TimeoutError,
        ) as e:
            raise APIConnectionError("failed to connect to xAI") from e
        return ws

    async def _close_ws(self, ws: aiohttp.ClientWebSocketResponse) -> None:
        await ws.close()

    def _ensure_session(self) -> aiohttp.ClientSession:
        if not self._session:
            self._session = utils.http_context.http_session()
        return self._session

    def update_options(
        self,
        *,
        voice: str | None = None,
        language: TTSLanguages | str | None = None,
    ) -> None:
        """
        Update the Text-to-Speech (TTS) configuration options.

        Args:
            voice (str, optional): The voice ID for the desired voice.
            language (TTSLanguages | str, optional): Language code for synthesis (e.g., "en", "fr", "ja").
        """  # noqa: E501
        self._opts.voice = voice or self._opts.voice
        self._opts.language = language or self._opts.language

    def synthesize(
        self,
        text: str,
        *,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    ) -> tts.ChunkedStream:
        return self._synthesize_with_stream(text, conn_options=conn_options)

    def stream(
        self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS
    ) -> SynthesizeStream:
        stream = SynthesizeStream(tts=self, conn_options=conn_options)
        self._streams.add(stream)
        return stream

    async def aclose(self) -> None:
        for stream in list(self._streams):
            await stream.aclose()

        self._streams.clear()

Helper class that provides a standard way to create an ABC using inheritance.

Create a new instance of the xAI TTS.

See [xAI TTS Documentation Link] for more documentation on all of these options.

Args

voice : str, optional
The voice ID for the desired voice. Defaults to "ara".
language : TTSLanguages | str, optional
Language code for synthesis (e.g., "en", "fr", "ja"). Defaults to "auto".
api_key : str | None, optional
The xAI API key. If not provided, it will be read from the xAI environment variable.
http_session : aiohttp.ClientSession | None, optional
An existing aiohttp ClientSession to use. If not provided, a new session will be created.

Ancestors

  • livekit.agents.tts.tts.TTS
  • abc.ABC
  • EventEmitter
  • typing.Generic

Instance variables

prop model : str
Expand source code
@property
def model(self) -> str:
    return "unknown"

Get the model name/identifier for this TTS instance.

Returns

The model name if available, "unknown" otherwise.

Note

Plugins should override this property to provide their model information.

prop provider : str
Expand source code
@property
def provider(self) -> str:
    return "xAI"

Get the provider name/identifier for this TTS instance.

Returns

The provider name if available, "unknown" otherwise.

Note

Plugins should override this property to provide their provider information.

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    for stream in list(self._streams):
        await stream.aclose()

    self._streams.clear()
def stream(self,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.xai.tts.SynthesizeStream
Expand source code
def stream(
    self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS
) -> SynthesizeStream:
    stream = SynthesizeStream(tts=self, conn_options=conn_options)
    self._streams.add(stream)
    return stream
def synthesize(self,
text: str,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.agents.tts.tts.ChunkedStream
Expand source code
def synthesize(
    self,
    text: str,
    *,
    conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
) -> tts.ChunkedStream:
    return self._synthesize_with_stream(text, conn_options=conn_options)
def update_options(self, *, voice: str | None = None, language: TTSLanguages | str | None = None) ‑> None
Expand source code
def update_options(
    self,
    *,
    voice: str | None = None,
    language: TTSLanguages | str | None = None,
) -> None:
    """
    Update the Text-to-Speech (TTS) configuration options.

    Args:
        voice (str, optional): The voice ID for the desired voice.
        language (TTSLanguages | str, optional): Language code for synthesis (e.g., "en", "fr", "ja").
    """  # noqa: E501
    self._opts.voice = voice or self._opts.voice
    self._opts.language = language or self._opts.language

Update the Text-to-Speech (TTS) configuration options.

Args

voice : str, optional
The voice ID for the desired voice.
language : TTSLanguages | str, optional
Language code for synthesis (e.g., "en", "fr", "ja").

Inherited members

class WebSearch
Expand source code
@dataclass
class WebSearch(XAITool):
    """Enable web search tool for real-time internet searches."""

    def __post_init__(self) -> None:
        super().__init__(id="xai_web_search")

    def to_dict(self) -> dict[str, Any]:
        return {"type": "web_search"}

Enable web search tool for real-time internet searches.

Ancestors

  • livekit.plugins.xai.tools.XAITool
  • livekit.agents.llm.tool_context.ProviderTool
  • livekit.agents.llm.tool_context.Tool
  • abc.ABC

Methods

def to_dict(self) ‑> dict[str, typing.Any]
Expand source code
def to_dict(self) -> dict[str, Any]:
    return {"type": "web_search"}
class XSearch (allowed_x_handles: list[str] | None = None)
Expand source code
@dataclass
class XSearch(XAITool):
    """Enable X (Twitter) search tool for searching posts."""

    allowed_x_handles: list[str] | None = None

    def __post_init__(self) -> None:
        super().__init__(id="xai_x_search")

    def to_dict(self) -> dict[str, Any]:
        result: dict[str, Any] = {"type": "x_search"}
        if self.allowed_x_handles:
            result["allowed_x_handles"] = self.allowed_x_handles
        return result

Enable X (Twitter) search tool for searching posts.

Ancestors

  • livekit.plugins.xai.tools.XAITool
  • livekit.agents.llm.tool_context.ProviderTool
  • livekit.agents.llm.tool_context.Tool
  • abc.ABC

Instance variables

var allowed_x_handles : list[str] | None

Methods

def to_dict(self) ‑> dict[str, typing.Any]
Expand source code
def to_dict(self) -> dict[str, Any]:
    result: dict[str, Any] = {"type": "x_search"}
    if self.allowed_x_handles:
        result["allowed_x_handles"] = self.allowed_x_handles
    return result