Module livekit.plugins.azure

Classes

class STT (*,
speech_key: str | None = None,
speech_region: str | None = None,
speech_host: str | None = None,
speech_auth_token: str | None = None,
sample_rate: int = 16000,
num_channels: int = 1,
segmentation_silence_timeout_ms: int | None = None,
segmentation_max_time_ms: int | None = None,
segmentation_strategy: str | None = None,
languages: list[str] = ['en-US'],
language: str | None = None,
profanity: speechsdk.enums.ProfanityOption | None = None)
Expand source code
class STT(stt.STT):
    def __init__(
        self,
        *,
        speech_key: str | None = None,
        speech_region: str | None = None,
        speech_host: str | None = None,
        speech_auth_token: str | None = None,
        sample_rate: int = 16000,
        num_channels: int = 1,
        segmentation_silence_timeout_ms: int | None = None,
        segmentation_max_time_ms: int | None = None,
        segmentation_strategy: str | None = None,
        # Azure handles multiple languages and can auto-detect the language used. It requires the candidate set to be set.
        languages: list[str] = ["en-US"],
        # for compatibility with other STT plugins
        language: str | None = None,
        profanity: speechsdk.enums.ProfanityOption | None = None,
    ):
        """
        Create a new instance of Azure STT.

        Either ``speech_host`` or ``speech_key`` and ``speech_region`` or
        ``speech_auth_token`` and ``speech_region`` must be set using arguments.
         Alternatively,  set the ``AZURE_SPEECH_HOST``, ``AZURE_SPEECH_KEY``
        and ``AZURE_SPEECH_REGION`` environmental variables, respectively.
        ``speech_auth_token`` must be set using the arguments as it's an ephemeral token.
        """

        super().__init__(
            capabilities=stt.STTCapabilities(streaming=True, interim_results=True)
        )
        speech_host = speech_host or os.environ.get("AZURE_SPEECH_HOST")
        speech_key = speech_key or os.environ.get("AZURE_SPEECH_KEY")
        speech_region = speech_region or os.environ.get("AZURE_SPEECH_REGION")

        if not (
            speech_host
            or (speech_key and speech_region)
            or (speech_auth_token and speech_region)
        ):
            raise ValueError(
                "AZURE_SPEECH_HOST or AZURE_SPEECH_KEY and AZURE_SPEECH_REGION or speech_auth_token and AZURE_SPEECH_REGION must be set"
            )

        if language:
            languages = [language]

        self._config = STTOptions(
            speech_key=speech_key,
            speech_region=speech_region,
            speech_host=speech_host,
            speech_auth_token=speech_auth_token,
            languages=languages,
            sample_rate=sample_rate,
            num_channels=num_channels,
            segmentation_silence_timeout_ms=segmentation_silence_timeout_ms,
            segmentation_max_time_ms=segmentation_max_time_ms,
            segmentation_strategy=segmentation_strategy,
            profanity=profanity,
        )
        self._streams = weakref.WeakSet[SpeechStream]()

    async def _recognize_impl(
        self,
        buffer: utils.AudioBuffer,
        *,
        language: str | None,
        conn_options: APIConnectOptions,
    ) -> stt.SpeechEvent:
        raise NotImplementedError("Azure STT does not support single frame recognition")

    def stream(
        self,
        *,
        languages: list[str] | None = None,
        language: str | None = None,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    ) -> "SpeechStream":
        config = deepcopy(self._config)
        if language and not languages:
            languages = [language]
        if languages:
            config.languages = languages
        stream = SpeechStream(stt=self, opts=config, conn_options=conn_options)
        self._streams.add(stream)
        return stream

    def update_options(
        self, *, language: str | None = None, languages: list[str] | None = None
    ):
        if language and not languages:
            languages = [language]
        if languages is not None:
            self._config.languages = languages
            for stream in self._streams:
                stream.update_options(languages=languages)

Helper class that provides a standard way to create an ABC using inheritance.

Create a new instance of Azure STT.

