Module livekit.plugins.azure
Azure plugin for LiveKit Agents
Support for Azure AI including Azure Speech. For Azure OpenAI, see the OpenAI plugin.
See https://docs.livekit.io/agents/integrations/azure/ for more information.
Classes
class STT (*,
speech_key: NotGivenOr[str] = NOT_GIVEN,
speech_region: NotGivenOr[str] = NOT_GIVEN,
speech_host: NotGivenOr[str] = NOT_GIVEN,
speech_auth_token: NotGivenOr[str] = NOT_GIVEN,
sample_rate: int = 16000,
num_channels: int = 1,
segmentation_silence_timeout_ms: NotGivenOr[int] = NOT_GIVEN,
segmentation_max_time_ms: NotGivenOr[int] = NOT_GIVEN,
segmentation_strategy: NotGivenOr[str] = NOT_GIVEN,
language: NotGivenOr[str | list[str] | None] = NOT_GIVEN,
profanity: NotGivenOr[speechsdk.enums.ProfanityOption] = NOT_GIVEN,
speech_endpoint: NotGivenOr[str] = NOT_GIVEN,
phrase_list: NotGivenOr[list[str] | None] = NOT_GIVEN,
explicit_punctuation: bool = False)-
Expand source code
class STT(stt.STT): def __init__( self, *, speech_key: NotGivenOr[str] = NOT_GIVEN, speech_region: NotGivenOr[str] = NOT_GIVEN, speech_host: NotGivenOr[str] = NOT_GIVEN, speech_auth_token: NotGivenOr[str] = NOT_GIVEN, sample_rate: int = 16000, num_channels: int = 1, segmentation_silence_timeout_ms: NotGivenOr[int] = NOT_GIVEN, segmentation_max_time_ms: NotGivenOr[int] = NOT_GIVEN, segmentation_strategy: NotGivenOr[str] = NOT_GIVEN, # Azure handles multiple languages and can auto-detect the language used. It requires the candidate set to be set. # noqa: E501 language: NotGivenOr[str | list[str] | None] = NOT_GIVEN, profanity: NotGivenOr[speechsdk.enums.ProfanityOption] = NOT_GIVEN, speech_endpoint: NotGivenOr[str] = NOT_GIVEN, phrase_list: NotGivenOr[list[str] | None] = NOT_GIVEN, explicit_punctuation: bool = False, ): """ Create a new instance of Azure STT. Either ``speech_host`` or ``speech_key`` and ``speech_region`` or ``speech_auth_token`` and ``speech_region`` or ``speech_key`` and ``speech_endpoint`` must be set using arguments. Alternatively, set the ``AZURE_SPEECH_HOST``, ``AZURE_SPEECH_KEY`` and ``AZURE_SPEECH_REGION`` environmental variables, respectively. ``speech_auth_token`` must be set using the arguments as it's an ephemeral token. Args: phrase_list: List of words or phrases to boost recognition accuracy. Azure will give higher priority to these phrases during recognition. explicit_punctuation: Controls punctuation behavior. If True, enables explicit punctuation mode where punctuation marks are added explicitly. If False (default), uses Azure's default punctuation behavior. """ super().__init__(capabilities=stt.STTCapabilities(streaming=True, interim_results=True)) if not language or not is_given(language): language = ["en-US"] if isinstance(language, str): language = [language] if not is_given(speech_host): speech_host = os.environ.get("AZURE_SPEECH_HOST") or NOT_GIVEN if not is_given(speech_key): speech_key = os.environ.get("AZURE_SPEECH_KEY") or NOT_GIVEN if not is_given(speech_region): speech_region = os.environ.get("AZURE_SPEECH_REGION") or NOT_GIVEN if not ( is_given(speech_host) or (is_given(speech_key) and is_given(speech_region)) or (is_given(speech_auth_token) and is_given(speech_region)) or (is_given(speech_key) and is_given(speech_endpoint)) ): raise ValueError( "AZURE_SPEECH_HOST or AZURE_SPEECH_KEY and AZURE_SPEECH_REGION or speech_auth_token and AZURE_SPEECH_REGION or AZURE_SPEECH_KEY and speech_endpoint must be set" # noqa: E501 ) if speech_region and speech_endpoint: logger.warning("speech_region and speech_endpoint both are set, using speech_endpoint") speech_region = NOT_GIVEN self._config = STTOptions( speech_key=speech_key, speech_region=speech_region, speech_host=speech_host, speech_auth_token=speech_auth_token, language=language, sample_rate=sample_rate, num_channels=num_channels, segmentation_silence_timeout_ms=segmentation_silence_timeout_ms, segmentation_max_time_ms=segmentation_max_time_ms, segmentation_strategy=segmentation_strategy, profanity=profanity, speech_endpoint=speech_endpoint, phrase_list=phrase_list, explicit_punctuation=explicit_punctuation, ) self._streams = weakref.WeakSet[SpeechStream]() async def _recognize_impl( self, buffer: utils.AudioBuffer, *, language: NotGivenOr[str] = NOT_GIVEN, conn_options: APIConnectOptions, ) -> stt.SpeechEvent: raise NotImplementedError("Azure STT does not support single frame recognition") def stream( self, *, language: NotGivenOr[str] = NOT_GIVEN, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> SpeechStream: config = deepcopy(self._config) if is_given(language): config.language = [language] stream = SpeechStream(stt=self, opts=config, conn_options=conn_options) self._streams.add(stream) return stream def update_options(self, *, language: NotGivenOr[list[str] | str] = NOT_GIVEN) -> None: if is_given(language): if isinstance(language, str): language = [language] language = cast(list[str], language) self._config.language = language for stream in self._streams: stream.update_options(language=language)
Helper class that provides a standard way to create an ABC using inheritance.
Create a new instance of Azure STT.
Either
speech_host
orspeech_key
andspeech_region
orspeech_auth_token
andspeech_region
orspeech_key
andspeech_endpoint
must be set using arguments. Alternatively, set theAZURE_SPEECH_HOST
,AZURE_SPEECH_KEY
andAZURE_SPEECH_REGION
environmental variables, respectively.speech_auth_token
must be set using the arguments as it's an ephemeral token.Args
phrase_list
- List of words or phrases to boost recognition accuracy. Azure will give higher priority to these phrases during recognition.
explicit_punctuation
- Controls punctuation behavior. If True, enables explicit punctuation mode where punctuation marks are added explicitly. If False (default), uses Azure's default punctuation behavior.
Ancestors
- livekit.agents.stt.stt.STT
- abc.ABC
- EventEmitter
- typing.Generic
Methods
def stream(self,
*,
language: NotGivenOr[str] = NOT_GIVEN,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.azure.stt.SpeechStream-
Expand source code
def stream( self, *, language: NotGivenOr[str] = NOT_GIVEN, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> SpeechStream: config = deepcopy(self._config) if is_given(language): config.language = [language] stream = SpeechStream(stt=self, opts=config, conn_options=conn_options) self._streams.add(stream) return stream
def update_options(self, *, language: NotGivenOr[list[str] | str] = NOT_GIVEN) ‑> None
-
Expand source code
def update_options(self, *, language: NotGivenOr[list[str] | str] = NOT_GIVEN) -> None: if is_given(language): if isinstance(language, str): language = [language] language = cast(list[str], language) self._config.language = language for stream in self._streams: stream.update_options(language=language)
Inherited members
class SpeechStream (*,
stt: STT,
opts: STTOptions,
conn_options: APIConnectOptions)-
Expand source code
class SpeechStream(stt.SpeechStream): def __init__(self, *, stt: STT, opts: STTOptions, conn_options: APIConnectOptions) -> None: super().__init__(stt=stt, conn_options=conn_options, sample_rate=opts.sample_rate) self._opts = opts self._speaking = False self._session_stopped_event = asyncio.Event() self._session_started_event = asyncio.Event() self._loop = asyncio.get_running_loop() self._reconnect_event = asyncio.Event() def update_options(self, *, language: list[str]) -> None: self._opts.language = language self._reconnect_event.set() async def _run(self) -> None: while True: self._session_stopped_event.clear() self._stream = speechsdk.audio.PushAudioInputStream( stream_format=speechsdk.audio.AudioStreamFormat( samples_per_second=self._opts.sample_rate, bits_per_sample=16, channels=self._opts.num_channels, ) ) self._recognizer = _create_speech_recognizer(config=self._opts, stream=self._stream) self._recognizer.recognizing.connect(self._on_recognizing) self._recognizer.recognized.connect(self._on_recognized) self._recognizer.speech_start_detected.connect(self._on_speech_start) self._recognizer.speech_end_detected.connect(self._on_speech_end) self._recognizer.session_started.connect(self._on_session_started) self._recognizer.session_stopped.connect(self._on_session_stopped) self._recognizer.canceled.connect(self._on_canceled) self._recognizer.start_continuous_recognition() try: await asyncio.wait_for( self._session_started_event.wait(), self._conn_options.timeout ) async def process_input() -> None: async for input in self._input_ch: if isinstance(input, rtc.AudioFrame): self._stream.write(input.data.tobytes()) process_input_task = asyncio.create_task(process_input()) wait_reconnect_task = asyncio.create_task(self._reconnect_event.wait()) wait_stopped_task = asyncio.create_task(self._session_stopped_event.wait()) try: done, _ = await asyncio.wait( [process_input_task, wait_reconnect_task, wait_stopped_task], return_when=asyncio.FIRST_COMPLETED, ) for task in done: if task not in [wait_reconnect_task, wait_stopped_task]: task.result() if wait_stopped_task in done: raise APIConnectionError("SpeechRecognition session stopped") if wait_reconnect_task not in done: break self._reconnect_event.clear() finally: await utils.aio.gracefully_cancel(process_input_task, wait_reconnect_task) self._stream.close() await self._session_stopped_event.wait() finally: def _cleanup() -> None: self._recognizer.stop_continuous_recognition() del self._recognizer await asyncio.to_thread(_cleanup) def _on_recognized(self, evt: speechsdk.SpeechRecognitionEventArgs) -> None: detected_lg = speechsdk.AutoDetectSourceLanguageResult(evt.result).language text = evt.result.text.strip() if not text: return if not detected_lg and self._opts.language: detected_lg = self._opts.language[0] final_data = stt.SpeechData(language=detected_lg, confidence=1.0, text=evt.result.text) with contextlib.suppress(RuntimeError): self._loop.call_soon_threadsafe( self._event_ch.send_nowait, stt.SpeechEvent( type=stt.SpeechEventType.FINAL_TRANSCRIPT, alternatives=[final_data] ), ) def _on_recognizing(self, evt: speechsdk.SpeechRecognitionEventArgs) -> None: detected_lg = speechsdk.AutoDetectSourceLanguageResult(evt.result).language text = evt.result.text.strip() if not text: return if not detected_lg and self._opts.language: detected_lg = self._opts.language[0] interim_data = stt.SpeechData(language=detected_lg, confidence=0.0, text=evt.result.text) with contextlib.suppress(RuntimeError): self._loop.call_soon_threadsafe( self._event_ch.send_nowait, stt.SpeechEvent( type=stt.SpeechEventType.INTERIM_TRANSCRIPT, alternatives=[interim_data], ), ) def _on_speech_start(self, evt: speechsdk.SpeechRecognitionEventArgs) -> None: if self._speaking: return self._speaking = True with contextlib.suppress(RuntimeError): self._loop.call_soon_threadsafe( self._event_ch.send_nowait, stt.SpeechEvent(type=stt.SpeechEventType.START_OF_SPEECH), ) def _on_speech_end(self, evt: speechsdk.SpeechRecognitionEventArgs) -> None: if not self._speaking: return self._speaking = False with contextlib.suppress(RuntimeError): self._loop.call_soon_threadsafe( self._event_ch.send_nowait, stt.SpeechEvent(type=stt.SpeechEventType.END_OF_SPEECH), ) def _on_session_started(self, evt: speechsdk.SpeechRecognitionEventArgs) -> None: self._session_started_event.set() with contextlib.suppress(RuntimeError): self._loop.call_soon_threadsafe(self._session_started_event.set) def _on_session_stopped(self, evt: speechsdk.SpeechRecognitionEventArgs) -> None: with contextlib.suppress(RuntimeError): self._loop.call_soon_threadsafe(self._session_stopped_event.set) def _on_canceled(self, evt: speechsdk.SpeechRecognitionCanceledEventArgs) -> None: if evt.cancellation_details.reason == speechsdk.CancellationReason.Error: logger.warning( f"Speech recognition canceled: {evt.cancellation_details}", extra={ "code": evt.cancellation_details.code, "reason": evt.cancellation_details.reason, "error_details": evt.cancellation_details.error_details, }, )
Helper class that provides a standard way to create an ABC using inheritance.
Args: sample_rate : int or None, optional The desired sample rate for the audio input. If specified, the audio input will be automatically resampled to match the given sample rate before being processed for Speech-to-Text. If not provided (None), the input will retain its original sample rate.
Ancestors
- livekit.agents.stt.stt.RecognizeStream
- abc.ABC
Methods
def update_options(self, *, language: list[str]) ‑> None
-
Expand source code
def update_options(self, *, language: list[str]) -> None: self._opts.language = language self._reconnect_event.set()
class TTS (*,
voice: str = 'en-US-JennyNeural',
language: str | None = None,
sample_rate: int = 24000,
prosody: NotGivenOr[ProsodyConfig] = NOT_GIVEN,
style: NotGivenOr[StyleConfig] = NOT_GIVEN,
speech_key: str | None = None,
speech_region: str | None = None,
speech_endpoint: str | None = None,
deployment_id: str | None = None,
speech_auth_token: str | None = None,
http_session: aiohttp.ClientSession | None = None)-
Expand source code
class TTS(tts.TTS): def __init__( self, *, voice: str = "en-US-JennyNeural", language: str | None = None, sample_rate: int = 24000, prosody: NotGivenOr[ProsodyConfig] = NOT_GIVEN, style: NotGivenOr[StyleConfig] = NOT_GIVEN, speech_key: str | None = None, speech_region: str | None = None, speech_endpoint: str | None = None, deployment_id: str | None = None, speech_auth_token: str | None = None, http_session: aiohttp.ClientSession | None = None, ) -> None: super().__init__( capabilities=tts.TTSCapabilities(streaming=False), sample_rate=sample_rate, num_channels=1, ) if sample_rate not in SUPPORTED_OUTPUT_FORMATS: raise ValueError( f"Unsupported sample rate {sample_rate}. Supported: {list(SUPPORTED_OUTPUT_FORMATS)}" # noqa: E501 ) if not speech_key: speech_key = os.environ.get("AZURE_SPEECH_KEY") if not speech_region: speech_region = os.environ.get("AZURE_SPEECH_REGION") if not speech_endpoint: speech_endpoint = os.environ.get("AZURE_SPEECH_ENDPOINT") has_endpoint = bool(speech_endpoint) has_key_and_region = bool(speech_key and speech_region) has_token_and_region = bool(speech_auth_token and speech_region) if not (has_endpoint or has_key_and_region or has_token_and_region): raise ValueError( "Authentication requires one of: speech_endpoint (AZURE_SPEECH_ENDPOINT), " "speech_key & speech_region (AZURE_SPEECH_KEY & AZURE_SPEECH_REGION), " "or speech_auth_token & speech_region." ) if is_given(prosody): prosody.validate() if is_given(style): style.validate() self._session = http_session self._opts = _TTSOptions( sample_rate=sample_rate, subscription_key=speech_key, region=speech_region, speech_endpoint=speech_endpoint, voice=voice, deployment_id=deployment_id, language=language, prosody=prosody, style=style, auth_token=speech_auth_token, ) def update_options( self, *, voice: NotGivenOr[str] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, prosody: NotGivenOr[ProsodyConfig] = NOT_GIVEN, style: NotGivenOr[StyleConfig] = NOT_GIVEN, ) -> None: if is_given(voice): self._opts.voice = voice if is_given(language): self._opts.language = language if is_given(prosody): prosody.validate() self._opts.prosody = prosody if is_given(style): style.validate() self._opts.style = style def _ensure_session(self) -> aiohttp.ClientSession: if not self._session: self._session = utils.http_context.http_session() return self._session def synthesize( self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> tts.ChunkedStream: return ChunkedStream(tts=self, input_text=text, conn_options=conn_options)
Helper class that provides a standard way to create an ABC using inheritance.
Ancestors
- livekit.agents.tts.tts.TTS
- abc.ABC
- EventEmitter
- typing.Generic
Methods
def synthesize(self,
text: str,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.agents.tts.tts.ChunkedStream-
Expand source code
def synthesize( self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> tts.ChunkedStream: return ChunkedStream(tts=self, input_text=text, conn_options=conn_options)
def update_options(self,
*,
voice: NotGivenOr[str] = NOT_GIVEN,
language: NotGivenOr[str] = NOT_GIVEN,
prosody: NotGivenOr[ProsodyConfig] = NOT_GIVEN,
style: NotGivenOr[StyleConfig] = NOT_GIVEN) ‑> None-
Expand source code
def update_options( self, *, voice: NotGivenOr[str] = NOT_GIVEN, language: NotGivenOr[str] = NOT_GIVEN, prosody: NotGivenOr[ProsodyConfig] = NOT_GIVEN, style: NotGivenOr[StyleConfig] = NOT_GIVEN, ) -> None: if is_given(voice): self._opts.voice = voice if is_given(language): self._opts.language = language if is_given(prosody): prosody.validate() self._opts.prosody = prosody if is_given(style): style.validate() self._opts.style = style
Inherited members