Module livekit.plugins.xai
xAI plugin for LiveKit Agents
Sub-modules
livekit.plugins.xai.realtimelivekit.plugins.xai.responses
Classes
class FileSearch (vector_store_ids: list[str] = <factory>,
max_num_results: int | None = None)-
Expand source code
@dataclass class FileSearch(XAITool): """Enable file search tool for searching uploaded document collections.""" vector_store_ids: list[str] = field(default_factory=list) max_num_results: int | None = None def __post_init__(self) -> None: super().__init__(id="xai_file_search") def to_dict(self) -> dict[str, Any]: result: dict[str, Any] = { "type": "file_search", "vector_store_ids": self.vector_store_ids, } if self.max_num_results is not None: result["max_num_results"] = self.max_num_results return resultEnable file search tool for searching uploaded document collections.
Ancestors
- livekit.plugins.xai.tools.XAITool
- livekit.agents.llm.tool_context.ProviderTool
- livekit.agents.llm.tool_context.Tool
- abc.ABC
Instance variables
var max_num_results : int | Nonevar vector_store_ids : list[str]
Methods
def to_dict(self) ‑> dict[str, typing.Any]-
Expand source code
def to_dict(self) -> dict[str, Any]: result: dict[str, Any] = { "type": "file_search", "vector_store_ids": self.vector_store_ids, } if self.max_num_results is not None: result["max_num_results"] = self.max_num_results return result
class STT (*,
enable_interim_results: bool = True,
sample_rate: int = 16000,
enable_diarization: bool = False,
language: STTLanguages | str = 'en',
endpointing: int = 100,
api_key: NotGivenOr[str] = NOT_GIVEN,
http_session: aiohttp.ClientSession | None = None)-
Expand source code
class STT(stt.STT): def __init__( self, *, enable_interim_results: bool = True, sample_rate: int = SAMPLE_RATE, enable_diarization: bool = False, language: STTLanguages | str = "en", endpointing: int = 100, api_key: NotGivenOr[str] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, ) -> None: """Create a new instance of xAI STT. Args: enable_interim_results (bool, optional): Whether to return interim (non-final) transcription results. Defaults to True. sample_rate: The sample rate of the audio in Hz. Defaults to 16000. enable_diarization: Whether to enable speaker diarization. Words will include a speaker field. Defaults to False. language: BCP-47 language code for transcription (e.g. "en", "fr", "de"). Defaults to "en". endpointing: Silence duration in milliseconds before an utterance-final event is fired. xAI's default is 10ms, but we default to 100ms for better compatibility with LK EOT models. api_key: Your xAI API key. If not provided, will look for XAI_API_KEY environment variable. http_session: Optional aiohttp ClientSession to use for requests. Raises: ValueError: If no API key is provided or found in environment variables. Note: The api_key must be set either through the constructor argument or by setting the XAI_API_KEY environmental variable. """ # noqa: E501 super().__init__( capabilities=stt.STTCapabilities( streaming=True, interim_results=enable_interim_results, diarization=enable_diarization, aligned_transcript="word", ) ) xai_api_key = api_key if is_given(api_key) else os.environ.get("XAI_API_KEY") if not xai_api_key: raise ValueError("xAI API key is required") self._api_key = xai_api_key self._opts = STTOptions( enable_interim_results=enable_interim_results, sample_rate=sample_rate, enable_diarization=enable_diarization, language=language, endpointing=endpointing, ) self._session = http_session self._streams = weakref.WeakSet[SpeechStream]() def _ensure_session(self) -> aiohttp.ClientSession: if not self._session: self._session = utils.http_context.http_session() return self._session async def _recognize_impl( self, buffer: AudioBuffer, *, language: NotGivenOr[str] = NOT_GIVEN, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> stt.SpeechEvent: lang = language if is_given(language) else self._opts.language form = aiohttp.FormData() form.add_field( "file", rtc.combine_audio_frames(buffer).to_wav_bytes(), filename="audio.wav", content_type="audio/wav", ) form.add_field("language", lang) form.add_field("format", "true") try: async with self._ensure_session().post( url=XAI_REST_URL, data=form, headers={ "Authorization": f"Bearer {self._api_key}", "Accept": "application/json", }, timeout=aiohttp.ClientTimeout( total=30, sock_connect=conn_options.timeout, ), ) as res: return _prerecorded_transcription_to_speech_event( await res.json(), enable_diarization=self._opts.enable_diarization ) except asyncio.TimeoutError as e: raise APITimeoutError() from e except aiohttp.ClientResponseError as e: raise APIStatusError( message=e.message, status_code=e.status, request_id=None, body=None, ) from e except Exception as e: raise APIConnectionError() from e def _sanitize_options(self, *, language: NotGivenOr[str] = NOT_GIVEN) -> STTOptions: config = dataclasses.replace(self._opts) if is_given(language): config.language = language return config def stream( self, *, language: NotGivenOr[str] = NOT_GIVEN, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> SpeechStream: config = self._sanitize_options(language=language) stream = SpeechStream( stt=self, conn_options=conn_options, opts=config, api_key=self._api_key, http_session=self._ensure_session(), ) self._streams.add(stream) return stream def update_options( self, *, interim_results: NotGivenOr[bool] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, enable_diarization: NotGivenOr[bool] = NOT_GIVEN, language: NotGivenOr[STTLanguages | str] = NOT_GIVEN, endpointing: NotGivenOr[int] = NOT_GIVEN, ) -> None: if is_given(interim_results): self._opts.enable_interim_results = interim_results if is_given(sample_rate): self._opts.sample_rate = sample_rate if is_given(enable_diarization): self._opts.enable_diarization = enable_diarization if is_given(language): self._opts.language = language if is_given(endpointing): self._opts.endpointing = endpointing for stream in self._streams: stream.update_options( enable_interim_results=interim_results, sample_rate=sample_rate, enable_diarization=enable_diarization, language=language, endpointing=endpointing, )Helper class that provides a standard way to create an ABC using inheritance.
Create a new instance of xAI STT.
Args
enable_interim_results:bool, optional- Whether to return interim (non-final) transcription results. Defaults to True.
sample_rate- The sample rate of the audio in Hz. Defaults to 16000.
enable_diarization- Whether to enable speaker diarization. Words will include a speaker field. Defaults to False.
language- BCP-47 language code for transcription (e.g. "en", "fr", "de"). Defaults to "en".
endpointing- Silence duration in milliseconds before an utterance-final event is fired. xAI's default is 10ms, but we default to 100ms for better compatibility with LK EOT models.
api_key- Your xAI API key. If not provided, will look for XAI_API_KEY environment variable.
http_session- Optional aiohttp ClientSession to use for requests.
Raises
ValueError- If no API key is provided or found in environment variables.
Note
The api_key must be set either through the constructor argument or by setting the XAI_API_KEY environmental variable.
Ancestors
- livekit.agents.stt.stt.STT
- abc.ABC
- EventEmitter
- typing.Generic
Methods
def stream(self,
*,
language: NotGivenOr[str] = NOT_GIVEN,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.xai.stt.SpeechStream-
Expand source code
def stream( self, *, language: NotGivenOr[str] = NOT_GIVEN, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> SpeechStream: config = self._sanitize_options(language=language) stream = SpeechStream( stt=self, conn_options=conn_options, opts=config, api_key=self._api_key, http_session=self._ensure_session(), ) self._streams.add(stream) return stream def update_options(self,
*,
interim_results: NotGivenOr[bool] = NOT_GIVEN,
sample_rate: NotGivenOr[int] = NOT_GIVEN,
enable_diarization: NotGivenOr[bool] = NOT_GIVEN,
language: NotGivenOr[STTLanguages | str] = NOT_GIVEN,
endpointing: NotGivenOr[int] = NOT_GIVEN) ‑> None-
Expand source code
def update_options( self, *, interim_results: NotGivenOr[bool] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, enable_diarization: NotGivenOr[bool] = NOT_GIVEN, language: NotGivenOr[STTLanguages | str] = NOT_GIVEN, endpointing: NotGivenOr[int] = NOT_GIVEN, ) -> None: if is_given(interim_results): self._opts.enable_interim_results = interim_results if is_given(sample_rate): self._opts.sample_rate = sample_rate if is_given(enable_diarization): self._opts.enable_diarization = enable_diarization if is_given(language): self._opts.language = language if is_given(endpointing): self._opts.endpointing = endpointing for stream in self._streams: stream.update_options( enable_interim_results=interim_results, sample_rate=sample_rate, enable_diarization=enable_diarization, language=language, endpointing=endpointing, )
Inherited members
class TTS (*,
api_key: NotGivenOr[str] = NOT_GIVEN,
voice: GrokVoices | str = 'ara',
language: TTSLanguages | str = 'auto',
tokenizer: tokenize.WordTokenizer | None = None,
http_session: aiohttp.ClientSession | None = None)-
Expand source code
class TTS(tts.TTS): def __init__( self, *, api_key: NotGivenOr[str] = NOT_GIVEN, voice: GrokVoices | str = DEFAULT_VOICE, language: TTSLanguages | str = "auto", tokenizer: tokenize.WordTokenizer | None = None, http_session: aiohttp.ClientSession | None = None, ) -> None: """ Create a new instance of the xAI TTS. See [xAI TTS Documentation Link] for more documentation on all of these options. Args: voice (str, optional): The voice ID for the desired voice. Defaults to "ara". language (TTSLanguages | str, optional): Language code for synthesis (e.g., "en", "fr", "ja"). Defaults to "auto". api_key (str | None, optional): The xAI API key. If not provided, it will be read from the xAI environment variable. http_session (aiohttp.ClientSession | None, optional): An existing aiohttp ClientSession to use. If not provided, a new session will be created. """ # noqa: E501 super().__init__( capabilities=tts.TTSCapabilities(streaming=True), sample_rate=SAMPLE_RATE, num_channels=NUM_CHANNELS, ) resolved_key: str | None = api_key if is_given(api_key) else os.environ.get("XAI_API_KEY") if not resolved_key: raise ValueError( "xAI API key is required, either as argument or set XAI_API_KEY" " environment variable" ) self._api_key = resolved_key if tokenizer is None: tokenizer = tokenize.basic.WordTokenizer(ignore_punctuation=False) self._opts = _TTSOptions( voice=voice, language=language, tokenizer=tokenizer, ) self._session = http_session self._streams = weakref.WeakSet[SynthesizeStream]() @property def model(self) -> str: return "unknown" @property def provider(self) -> str: return "xAI" async def _connect_ws(self, timeout: float) -> aiohttp.ClientWebSocketResponse: params = { "voice": self._opts.voice, "language": self._opts.language, "codec": "pcm", "sample_rate": SAMPLE_RATE, } url = f"{XAI_WEBSOCKET_URL}?{urlencode(params)}" try: ws = await asyncio.wait_for( self._ensure_session().ws_connect( url, headers={"Authorization": f"Bearer {self._api_key}"}, ), timeout, ) except ( aiohttp.ClientConnectorError, aiohttp.ClientConnectionResetError, asyncio.TimeoutError, ) as e: raise APIConnectionError("failed to connect to xAI") from e return ws async def _close_ws(self, ws: aiohttp.ClientWebSocketResponse) -> None: await ws.close() def _ensure_session(self) -> aiohttp.ClientSession: if not self._session: self._session = utils.http_context.http_session() return self._session def update_options( self, *, voice: str | None = None, language: TTSLanguages | str | None = None, ) -> None: """ Update the Text-to-Speech (TTS) configuration options. Args: voice (str, optional): The voice ID for the desired voice. language (TTSLanguages | str, optional): Language code for synthesis (e.g., "en", "fr", "ja"). """ # noqa: E501 self._opts.voice = voice or self._opts.voice self._opts.language = language or self._opts.language def synthesize( self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> tts.ChunkedStream: return self._synthesize_with_stream(text, conn_options=conn_options) def stream( self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS ) -> SynthesizeStream: stream = SynthesizeStream(tts=self, conn_options=conn_options) self._streams.add(stream) return stream async def aclose(self) -> None: for stream in list(self._streams): await stream.aclose() self._streams.clear()Helper class that provides a standard way to create an ABC using inheritance.
Create a new instance of the xAI TTS.
See [xAI TTS Documentation Link] for more documentation on all of these options.
Args
voice:str, optional- The voice ID for the desired voice. Defaults to "ara".
language:TTSLanguages | str, optional- Language code for synthesis (e.g., "en", "fr", "ja"). Defaults to "auto".
api_key:str | None, optional- The xAI API key. If not provided, it will be read from the xAI environment variable.
http_session:aiohttp.ClientSession | None, optional- An existing aiohttp ClientSession to use. If not provided, a new session will be created.
Ancestors
- livekit.agents.tts.tts.TTS
- abc.ABC
- EventEmitter
- typing.Generic
Instance variables
prop model : str-
Expand source code
@property def model(self) -> str: return "unknown"Get the model name/identifier for this TTS instance.
Returns
The model name if available, "unknown" otherwise.
Note
Plugins should override this property to provide their model information.
prop provider : str-
Expand source code
@property def provider(self) -> str: return "xAI"Get the provider name/identifier for this TTS instance.
Returns
The provider name if available, "unknown" otherwise.
Note
Plugins should override this property to provide their provider information.
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: for stream in list(self._streams): await stream.aclose() self._streams.clear() def stream(self,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.xai.tts.SynthesizeStream-
Expand source code
def stream( self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS ) -> SynthesizeStream: stream = SynthesizeStream(tts=self, conn_options=conn_options) self._streams.add(stream) return stream def synthesize(self,
text: str,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.agents.tts.tts.ChunkedStream-
Expand source code
def synthesize( self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> tts.ChunkedStream: return self._synthesize_with_stream(text, conn_options=conn_options) def update_options(self, *, voice: str | None = None, language: TTSLanguages | str | None = None) ‑> None-
Expand source code
def update_options( self, *, voice: str | None = None, language: TTSLanguages | str | None = None, ) -> None: """ Update the Text-to-Speech (TTS) configuration options. Args: voice (str, optional): The voice ID for the desired voice. language (TTSLanguages | str, optional): Language code for synthesis (e.g., "en", "fr", "ja"). """ # noqa: E501 self._opts.voice = voice or self._opts.voice self._opts.language = language or self._opts.languageUpdate the Text-to-Speech (TTS) configuration options.
Args
voice:str, optional- The voice ID for the desired voice.
language:TTSLanguages | str, optional- Language code for synthesis (e.g., "en", "fr", "ja").
Inherited members
class WebSearch-
Expand source code
@dataclass class WebSearch(XAITool): """Enable web search tool for real-time internet searches.""" def __post_init__(self) -> None: super().__init__(id="xai_web_search") def to_dict(self) -> dict[str, Any]: return {"type": "web_search"}Enable web search tool for real-time internet searches.
Ancestors
- livekit.plugins.xai.tools.XAITool
- livekit.agents.llm.tool_context.ProviderTool
- livekit.agents.llm.tool_context.Tool
- abc.ABC
Methods
def to_dict(self) ‑> dict[str, typing.Any]-
Expand source code
def to_dict(self) -> dict[str, Any]: return {"type": "web_search"}
class XSearch (allowed_x_handles: list[str] | None = None)-
Expand source code
@dataclass class XSearch(XAITool): """Enable X (Twitter) search tool for searching posts.""" allowed_x_handles: list[str] | None = None def __post_init__(self) -> None: super().__init__(id="xai_x_search") def to_dict(self) -> dict[str, Any]: result: dict[str, Any] = {"type": "x_search"} if self.allowed_x_handles: result["allowed_x_handles"] = self.allowed_x_handles return resultEnable X (Twitter) search tool for searching posts.
Ancestors
- livekit.plugins.xai.tools.XAITool
- livekit.agents.llm.tool_context.ProviderTool
- livekit.agents.llm.tool_context.Tool
- abc.ABC
Instance variables
var allowed_x_handles : list[str] | None
Methods
def to_dict(self) ‑> dict[str, typing.Any]-
Expand source code
def to_dict(self) -> dict[str, Any]: result: dict[str, Any] = {"type": "x_search"} if self.allowed_x_handles: result["allowed_x_handles"] = self.allowed_x_handles return result