Either speech_host or speech_key and speech_region or speech_auth_token and speech_region must be set using arguments. Alternatively, set the AZURE_SPEECH_HOST, AZURE_SPEECH_KEY and AZURE_SPEECH_REGION environmental variables, respectively. speech_auth_token must be set using the arguments as it's an ephemeral token.

Ancestors

Methods

def stream(self,
*,
languages: list[str] | None = None,
language: str | None = None,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=5.0, timeout=10.0)) ‑> livekit.plugins.azure.stt.SpeechStream
Expand source code
def stream(
    self,
    *,
    languages: list[str] | None = None,
    language: str | None = None,
    conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
) -> "SpeechStream":
    config = deepcopy(self._config)
    if language and not languages:
        languages = [language]
    if languages:
        config.languages = languages
    stream = SpeechStream(stt=self, opts=config, conn_options=conn_options)
    self._streams.add(stream)
    return stream
def update_options(self, *, language: str | None = None, languages: list[str] | None = None)
Expand source code
def update_options(
    self, *, language: str | None = None, languages: list[str] | None = None
):
    if language and not languages:
        languages = [language]
    if languages is not None:
        self._config.languages = languages
        for stream in self._streams:
            stream.update_options(languages=languages)

Inherited members

class SpeechStream (*,
stt: STT,
opts: STTOptions,
conn_options: APIConnectOptions)
Expand source code
class SpeechStream(stt.SpeechStream):
    def __init__(
        self, *, stt: STT, opts: STTOptions, conn_options: APIConnectOptions
    ) -> None:
        super().__init__(
            stt=stt, conn_options=conn_options, sample_rate=opts.sample_rate
        )
        self._opts = opts
        self._speaking = False

        self._session_stopped_event = asyncio.Event()
        self._session_started_event = asyncio.Event()

        self._loop = asyncio.get_running_loop()
        self._reconnect_event = asyncio.Event()

    def update_options(
        self, *, language: str | None = None, languages: list[str] | None = None
    ):
        if language and not languages:
            languages = [language]
        if languages:
            self._opts.languages = languages
            self._reconnect_event.set()

    async def _run(self) -> None:
        while True:
            self._stream = speechsdk.audio.PushAudioInputStream(
                stream_format=speechsdk.audio.AudioStreamFormat(
                    samples_per_second=self._opts.sample_rate,
                    bits_per_sample=16,
                    channels=self._opts.num_channels,
                )
            )
            self._recognizer = _create_speech_recognizer(
                config=self._opts, stream=self._stream
            )
            self._recognizer.recognizing.connect(self._on_recognizing)
            self._recognizer.recognized.connect(self._on_recognized)
            self._recognizer.speech_start_detected.connect(self._on_speech_start)
            self._recognizer.speech_end_detected.connect(self._on_speech_end)
            self._recognizer.session_started.connect(self._on_session_started)
            self._recognizer.session_stopped.connect(self._on_session_stopped)
            self._recognizer.start_continuous_recognition()

            try:
                await asyncio.wait_for(
                    self._session_started_event.wait(), self._conn_options.timeout
                )

                async def process_input():
                    async for input in self._input_ch:
                        if isinstance(input, rtc.AudioFrame):
                            self._stream.write(input.data.tobytes())

                process_input_task = asyncio.create_task(process_input())
                wait_reconnect_task = asyncio.create_task(self._reconnect_event.wait())

                try:
                    done, _ = await asyncio.wait(
                        [process_input_task, wait_reconnect_task],
                        return_when=asyncio.FIRST_COMPLETED,
                    )
                    for task in done:
                        if task != wait_reconnect_task:
                            task.result()
                    if wait_reconnect_task not in done:
                        break
                    self._reconnect_event.clear()
                finally:
                    await utils.aio.gracefully_cancel(
                        process_input_task, wait_reconnect_task
                    )

                self._stream.close()
                await self._session_stopped_event.wait()
            finally:

                def _cleanup():
                    self._recognizer.stop_continuous_recognition()
                    del self._recognizer

                await asyncio.to_thread(_cleanup)

    def _on_recognized(self, evt: speechsdk.SpeechRecognitionEventArgs):
        detected_lg = speechsdk.AutoDetectSourceLanguageResult(evt.result).language
        text = evt.result.text.strip()
        if not text:
            return

        if not detected_lg and self._opts.languages:
            detected_lg = self._opts.languages[0]

        final_data = stt.SpeechData(
            language=detected_lg, confidence=1.0, text=evt.result.text
        )

        with contextlib.suppress(RuntimeError):
            self._loop.call_soon_threadsafe(
                self._event_ch.send_nowait,
                stt.SpeechEvent(
                    type=stt.SpeechEventType.FINAL_TRANSCRIPT, alternatives=[final_data]
                ),
            )

    def _on_recognizing(self, evt: speechsdk.SpeechRecognitionEventArgs):
        detected_lg = speechsdk.AutoDetectSourceLanguageResult(evt.result).language
        text = evt.result.text.strip()
        if not text:
            return

        if not detected_lg and self._opts.languages:
            detected_lg = self._opts.languages[0]

        interim_data = stt.SpeechData(
            language=detected_lg, confidence=0.0, text=evt.result.text
        )

        with contextlib.suppress(RuntimeError):
            self._loop.call_soon_threadsafe(
                self._event_ch.send_nowait,
                stt.SpeechEvent(
                    type=stt.SpeechEventType.INTERIM_TRANSCRIPT,
                    alternatives=[interim_data],
                ),
            )

    def _on_speech_start(self, evt: speechsdk.SpeechRecognitionEventArgs):
        if self._speaking:
            return

        self._speaking = True

        with contextlib.suppress(RuntimeError):
            self._loop.call_soon_threadsafe(
                self._event_ch.send_nowait,
                stt.SpeechEvent(type=stt.SpeechEventType.START_OF_SPEECH),
            )

    def _on_speech_end(self, evt: speechsdk.SpeechRecognitionEventArgs):
        if not self._speaking:
            return

        self._speaking = False

        with contextlib.suppress(RuntimeError):
            self._loop.call_soon_threadsafe(
                self._event_ch.send_nowait,
                stt.SpeechEvent(type=stt.SpeechEventType.END_OF_SPEECH),
            )

    def _on_session_started(self, evt: speechsdk.SpeechRecognitionEventArgs):
        self._session_started_event.set()

        with contextlib.suppress(RuntimeError):
            self._loop.call_soon_threadsafe(self._session_started_event.set)

    def _on_session_stopped(self, evt: speechsdk.SpeechRecognitionEventArgs):
        with contextlib.suppress(RuntimeError):
            self._loop.call_soon_threadsafe(self._session_stopped_event.set)

