Module livekit.plugins.azure
Classes
class STT (*,
speech_key: NotGivenOr[str] = NOT_GIVEN,
speech_region: NotGivenOr[str] = NOT_GIVEN,
speech_host: NotGivenOr[str] = NOT_GIVEN,
speech_auth_token: NotGivenOr[str] = NOT_GIVEN,
sample_rate: int = 16000,
num_channels: int = 1,
segmentation_silence_timeout_ms: NotGivenOr[int] = NOT_GIVEN,
segmentation_max_time_ms: NotGivenOr[int] = NOT_GIVEN,
segmentation_strategy: NotGivenOr[str] = NOT_GIVEN,
language: NotGivenOr[str | list[str] | None] = NOT_GIVEN,
profanity: NotGivenOr[speechsdk.enums.ProfanityOption] = NOT_GIVEN)-
Expand source code
class STT(stt.STT): def __init__( self, *, speech_key: NotGivenOr[str] = NOT_GIVEN, speech_region: NotGivenOr[str] = NOT_GIVEN, speech_host: NotGivenOr[str] = NOT_GIVEN, speech_auth_token: NotGivenOr[str] = NOT_GIVEN, sample_rate: int = 16000, num_channels: int = 1, segmentation_silence_timeout_ms: NotGivenOr[int] = NOT_GIVEN, segmentation_max_time_ms: NotGivenOr[int] = NOT_GIVEN, segmentation_strategy: NotGivenOr[str] = NOT_GIVEN, # Azure handles multiple languages and can auto-detect the language used. It requires the candidate set to be set. # noqa: E501 language: NotGivenOr[str | list[str] | None] = NOT_GIVEN, profanity: NotGivenOr[speechsdk.enums.ProfanityOption] = NOT_GIVEN, ): """ Create a new instance of Azure STT. Either ``speech_host`` or ``speech_key`` and ``speech_region`` or ``speech_auth_token`` and ``speech_region`` must be set using arguments. Alternatively, set the ``AZURE_SPEECH_HOST``, ``AZURE_SPEECH_KEY`` and ``AZURE_SPEECH_REGION`` environmental variables, respectively. ``speech_auth_token`` must be set using the arguments as it's an ephemeral token. """ super().__init__(capabilities=stt.STTCapabilities(streaming=True, interim_results=True)) if not language or not is_given(language): language = ["en-US"] if isinstance(language, str): language = [language] if not is_given(speech_host): speech_host = os.environ.get("AZURE_SPEECH_HOST") if not is_given(speech_key): speech_key = os.environ.get("AZURE_SPEECH_KEY") if not is_given(speech_region): speech_region = os.environ.get("AZURE_SPEECH_REGION") if not ( is_given(speech_host) or (is_given(speech_key) and is_given(speech_region)) or (is_given(speech_auth_token) and is_given(speech_region)) ): raise ValueError( "AZURE_SPEECH_HOST or AZURE_SPEECH_KEY and AZURE_SPEECH_REGION or speech_auth_token and AZURE_SPEECH_REGION must be set" # noqa: E501 ) self._config = STTOptions( speech_key=speech_key, speech_region=speech_region, speech_host=speech_host, speech_auth_token=speech_auth_token, language=language, sample_rate=sample_rate, num_channels=num_channels, segmentation_silence_timeout_ms=segmentation_silence_timeout_ms, segmentation_max_time_ms=segmentation_max_time_ms, segmentation_strategy=segmentation_strategy, profanity=profanity, ) self._streams = weakref.WeakSet[SpeechStream]() async def _recognize_impl( self, buffer: utils.AudioBuffer, *, language: NotGivenOr[str] = NOT_GIVEN, conn_options: APIConnectOptions, ) -> stt.SpeechEvent: raise NotImplementedError("Azure STT does not support single frame recognition") def stream( self, *, language: NotGivenOr[str] = NOT_GIVEN, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> SpeechStream: config = deepcopy(self._config) if is_given(language): config.language = [language] stream = SpeechStream(stt=self, opts=config, conn_options=conn_options) self._streams.add(stream) return stream def update_options(self, *, language: NotGivenOr[list[str] | str] = NOT_GIVEN): if is_given(language): if isinstance(language, str): language = [language] self._config.language = language for stream in self._streams: stream.update_options(language=language)
Helper class that provides a standard way to create an ABC using inheritance.
Create a new instance of Azure STT.
Either
speech_host
orspeech_key
andspeech_region
orspeech_auth_token
andspeech_region
must be set using arguments. Alternatively, set theAZURE_SPEECH_HOST
,AZURE_SPEECH_KEY
andAZURE_SPEECH_REGION
environmental variables, respectively.speech_auth_token
must be set using the arguments as it's an ephemeral token.Ancestors
- livekit.agents.stt.stt.STT
- abc.ABC
- EventEmitter
- typing.Generic
Methods
def stream(self,
*,
language: NotGivenOr[str] = NOT_GIVEN,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.azure.stt.SpeechStream-
Expand source code
def stream( self, *, language: NotGivenOr[str] = NOT_GIVEN, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> SpeechStream: config = deepcopy(self._config) if is_given(language): config.language = [language] stream = SpeechStream(stt=self, opts=config, conn_options=conn_options) self._streams.add(stream) return stream
def update_options(self, *, language: NotGivenOr[list[str] | str] = NOT_GIVEN)
-
Expand source code
def update_options(self, *, language: NotGivenOr[list[str] | str] = NOT_GIVEN): if is_given(language): if isinstance(language, str): language = [language] self._config.language = language for stream in self._streams: stream.update_options(language=language)
Inherited members
class SpeechStream (*,
stt: STT,
opts: STTOptions,
conn_options: APIConnectOptions)-
Expand source code
class SpeechStream(stt.SpeechStream): def __init__(self, *, stt: STT, opts: STTOptions, conn_options: APIConnectOptions) -> None: super().__init__(stt=stt, conn_options=conn_options, sample_rate=opts.sample_rate) self._opts = opts self._speaking = False self._session_stopped_event = asyncio.Event() self._session_started_event = asyncio.Event() self._loop = asyncio.get_running_loop() self._reconnect_event = asyncio.Event() def update_options(self, *, language: list[str]): self._opts.language = language self._reconnect_event.set() async def _run(self) -> None: while True: self._stream = speechsdk.audio.PushAudioInputStream( stream_format=speechsdk.audio.AudioStreamFormat( samples_per_second=self._opts.sample_rate, bits_per_sample=16, channels=self._opts.num_channels, ) ) self._recognizer = _create_speech_recognizer(config=self._opts, stream=self._stream) self._recognizer.recognizing.connect(self._on_recognizing) self._recognizer.recognized.connect(self._on_recognized) self._recognizer.speech_start_detected.connect(self._on_speech_start) self._recognizer.speech_end_detected.connect(self._on_speech_end) self._recognizer.session_started.connect(self._on_session_started) self._recognizer.session_stopped.connect(self._on_session_stopped) self._recognizer.start_continuous_recognition() try: await asyncio.wait_for( self._session_started_event.wait(), self._conn_options.timeout ) async def process_input(): async for input in self._input_ch: if isinstance(input, rtc.AudioFrame): self._stream.write(input.data.tobytes()) process_input_task = asyncio.create_task(process_input()) wait_reconnect_task = asyncio.create_task(self._reconnect_event.wait()) try: done, _ = await asyncio.wait( [process_input_task, wait_reconnect_task], return_when=asyncio.FIRST_COMPLETED, ) for task in done: if task != wait_reconnect_task: task.result() if wait_reconnect_task not in done: break self._reconnect_event.clear() finally: await utils.aio.gracefully_cancel(process_input_task, wait_reconnect_task) self._stream.close() await self._session_stopped_event.wait() finally: def _cleanup(): self._recognizer.stop_continuous_recognition() del self._recognizer await asyncio.to_thread(_cleanup) def _on_recognized(self, evt: speechsdk.SpeechRecognitionEventArgs): detected_lg = speechsdk.AutoDetectSourceLanguageResult(evt.result).language text = evt.result.text.strip() if not text: return if not detected_lg and self._opts.language: detected_lg = self._opts.language[0] final_data = stt.SpeechData(language=detected_lg, confidence=1.0, text=evt.result.text) with contextlib.suppress(RuntimeError): self._loop.call_soon_threadsafe( self._event_ch.send_nowait, stt.SpeechEvent( type=stt.SpeechEventType.FINAL_TRANSCRIPT, alternatives=[final_data] ), ) def _on_recognizing(self, evt: speechsdk.SpeechRecognitionEventArgs): detected_lg = speechsdk.AutoDetectSourceLanguageResult(evt.result).language text = evt.result.text.strip() if not text: return if not detected_lg and self._opts.language: detected_lg = self._opts.language[0] interim_data = stt.SpeechData(language=detected_lg, confidence=0.0, text=evt.result.text) with contextlib.suppress(RuntimeError): self._loop.call_soon_threadsafe( self._event_ch.send_nowait, stt.SpeechEvent( type=stt.SpeechEventType.INTERIM_TRANSCRIPT, alternatives=[interim_data], ), ) def _on_speech_start(self, evt: speechsdk.SpeechRecognitionEventArgs): if self._speaking: return self._speaking = True with contextlib.suppress(RuntimeError): self._loop.call_soon_threadsafe( self._event_ch.send_nowait, stt.SpeechEvent(type=stt.SpeechEventType.START_OF_SPEECH), ) def _on_speech_end(self, evt: speechsdk.SpeechRecognitionEventArgs): if not self._speaking: return self._speaking = False with contextlib.suppress(RuntimeError): self._loop.call_soon_threadsafe( self._event_ch.send_nowait, stt.SpeechEvent(type=stt.SpeechEventType.END_OF_SPEECH), ) def _on_session_started(self, evt: speechsdk.SpeechRecognitionEventArgs): self._session_started_event.set() with contextlib.suppress(RuntimeError): self._loop.call_soon_threadsafe(self._session_started_event.set) def _on_session_stopped(self, evt: speechsdk.SpeechRecognitionEventArgs): with contextlib.suppress(RuntimeError): self._loop.call_soon_threadsafe(self._session_stopped_event.set)
Helper class that provides a standard way to create an ABC using inheritance.
Args: sample_rate : int or None, optional The desired sample rate for the audio input. If specified, the audio input will be automatically resampled to match the given sample rate before being processed for Speech-to-Text. If not provided (None), the input will retain its original sample rate.
Ancestors
- livekit.agents.stt.stt.RecognizeStream
- abc.ABC
Methods
def update_options(self, *, language: list[str])
-
Expand source code
def update_options(self, *, language: list[str]): self._opts.language = language self._reconnect_event.set()
class TTS (*,
sample_rate: int = 24000,
voice: NotGivenOr[str] = NOT_GIVEN,
language: NotGivenOr[str] = NOT_GIVEN,
prosody: NotGivenOr[ProsodyConfig] = NOT_GIVEN,
speech_key: NotGivenOr[str] = NOT_GIVEN,
speech_region: NotGivenOr[str] = NOT_GIVEN,
speech_host: NotGivenOr[str] = NOT_GIVEN,
speech_auth_token: NotGivenOr[str] = NOT_GIVEN,
endpoint_id: NotGivenOr[str] = NOT_GIVEN,
style: NotGivenOr[StyleConfig] = NOT_GIVEN,
on_bookmark_reached_event: NotGivenOr[Callable] = NOT_GIVEN,
on_synthesis_canceled_event: NotGivenOr[Callable] = NOT_GIVEN,
on_synthesis_completed_event: NotGivenOr[Callable] = NOT_GIVEN,
on_synthesis_started_event: NotGivenOr[Callable] = NOT_GIVEN,
on_synthesizing_event: NotGivenOr[Callable] = NOT_GIVEN,
on_viseme_event: NotGivenOr[Callable] = NOT_GIVEN,
on_word_boundary_event: NotGivenOr[Callable] = NOT_GIVEN)-
Expand source code
class TTS(tts.TTS): def __init__( self, *, sample_rate: int = 24000, voice: NotGivenOr[str] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, prosody: NotGivenOr[ProsodyConfig] = NOT_GIVEN, speech_key: NotGivenOr[str] = NOT_GIVEN, speech_region: NotGivenOr[str] = NOT_GIVEN, speech_host: NotGivenOr[str] = NOT_GIVEN, speech_auth_token: NotGivenOr[str] = NOT_GIVEN, endpoint_id: NotGivenOr[str] = NOT_GIVEN, style: NotGivenOr[StyleConfig] = NOT_GIVEN, on_bookmark_reached_event: NotGivenOr[Callable] = NOT_GIVEN, on_synthesis_canceled_event: NotGivenOr[Callable] = NOT_GIVEN, on_synthesis_completed_event: NotGivenOr[Callable] = NOT_GIVEN, on_synthesis_started_event: NotGivenOr[Callable] = NOT_GIVEN, on_synthesizing_event: NotGivenOr[Callable] = NOT_GIVEN, on_viseme_event: NotGivenOr[Callable] = NOT_GIVEN, on_word_boundary_event: NotGivenOr[Callable] = NOT_GIVEN, ) -> None: """ Create a new instance of Azure TTS. Either ``speech_host`` or ``speech_key`` and ``speech_region`` or ``speech_auth_token`` and ``speech_region`` must be set using arguments. Alternatively, set the ``AZURE_SPEECH_HOST``, ``AZURE_SPEECH_KEY`` and ``AZURE_SPEECH_REGION`` environmental variables, respectively. ``speech_auth_token`` must be set using the arguments as it's an ephemeral token. """ if sample_rate not in SUPPORTED_SAMPLE_RATE: raise ValueError( f"Unsupported sample rate {sample_rate}. Supported sample rates: {list(SUPPORTED_SAMPLE_RATE.keys())}" # noqa: E501 ) super().__init__( capabilities=tts.TTSCapabilities( streaming=False, ), sample_rate=sample_rate, num_channels=1, ) if not is_given(speech_host): speech_host = os.environ.get("AZURE_SPEECH_HOST") if not is_given(speech_key): speech_key = os.environ.get("AZURE_SPEECH_KEY") if not is_given(speech_region): speech_region = os.environ.get("AZURE_SPEECH_REGION") if not ( is_given(speech_host) or (is_given(speech_key) and is_given(speech_region)) or (is_given(speech_auth_token) and is_given(speech_region)) ): raise ValueError( "AZURE_SPEECH_HOST or AZURE_SPEECH_KEY and AZURE_SPEECH_REGION or speech_auth_token and AZURE_SPEECH_REGION must be set" # noqa: E501 ) if is_given(prosody): prosody.validate() if is_given(style): style.validate() self._opts = _TTSOptions( sample_rate=sample_rate, speech_key=speech_key, speech_region=speech_region, speech_host=speech_host, speech_auth_token=speech_auth_token, voice=voice, endpoint_id=endpoint_id, language=language, prosody=prosody, style=style, on_bookmark_reached_event=on_bookmark_reached_event, on_synthesis_canceled_event=on_synthesis_canceled_event, on_synthesis_completed_event=on_synthesis_completed_event, on_synthesis_started_event=on_synthesis_started_event, on_synthesizing_event=on_synthesizing_event, on_viseme_event=on_viseme_event, on_word_boundary_event=on_word_boundary_event, ) def update_options( self, *, voice: NotGivenOr[str] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, prosody: NotGivenOr[ProsodyConfig] = NOT_GIVEN, style: NotGivenOr[StyleConfig] = NOT_GIVEN, on_bookmark_reached_event: NotGivenOr[Callable] = NOT_GIVEN, on_synthesis_canceled_event: NotGivenOr[Callable] = NOT_GIVEN, on_synthesis_completed_event: NotGivenOr[Callable] = NOT_GIVEN, on_synthesis_started_event: NotGivenOr[Callable] = NOT_GIVEN, on_synthesizing_event: NotGivenOr[Callable] = NOT_GIVEN, on_viseme_event: NotGivenOr[Callable] = NOT_GIVEN, on_word_boundary_event: NotGivenOr[Callable] = NOT_GIVEN, ) -> None: if is_given(voice): self._opts.voice = voice if is_given(language): self._opts.language = language if is_given(prosody): self._opts.prosody = prosody if is_given(style): self._opts.style = style if is_given(on_bookmark_reached_event): self._opts.on_bookmark_reached_event = on_bookmark_reached_event if is_given(on_synthesis_canceled_event): self._opts.on_synthesis_canceled_event = on_synthesis_canceled_event if is_given(on_synthesis_completed_event): self._opts.on_synthesis_completed_event = on_synthesis_completed_event if is_given(on_synthesis_started_event): self._opts.on_synthesis_started_event = on_synthesis_started_event if is_given(on_synthesizing_event): self._opts.on_synthesizing_event = on_synthesizing_event if is_given(on_viseme_event): self._opts.on_viseme_event = on_viseme_event if is_given(on_word_boundary_event): self._opts.on_word_boundary_event = on_word_boundary_event def synthesize( self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> ChunkedStream: return ChunkedStream(tts=self, input_text=text, conn_options=conn_options, opts=self._opts)
Helper class that provides a standard way to create an ABC using inheritance.
Create a new instance of Azure TTS.
Either
speech_host
orspeech_key
andspeech_region
orspeech_auth_token
andspeech_region
must be set using arguments. Alternatively, set theAZURE_SPEECH_HOST
,AZURE_SPEECH_KEY
andAZURE_SPEECH_REGION
environmental variables, respectively.speech_auth_token
must be set using the arguments as it's an ephemeral token.Ancestors
- livekit.agents.tts.tts.TTS
- abc.ABC
- EventEmitter
- typing.Generic
Methods
def synthesize(self,
text: str,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.azure.tts.ChunkedStream-
Expand source code
def synthesize( self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> ChunkedStream: return ChunkedStream(tts=self, input_text=text, conn_options=conn_options, opts=self._opts)
def update_options(self,
*,
voice: NotGivenOr[str] = NOT_GIVEN,
language: NotGivenOr[str] = NOT_GIVEN,
prosody: NotGivenOr[ProsodyConfig] = NOT_GIVEN,
style: NotGivenOr[StyleConfig] = NOT_GIVEN,
on_bookmark_reached_event: NotGivenOr[Callable] = NOT_GIVEN,
on_synthesis_canceled_event: NotGivenOr[Callable] = NOT_GIVEN,
on_synthesis_completed_event: NotGivenOr[Callable] = NOT_GIVEN,
on_synthesis_started_event: NotGivenOr[Callable] = NOT_GIVEN,
on_synthesizing_event: NotGivenOr[Callable] = NOT_GIVEN,
on_viseme_event: NotGivenOr[Callable] = NOT_GIVEN,
on_word_boundary_event: NotGivenOr[Callable] = NOT_GIVEN) ‑> None-
Expand source code
def update_options( self, *, voice: NotGivenOr[str] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, prosody: NotGivenOr[ProsodyConfig] = NOT_GIVEN, style: NotGivenOr[StyleConfig] = NOT_GIVEN, on_bookmark_reached_event: NotGivenOr[Callable] = NOT_GIVEN, on_synthesis_canceled_event: NotGivenOr[Callable] = NOT_GIVEN, on_synthesis_completed_event: NotGivenOr[Callable] = NOT_GIVEN, on_synthesis_started_event: NotGivenOr[Callable] = NOT_GIVEN, on_synthesizing_event: NotGivenOr[Callable] = NOT_GIVEN, on_viseme_event: NotGivenOr[Callable] = NOT_GIVEN, on_word_boundary_event: NotGivenOr[Callable] = NOT_GIVEN, ) -> None: if is_given(voice): self._opts.voice = voice if is_given(language): self._opts.language = language if is_given(prosody): self._opts.prosody = prosody if is_given(style): self._opts.style = style if is_given(on_bookmark_reached_event): self._opts.on_bookmark_reached_event = on_bookmark_reached_event if is_given(on_synthesis_canceled_event): self._opts.on_synthesis_canceled_event = on_synthesis_canceled_event if is_given(on_synthesis_completed_event): self._opts.on_synthesis_completed_event = on_synthesis_completed_event if is_given(on_synthesis_started_event): self._opts.on_synthesis_started_event = on_synthesis_started_event if is_given(on_synthesizing_event): self._opts.on_synthesizing_event = on_synthesizing_event if is_given(on_viseme_event): self._opts.on_viseme_event = on_viseme_event if is_given(on_word_boundary_event): self._opts.on_word_boundary_event = on_word_boundary_event
Inherited members