Module livekit.plugins.azure

Classes

class STT (*,
speech_key: NotGivenOr[str] = NOT_GIVEN,
speech_region: NotGivenOr[str] = NOT_GIVEN,
speech_host: NotGivenOr[str] = NOT_GIVEN,
speech_auth_token: NotGivenOr[str] = NOT_GIVEN,
sample_rate: int = 16000,
num_channels: int = 1,
segmentation_silence_timeout_ms: NotGivenOr[int] = NOT_GIVEN,
segmentation_max_time_ms: NotGivenOr[int] = NOT_GIVEN,
segmentation_strategy: NotGivenOr[str] = NOT_GIVEN,
language: NotGivenOr[str | list[str] | None] = NOT_GIVEN,
profanity: NotGivenOr[speechsdk.enums.ProfanityOption] = NOT_GIVEN)
Expand source code
class STT(stt.STT):
    def __init__(
        self,
        *,
        speech_key: NotGivenOr[str] = NOT_GIVEN,
        speech_region: NotGivenOr[str] = NOT_GIVEN,
        speech_host: NotGivenOr[str] = NOT_GIVEN,
        speech_auth_token: NotGivenOr[str] = NOT_GIVEN,
        sample_rate: int = 16000,
        num_channels: int = 1,
        segmentation_silence_timeout_ms: NotGivenOr[int] = NOT_GIVEN,
        segmentation_max_time_ms: NotGivenOr[int] = NOT_GIVEN,
        segmentation_strategy: NotGivenOr[str] = NOT_GIVEN,
        # Azure handles multiple languages and can auto-detect the language used. It requires the candidate set to be set.  # noqa: E501
        language: NotGivenOr[str | list[str] | None] = NOT_GIVEN,
        profanity: NotGivenOr[speechsdk.enums.ProfanityOption] = NOT_GIVEN,
    ):
        """
        Create a new instance of Azure STT.

        Either ``speech_host`` or ``speech_key`` and ``speech_region`` or
        ``speech_auth_token`` and ``speech_region`` must be set using arguments.
         Alternatively,  set the ``AZURE_SPEECH_HOST``, ``AZURE_SPEECH_KEY``
        and ``AZURE_SPEECH_REGION`` environmental variables, respectively.
        ``speech_auth_token`` must be set using the arguments as it's an ephemeral token.
        """

        super().__init__(capabilities=stt.STTCapabilities(streaming=True, interim_results=True))
        if not language or not is_given(language):
            language = ["en-US"]

        if isinstance(language, str):
            language = [language]

        if not is_given(speech_host):
            speech_host = os.environ.get("AZURE_SPEECH_HOST")

        if not is_given(speech_key):
            speech_key = os.environ.get("AZURE_SPEECH_KEY")

        if not is_given(speech_region):
            speech_region = os.environ.get("AZURE_SPEECH_REGION")

        if not (
            is_given(speech_host)
            or (is_given(speech_key) and is_given(speech_region))
            or (is_given(speech_auth_token) and is_given(speech_region))
        ):
            raise ValueError(
                "AZURE_SPEECH_HOST or AZURE_SPEECH_KEY and AZURE_SPEECH_REGION or speech_auth_token and AZURE_SPEECH_REGION must be set"  # noqa: E501
            )

        self._config = STTOptions(
            speech_key=speech_key,
            speech_region=speech_region,
            speech_host=speech_host,
            speech_auth_token=speech_auth_token,
            language=language,
            sample_rate=sample_rate,
            num_channels=num_channels,
            segmentation_silence_timeout_ms=segmentation_silence_timeout_ms,
            segmentation_max_time_ms=segmentation_max_time_ms,
            segmentation_strategy=segmentation_strategy,
            profanity=profanity,
        )
        self._streams = weakref.WeakSet[SpeechStream]()

    async def _recognize_impl(
        self,
        buffer: utils.AudioBuffer,
        *,
        language: NotGivenOr[str] = NOT_GIVEN,
        conn_options: APIConnectOptions,
    ) -> stt.SpeechEvent:
        raise NotImplementedError("Azure STT does not support single frame recognition")

    def stream(
        self,
        *,
        language: NotGivenOr[str] = NOT_GIVEN,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    ) -> SpeechStream:
        config = deepcopy(self._config)
        if is_given(language):
            config.language = [language]
        stream = SpeechStream(stt=self, opts=config, conn_options=conn_options)
        self._streams.add(stream)
        return stream

    def update_options(self, *, language: NotGivenOr[list[str] | str] = NOT_GIVEN):
        if is_given(language):
            if isinstance(language, str):
                language = [language]
            self._config.language = language
            for stream in self._streams:
                stream.update_options(language=language)