Helper class that provides a standard way to create an ABC using inheritance.

Args: sample_rate : int or None, optional The desired sample rate for the audio input. If specified, the audio input will be automatically resampled to match the given sample rate before being processed for Speech-to-Text. If not provided (None), the input will retain its original sample rate.

Ancestors

Methods

def update_options(self, *, language: str | None = None, languages: list[str] | None = None)
Expand source code
def update_options(
    self, *, language: str | None = None, languages: list[str] | None = None
):
    if language and not languages:
        languages = [language]
    if languages:
        self._opts.languages = languages
        self._reconnect_event.set()

Inherited members

class TTS (*,
sample_rate: int = 24000,
voice: str | None = None,
language: str | None = None,
prosody: ProsodyConfig | None = None,
speech_key: str | None = None,
speech_region: str | None = None,
speech_host: str | None = None,
speech_auth_token: str | None = None,
endpoint_id: str | None = None,
style: StyleConfig | None = None,
on_bookmark_reached_event: Callable | None = None,
on_synthesis_canceled_event: Callable | None = None,
on_synthesis_completed_event: Callable | None = None,
on_synthesis_started_event: Callable | None = None,
on_synthesizing_event: Callable | None = None,
on_viseme_event: Callable | None = None,
on_word_boundary_event: Callable | None = None)
Expand source code
class TTS(tts.TTS):
    def __init__(
        self,
        *,
        sample_rate: int = 24000,
        voice: str | None = None,
        language: str | None = None,
        prosody: ProsodyConfig | None = None,
        speech_key: str | None = None,
        speech_region: str | None = None,
        speech_host: str | None = None,
        speech_auth_token: str | None = None,
        endpoint_id: str | None = None,
        style: StyleConfig | None = None,
        on_bookmark_reached_event: Callable | None = None,
        on_synthesis_canceled_event: Callable | None = None,
        on_synthesis_completed_event: Callable | None = None,
        on_synthesis_started_event: Callable | None = None,
        on_synthesizing_event: Callable | None = None,
        on_viseme_event: Callable | None = None,
        on_word_boundary_event: Callable | None = None,
    ) -> None:
        """
        Create a new instance of Azure TTS.

        Either ``speech_host`` or ``speech_key`` and ``speech_region`` or
        ``speech_auth_token`` and ``speech_region`` must be set using arguments.
         Alternatively,  set the ``AZURE_SPEECH_HOST``, ``AZURE_SPEECH_KEY``
        and ``AZURE_SPEECH_REGION`` environmental variables, respectively.
        ``speech_auth_token`` must be set using the arguments as it's an ephemeral token.
        """

        if sample_rate not in SUPPORTED_SAMPLE_RATE:
            raise ValueError(
                f"Unsupported sample rate {sample_rate}. Supported sample rates: {list(SUPPORTED_SAMPLE_RATE.keys())}"
            )

        super().__init__(
            capabilities=tts.TTSCapabilities(
                streaming=False,
            ),
            sample_rate=sample_rate,
            num_channels=1,
        )

        speech_host = speech_host or os.environ.get("AZURE_SPEECH_HOST")
        speech_key = speech_key or os.environ.get("AZURE_SPEECH_KEY")
        speech_region = speech_region or os.environ.get("AZURE_SPEECH_REGION")

        if not (
            speech_host
            or (speech_key and speech_region)
            or (speech_auth_token and speech_region)
        ):
            raise ValueError(
                "AZURE_SPEECH_HOST or AZURE_SPEECH_KEY and AZURE_SPEECH_REGION or speech_auth_token and AZURE_SPEECH_REGION must be set"
            )

        if prosody:
            prosody.validate()

        if style:
            style.validate()

        self._opts = _TTSOptions(
            sample_rate=sample_rate,
            speech_key=speech_key,
            speech_region=speech_region,
            speech_host=speech_host,
            speech_auth_token=speech_auth_token,
            voice=voice,
            endpoint_id=endpoint_id,
            language=language,
            prosody=prosody,
            style=style,
            on_bookmark_reached_event=on_bookmark_reached_event,
            on_synthesis_canceled_event=on_synthesis_canceled_event,
            on_synthesis_completed_event=on_synthesis_completed_event,
            on_synthesis_started_event=on_synthesis_started_event,
            on_synthesizing_event=on_synthesizing_event,
            on_viseme_event=on_viseme_event,
            on_word_boundary_event=on_word_boundary_event,
        )

    def update_options(
        self,
        *,
        voice: str | None = None,
        language: str | None = None,
        prosody: ProsodyConfig | None = None,
        style: StyleConfig | None = None,
        on_bookmark_reached_event: Callable | None = None,
        on_synthesis_canceled_event: Callable | None = None,
        on_synthesis_completed_event: Callable | None = None,
        on_synthesis_started_event: Callable | None = None,
        on_synthesizing_event: Callable | None = None,
        on_viseme_event: Callable | None = None,
        on_word_boundary_event: Callable | None = None,
    ) -> None:
        self._opts.voice = voice or self._opts.voice
        self._opts.language = language or self._opts.language
        self._opts.prosody = prosody or self._opts.prosody
        self._opts.style = style or self._opts.style

        self._opts.on_bookmark_reached_event = (
            on_bookmark_reached_event or self._opts.on_bookmark_reached_event
        )
        self._opts.on_synthesis_canceled_event = (
            on_synthesis_canceled_event or self._opts.on_synthesis_canceled_event
        )
        self._opts.on_synthesis_completed_event = (
            on_synthesis_completed_event or self._opts.on_synthesis_completed_event
        )
        self._opts.on_synthesis_started_event = (
            on_synthesis_started_event or self._opts.on_synthesis_started_event
        )
        self._opts.on_synthesizing_event = (
            on_synthesizing_event or self._opts.on_synthesizing_event
        )
        self._opts.on_viseme_event = on_viseme_event or self._opts.on_viseme_event
        self._opts.on_word_boundary_event = (
            on_word_boundary_event or self._opts.on_word_boundary_event
        )

    def synthesize(
        self,
        text: str,
        *,
        conn_options: Optional[APIConnectOptions] = None,
    ) -> "ChunkedStream":
        return ChunkedStream(
            tts=self, input_text=text, conn_options=conn_options, opts=self._opts
        )

