Module livekit.agents.inference.stt
Classes
class AssemblyaiOptions (*args, **kwargs)
-
Expand source code
class AssemblyaiOptions(TypedDict, total=False): format_turns: bool # default: False end_of_turn_confidence_threshold: float # default: 0.01 min_end_of_turn_silence_when_confident: int # default: 0 max_turn_silence: int # default: not specified keyterms_prompt: list[str] # default: not specified
dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)
Ancestors
- builtins.dict
Class variables
var end_of_turn_confidence_threshold : float
var format_turns : bool
var keyterms_prompt : list[str]
var max_turn_silence : int
var min_end_of_turn_silence_when_confident : int
class CartesiaOptions (*args, **kwargs)
-
Expand source code
class CartesiaOptions(TypedDict, total=False): min_volume: float # default: not specified max_silence_duration_secs: float # default: not specified
dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)
Ancestors
- builtins.dict
Class variables
var max_silence_duration_secs : float
var min_volume : float
class DeepgramOptions (*args, **kwargs)
-
Expand source code
class DeepgramOptions(TypedDict, total=False): filler_words: bool # default: True interim_results: bool # default: True endpointing: int # default: 25 (ms) punctuate: bool # default: False smart_format: bool keywords: list[tuple[str, float]] keyterms: list[str] profanity_filter: bool numerals: bool mip_opt_out: bool
dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)
Ancestors
- builtins.dict
Class variables
var endpointing : int
var filler_words : bool
var interim_results : bool
var keyterms : list[str]
var keywords : list[tuple[str, float]]
var mip_opt_out : bool
var numerals : bool
var profanity_filter : bool
var punctuate : bool
var smart_format : bool
class STT (model: NotGivenOr[STTModels | str] = NOT_GIVEN,
*,
language: NotGivenOr[str] = NOT_GIVEN,
base_url: NotGivenOr[str] = NOT_GIVEN,
encoding: NotGivenOr[STTEncoding] = NOT_GIVEN,
sample_rate: NotGivenOr[int] = NOT_GIVEN,
api_key: NotGivenOr[str] = NOT_GIVEN,
api_secret: NotGivenOr[str] = NOT_GIVEN,
http_session: aiohttp.ClientSession | None = None,
extra_kwargs: NotGivenOr[dict[str, Any] | CartesiaOptions | DeepgramOptions | AssemblyaiOptions] = NOT_GIVEN)-
Expand source code
class STT(stt.STT): @overload def __init__( self, model: CartesiaModels, *, language: NotGivenOr[str] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, encoding: NotGivenOr[STTEncoding] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, extra_kwargs: NotGivenOr[CartesiaOptions] = NOT_GIVEN, ) -> None: ... @overload def __init__( self, model: DeepgramModels, *, language: NotGivenOr[str] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, encoding: NotGivenOr[STTEncoding] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, extra_kwargs: NotGivenOr[DeepgramOptions] = NOT_GIVEN, ) -> None: ... @overload def __init__( self, model: AssemblyaiModels, *, language: NotGivenOr[str] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, encoding: NotGivenOr[STTEncoding] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, extra_kwargs: NotGivenOr[AssemblyaiOptions] = NOT_GIVEN, ) -> None: ... @overload def __init__( self, model: str, *, language: NotGivenOr[str] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, encoding: NotGivenOr[STTEncoding] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, extra_kwargs: NotGivenOr[dict[str, Any]] = NOT_GIVEN, ) -> None: ... def __init__( self, model: NotGivenOr[STTModels | str] = NOT_GIVEN, *, language: NotGivenOr[str] = NOT_GIVEN, base_url: NotGivenOr[str] = NOT_GIVEN, encoding: NotGivenOr[STTEncoding] = NOT_GIVEN, sample_rate: NotGivenOr[int] = NOT_GIVEN, api_key: NotGivenOr[str] = NOT_GIVEN, api_secret: NotGivenOr[str] = NOT_GIVEN, http_session: aiohttp.ClientSession | None = None, extra_kwargs: NotGivenOr[ dict[str, Any] | CartesiaOptions | DeepgramOptions | AssemblyaiOptions ] = NOT_GIVEN, ) -> None: """Livekit Cloud Inference STT Args: model (STTModels | str, optional): STT model to use. language (str, optional): Language of the STT model. encoding (STTEncoding, optional): Encoding of the STT model. sample_rate (int, optional): Sample rate of the STT model. base_url (str, optional): LIVEKIT_URL, if not provided, read from environment variable. api_key (str, optional): LIVEKIT_API_KEY, if not provided, read from environment variable. api_secret (str, optional): LIVEKIT_API_SECRET, if not provided, read from environment variable. http_session (aiohttp.ClientSession, optional): HTTP session to use. extra_kwargs (dict, optional): Extra kwargs to pass to the STT model. """ super().__init__( capabilities=stt.STTCapabilities(streaming=True, interim_results=True), ) lk_base_url = ( base_url if is_given(base_url) else os.environ.get("LIVEKIT_INFERENCE_URL", DEFAULT_BASE_URL) ) lk_api_key = ( api_key if is_given(api_key) else os.getenv("LIVEKIT_INFERENCE_API_KEY", os.getenv("LIVEKIT_API_KEY", "")) ) if not lk_api_key: raise ValueError( "api_key is required, either as argument or set LIVEKIT_API_KEY environmental variable" ) lk_api_secret = ( api_secret if is_given(api_secret) else os.getenv("LIVEKIT_INFERENCE_API_SECRET", os.getenv("LIVEKIT_API_SECRET", "")) ) if not lk_api_secret: raise ValueError( "api_secret is required, either as argument or set LIVEKIT_API_SECRET environmental variable" ) self._opts = STTOptions( model=model, language=language, encoding=encoding if is_given(encoding) else DEFAULT_ENCODING, sample_rate=sample_rate if is_given(sample_rate) else DEFAULT_SAMPLE_RATE, base_url=lk_base_url, api_key=lk_api_key, api_secret=lk_api_secret, extra_kwargs=dict(extra_kwargs) if is_given(extra_kwargs) else {}, ) self._session = http_session self._streams = weakref.WeakSet[SpeechStream]() def _ensure_session(self) -> aiohttp.ClientSession: if not self._session: self._session = utils.http_context.http_session() return self._session async def _recognize_impl( self, buffer: utils.AudioBuffer, *, language: NotGivenOr[str] = NOT_GIVEN, conn_options: APIConnectOptions, ) -> stt.SpeechEvent: raise NotImplementedError( "LiveKit STT does not support batch recognition, use stream() instead" ) def stream( self, *, language: NotGivenOr[STTLanguages | str] = NOT_GIVEN, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> SpeechStream: """Create a streaming transcription session.""" options = self._sanitize_options(language=language) stream = SpeechStream(stt=self, opts=options, conn_options=conn_options) self._streams.add(stream) return stream def update_options( self, *, model: NotGivenOr[STTModels | str] = NOT_GIVEN, language: NotGivenOr[STTLanguages | str] = NOT_GIVEN, ) -> None: """Update STT configuration options.""" if is_given(model): self._opts.model = model if is_given(language): self._opts.language = language for stream in self._streams: stream.update_options(model=model, language=language) def _sanitize_options( self, *, language: NotGivenOr[STTLanguages | str] = NOT_GIVEN ) -> STTOptions: """Create a sanitized copy of options with language override if provided.""" options = replace(self._opts) if is_given(language): options.language = language return options
Helper class that provides a standard way to create an ABC using inheritance.
Livekit Cloud Inference STT
Args
model
:STTModels | str
, optional- STT model to use.
language
:str
, optional- Language of the STT model.
encoding
:STTEncoding
, optional- Encoding of the STT model.
sample_rate
:int
, optional- Sample rate of the STT model.
base_url
:str
, optional- LIVEKIT_URL, if not provided, read from environment variable.
api_key
:str
, optional- LIVEKIT_API_KEY, if not provided, read from environment variable.
api_secret
:str
, optional- LIVEKIT_API_SECRET, if not provided, read from environment variable.
http_session
:aiohttp.ClientSession
, optional- HTTP session to use.
extra_kwargs
:dict
, optional- Extra kwargs to pass to the STT model.
Ancestors
- livekit.agents.stt.stt.STT
- abc.ABC
- EventEmitter
- typing.Generic
Methods
def stream(self,
*,
language: NotGivenOr[STTLanguages | str] = NOT_GIVEN,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> SpeechStream-
Expand source code
def stream( self, *, language: NotGivenOr[STTLanguages | str] = NOT_GIVEN, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> SpeechStream: """Create a streaming transcription session.""" options = self._sanitize_options(language=language) stream = SpeechStream(stt=self, opts=options, conn_options=conn_options) self._streams.add(stream) return stream
Create a streaming transcription session.
def update_options(self,
*,
model: NotGivenOr[STTModels | str] = NOT_GIVEN,
language: NotGivenOr[STTLanguages | str] = NOT_GIVEN) ‑> None-
Expand source code
def update_options( self, *, model: NotGivenOr[STTModels | str] = NOT_GIVEN, language: NotGivenOr[STTLanguages | str] = NOT_GIVEN, ) -> None: """Update STT configuration options.""" if is_given(model): self._opts.model = model if is_given(language): self._opts.language = language for stream in self._streams: stream.update_options(model=model, language=language)
Update STT configuration options.
Inherited members
class STTOptions (model: NotGivenOr[STTModels | str],
language: NotGivenOr[str],
encoding: STTEncoding,
sample_rate: int,
base_url: str,
api_key: str,
api_secret: str,
extra_kwargs: dict[str, Any])-
Expand source code
@dataclass class STTOptions: model: NotGivenOr[STTModels | str] language: NotGivenOr[str] encoding: STTEncoding sample_rate: int base_url: str api_key: str api_secret: str extra_kwargs: dict[str, Any]
STTOptions(model: 'NotGivenOr[STTModels | str]', language: 'NotGivenOr[str]', encoding: 'STTEncoding', sample_rate: 'int', base_url: 'str', api_key: 'str', api_secret: 'str', extra_kwargs: 'dict[str, Any]')
Instance variables
var api_key : str
var api_secret : str
var base_url : str
var encoding : Literal['pcm_s16le']
var extra_kwargs : dict[str, typing.Any]
var language : str | livekit.agents.types.NotGiven
var model : Literal['deepgram', 'deepgram/nova-3', 'deepgram/nova-3-general', 'deepgram/nova-3-medical', 'deepgram/nova-2', 'deepgram/nova-2-general', 'deepgram/nova-2-medical', 'deepgram/nova-2-phonecall'] | Literal['cartesia', 'cartesia/ink-whisper'] | Literal['assemblyai'] | str | livekit.agents.types.NotGiven
var sample_rate : int
class SpeechStream (*,
stt: STT,
opts: STTOptions,
conn_options: APIConnectOptions)-
Expand source code
class SpeechStream(stt.SpeechStream): def __init__( self, *, stt: STT, opts: STTOptions, conn_options: APIConnectOptions, ) -> None: super().__init__(stt=stt, conn_options=conn_options, sample_rate=opts.sample_rate) self._opts = opts self._session = stt._ensure_session() self._request_id = str(utils.shortuuid("stt_request_")) self._reconnect_event = asyncio.Event() self._speaking = False self._speech_duration: float = 0 def update_options( self, *, model: NotGivenOr[STTModels | str] = NOT_GIVEN, language: NotGivenOr[STTLanguages | str] = NOT_GIVEN, ) -> None: """Update streaming transcription options.""" if is_given(model): self._opts.model = model if is_given(language): self._opts.language = language async def _run(self) -> None: """Main loop for streaming transcription.""" closing_ws = False self._reconnect_event.set() @utils.log_exceptions(logger=logger) async def send_task(ws: aiohttp.ClientWebSocketResponse) -> None: nonlocal closing_ws audio_bstream = utils.audio.AudioByteStream( sample_rate=self._opts.sample_rate, num_channels=1, samples_per_channel=self._opts.sample_rate // 20, # 50ms ) async for ev in self._input_ch: frames: list[rtc.AudioFrame] = [] if isinstance(ev, rtc.AudioFrame): frames.extend(audio_bstream.push(ev.data)) elif isinstance(ev, self._FlushSentinel): frames.extend(audio_bstream.flush()) for frame in frames: self._speech_duration += frame.duration audio_bytes = frame.data.tobytes() base64_audio = base64.b64encode(audio_bytes).decode("utf-8") audio_msg = { "type": "input_audio", "audio": base64_audio, } await ws.send_str(json.dumps(audio_msg)) closing_ws = True finalize_msg = { "type": "session.finalize", } await ws.send_str(json.dumps(finalize_msg)) @utils.log_exceptions(logger=logger) async def recv_task(ws: aiohttp.ClientWebSocketResponse) -> None: nonlocal closing_ws while True: msg = await ws.receive() if msg.type in ( aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE, aiohttp.WSMsgType.CLOSING, ): if closing_ws or self._session.closed: return raise APIStatusError(message="LiveKit STT connection closed unexpectedly") if msg.type != aiohttp.WSMsgType.TEXT: logger.warning("unexpected LiveKit STT message type %s", msg.type) continue data = json.loads(msg.data) msg_type = data.get("type") if msg_type == "session.created": pass elif msg_type == "interim_transcript": self._process_transcript(data, is_final=False) elif msg_type == "final_transcript": self._process_transcript(data, is_final=True) elif msg_type == "session.finalized": pass elif msg_type == "session.closed": pass elif msg_type == "error": raise APIError(f"LiveKit STT returned error: {msg.data}") else: logger.warning("received unexpected message from LiveKit STT: %s", data) ws: aiohttp.ClientWebSocketResponse | None = None while True: try: ws = await self._connect_ws() tasks = [ asyncio.create_task(send_task(ws)), asyncio.create_task(recv_task(ws)), ] tasks_group = asyncio.gather(*tasks) wait_reconnect_task = asyncio.create_task(self._reconnect_event.wait()) try: done, _ = await asyncio.wait( (tasks_group, wait_reconnect_task), return_when=asyncio.FIRST_COMPLETED, ) for task in done: if task != wait_reconnect_task: task.result() if wait_reconnect_task not in done: break self._reconnect_event.clear() finally: await utils.aio.gracefully_cancel(*tasks, wait_reconnect_task) tasks_group.cancel() tasks_group.exception() # retrieve the exception finally: if ws is not None: await ws.close() async def _connect_ws(self) -> aiohttp.ClientWebSocketResponse: """Connect to the LiveKit STT WebSocket.""" params: dict[str, Any] = { "settings": { "sample_rate": str(self._opts.sample_rate), "encoding": self._opts.encoding, "extra": self._opts.extra_kwargs, }, } if self._opts.model: params["model"] = self._opts.model if self._opts.language: params["settings"]["language"] = self._opts.language base_url = self._opts.base_url if base_url.startswith(("http://", "https://")): base_url = base_url.replace("http", "ws", 1) headers = { "Authorization": f"Bearer {create_access_token(self._opts.api_key, self._opts.api_secret)}" } try: ws = await asyncio.wait_for( self._session.ws_connect(f"{base_url}/stt", headers=headers), self._conn_options.timeout, ) params["type"] = "session.create" await ws.send_str(json.dumps(params)) except (aiohttp.ClientConnectorError, asyncio.TimeoutError) as e: if isinstance(e, aiohttp.ClientResponseError) and e.status == 429: raise APIStatusError("LiveKit STT quota exceeded", status_code=e.status) from e raise APIConnectionError("failed to connect to LiveKit STT") from e return ws def _process_transcript(self, data: dict, is_final: bool) -> None: request_id = data.get("request_id", self._request_id) text = data.get("transcript", "") language = data.get("language", self._opts.language or "en") if not text and not is_final: return # We'll have a more accurate way of detecting when speech started when we have VAD if not self._speaking: self._speaking = True start_event = stt.SpeechEvent(type=stt.SpeechEventType.START_OF_SPEECH) self._event_ch.send_nowait(start_event) speech_data = stt.SpeechData( language=language, start_time=data.get("start", 0), end_time=data.get("duration", 0), # This is the duration transcribed so far confidence=data.get("confidence", 1.0), text=text, ) if is_final: if self._speech_duration > 0: self._event_ch.send_nowait( stt.SpeechEvent( type=stt.SpeechEventType.RECOGNITION_USAGE, request_id=request_id, recognition_usage=stt.RecognitionUsage( audio_duration=self._speech_duration, ), ) ) self._speech_duration = 0 event = stt.SpeechEvent( type=stt.SpeechEventType.FINAL_TRANSCRIPT, request_id=request_id, alternatives=[speech_data], ) self._event_ch.send_nowait(event) if self._speaking: self._speaking = False end_event = stt.SpeechEvent(type=stt.SpeechEventType.END_OF_SPEECH) self._event_ch.send_nowait(end_event) else: event = stt.SpeechEvent( type=stt.SpeechEventType.INTERIM_TRANSCRIPT, request_id=request_id, alternatives=[speech_data], ) self._event_ch.send_nowait(event)
Helper class that provides a standard way to create an ABC using inheritance.
Args: sample_rate : int or None, optional The desired sample rate for the audio input. If specified, the audio input will be automatically resampled to match the given sample rate before being processed for Speech-to-Text. If not provided (None), the input will retain its original sample rate.
Ancestors
- livekit.agents.stt.stt.RecognizeStream
- abc.ABC
Methods
def update_options(self,
*,
model: NotGivenOr[STTModels | str] = NOT_GIVEN,
language: NotGivenOr[STTLanguages | str] = NOT_GIVEN) ‑> None-
Expand source code
def update_options( self, *, model: NotGivenOr[STTModels | str] = NOT_GIVEN, language: NotGivenOr[STTLanguages | str] = NOT_GIVEN, ) -> None: """Update streaming transcription options.""" if is_given(model): self._opts.model = model if is_given(language): self._opts.language = language
Update streaming transcription options.