Module livekit.plugins.baseten
Classes
class LLM (*,
model: str | LLMModels = 'meta-llama/Llama-4-Maverick-17B-128E-Instruct',
api_key: NotGivenOr[str] = NOT_GIVEN,
user: NotGivenOr[str] = NOT_GIVEN,
safety_identifier: NotGivenOr[str] = NOT_GIVEN,
prompt_cache_key: NotGivenOr[str] = NOT_GIVEN,
temperature: NotGivenOr[float] = NOT_GIVEN,
top_p: NotGivenOr[float] = NOT_GIVEN,
parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN,
tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN,
reasoning_effort: NotGivenOr[ReasoningEffort] = NOT_GIVEN,
base_url: NotGivenOr[str] = 'https://inference.baseten.co/v1',
client: openai.AsyncClient | None = None,
timeout: httpx.Timeout | None = None)-
Expand source code
class LLM(OpenAILLM): def __init__( self, *, model: str | LLMModels = "meta-llama/Llama-4-Maverick-17B-128E-Instruct", api_key: NotGivenOr[str] = NOT_GIVEN, user: NotGivenOr[str] = NOT_GIVEN, safety_identifier: NotGivenOr[str] = NOT_GIVEN, prompt_cache_key: NotGivenOr[str] = NOT_GIVEN, temperature: NotGivenOr[float] = NOT_GIVEN, top_p: NotGivenOr[float] = NOT_GIVEN, parallel_tool_calls: NotGivenOr[bool] = NOT_GIVEN, tool_choice: NotGivenOr[ToolChoice] = NOT_GIVEN, reasoning_effort: NotGivenOr[ReasoningEffort] = NOT_GIVEN, base_url: NotGivenOr[str] = "https://inference.baseten.co/v1", client: openai.AsyncClient | None = None, timeout: httpx.Timeout | None = None, ): """ Create a new instance of Baseten LLM. ``api_key`` must be set to your Baseten API key, either using the argument or by setting the ``BASETEN_API_KEY`` environmental variable. """ api_key = api_key if is_given(api_key) else os.environ.get("BASETEN_API_KEY", "") if not api_key: raise ValueError( "BASETEN_API_KEY is required, either as argument or set BASETEN_API_KEY environmental variable" # noqa: E501 ) if not is_given(reasoning_effort): if model == "openai/gpt-oss-120b": reasoning_effort = "low" super().__init__( model=model, api_key=api_key, base_url=base_url, client=client, user=user, safety_identifier=safety_identifier, prompt_cache_key=prompt_cache_key, temperature=temperature, top_p=top_p, parallel_tool_calls=parallel_tool_calls, tool_choice=tool_choice, timeout=timeout, reasoning_effort=reasoning_effort, )
Helper class that provides a standard way to create an ABC using inheritance.
Create a new instance of Baseten LLM.
api_key
must be set to your Baseten API key, either using the argument or by setting theBASETEN_API_KEY
environmental variable.Ancestors
- livekit.plugins.openai.llm.LLM
- livekit.agents.llm.llm.LLM
- abc.ABC
- EventEmitter
- typing.Generic
Inherited members
class STT (*,
api_key: str | None = None,
model_endpoint: str | None = None,
sample_rate: int = 16000,
encoding: NotGivenOr[STTEncoding] = NOT_GIVEN,
buffer_size_seconds: float = 0.032,
vad_threshold: float = 0.5,
vad_min_silence_duration_ms: int = 300,
vad_speech_pad_ms: int = 30,
language: str = 'en',
http_session: aiohttp.ClientSession | None = None)-
Expand source code
class STT(stt.STT): def __init__( self, *, api_key: str | None = None, model_endpoint: str | None = None, sample_rate: int = 16000, encoding: NotGivenOr[STTEncoding] = NOT_GIVEN, buffer_size_seconds: float = 0.032, vad_threshold: float = 0.5, vad_min_silence_duration_ms: int = 300, vad_speech_pad_ms: int = 30, language: str = "en", http_session: aiohttp.ClientSession | None = None, ): super().__init__( capabilities=stt.STTCapabilities( streaming=True, interim_results=True, # only final transcripts ), ) api_key = api_key or os.environ.get("BASETEN_API_KEY") if not api_key: raise ValueError( "Baseten API key is required. " "Pass one in via the `api_key` parameter, " "or set it as the `BASETEN_API_KEY` environment variable" ) self._api_key = api_key model_endpoint = model_endpoint or os.environ.get("BASETEN_MODEL_ENDPOINT") if not model_endpoint: raise ValueError( "The model endpoint is required, you can find it in the Baseten dashboard" ) self._model_endpoint = model_endpoint self._opts = STTOptions( sample_rate=sample_rate, buffer_size_seconds=buffer_size_seconds, vad_threshold=vad_threshold, vad_min_silence_duration_ms=vad_min_silence_duration_ms, vad_speech_pad_ms=vad_speech_pad_ms, language=language, ) if is_given(encoding): self._opts.encoding = encoding self._session = http_session self._streams = weakref.WeakSet[SpeechStream]() @property def session(self) -> aiohttp.ClientSession: if not self._session: self._session = utils.http_context.http_session() return self._session async def _recognize_impl( self, buffer: AudioBuffer, *, language: NotGivenOr[str] = NOT_GIVEN, conn_options: APIConnectOptions, ) -> stt.SpeechEvent: raise NotImplementedError("Not implemented") def stream( self, *, language: NotGivenOr[str] = NOT_GIVEN, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> SpeechStream: config = dataclasses.replace(self._opts) stream = SpeechStream( stt=self, conn_options=conn_options, opts=config, api_key=self._api_key, model_endpoint=self._model_endpoint, http_session=self.session, ) self._streams.add(stream) return stream def update_options( self, *, vad_threshold: NotGivenOr[float] = NOT_GIVEN, vad_min_silence_duration_ms: NotGivenOr[int] = NOT_GIVEN, vad_speech_pad_ms: NotGivenOr[int] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, buffer_size_seconds: NotGivenOr[float] = NOT_GIVEN, ) -> None: if is_given(vad_threshold): self._opts.vad_threshold = vad_threshold if is_given(vad_min_silence_duration_ms): self._opts.vad_min_silence_duration_ms = vad_min_silence_duration_ms if is_given(vad_speech_pad_ms): self._opts.vad_speech_pad_ms = vad_speech_pad_ms if is_given(language): self._opts.language = language if is_given(buffer_size_seconds): self._opts.buffer_size_seconds = buffer_size_seconds for stream in self._streams: stream.update_options( vad_threshold=vad_threshold, vad_min_silence_duration_ms=vad_min_silence_duration_ms, vad_speech_pad_ms=vad_speech_pad_ms, language=language, buffer_size_seconds=buffer_size_seconds, )
Helper class that provides a standard way to create an ABC using inheritance.
Ancestors
- livekit.agents.stt.stt.STT
- abc.ABC
- EventEmitter
- typing.Generic
Instance variables
prop session : aiohttp.ClientSession
-
Expand source code
@property def session(self) -> aiohttp.ClientSession: if not self._session: self._session = utils.http_context.http_session() return self._session
Methods
def stream(self,
*,
language: NotGivenOr[str] = NOT_GIVEN,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.baseten.stt.SpeechStream-
Expand source code
def stream( self, *, language: NotGivenOr[str] = NOT_GIVEN, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> SpeechStream: config = dataclasses.replace(self._opts) stream = SpeechStream( stt=self, conn_options=conn_options, opts=config, api_key=self._api_key, model_endpoint=self._model_endpoint, http_session=self.session, ) self._streams.add(stream) return stream
def update_options(self,
*,
vad_threshold: NotGivenOr[float] = NOT_GIVEN,
vad_min_silence_duration_ms: NotGivenOr[int] = NOT_GIVEN,
vad_speech_pad_ms: NotGivenOr[int] = NOT_GIVEN,
language: NotGivenOr[str] = NOT_GIVEN,
buffer_size_seconds: NotGivenOr[float] = NOT_GIVEN) ‑> None-
Expand source code
def update_options( self, *, vad_threshold: NotGivenOr[float] = NOT_GIVEN, vad_min_silence_duration_ms: NotGivenOr[int] = NOT_GIVEN, vad_speech_pad_ms: NotGivenOr[int] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, buffer_size_seconds: NotGivenOr[float] = NOT_GIVEN, ) -> None: if is_given(vad_threshold): self._opts.vad_threshold = vad_threshold if is_given(vad_min_silence_duration_ms): self._opts.vad_min_silence_duration_ms = vad_min_silence_duration_ms if is_given(vad_speech_pad_ms): self._opts.vad_speech_pad_ms = vad_speech_pad_ms if is_given(language): self._opts.language = language if is_given(buffer_size_seconds): self._opts.buffer_size_seconds = buffer_size_seconds for stream in self._streams: stream.update_options( vad_threshold=vad_threshold, vad_min_silence_duration_ms=vad_min_silence_duration_ms, vad_speech_pad_ms=vad_speech_pad_ms, language=language, buffer_size_seconds=buffer_size_seconds, )
Inherited members
class SpeechStream (*,
stt: STT,
opts: STTOptions,
conn_options: APIConnectOptions,
api_key: str,
model_endpoint: str,
http_session: aiohttp.ClientSession)-
Expand source code
class SpeechStream(stt.SpeechStream): # Used to close websocket _CLOSE_MSG: str = json.dumps({"terminate_session": True}) def __init__( self, *, stt: STT, opts: STTOptions, conn_options: APIConnectOptions, api_key: str, model_endpoint: str, http_session: aiohttp.ClientSession, ) -> None: super().__init__(stt=stt, conn_options=conn_options, sample_rate=opts.sample_rate) self._opts = opts self._api_key = api_key self._model_endpoint = model_endpoint self._session = http_session self._speech_duration: float = 0 # keep a list of final transcripts to combine them inside the END_OF_SPEECH event self._final_events: list[SpeechEvent] = [] self._reconnect_event = asyncio.Event() def update_options( self, *, vad_threshold: NotGivenOr[float] = NOT_GIVEN, vad_min_silence_duration_ms: NotGivenOr[int] = NOT_GIVEN, vad_speech_pad_ms: NotGivenOr[int] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, buffer_size_seconds: NotGivenOr[float] = NOT_GIVEN, ) -> None: if is_given(vad_threshold): self._opts.vad_threshold = vad_threshold if is_given(vad_min_silence_duration_ms): self._opts.vad_min_silence_duration_ms = vad_min_silence_duration_ms if is_given(vad_speech_pad_ms): self._opts.vad_speech_pad_ms = vad_speech_pad_ms if is_given(language): self._opts.language = language if is_given(buffer_size_seconds): self._opts.buffer_size_seconds = buffer_size_seconds self._reconnect_event.set() async def _run(self) -> None: """ Run a single websocket connection to Baseten and make sure to reconnect when something went wrong. """ closing_ws = False async def send_task(ws: aiohttp.ClientWebSocketResponse) -> None: samples_per_buffer = 512 audio_bstream = utils.audio.AudioByteStream( sample_rate=self._opts.sample_rate, num_channels=1, samples_per_channel=samples_per_buffer, ) async for data in self._input_ch: if isinstance(data, self._FlushSentinel): frames = audio_bstream.flush() else: frames = audio_bstream.write(data.data.tobytes()) for frame in frames: if len(frame.data) % 2 != 0: logger.warning("Frame data size not aligned to float32 (multiple of 4)") int16_array = np.frombuffer(frame.data, dtype=np.int16) await ws.send_bytes(int16_array.tobytes()) async def recv_task(ws: aiohttp.ClientWebSocketResponse) -> None: nonlocal closing_ws while True: try: msg = await asyncio.wait_for(ws.receive(), timeout=5) except asyncio.TimeoutError: if closing_ws: break continue if msg.type in ( aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE, aiohttp.WSMsgType.CLOSING, ): if closing_ws: return raise APIStatusError("Baseten connection closed unexpectedly") if msg.type != aiohttp.WSMsgType.TEXT: logger.error("Unexpected Baseten message type: %s", msg.type) continue try: data = json.loads(msg.data) is_final = data.get("is_final", True) segments = data.get("segments", []) text = data.get("transcript", "") confidence = data.get("confidence", 0.0) if not is_final: if text: start_time = segments[0].get("start", 0.0) if segments else 0.0 end_time = segments[-1].get("end", 0.0) if segments else 0.0 event = stt.SpeechEvent( type=stt.SpeechEventType.INTERIM_TRANSCRIPT, alternatives=[ stt.SpeechData( language="", text=text, confidence=confidence, start_time=start_time, end_time=end_time, ) ], ) self._event_ch.send_nowait(event) elif is_final: language = data.get("language_code", self._opts.language) if text: start_time = segments[0].get("start", 0.0) if segments else 0.0 end_time = segments[-1].get("end", 0.0) if segments else 0.0 event = stt.SpeechEvent( type=stt.SpeechEventType.FINAL_TRANSCRIPT, alternatives=[ stt.SpeechData( language=language, text=text, confidence=confidence, start_time=start_time, end_time=end_time, ) ], ) self._final_events.append(event) self._event_ch.send_nowait(event) else: logger.warning("Unknown message type from Baseten") except Exception: logger.exception("Failed to process message from Baseten") ws: aiohttp.ClientWebSocketResponse | None = None while True: try: ws = await self._connect_ws() tasks = [ asyncio.create_task(send_task(ws)), asyncio.create_task(recv_task(ws)), ] wait_reconnect_task = asyncio.create_task(self._reconnect_event.wait()) try: done, _ = await asyncio.wait( [asyncio.gather(*tasks), wait_reconnect_task], return_when=asyncio.FIRST_COMPLETED, ) # type: ignore for task in done: if task != wait_reconnect_task: task.result() if wait_reconnect_task not in done: break self._reconnect_event.clear() finally: await utils.aio.gracefully_cancel(*tasks, wait_reconnect_task) finally: if ws is not None: await ws.close() async def _connect_ws(self) -> aiohttp.ClientWebSocketResponse: headers = { "Authorization": f"Api-Key {self._api_key}", } ws = await self._session.ws_connect(self._model_endpoint, headers=headers, ssl=ssl_context) # Build and send the metadata payload as the first message metadata = { "streaming_vad_config": { "threshold": self._opts.vad_threshold, "min_silence_duration_ms": self._opts.vad_min_silence_duration_ms, "speech_pad_ms": self._opts.vad_speech_pad_ms, }, "streaming_params": { "encoding": self._opts.encoding, "sample_rate": self._opts.sample_rate, "enable_partial_transcripts": False, }, "whisper_params": {"audio_language": self._opts.language}, } await ws.send_str(json.dumps(metadata)) return ws
Helper class that provides a standard way to create an ABC using inheritance.
Args: sample_rate : int or None, optional The desired sample rate for the audio input. If specified, the audio input will be automatically resampled to match the given sample rate before being processed for Speech-to-Text. If not provided (None), the input will retain its original sample rate.
Ancestors
- livekit.agents.stt.stt.RecognizeStream
- abc.ABC
Methods
def update_options(self,
*,
vad_threshold: NotGivenOr[float] = NOT_GIVEN,
vad_min_silence_duration_ms: NotGivenOr[int] = NOT_GIVEN,
vad_speech_pad_ms: NotGivenOr[int] = NOT_GIVEN,
language: NotGivenOr[str] = NOT_GIVEN,
buffer_size_seconds: NotGivenOr[float] = NOT_GIVEN) ‑> None-
Expand source code
def update_options( self, *, vad_threshold: NotGivenOr[float] = NOT_GIVEN, vad_min_silence_duration_ms: NotGivenOr[int] = NOT_GIVEN, vad_speech_pad_ms: NotGivenOr[int] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, buffer_size_seconds: NotGivenOr[float] = NOT_GIVEN, ) -> None: if is_given(vad_threshold): self._opts.vad_threshold = vad_threshold if is_given(vad_min_silence_duration_ms): self._opts.vad_min_silence_duration_ms = vad_min_silence_duration_ms if is_given(vad_speech_pad_ms): self._opts.vad_speech_pad_ms = vad_speech_pad_ms if is_given(language): self._opts.language = language if is_given(buffer_size_seconds): self._opts.buffer_size_seconds = buffer_size_seconds self._reconnect_event.set()
class TTS (*,
api_key: str | None = None,
model_endpoint: str | None = None,
voice: str = 'tara',
language: str = 'en',
temperature: float = 0.6,
http_session: aiohttp.ClientSession | None = None)-
Expand source code
class TTS(tts.TTS): def __init__( self, *, api_key: str | None = None, model_endpoint: str | None = None, voice: str = "tara", language: str = "en", temperature: float = 0.6, http_session: aiohttp.ClientSession | None = None, ) -> None: """ Initialize the Baseten TTS. Args: api_key (str): Baseten API key, or `BASETEN_API_KEY` env var. model_endpoint (str): Baseten model endpoint, or `BASETEN_MODEL_ENDPOINT` env var. voice (str): Speaker voice. language (str): language, defaults to "english". """ super().__init__( capabilities=tts.TTSCapabilities(streaming=False), sample_rate=24000, num_channels=1, ) api_key = api_key or os.environ.get("BASETEN_API_KEY") if not api_key: raise ValueError( "Baseten API key is required. " "Pass one in via the `api_key` parameter, " "or set it as the `BASETEN_API_KEY` environment variable" ) model_endpoint = model_endpoint or os.environ.get("BASETEN_MODEL_ENDPOINT") if not model_endpoint: raise ValueError( "The model endpoint is required, you can find it in the Baseten dashboard" ) self._api_key = api_key self._model_endpoint = model_endpoint self._opts = _TTSOptions(voice=voice, language=language, temperature=temperature) self._session = http_session def _ensure_session(self) -> aiohttp.ClientSession: if not self._session: self._session = utils.http_context.http_session() return self._session def update_options( self, *, voice: NotGivenOr[str] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, temperature: NotGivenOr[float] = NOT_GIVEN, ) -> None: if is_given(voice): self._opts.voice = voice if is_given(language): self._opts.language = language if is_given(temperature): self._opts.temperature = temperature def synthesize( self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS ) -> ChunkedStream: return ChunkedStream( tts=self, api_key=self._api_key, input_text=text, model_endpoint=self._model_endpoint, conn_options=conn_options, )
Helper class that provides a standard way to create an ABC using inheritance.
Initialize the Baseten TTS.
Args
api_key
:str
- Baseten API key, or
BASETEN_API_KEY
env var. model_endpoint
:str
- Baseten model endpoint, or
BASETEN_MODEL_ENDPOINT
env var. voice
:str
- Speaker voice.
language
:str
- language, defaults to "english".
Ancestors
- livekit.agents.tts.tts.TTS
- abc.ABC
- EventEmitter
- typing.Generic
Methods
def synthesize(self,
text: str,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.baseten.tts.ChunkedStream-
Expand source code
def synthesize( self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS ) -> ChunkedStream: return ChunkedStream( tts=self, api_key=self._api_key, input_text=text, model_endpoint=self._model_endpoint, conn_options=conn_options, )
def update_options(self,
*,
voice: NotGivenOr[str] = NOT_GIVEN,
language: NotGivenOr[str] = NOT_GIVEN,
temperature: NotGivenOr[float] = NOT_GIVEN) ‑> None-
Expand source code
def update_options( self, *, voice: NotGivenOr[str] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, temperature: NotGivenOr[float] = NOT_GIVEN, ) -> None: if is_given(voice): self._opts.voice = voice if is_given(language): self._opts.language = language if is_given(temperature): self._opts.temperature = temperature
Inherited members