Helper class that provides a standard way to create an ABC using inheritance.

Create a new instance of Azure TTS.

Either speech_host or speech_key and speech_region or speech_auth_token and speech_region must be set using arguments. Alternatively, set the AZURE_SPEECH_HOST, AZURE_SPEECH_KEY and AZURE_SPEECH_REGION environmental variables, respectively. speech_auth_token must be set using the arguments as it's an ephemeral token.

Ancestors

Methods

def synthesize(self, text: str, *, conn_options: Optional[APIConnectOptions] = None) ‑> livekit.plugins.azure.tts.ChunkedStream
Expand source code
def synthesize(
    self,
    text: str,
    *,
    conn_options: Optional[APIConnectOptions] = None,
) -> "ChunkedStream":
    return ChunkedStream(
        tts=self, input_text=text, conn_options=conn_options, opts=self._opts
    )
def update_options(self,
*,
voice: str | None = None,
language: str | None = None,
prosody: ProsodyConfig | None = None,
style: StyleConfig | None = None,
on_bookmark_reached_event: Callable | None = None,
on_synthesis_canceled_event: Callable | None = None,
on_synthesis_completed_event: Callable | None = None,
on_synthesis_started_event: Callable | None = None,
on_synthesizing_event: Callable | None = None,
on_viseme_event: Callable | None = None,
on_word_boundary_event: Callable | None = None) ‑> None
Expand source code
def update_options(
    self,
    *,
    voice: str | None = None,
    language: str | None = None,
    prosody: ProsodyConfig | None = None,
    style: StyleConfig | None = None,
    on_bookmark_reached_event: Callable | None = None,
    on_synthesis_canceled_event: Callable | None = None,
    on_synthesis_completed_event: Callable | None = None,
    on_synthesis_started_event: Callable | None = None,
    on_synthesizing_event: Callable | None = None,
    on_viseme_event: Callable | None = None,
    on_word_boundary_event: Callable | None = None,
) -> None:
    self._opts.voice = voice or self._opts.voice
    self._opts.language = language or self._opts.language
    self._opts.prosody = prosody or self._opts.prosody
    self._opts.style = style or self._opts.style

    self._opts.on_bookmark_reached_event = (
        on_bookmark_reached_event or self._opts.on_bookmark_reached_event
    )
    self._opts.on_synthesis_canceled_event = (
        on_synthesis_canceled_event or self._opts.on_synthesis_canceled_event
    )
    self._opts.on_synthesis_completed_event = (
        on_synthesis_completed_event or self._opts.on_synthesis_completed_event
    )
    self._opts.on_synthesis_started_event = (
        on_synthesis_started_event or self._opts.on_synthesis_started_event
    )
    self._opts.on_synthesizing_event = (
        on_synthesizing_event or self._opts.on_synthesizing_event
    )
    self._opts.on_viseme_event = on_viseme_event or self._opts.on_viseme_event
    self._opts.on_word_boundary_event = (
        on_word_boundary_event or self._opts.on_word_boundary_event
    )

Inherited members