Module livekit.plugins.assemblyai
Sub-modules
livekit.plugins.assemblyai.log
livekit.plugins.assemblyai.stt
livekit.plugins.assemblyai.version
Classes
class STT (*,
api_key: Optional[str] = None,
sample_rate: int = 16000,
word_boost: Optional[List[str]] = None,
encoding: "Optional[Literal['pcm_s16le', 'pcm_mulaw']]" = 'pcm_s16le',
disable_partial_transcripts: bool = False,
enable_extra_session_information: bool = False,
end_utterance_silence_threshold: Optional[int] = 500,
http_session: Optional[aiohttp.ClientSession] = None,
buffer_size_seconds: float = 0.05)-
Expand source code
class STT(stt.STT): def __init__( self, *, api_key: Optional[str] = None, sample_rate: int = 16000, word_boost: Optional[List[str]] = None, encoding: Optional[Literal["pcm_s16le", "pcm_mulaw"]] = "pcm_s16le", disable_partial_transcripts: bool = False, enable_extra_session_information: bool = False, end_utterance_silence_threshold: Optional[int] = 500, http_session: Optional[aiohttp.ClientSession] = None, buffer_size_seconds: float = 0.05, ): super().__init__( capabilities=stt.STTCapabilities( streaming=True, interim_results=True, ), ) api_key = api_key or os.environ.get("ASSEMBLYAI_API_KEY") if api_key is None: raise ValueError( "AssemblyAI API key is required. " "Pass one in via the `api_key` parameter, " "or set it as the `ASSEMBLYAI_API_KEY` environment variable" ) self._api_key = api_key self._opts = STTOptions( sample_rate=sample_rate, word_boost=word_boost, encoding=encoding, disable_partial_transcripts=disable_partial_transcripts, enable_extra_session_information=enable_extra_session_information, buffer_size_seconds=buffer_size_seconds, end_utterance_silence_threshold=end_utterance_silence_threshold, ) self._session = http_session self._streams = weakref.WeakSet[SpeechStream]() @property def session(self) -> aiohttp.ClientSession: if not self._session: self._session = utils.http_context.http_session() return self._session async def _recognize_impl( self, buffer: AudioBuffer, *, language: str | None, conn_options: APIConnectOptions, ) -> stt.SpeechEvent: raise NotImplementedError("Not implemented") def stream( self, *, language: Optional[str] = None, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> "SpeechStream": config = dataclasses.replace(self._opts) stream = SpeechStream( stt=self, conn_options=conn_options, opts=config, api_key=self._api_key, http_session=self.session, ) self._streams.add(stream) return stream def update_options( self, *, disable_partial_transcripts: Optional[bool] = None, word_boost: Optional[List[str]] = None, end_utterance_silence_threshold: Optional[int] = None, enable_extra_session_information: Optional[bool] = None, buffer_size_seconds: Optional[float] = None, ): if disable_partial_transcripts is not None: self._opts.disable_partial_transcripts = disable_partial_transcripts if word_boost is not None: self._opts.word_boost = word_boost if end_utterance_silence_threshold is not None: self._opts.end_utterance_silence_threshold = end_utterance_silence_threshold if enable_extra_session_information is not None: self._opts.enable_extra_session_information = ( enable_extra_session_information ) if buffer_size_seconds is not None: self._opts.buffer_size_seconds = buffer_size_seconds for stream in self._streams: stream.update_options( disable_partial_transcripts=disable_partial_transcripts, word_boost=word_boost, end_utterance_silence_threshold=end_utterance_silence_threshold, enable_extra_session_information=enable_extra_session_information, buffer_size_seconds=buffer_size_seconds, )
Helper class that provides a standard way to create an ABC using inheritance.
Ancestors
- STT
- abc.ABC
- EventEmitter
- typing.Generic
Instance variables
prop session : aiohttp.ClientSession
-
Expand source code
@property def session(self) -> aiohttp.ClientSession: if not self._session: self._session = utils.http_context.http_session() return self._session
Methods
def stream(self,
*,
language: Optional[str] = None,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=5.0, timeout=10.0)) ‑> SpeechStream-
Expand source code
def stream( self, *, language: Optional[str] = None, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> "SpeechStream": config = dataclasses.replace(self._opts) stream = SpeechStream( stt=self, conn_options=conn_options, opts=config, api_key=self._api_key, http_session=self.session, ) self._streams.add(stream) return stream
def update_options(self,
*,
disable_partial_transcripts: Optional[bool] = None,
word_boost: Optional[List[str]] = None,
end_utterance_silence_threshold: Optional[int] = None,
enable_extra_session_information: Optional[bool] = None,
buffer_size_seconds: Optional[float] = None)-
Expand source code
def update_options( self, *, disable_partial_transcripts: Optional[bool] = None, word_boost: Optional[List[str]] = None, end_utterance_silence_threshold: Optional[int] = None, enable_extra_session_information: Optional[bool] = None, buffer_size_seconds: Optional[float] = None, ): if disable_partial_transcripts is not None: self._opts.disable_partial_transcripts = disable_partial_transcripts if word_boost is not None: self._opts.word_boost = word_boost if end_utterance_silence_threshold is not None: self._opts.end_utterance_silence_threshold = end_utterance_silence_threshold if enable_extra_session_information is not None: self._opts.enable_extra_session_information = ( enable_extra_session_information ) if buffer_size_seconds is not None: self._opts.buffer_size_seconds = buffer_size_seconds for stream in self._streams: stream.update_options( disable_partial_transcripts=disable_partial_transcripts, word_boost=word_boost, end_utterance_silence_threshold=end_utterance_silence_threshold, enable_extra_session_information=enable_extra_session_information, buffer_size_seconds=buffer_size_seconds, )
Inherited members
class SpeechStream (*,
stt: STT,
opts: STTOptions,
conn_options: APIConnectOptions,
api_key: str,
http_session: aiohttp.ClientSession)-
Expand source code
class SpeechStream(stt.SpeechStream): # Used to close websocket _CLOSE_MSG: str = json.dumps({"terminate_session": True}) def __init__( self, *, stt: STT, opts: STTOptions, conn_options: APIConnectOptions, api_key: str, http_session: aiohttp.ClientSession, ) -> None: super().__init__( stt=stt, conn_options=conn_options, sample_rate=opts.sample_rate ) self._opts = opts self._api_key = api_key self._session = http_session self._speech_duration: float = 0 # keep a list of final transcripts to combine them inside the END_OF_SPEECH event self._final_events: List[SpeechEvent] = [] self._reconnect_event = asyncio.Event() def update_options( self, *, disable_partial_transcripts: Optional[bool] = None, word_boost: Optional[List[str]] = None, end_utterance_silence_threshold: Optional[int] = None, enable_extra_session_information: Optional[bool] = None, buffer_size_seconds: Optional[float] = None, ): if disable_partial_transcripts is not None: self._opts.disable_partial_transcripts = disable_partial_transcripts if word_boost is not None: self._opts.word_boost = word_boost if end_utterance_silence_threshold is not None: self._opts.end_utterance_silence_threshold = end_utterance_silence_threshold if enable_extra_session_information is not None: self._opts.enable_extra_session_information = ( enable_extra_session_information ) if buffer_size_seconds is not None: self._opts.buffer_size_seconds = buffer_size_seconds self._reconnect_event.set() async def _run(self) -> None: """ Run a single websocket connection to AssemblyAI and make sure to reconnect when something went wrong. """ closing_ws = False async def send_task(ws: aiohttp.ClientWebSocketResponse): nonlocal closing_ws if self._opts.end_utterance_silence_threshold: await ws.send_str( json.dumps( { "end_utterance_silence_threshold": self._opts.end_utterance_silence_threshold } ) ) samples_per_buffer = self._opts.sample_rate // round( 1 / self._opts.buffer_size_seconds ) audio_bstream = utils.audio.AudioByteStream( sample_rate=self._opts.sample_rate, num_channels=1, samples_per_channel=samples_per_buffer, ) # forward inputs to AssemblyAI # if we receive a close message, signal it to AssemblyAI and break. # the recv task will then make sure to process the remaining audio and stop async for data in self._input_ch: if isinstance(data, self._FlushSentinel): frames = audio_bstream.flush() else: frames = audio_bstream.write(data.data.tobytes()) for frame in frames: self._speech_duration += frame.duration await ws.send_bytes(frame.data.tobytes()) closing_ws = True await ws.send_str(SpeechStream._CLOSE_MSG) async def recv_task(ws: aiohttp.ClientWebSocketResponse): nonlocal closing_ws while True: try: msg = await asyncio.wait_for(ws.receive(), timeout=5) except asyncio.TimeoutError: if closing_ws: break continue if msg.type in ( aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.CLOSE, aiohttp.WSMsgType.CLOSING, ): if closing_ws: # close is expected, see SpeechStream.aclose return raise Exception( "AssemblyAI connection closed unexpectedly", ) # this will trigger a reconnection, see the _run loop if msg.type != aiohttp.WSMsgType.TEXT: logger.error("unexpected AssemblyAI message type %s", msg.type) continue try: # received a message from AssemblyAI data = json.loads(msg.data) self._process_stream_event(data, closing_ws) except Exception: logger.exception("failed to process AssemblyAI message") ws: aiohttp.ClientWebSocketResponse | None = None while True: try: ws = await self._connect_ws() tasks = [ asyncio.create_task(send_task(ws)), asyncio.create_task(recv_task(ws)), ] wait_reconnect_task = asyncio.create_task(self._reconnect_event.wait()) try: done, _ = await asyncio.wait( [asyncio.gather(*tasks), wait_reconnect_task], return_when=asyncio.FIRST_COMPLETED, ) # type: ignore if wait_reconnect_task not in done: break self._reconnect_event.clear() finally: await utils.aio.gracefully_cancel(*tasks, wait_reconnect_task) finally: if ws is not None: await ws.close() async def _connect_ws(self) -> aiohttp.ClientWebSocketResponse: live_config = { "sample_rate": self._opts.sample_rate, "word_boost": self._opts.word_boost, "encoding": self._opts.encoding, "disable_partial_transcripts": self._opts.disable_partial_transcripts, "enable_extra_session_information": self._opts.enable_extra_session_information, } headers = { "Authorization": self._api_key, "Content-Type": "application/json", } ws_url = "wss://api.assemblyai.com/v2/realtime/ws" filtered_config = {k: v for k, v in live_config.items() if v is not None} url = f"{ws_url}?{urlencode(filtered_config).lower()}" ws = await self._session.ws_connect(url, headers=headers) return ws def _process_stream_event(self, data: dict, closing_ws: bool) -> None: # see this page: # https://www.assemblyai.com/docs/api-reference/streaming/realtime # for more information about the different types of events if data["message_type"] == "SessionBegins": start_event = stt.SpeechEvent(type=stt.SpeechEventType.START_OF_SPEECH) self._event_ch.send_nowait(start_event) elif data["message_type"] == "PartialTranscript": alts = live_transcription_to_speech_data(ENGLISH, data) if len(alts) > 0 and alts[0].text: interim_event = stt.SpeechEvent( type=stt.SpeechEventType.INTERIM_TRANSCRIPT, alternatives=alts, ) self._event_ch.send_nowait(interim_event) elif data["message_type"] == "FinalTranscript": alts = live_transcription_to_speech_data(ENGLISH, data) if len(alts) > 0 and alts[0].text: final_event = stt.SpeechEvent( type=stt.SpeechEventType.FINAL_TRANSCRIPT, alternatives=alts, ) self._final_events.append(final_event) self._event_ch.send_nowait(final_event) # log metrics if self._speech_duration > 0: usage_event = stt.SpeechEvent( type=stt.SpeechEventType.RECOGNITION_USAGE, # request_id="", alternatives=[], recognition_usage=stt.RecognitionUsage( audio_duration=self._speech_duration ), ) self._event_ch.send_nowait(usage_event) self._speech_duration = 0 elif data["message_type"] == "SessionTerminated": if closing_ws: pass else: raise Exception("AssemblyAI connection closed unexpectedly") elif data["message_type"] == "SessionInformation": logger.debug("AssemblyAI Session Information: %s", str(data)) elif data["message_type"] == "RealtimeError": logger.error("Received unexpected error from AssemblyAI %s", data) else: logger.warning( "Received unexpected error from AssemblyAI %s", data["message_type"] )
Helper class that provides a standard way to create an ABC using inheritance.
Args: sample_rate : int or None, optional The desired sample rate for the audio input. If specified, the audio input will be automatically resampled to match the given sample rate before being processed for Speech-to-Text. If not provided (None), the input will retain its original sample rate.
Ancestors
- RecognizeStream
- abc.ABC
Methods
def update_options(self,
*,
disable_partial_transcripts: Optional[bool] = None,
word_boost: Optional[List[str]] = None,
end_utterance_silence_threshold: Optional[int] = None,
enable_extra_session_information: Optional[bool] = None,
buffer_size_seconds: Optional[float] = None)-
Expand source code
def update_options( self, *, disable_partial_transcripts: Optional[bool] = None, word_boost: Optional[List[str]] = None, end_utterance_silence_threshold: Optional[int] = None, enable_extra_session_information: Optional[bool] = None, buffer_size_seconds: Optional[float] = None, ): if disable_partial_transcripts is not None: self._opts.disable_partial_transcripts = disable_partial_transcripts if word_boost is not None: self._opts.word_boost = word_boost if end_utterance_silence_threshold is not None: self._opts.end_utterance_silence_threshold = end_utterance_silence_threshold if enable_extra_session_information is not None: self._opts.enable_extra_session_information = ( enable_extra_session_information ) if buffer_size_seconds is not None: self._opts.buffer_size_seconds = buffer_size_seconds self._reconnect_event.set()
Inherited members