Helper class that provides a standard way to create an ABC using inheritance.

Create a new instance of Azure STT.

Either speech_host or speech_key and speech_region or speech_auth_token and speech_region must be set using arguments. Alternatively, set the AZURE_SPEECH_HOST, AZURE_SPEECH_KEY and AZURE_SPEECH_REGION environmental variables, respectively. speech_auth_token must be set using the arguments as it's an ephemeral token.

Ancestors

  • livekit.agents.stt.stt.STT
  • abc.ABC
  • EventEmitter
  • typing.Generic

Methods

def stream(self,
*,
language: NotGivenOr[str] = NOT_GIVEN,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.azure.stt.SpeechStream
Expand source code
def stream(
    self,
    *,
    language: NotGivenOr[str] = NOT_GIVEN,
    conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
) -> SpeechStream:
    config = deepcopy(self._config)
    if is_given(language):
        config.language = [language]
    stream = SpeechStream(stt=self, opts=config, conn_options=conn_options)
    self._streams.add(stream)
    return stream
def update_options(self, *, language: NotGivenOr[list[str] | str] = NOT_GIVEN)
Expand source code
def update_options(self, *, language: NotGivenOr[list[str] | str] = NOT_GIVEN):
    if is_given(language):
        if isinstance(language, str):
            language = [language]
        self._config.language = language
        for stream in self._streams:
            stream.update_options(language=language)

Inherited members

class SpeechStream (*,
stt: STT,
opts: STTOptions,
conn_options: APIConnectOptions)
Expand source code
class SpeechStream(stt.SpeechStream):
    def __init__(self, *, stt: STT, opts: STTOptions, conn_options: APIConnectOptions) -> None:
        super().__init__(stt=stt, conn_options=conn_options, sample_rate=opts.sample_rate)
        self._opts = opts
        self._speaking = False

        self._session_stopped_event = asyncio.Event()
        self._session_started_event = asyncio.Event()

        self._loop = asyncio.get_running_loop()
        self._reconnect_event = asyncio.Event()

    def update_options(self, *, language: list[str]):
        self._opts.language = language
        self._reconnect_event.set()

    async def _run(self) -> None:
        while True:
            self._stream = speechsdk.audio.PushAudioInputStream(
                stream_format=speechsdk.audio.AudioStreamFormat(
                    samples_per_second=self._opts.sample_rate,
                    bits_per_sample=16,
                    channels=self._opts.num_channels,
                )
            )
            self._recognizer = _create_speech_recognizer(config=self._opts, stream=self._stream)
            self._recognizer.recognizing.connect(self._on_recognizing)
            self._recognizer.recognized.connect(self._on_recognized)
            self._recognizer.speech_start_detected.connect(self._on_speech_start)
            self._recognizer.speech_end_detected.connect(self._on_speech_end)
            self._recognizer.session_started.connect(self._on_session_started)
            self._recognizer.session_stopped.connect(self._on_session_stopped)
            self._recognizer.start_continuous_recognition()

            try:
                await asyncio.wait_for(
                    self._session_started_event.wait(), self._conn_options.timeout
                )

                async def process_input():
                    async for input in self._input_ch:
                        if isinstance(input, rtc.AudioFrame):
                            self._stream.write(input.data.tobytes())

                process_input_task = asyncio.create_task(process_input())
                wait_reconnect_task = asyncio.create_task(self._reconnect_event.wait())

                try:
                    done, _ = await asyncio.wait(
                        [process_input_task, wait_reconnect_task],
                        return_when=asyncio.FIRST_COMPLETED,
                    )
                    for task in done:
                        if task != wait_reconnect_task:
                            task.result()
                    if wait_reconnect_task not in done:
                        break
                    self._reconnect_event.clear()
                finally:
                    await utils.aio.gracefully_cancel(process_input_task, wait_reconnect_task)

                self._stream.close()
                await self._session_stopped_event.wait()
            finally:

                def _cleanup():
                    self._recognizer.stop_continuous_recognition()
                    del self._recognizer

                await asyncio.to_thread(_cleanup)

    def _on_recognized(self, evt: speechsdk.SpeechRecognitionEventArgs):
        detected_lg = speechsdk.AutoDetectSourceLanguageResult(evt.result).language
        text = evt.result.text.strip()
        if not text:
            return

        if not detected_lg and self._opts.language:
            detected_lg = self._opts.language[0]

        final_data = stt.SpeechData(language=detected_lg, confidence=1.0, text=evt.result.text)

        with contextlib.suppress(RuntimeError):
            self._loop.call_soon_threadsafe(
                self._event_ch.send_nowait,
                stt.SpeechEvent(
                    type=stt.SpeechEventType.FINAL_TRANSCRIPT, alternatives=[final_data]
                ),
            )

    def _on_recognizing(self, evt: speechsdk.SpeechRecognitionEventArgs):
        detected_lg = speechsdk.AutoDetectSourceLanguageResult(evt.result).language
        text = evt.result.text.strip()
        if not text:
            return

        if not detected_lg and self._opts.language:
            detected_lg = self._opts.language[0]

        interim_data = stt.SpeechData(language=detected_lg, confidence=0.0, text=evt.result.text)

        with contextlib.suppress(RuntimeError):
            self._loop.call_soon_threadsafe(
                self._event_ch.send_nowait,
                stt.SpeechEvent(
                    type=stt.SpeechEventType.INTERIM_TRANSCRIPT,
                    alternatives=[interim_data],
                ),
            )

    def _on_speech_start(self, evt: speechsdk.SpeechRecognitionEventArgs):
        if self._speaking:
            return

        self._speaking = True

        with contextlib.suppress(RuntimeError):
            self._loop.call_soon_threadsafe(
                self._event_ch.send_nowait,
                stt.SpeechEvent(type=stt.SpeechEventType.START_OF_SPEECH),
            )

    def _on_speech_end(self, evt: speechsdk.SpeechRecognitionEventArgs):
        if not self._speaking:
            return

        self._speaking = False

        with contextlib.suppress(RuntimeError):
            self._loop.call_soon_threadsafe(
                self._event_ch.send_nowait,
                stt.SpeechEvent(type=stt.SpeechEventType.END_OF_SPEECH),
            )

    def _on_session_started(self, evt: speechsdk.SpeechRecognitionEventArgs):
        self._session_started_event.set()

        with contextlib.suppress(RuntimeError):
            self._loop.call_soon_threadsafe(self._session_started_event.set)

    def _on_session_stopped(self, evt: speechsdk.SpeechRecognitionEventArgs):
        with contextlib.suppress(RuntimeError):
            self._loop.call_soon_threadsafe(self._session_stopped_event.set)

Helper class that provides a standard way to create an ABC using inheritance.

Args: sample_rate : int or None, optional The desired sample rate for the audio input. If specified, the audio input will be automatically resampled to match the given sample rate before being processed for Speech-to-Text. If not provided (None), the input will retain its original sample rate.

Ancestors

  • livekit.agents.stt.stt.RecognizeStream
  • abc.ABC

Methods

def update_options(self, *, language: list[str])
Expand source code
def update_options(self, *, language: list[str]):
    self._opts.language = language
    self._reconnect_event.set()
class TTS (*,
sample_rate: int = 24000,
voice: NotGivenOr[str] = NOT_GIVEN,
language: NotGivenOr[str] = NOT_GIVEN,
prosody: NotGivenOr[ProsodyConfig] = NOT_GIVEN,
speech_key: NotGivenOr[str] = NOT_GIVEN,
speech_region: NotGivenOr[str] = NOT_GIVEN,
speech_host: NotGivenOr[str] = NOT_GIVEN,
speech_auth_token: NotGivenOr[str] = NOT_GIVEN,
endpoint_id: NotGivenOr[str] = NOT_GIVEN,
style: NotGivenOr[StyleConfig] = NOT_GIVEN,
on_bookmark_reached_event: NotGivenOr[Callable] = NOT_GIVEN,
on_synthesis_canceled_event: NotGivenOr[Callable] = NOT_GIVEN,
on_synthesis_completed_event: NotGivenOr[Callable] = NOT_GIVEN,
on_synthesis_started_event: NotGivenOr[Callable] = NOT_GIVEN,
on_synthesizing_event: NotGivenOr[Callable] = NOT_GIVEN,
on_viseme_event: NotGivenOr[Callable] = NOT_GIVEN,
on_word_boundary_event: NotGivenOr[Callable] = NOT_GIVEN)
Expand source code
class TTS(tts.TTS):
    def __init__(
        self,
        *,
        sample_rate: int = 24000,
        voice: NotGivenOr[str] = NOT_GIVEN,
        language: NotGivenOr[str] = NOT_GIVEN,
        prosody: NotGivenOr[ProsodyConfig] = NOT_GIVEN,
        speech_key: NotGivenOr[str] = NOT_GIVEN,
        speech_region: NotGivenOr[str] = NOT_GIVEN,
        speech_host: NotGivenOr[str] = NOT_GIVEN,
        speech_auth_token: NotGivenOr[str] = NOT_GIVEN,
        endpoint_id: NotGivenOr[str] = NOT_GIVEN,
        style: NotGivenOr[StyleConfig] = NOT_GIVEN,
        on_bookmark_reached_event: NotGivenOr[Callable] = NOT_GIVEN,
        on_synthesis_canceled_event: NotGivenOr[Callable] = NOT_GIVEN,
        on_synthesis_completed_event: NotGivenOr[Callable] = NOT_GIVEN,
        on_synthesis_started_event: NotGivenOr[Callable] = NOT_GIVEN,
        on_synthesizing_event: NotGivenOr[Callable] = NOT_GIVEN,
        on_viseme_event: NotGivenOr[Callable] = NOT_GIVEN,
        on_word_boundary_event: NotGivenOr[Callable] = NOT_GIVEN,
    ) -> None:
        """
        Create a new instance of Azure TTS.

        Either ``speech_host`` or ``speech_key`` and ``speech_region`` or
        ``speech_auth_token`` and ``speech_region`` must be set using arguments.
         Alternatively,  set the ``AZURE_SPEECH_HOST``, ``AZURE_SPEECH_KEY``
        and ``AZURE_SPEECH_REGION`` environmental variables, respectively.
        ``speech_auth_token`` must be set using the arguments as it's an ephemeral token.
        """

        if sample_rate not in SUPPORTED_SAMPLE_RATE:
            raise ValueError(
                f"Unsupported sample rate {sample_rate}. Supported sample rates: {list(SUPPORTED_SAMPLE_RATE.keys())}"  # noqa: E501
            )

        super().__init__(
            capabilities=tts.TTSCapabilities(
                streaming=False,
            ),
            sample_rate=sample_rate,
            num_channels=1,
        )

        if not is_given(speech_host):
            speech_host = os.environ.get("AZURE_SPEECH_HOST")

        if not is_given(speech_key):
            speech_key = os.environ.get("AZURE_SPEECH_KEY")

        if not is_given(speech_region):
            speech_region = os.environ.get("AZURE_SPEECH_REGION")

        if not (
            is_given(speech_host)
            or (is_given(speech_key) and is_given(speech_region))
            or (is_given(speech_auth_token) and is_given(speech_region))
        ):
            raise ValueError(
                "AZURE_SPEECH_HOST or AZURE_SPEECH_KEY and AZURE_SPEECH_REGION or speech_auth_token and AZURE_SPEECH_REGION must be set"  # noqa: E501
            )

        if is_given(prosody):
            prosody.validate()

        if is_given(style):
            style.validate()

        self._opts = _TTSOptions(
            sample_rate=sample_rate,
            speech_key=speech_key,
            speech_region=speech_region,
            speech_host=speech_host,
            speech_auth_token=speech_auth_token,
            voice=voice,
            endpoint_id=endpoint_id,
            language=language,
            prosody=prosody,
            style=style,
            on_bookmark_reached_event=on_bookmark_reached_event,
            on_synthesis_canceled_event=on_synthesis_canceled_event,
            on_synthesis_completed_event=on_synthesis_completed_event,
            on_synthesis_started_event=on_synthesis_started_event,
            on_synthesizing_event=on_synthesizing_event,
            on_viseme_event=on_viseme_event,
            on_word_boundary_event=on_word_boundary_event,
        )

    def update_options(
        self,
        *,
        voice: NotGivenOr[str] = NOT_GIVEN,
        language: NotGivenOr[str] = NOT_GIVEN,
        prosody: NotGivenOr[ProsodyConfig] = NOT_GIVEN,
        style: NotGivenOr[StyleConfig] = NOT_GIVEN,
        on_bookmark_reached_event: NotGivenOr[Callable] = NOT_GIVEN,
        on_synthesis_canceled_event: NotGivenOr[Callable] = NOT_GIVEN,
        on_synthesis_completed_event: NotGivenOr[Callable] = NOT_GIVEN,
        on_synthesis_started_event: NotGivenOr[Callable] = NOT_GIVEN,
        on_synthesizing_event: NotGivenOr[Callable] = NOT_GIVEN,
        on_viseme_event: NotGivenOr[Callable] = NOT_GIVEN,
        on_word_boundary_event: NotGivenOr[Callable] = NOT_GIVEN,
    ) -> None:
        if is_given(voice):
            self._opts.voice = voice
        if is_given(language):
            self._opts.language = language
        if is_given(prosody):
            self._opts.prosody = prosody
        if is_given(style):
            self._opts.style = style

        if is_given(on_bookmark_reached_event):
            self._opts.on_bookmark_reached_event = on_bookmark_reached_event
        if is_given(on_synthesis_canceled_event):
            self._opts.on_synthesis_canceled_event = on_synthesis_canceled_event
        if is_given(on_synthesis_completed_event):
            self._opts.on_synthesis_completed_event = on_synthesis_completed_event
        if is_given(on_synthesis_started_event):
            self._opts.on_synthesis_started_event = on_synthesis_started_event
        if is_given(on_synthesizing_event):
            self._opts.on_synthesizing_event = on_synthesizing_event
        if is_given(on_viseme_event):
            self._opts.on_viseme_event = on_viseme_event
        if is_given(on_word_boundary_event):
            self._opts.on_word_boundary_event = on_word_boundary_event

    def synthesize(
        self,
        text: str,
        *,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    ) -> ChunkedStream:
        return ChunkedStream(tts=self, input_text=text, conn_options=conn_options, opts=self._opts)

Helper class that provides a standard way to create an ABC using inheritance.

Create a new instance of Azure TTS.

Either speech_host or speech_key and speech_region or speech_auth_token and speech_region must be set using arguments. Alternatively, set the AZURE_SPEECH_HOST, AZURE_SPEECH_KEY and AZURE_SPEECH_REGION environmental variables, respectively. speech_auth_token must be set using the arguments as it's an ephemeral token.

Ancestors

  • livekit.agents.tts.tts.TTS
  • abc.ABC
  • EventEmitter
  • typing.Generic

Methods

def synthesize(self,
text: str,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.azure.tts.ChunkedStream
Expand source code
def synthesize(
    self,
    text: str,
    *,
    conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
) -> ChunkedStream:
    return ChunkedStream(tts=self, input_text=text, conn_options=conn_options, opts=self._opts)
def update_options(self,
*,
voice: NotGivenOr[str] = NOT_GIVEN,
language: NotGivenOr[str] = NOT_GIVEN,
prosody: NotGivenOr[ProsodyConfig] = NOT_GIVEN,
style: NotGivenOr[StyleConfig] = NOT_GIVEN,
on_bookmark_reached_event: NotGivenOr[Callable] = NOT_GIVEN,
on_synthesis_canceled_event: NotGivenOr[Callable] = NOT_GIVEN,
on_synthesis_completed_event: NotGivenOr[Callable] = NOT_GIVEN,
on_synthesis_started_event: NotGivenOr[Callable] = NOT_GIVEN,
on_synthesizing_event: NotGivenOr[Callable] = NOT_GIVEN,
on_viseme_event: NotGivenOr[Callable] = NOT_GIVEN,
on_word_boundary_event: NotGivenOr[Callable] = NOT_GIVEN) ‑> None
Expand source code
def update_options(
    self,
    *,
    voice: NotGivenOr[str] = NOT_GIVEN,
    language: NotGivenOr[str] = NOT_GIVEN,
    prosody: NotGivenOr[ProsodyConfig] = NOT_GIVEN,
    style: NotGivenOr[StyleConfig] = NOT_GIVEN,
    on_bookmark_reached_event: NotGivenOr[Callable] = NOT_GIVEN,
    on_synthesis_canceled_event: NotGivenOr[Callable] = NOT_GIVEN,
    on_synthesis_completed_event: NotGivenOr[Callable] = NOT_GIVEN,
    on_synthesis_started_event: NotGivenOr[Callable] = NOT_GIVEN,
    on_synthesizing_event: NotGivenOr[Callable] = NOT_GIVEN,
    on_viseme_event: NotGivenOr[Callable] = NOT_GIVEN,
    on_word_boundary_event: NotGivenOr[Callable] = NOT_GIVEN,
) -> None:
    if is_given(voice):
        self._opts.voice = voice
    if is_given(language):
        self._opts.language = language
    if is_given(prosody):
        self._opts.prosody = prosody
    if is_given(style):
        self._opts.style = style

    if is_given(on_bookmark_reached_event):
        self._opts.on_bookmark_reached_event = on_bookmark_reached_event
    if is_given(on_synthesis_canceled_event):
        self._opts.on_synthesis_canceled_event = on_synthesis_canceled_event
    if is_given(on_synthesis_completed_event):
        self._opts.on_synthesis_completed_event = on_synthesis_completed_event
    if is_given(on_synthesis_started_event):
        self._opts.on_synthesis_started_event = on_synthesis_started_event
    if is_given(on_synthesizing_event):
        self._opts.on_synthesizing_event = on_synthesizing_event
    if is_given(on_viseme_event):
        self._opts.on_viseme_event = on_viseme_event
    if is_given(on_word_boundary_event):
        self._opts.on_word_boundary_event = on_word_boundary_event

Inherited members