Module livekit.agents.tts

Sub-modules

livekit.agents.tts.fallback_adapter
livekit.agents.tts.stream_adapter
livekit.agents.tts.tts

Classes

class AvailabilityChangedEvent (tts: TTS,
available: bool)
Expand source code
@dataclass
class AvailabilityChangedEvent:
    tts: TTS
    available: bool

AvailabilityChangedEvent(tts: 'TTS', available: 'bool')

Instance variables

var available : bool
var ttsTTS
class ChunkedStream (*,
tts: TTS,
input_text: str,
conn_options: Optional[APIConnectOptions] = None)
Expand source code
class ChunkedStream(ABC):
    """Used by the non-streamed synthesize API, some providers support chunked http responses"""

    def __init__(
        self,
        *,
        tts: TTS,
        input_text: str,
        conn_options: Optional[APIConnectOptions] = None,
    ) -> None:
        self._input_text = input_text
        self._tts = tts
        self._conn_options = conn_options or DEFAULT_API_CONNECT_OPTIONS
        self._event_ch = aio.Chan[SynthesizedAudio]()

        self._event_aiter, monitor_aiter = aio.itertools.tee(self._event_ch, 2)
        self._metrics_task = asyncio.create_task(
            self._metrics_monitor_task(monitor_aiter), name="TTS._metrics_task"
        )
        self._synthesize_task = asyncio.create_task(
            self._main_task(), name="TTS._synthesize_task"
        )
        self._synthesize_task.add_done_callback(lambda _: self._event_ch.close())

    @property
    def input_text(self) -> str:
        return self._input_text

    @property
    def done(self) -> bool:
        return self._synthesize_task.done()

    @property
    def exception(self) -> BaseException | None:
        return self._synthesize_task.exception()

    async def _metrics_monitor_task(
        self, event_aiter: AsyncIterable[SynthesizedAudio]
    ) -> None:
        """Task used to collect metrics"""

        start_time = time.perf_counter()
        audio_duration = 0.0
        ttfb = -1.0
        request_id = ""

        async for ev in event_aiter:
            request_id = ev.request_id
            if ttfb == -1.0:
                ttfb = time.perf_counter() - start_time

            audio_duration += ev.frame.duration

        duration = time.perf_counter() - start_time
        metrics = TTSMetrics(
            timestamp=time.time(),
            request_id=request_id,
            ttfb=ttfb,
            duration=duration,
            characters_count=len(self._input_text),
            audio_duration=audio_duration,
            cancelled=self._synthesize_task.cancelled(),
            label=self._tts._label,
            streamed=False,
            error=None,
        )
        self._tts.emit("metrics_collected", metrics)

    async def collect(self) -> rtc.AudioFrame:
        """Utility method to collect every frame in a single call"""
        frames = []
        async for ev in self:
            frames.append(ev.frame)

        return rtc.combine_audio_frames(frames)

    @abstractmethod
    async def _run(self) -> None: ...

    async def _main_task(self) -> None:
        for i in range(self._conn_options.max_retry + 1):
            try:
                return await self._run()
            except APIError as e:
                retry_interval = self._conn_options._interval_for_retry(i)
                if self._conn_options.max_retry == 0:
                    raise
                elif i == self._conn_options.max_retry:
                    raise APIConnectionError(
                        f"failed to synthesize speech after {self._conn_options.max_retry + 1} attempts",
                    ) from e
                else:
                    logger.warning(
                        f"failed to synthesize speech, retrying in {retry_interval}s",
                        exc_info=e,
                        extra={
                            "tts": self._tts._label,
                            "attempt": i + 1,
                            "streamed": False,
                        },
                    )

                await asyncio.sleep(retry_interval)

    async def aclose(self) -> None:
        """Close is automatically called if the stream is completely collected"""
        await aio.gracefully_cancel(self._synthesize_task)
        self._event_ch.close()
        await self._metrics_task

    async def __anext__(self) -> SynthesizedAudio:
        try:
            val = await self._event_aiter.__anext__()
        except StopAsyncIteration:
            if not self._synthesize_task.cancelled() and (
                exc := self._synthesize_task.exception()
            ):
                raise exc from None

            raise StopAsyncIteration

        return val

    def __aiter__(self) -> AsyncIterator[SynthesizedAudio]:
        return self

    async def __aenter__(self) -> ChunkedStream:
        return self

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> None:
        await self.aclose()

Used by the non-streamed synthesize API, some providers support chunked http responses

Ancestors

  • abc.ABC

Subclasses

  • FallbackChunkedStream
  • ChunkedStream
  • livekit.plugins.azure.tts.ChunkedStream
  • livekit.plugins.cartesia.tts.ChunkedStream
  • livekit.plugins.deepgram.tts.ChunkedStream
  • livekit.plugins.elevenlabs.tts.ChunkedStream
  • livekit.plugins.google.tts.ChunkedStream
  • ChunkedStream
  • livekit.plugins.neuphonic.tts.ChunkedStream
  • livekit.plugins.openai.tts.ChunkedStream
  • livekit.plugins.playai.tts.ChunkedStream
  • livekit.plugins.resemble.tts.ChunkedStream
  • livekit.plugins.rime.tts.ChunkedStream

Instance variables

prop done : bool
Expand source code
@property
def done(self) -> bool:
    return self._synthesize_task.done()
prop exception : BaseException | None
Expand source code
@property
def exception(self) -> BaseException | None:
    return self._synthesize_task.exception()
prop input_text : str
Expand source code
@property
def input_text(self) -> str:
    return self._input_text

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    """Close is automatically called if the stream is completely collected"""
    await aio.gracefully_cancel(self._synthesize_task)
    self._event_ch.close()
    await self._metrics_task

Close is automatically called if the stream is completely collected

async def collect(self) ‑> AudioFrame
Expand source code
async def collect(self) -> rtc.AudioFrame:
    """Utility method to collect every frame in a single call"""
    frames = []
    async for ev in self:
        frames.append(ev.frame)

    return rtc.combine_audio_frames(frames)

Utility method to collect every frame in a single call

class FallbackAdapter (tts: list[TTS],
*,
attempt_timeout: float = 10.0,
max_retry_per_tts: int = 1,
retry_interval: float = 5,
no_fallback_after_audio_duration: float | None = 3.0,
sample_rate: int | None = None)
Expand source code
class FallbackAdapter(
    TTS[Literal["tts_availability_changed"]],
):
    """
    Manages multiple TTS instances, providing a fallback mechanism to ensure continuous TTS service.
    """

    def __init__(
        self,
        tts: list[TTS],
        *,
        attempt_timeout: float = 10.0,
        max_retry_per_tts: int = 1,  # only retry once by default
        retry_interval: float = 5,
        no_fallback_after_audio_duration: float | None = 3.0,
        sample_rate: int | None = None,
    ) -> None:
        """
        Initialize a FallbackAdapter that manages multiple TTS instances.

        Args:
            tts (list[TTS]): A list of TTS instances to use for fallback.
            attempt_timeout (float, optional): Timeout for each synthesis attempt in seconds. Defaults to 10.0.
            max_retry_per_tts (int, optional): Maximum number of retries per TTS instance. Defaults to 1.
            no_fallback_after_audio_duration (float | None, optional): Disables fallback after this duration of audio is synthesized. Defaults to 3.0.
                This is used to prevent unnaturally resaying the same text when the first TTS
                instance fails.
            sample_rate (int | None, optional): Desired sample rate for the synthesized audio. If None, uses the maximum sample rate among the TTS instances.

        Raises:
            ValueError: If less than one TTS instance is provided.
            ValueError: If TTS instances have different numbers of channels.
        """

        if len(tts) < 1:
            raise ValueError("at least one TTS instance must be provided.")

        if len(set(t.num_channels for t in tts)) != 1:
            raise ValueError("all TTS must have the same number of channels")

        if sample_rate is None:
            sample_rate = max(t.sample_rate for t in tts)

        num_channels = tts[0].num_channels

        super().__init__(
            capabilities=TTSCapabilities(
                streaming=all(t.capabilities.streaming for t in tts),
            ),
            sample_rate=sample_rate,
            num_channels=num_channels,
        )

        self._tts_instances = tts
        self._attempt_timeout = attempt_timeout
        self._max_retry_per_tts = max_retry_per_tts
        self._retry_interval = retry_interval
        self._no_fallback_after_audio_duration = no_fallback_after_audio_duration

        self._status: list[_TTSStatus] = []
        for t in tts:
            resampler = None
            if sample_rate != t.sample_rate:
                logger.info(
                    f"resampling {t.label} from {t.sample_rate}Hz to {sample_rate}Hz"
                )
                resampler = rtc.AudioResampler(
                    input_rate=t.sample_rate, output_rate=sample_rate
                )

            self._status.append(
                _TTSStatus(available=True, recovering_task=None, resampler=resampler)
            )

    def synthesize(
        self,
        text: str,
        *,
        conn_options: Optional[APIConnectOptions] = None,
    ) -> "FallbackChunkedStream":
        return FallbackChunkedStream(
            tts=self,
            input_text=text,
            conn_options=conn_options or DEFAULT_FALLBACK_API_CONNECT_OPTIONS,
        )

    def stream(
        self,
        *,
        conn_options: Optional[APIConnectOptions] = None,
    ) -> "FallbackSynthesizeStream":
        return FallbackSynthesizeStream(
            tts=self,
            conn_options=conn_options or DEFAULT_FALLBACK_API_CONNECT_OPTIONS,
        )

    def prewarm(self) -> None:
        if self._tts_instances:
            self._tts_instances[0].prewarm()

    async def aclose(self) -> None:
        for tts_status in self._status:
            if tts_status.recovering_task is not None:
                await aio.gracefully_cancel(tts_status.recovering_task)

Manages multiple TTS instances, providing a fallback mechanism to ensure continuous TTS service.

Initialize a FallbackAdapter that manages multiple TTS instances.

Args

tts : list[TTS]
A list of TTS instances to use for fallback.
attempt_timeout : float, optional
Timeout for each synthesis attempt in seconds. Defaults to 10.0.
max_retry_per_tts : int, optional
Maximum number of retries per TTS instance. Defaults to 1.
no_fallback_after_audio_duration : float | None, optional
Disables fallback after this duration of audio is synthesized. Defaults to 3.0. This is used to prevent unnaturally resaying the same text when the first TTS instance fails.
sample_rate : int | None, optional
Desired sample rate for the synthesized audio. If None, uses the maximum sample rate among the TTS instances.

Raises

ValueError
If less than one TTS instance is provided.
ValueError
If TTS instances have different numbers of channels.

Ancestors

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    for tts_status in self._status:
        if tts_status.recovering_task is not None:
            await aio.gracefully_cancel(tts_status.recovering_task)
def stream(self, *, conn_options: Optional[APIConnectOptions] = None) ‑> FallbackSynthesizeStream
Expand source code
def stream(
    self,
    *,
    conn_options: Optional[APIConnectOptions] = None,
) -> "FallbackSynthesizeStream":
    return FallbackSynthesizeStream(
        tts=self,
        conn_options=conn_options or DEFAULT_FALLBACK_API_CONNECT_OPTIONS,
    )
def synthesize(self, text: str, *, conn_options: Optional[APIConnectOptions] = None) ‑> FallbackChunkedStream
Expand source code
def synthesize(
    self,
    text: str,
    *,
    conn_options: Optional[APIConnectOptions] = None,
) -> "FallbackChunkedStream":
    return FallbackChunkedStream(
        tts=self,
        input_text=text,
        conn_options=conn_options or DEFAULT_FALLBACK_API_CONNECT_OPTIONS,
    )

Inherited members

class FallbackChunkedStream (*,
tts: FallbackAdapter,
input_text: str,
conn_options: Optional[APIConnectOptions])
Expand source code
class FallbackChunkedStream(ChunkedStream):
    def __init__(
        self,
        *,
        tts: FallbackAdapter,
        input_text: str,
        conn_options: Optional[APIConnectOptions],
    ) -> None:
        super().__init__(tts=tts, input_text=input_text, conn_options=conn_options)
        self._fallback_adapter = tts

    async def _try_synthesize(
        self, *, tts: TTS, recovering: bool = False
    ) -> AsyncGenerator[SynthesizedAudio, None]:
        try:
            audio_duration = 0.0
            async with tts.synthesize(
                self._input_text,
                conn_options=dataclasses.replace(
                    self._conn_options,
                    max_retry=self._fallback_adapter._max_retry_per_tts,
                    timeout=self._fallback_adapter._attempt_timeout,
                    retry_interval=self._fallback_adapter._retry_interval,
                ),
            ) as stream:
                while True:
                    try:
                        audio = await asyncio.wait_for(
                            stream.__anext__(),
                            self._fallback_adapter._attempt_timeout
                            if audio_duration == 0.0
                            else None,
                        )

                        audio_duration += audio.frame.duration
                        yield audio
                    except StopAsyncIteration:
                        break

            if audio_duration == 0.0:
                raise APIConnectionError("no audio received")

        except asyncio.TimeoutError:
            if recovering:
                logger.warning(
                    f"{tts.label} recovery timed out", extra={"streamed": False}
                )
                raise

            logger.warning(
                f"{tts.label} timed out, switching to next TTS",
                extra={"streamed": False},
            )

            raise
        except APIError as e:
            if recovering:
                logger.warning(
                    f"{tts.label} recovery failed",
                    exc_info=e,
                    extra={"streamed": False},
                )
                raise

            logger.warning(
                f"{tts.label} failed, switching to next TTS",
                exc_info=e,
                extra={"streamed": False},
            )
            raise
        except Exception:
            if recovering:
                logger.exception(
                    f"{tts.label} recovery unexpected error", extra={"streamed": False}
                )
                raise

            logger.exception(
                f"{tts.label} unexpected error, switching to next TTS",
                extra={"streamed": False},
            )
            raise

    def _try_recovery(self, tts: TTS) -> None:
        assert isinstance(self._tts, FallbackAdapter)

        tts_status = self._tts._status[self._tts._tts_instances.index(tts)]
        if tts_status.recovering_task is None or tts_status.recovering_task.done():

            async def _recover_tts_task(tts: TTS) -> None:
                try:
                    async for _ in self._try_synthesize(tts=tts, recovering=True):
                        pass

                    tts_status.available = True
                    logger.info(f"tts.FallbackAdapter, {tts.label} recovered")
                    self._tts.emit(
                        "tts_availability_changed",
                        AvailabilityChangedEvent(tts=tts, available=True),
                    )
                except Exception:
                    return

            tts_status.recovering_task = asyncio.create_task(_recover_tts_task(tts))

    async def _run(self) -> None:
        assert isinstance(self._tts, FallbackAdapter)

        start_time = time.time()

        all_failed = all(not tts_status.available for tts_status in self._tts._status)
        if all_failed:
            logger.error("all TTSs are unavailable, retrying..")

        for i, tts in enumerate(self._tts._tts_instances):
            tts_status = self._tts._status[i]
            if tts_status.available or all_failed:
                audio_duration = 0.0
                try:
                    request_id: str | None = None
                    resampler = tts_status.resampler
                    async for synthesized_audio in self._try_synthesize(
                        tts=tts, recovering=False
                    ):
                        audio_duration += synthesized_audio.frame.duration
                        request_id = synthesized_audio.request_id

                        if resampler is not None:
                            for rf in resampler.push(synthesized_audio.frame):
                                self._event_ch.send_nowait(
                                    SynthesizedAudio(
                                        frame=rf,
                                        request_id=synthesized_audio.request_id,
                                    )
                                )

                            continue

                        self._event_ch.send_nowait(synthesized_audio)

                    if resampler is not None and request_id is not None:
                        for rf in resampler.flush():
                            self._event_ch.send_nowait(
                                SynthesizedAudio(
                                    frame=rf,
                                    request_id=request_id,
                                )
                            )

                    return
                except Exception:  # exceptions already logged inside _try_synthesize
                    if tts_status.available:
                        tts_status.available = False
                        self._tts.emit(
                            "tts_availability_changed",
                            AvailabilityChangedEvent(tts=tts, available=False),
                        )

                    if self._tts._no_fallback_after_audio_duration is not None:
                        if (
                            audio_duration
                            >= self._tts._no_fallback_after_audio_duration
                        ):
                            logger.warning(
                                f"{tts.label} already synthesized {audio_duration}s of audio, ignoring fallback"
                            )
                            return

            self._try_recovery(tts)

        raise APIConnectionError(
            "all TTSs failed (%s) after %s seconds"
            % (
                [tts.label for tts in self._tts._tts_instances],
                time.time() - start_time,
            )
        )

Used by the non-streamed synthesize API, some providers support chunked http responses

Ancestors

Inherited members

class FallbackSynthesizeStream (*,
tts: FallbackAdapter,
conn_options: Optional[APIConnectOptions] = None)
Expand source code
class FallbackSynthesizeStream(SynthesizeStream):
    def __init__(
        self,
        *,
        tts: FallbackAdapter,
        conn_options: Optional[APIConnectOptions] = None,
    ):
        super().__init__(
            tts=tts, conn_options=conn_options or DEFAULT_FALLBACK_API_CONNECT_OPTIONS
        )
        self._fallback_adapter = tts

        self._total_segments: list[list[str]] = []
        self._pending_segments_chunks: list[list[str]] = []
        self._current_segment_text: list[str] = []

    async def _try_synthesize(
        self,
        *,
        tts: TTS,
        input_ch: aio.ChanReceiver[str | SynthesizeStream._FlushSentinel],
        conn_options: APIConnectOptions,
        recovering: bool = False,
    ) -> AsyncGenerator[SynthesizedAudio, None]:
        stream = tts.stream(conn_options=conn_options)
        input_sent_fut = asyncio.Future()  # type: ignore

        @utils.log_exceptions(logger=logger)
        async def _input_task() -> None:
            try:
                segment = ""
                async for data in input_ch:
                    if isinstance(data, str):
                        segment += data
                        stream.push_text(data)
                    elif isinstance(data, self._FlushSentinel):
                        # start the timeout on flush
                        if segment:
                            segment = ""
                            with contextlib.suppress(asyncio.InvalidStateError):
                                input_sent_fut.set_result(True)

                        stream.flush()
            finally:
                with contextlib.suppress(RuntimeError):
                    stream.end_input()

                with contextlib.suppress(asyncio.InvalidStateError):
                    input_sent_fut.set_result(False)

        input_task = asyncio.create_task(_input_task())
        next_audio_task: asyncio.Future[SynthesizedAudio] | None = None

        try:
            audio_duration = 0.0
            async with stream:
                while True:
                    if next_audio_task is None or next_audio_task.done():
                        next_audio_task = asyncio.ensure_future(stream.__anext__())

                    try:
                        if not input_sent_fut.done():
                            await asyncio.wait(
                                [input_sent_fut, next_audio_task],
                                return_when=asyncio.FIRST_COMPLETED,
                            )

                            if not next_audio_task.done():
                                continue

                            audio = next_audio_task.result()
                        else:
                            audio = await asyncio.wait_for(
                                next_audio_task, self._fallback_adapter._attempt_timeout
                            )

                        audio_duration += audio.frame.duration
                        if audio.is_final:
                            input_sent_fut = asyncio.Future()
                            audio_duration = 0.0

                        yield audio
                    except StopAsyncIteration:
                        break

            if (
                audio_duration == 0.0
                and input_sent_fut.done()
                and input_sent_fut.result()
            ):
                raise APIConnectionError("no audio received")

        except asyncio.TimeoutError:
            if recovering:
                logger.warning(
                    f"{tts.label} recovery timed out", extra={"streamed": True}
                )
                raise

            logger.warning(
                f"{tts.label} timed out, switching to next TTS",
                extra={"streamed": True},
            )
            raise
        except APIError as e:
            if recovering:
                logger.warning(
                    f"{tts.label} recovery failed", exc_info=e, extra={"streamed": True}
                )
                raise

            logger.warning(
                f"{tts.label} failed, switching to next TTS",
                exc_info=e,
                extra={"streamed": True},
            )
            raise
        except Exception:
            if recovering:
                logger.exception(
                    f"{tts.label} recovery unexpected error",
                    extra={"streamed": True},
                )
                raise

            logger.exception(
                f"{tts.label} unexpected error, switching to next TTS",
                extra={"streamed": True},
            )
            raise
        finally:
            if next_audio_task is not None:
                await utils.aio.gracefully_cancel(next_audio_task)

            await utils.aio.gracefully_cancel(input_task)

    async def _run(self) -> None:
        start_time = time.time()

        all_failed = all(
            not tts_status.available for tts_status in self._fallback_adapter._status
        )
        if all_failed:
            logger.error("all TTSs are unavailable, retrying..")

        new_input_ch: aio.Chan[str | SynthesizeStream._FlushSentinel] | None = None

        async def _forward_input_task():
            nonlocal new_input_ch

            async for data in self._input_ch:
                if new_input_ch:
                    new_input_ch.send_nowait(data)

                if isinstance(data, str) and data:
                    self._current_segment_text.append(data)

                elif (
                    isinstance(data, self._FlushSentinel) and self._current_segment_text
                ):
                    self._total_segments.append(self._current_segment_text)
                    self._pending_segments_chunks.append(self._current_segment_text)
                    self._current_segment_text = []

            if new_input_ch:
                new_input_ch.close()

        input_task = asyncio.create_task(_forward_input_task())

        try:
            for i, tts in enumerate(self._fallback_adapter._tts_instances):
                tts_status = self._fallback_adapter._status[i]
                if tts_status.available or all_failed:
                    audio_duration = 0.0
                    try:
                        new_input_ch = aio.Chan[
                            Union[str, SynthesizeStream._FlushSentinel]
                        ]()

                        for text in self._pending_segments_chunks:
                            for chunk in text:
                                new_input_ch.send_nowait(chunk)

                            new_input_ch.send_nowait(self._FlushSentinel())

                        for chunk in self._current_segment_text:
                            new_input_ch.send_nowait(chunk)

                        if input_task.done():
                            new_input_ch.close()

                        last_segment_id: str | None = None
                        resampler = tts_status.resampler

                        async for synthesized_audio in self._try_synthesize(
                            tts=tts,
                            input_ch=new_input_ch,
                            conn_options=dataclasses.replace(
                                self._conn_options,
                                max_retry=self._fallback_adapter._max_retry_per_tts,
                                timeout=self._fallback_adapter._attempt_timeout,
                                retry_interval=self._fallback_adapter._retry_interval,
                            ),
                            recovering=False,
                        ):
                            audio_duration += synthesized_audio.frame.duration

                            if resampler is not None:
                                for resampled_frame in resampler.push(
                                    synthesized_audio.frame
                                ):
                                    self._event_ch.send_nowait(
                                        dataclasses.replace(
                                            synthesized_audio, frame=resampled_frame
                                        )
                                    )

                                if synthesized_audio.is_final:
                                    for resampled_frame in resampler.flush():
                                        self._event_ch.send_nowait(
                                            dataclasses.replace(
                                                synthesized_audio, frame=resampled_frame
                                            )
                                        )
                            else:
                                self._event_ch.send_nowait(synthesized_audio)

                            if (
                                synthesized_audio.is_final
                                or (
                                    last_segment_id is not None
                                    and synthesized_audio.segment_id != last_segment_id
                                )
                            ) and self._pending_segments_chunks:
                                audio_duration = 0.0
                                self._pending_segments_chunks.pop(0)

                            last_segment_id = synthesized_audio.segment_id

                        return
                    except Exception:
                        if tts_status.available:
                            tts_status.available = False
                            self._tts.emit(
                                "tts_availability_changed",
                                AvailabilityChangedEvent(tts=tts, available=False),
                            )

                        if (
                            self._fallback_adapter._no_fallback_after_audio_duration
                            is not None
                        ):
                            if (
                                audio_duration
                                >= self._fallback_adapter._no_fallback_after_audio_duration
                                and self._pending_segments_chunks
                            ):
                                logger.warning(
                                    f"{tts.label} already synthesized {audio_duration}s of audio, ignoring the current segment for the tts fallback"
                                )
                                return

                self._try_recovery(tts)

            raise APIConnectionError(
                "all TTSs failed (%s) after %s seconds"
                % (
                    [tts.label for tts in self._fallback_adapter._tts_instances],
                    time.time() - start_time,
                )
            )
        finally:
            await utils.aio.gracefully_cancel(input_task)

    def _try_recovery(self, tts: TTS) -> None:
        assert isinstance(self._tts, FallbackAdapter)

        retry_segments = [self._current_segment_text.copy()]
        if self._total_segments:
            retry_segments.insert(0, self._total_segments[-1])

        tts_status = self._tts._status[self._tts._tts_instances.index(tts)]
        if tts_status.recovering_task is None or tts_status.recovering_task.done():

            async def _recover_tts_task(tts: TTS) -> None:
                try:
                    input_ch = aio.Chan[Union[str, SynthesizeStream._FlushSentinel]]()
                    for segment in retry_segments:
                        for t in segment:
                            input_ch.send_nowait(t)

                        input_ch.send_nowait(self._FlushSentinel())

                    input_ch.close()

                    async for _ in self._try_synthesize(
                        tts=tts,
                        input_ch=input_ch,
                        recovering=True,
                        conn_options=dataclasses.replace(
                            self._conn_options,
                            max_retry=0,
                            timeout=self._fallback_adapter._attempt_timeout,
                            retry_interval=self._fallback_adapter._retry_interval,
                        ),
                    ):
                        pass

                    tts_status.available = True
                    logger.info(f"tts.FallbackAdapter, {tts.label} recovered")
                    self._tts.emit(
                        "tts_availability_changed",
                        AvailabilityChangedEvent(tts=tts, available=True),
                    )
                except Exception:
                    return

            tts_status.recovering_task = asyncio.create_task(_recover_tts_task(tts))

Helper class that provides a standard way to create an ABC using inheritance.

Ancestors

Inherited members

class StreamAdapter (*,
tts: TTS,
sentence_tokenizer: tokenize.SentenceTokenizer)
Expand source code
class StreamAdapter(TTS):
    def __init__(
        self,
        *,
        tts: TTS,
        sentence_tokenizer: tokenize.SentenceTokenizer,
    ) -> None:
        super().__init__(
            capabilities=TTSCapabilities(
                streaming=True,
            ),
            sample_rate=tts.sample_rate,
            num_channels=tts.num_channels,
        )
        self._tts = tts
        self._sentence_tokenizer = sentence_tokenizer

        @self._tts.on("metrics_collected")
        def _forward_metrics(*args, **kwargs):
            self.emit("metrics_collected", *args, **kwargs)

    def synthesize(
        self,
        text: str,
        *,
        conn_options: Optional[APIConnectOptions] = None,
    ) -> "ChunkedStream":
        return self._tts.synthesize(text=text, conn_options=conn_options)

    def stream(
        self,
        *,
        conn_options: Optional[APIConnectOptions] = None,
    ) -> "StreamAdapterWrapper":
        return StreamAdapterWrapper(
            tts=self,
            conn_options=conn_options,
            wrapped_tts=self._tts,
            sentence_tokenizer=self._sentence_tokenizer,
        )

    def prewarm(self) -> None:
        self._tts.prewarm()

Helper class that provides a standard way to create an ABC using inheritance.

Ancestors

Methods

def stream(self, *, conn_options: Optional[APIConnectOptions] = None) ‑> StreamAdapterWrapper
Expand source code
def stream(
    self,
    *,
    conn_options: Optional[APIConnectOptions] = None,
) -> "StreamAdapterWrapper":
    return StreamAdapterWrapper(
        tts=self,
        conn_options=conn_options,
        wrapped_tts=self._tts,
        sentence_tokenizer=self._sentence_tokenizer,
    )
def synthesize(self, text: str, *, conn_options: Optional[APIConnectOptions] = None) ‑> ChunkedStream
Expand source code
def synthesize(
    self,
    text: str,
    *,
    conn_options: Optional[APIConnectOptions] = None,
) -> "ChunkedStream":
    return self._tts.synthesize(text=text, conn_options=conn_options)

Inherited members

class StreamAdapterWrapper (*,
tts: TTS,
wrapped_tts: TTS,
sentence_tokenizer: tokenize.SentenceTokenizer,
conn_options: Optional[APIConnectOptions])
Expand source code
class StreamAdapterWrapper(SynthesizeStream):
    def __init__(
        self,
        *,
        tts: TTS,
        wrapped_tts: TTS,
        sentence_tokenizer: tokenize.SentenceTokenizer,
        conn_options: Optional[APIConnectOptions],
    ) -> None:
        super().__init__(tts=tts, conn_options=conn_options)
        self._wrapped_tts = wrapped_tts
        self._sent_stream = sentence_tokenizer.stream()

    async def _metrics_monitor_task(
        self, event_aiter: AsyncIterable[SynthesizedAudio]
    ) -> None:
        pass  # do nothing

    async def _run(self) -> None:
        async def _forward_input():
            """forward input to vad"""
            async for data in self._input_ch:
                if isinstance(data, self._FlushSentinel):
                    self._sent_stream.flush()
                    continue
                self._sent_stream.push_text(data)

            self._sent_stream.end_input()

        async def _synthesize():
            async for ev in self._sent_stream:
                last_audio: SynthesizedAudio | None = None
                async for audio in self._wrapped_tts.synthesize(ev.token):
                    if last_audio is not None:
                        self._event_ch.send_nowait(last_audio)

                    last_audio = audio

                if last_audio is not None:
                    last_audio.is_final = True
                    self._event_ch.send_nowait(last_audio)

        tasks = [
            asyncio.create_task(_forward_input()),
            asyncio.create_task(_synthesize()),
        ]
        try:
            await asyncio.gather(*tasks)
        finally:
            await utils.aio.gracefully_cancel(*tasks)
            await self._wrapped_tts.aclose()

Helper class that provides a standard way to create an ABC using inheritance.

Ancestors

Inherited members

class SynthesizeStream (*,
tts: TTS,
conn_options: Optional[APIConnectOptions] = None)
Expand source code
class SynthesizeStream(ABC):
    class _FlushSentinel: ...

    def __init__(
        self, *, tts: TTS, conn_options: Optional[APIConnectOptions] = None
    ) -> None:
        super().__init__()
        self._tts = tts
        self._conn_options = conn_options or DEFAULT_API_CONNECT_OPTIONS
        self._input_ch = aio.Chan[Union[str, SynthesizeStream._FlushSentinel]]()
        self._event_ch = aio.Chan[SynthesizedAudio]()
        self._event_aiter, self._monitor_aiter = aio.itertools.tee(self._event_ch, 2)

        self._task = asyncio.create_task(self._main_task(), name="TTS._main_task")
        self._task.add_done_callback(lambda _: self._event_ch.close())
        self._metrics_task: asyncio.Task | None = None  # started on first push
        self._started_time: float = 0

        # used to track metrics
        self._mtc_pending_texts: list[str] = []
        self._mtc_text = ""

    @abstractmethod
    async def _run(self) -> None: ...

    async def _main_task(self) -> None:
        for i in range(self._conn_options.max_retry + 1):
            try:
                return await self._run()
            except APIError as e:
                retry_interval = self._conn_options._interval_for_retry(i)
                if self._conn_options.max_retry == 0:
                    raise
                elif i == self._conn_options.max_retry:
                    raise APIConnectionError(
                        f"failed to synthesize speech after {self._conn_options.max_retry + 1} attempts",
                    ) from e
                else:
                    logger.warning(
                        f"failed to synthesize speech, retrying in {retry_interval}s",
                        exc_info=e,
                        extra={
                            "tts": self._tts._label,
                            "attempt": i + 1,
                            "streamed": True,
                        },
                    )

                await asyncio.sleep(retry_interval)

    def _mark_started(self) -> None:
        # only set the started time once, it'll get reset after we emit metrics
        if self._started_time == 0:
            self._started_time = time.perf_counter()

    async def _metrics_monitor_task(
        self, event_aiter: AsyncIterable[SynthesizedAudio]
    ) -> None:
        """Task used to collect metrics"""
        audio_duration = 0.0
        ttfb = -1.0
        request_id = ""

        def _emit_metrics():
            nonlocal audio_duration, ttfb, request_id

            if not self._started_time:
                return

            duration = time.perf_counter() - self._started_time

            if not self._mtc_pending_texts:
                return

            text = self._mtc_pending_texts.pop(0)
            if not text:
                return

            metrics = TTSMetrics(
                timestamp=time.time(),
                request_id=request_id,
                ttfb=ttfb,
                duration=duration,
                characters_count=len(text),
                audio_duration=audio_duration,
                cancelled=self._task.cancelled(),
                label=self._tts._label,
                streamed=True,
                error=None,
            )
            self._tts.emit("metrics_collected", metrics)

            audio_duration = 0.0
            ttfb = -1.0
            request_id = ""
            self._started_time = 0

        async for ev in event_aiter:
            if ttfb == -1.0:
                ttfb = time.perf_counter() - self._started_time

            audio_duration += ev.frame.duration
            request_id = ev.request_id

            if ev.is_final:
                _emit_metrics()

        if request_id:
            _emit_metrics()

    def push_text(self, token: str) -> None:
        """Push some text to be synthesized"""
        if self._metrics_task is None:
            self._metrics_task = asyncio.create_task(
                self._metrics_monitor_task(self._monitor_aiter),
                name="TTS._metrics_task",
            )

        self._mtc_text += token
        self._check_input_not_ended()
        self._check_not_closed()
        self._input_ch.send_nowait(token)

    def flush(self) -> None:
        """Mark the end of the current segment"""
        if self._mtc_text:
            self._mtc_pending_texts.append(self._mtc_text)
            self._mtc_text = ""

        self._check_input_not_ended()
        self._check_not_closed()
        self._input_ch.send_nowait(self._FlushSentinel())

    def end_input(self) -> None:
        """Mark the end of input, no more text will be pushed"""
        self.flush()
        self._input_ch.close()

    async def aclose(self) -> None:
        """Close ths stream immediately"""
        self._input_ch.close()
        await aio.gracefully_cancel(self._task)

        if self._metrics_task is not None:
            await self._metrics_task

    def _check_not_closed(self) -> None:
        if self._event_ch.closed:
            cls = type(self)
            raise RuntimeError(f"{cls.__module__}.{cls.__name__} is closed")

    def _check_input_not_ended(self) -> None:
        if self._input_ch.closed:
            cls = type(self)
            raise RuntimeError(f"{cls.__module__}.{cls.__name__} input ended")

    async def __anext__(self) -> SynthesizedAudio:
        try:
            val = await self._event_aiter.__anext__()
        except StopAsyncIteration:
            if not self._task.cancelled() and (exc := self._task.exception()):
                raise exc from None

            raise StopAsyncIteration

        return val

    def __aiter__(self) -> AsyncIterator[SynthesizedAudio]:
        return self

    async def __aenter__(self) -> SynthesizeStream:
        return self

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> None:
        await self.aclose()

Helper class that provides a standard way to create an ABC using inheritance.

Ancestors

  • abc.ABC

Subclasses

  • FallbackSynthesizeStream
  • StreamAdapterWrapper
  • livekit.plugins.cartesia.tts.SynthesizeStream
  • livekit.plugins.deepgram.tts.SynthesizeStream
  • livekit.plugins.elevenlabs.tts.SynthesizeStream
  • livekit.plugins.neuphonic.tts.SynthesizeStream
  • livekit.plugins.playai.tts.SynthesizeStream
  • livekit.plugins.resemble.tts.SynthesizeStream

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    """Close ths stream immediately"""
    self._input_ch.close()
    await aio.gracefully_cancel(self._task)

    if self._metrics_task is not None:
        await self._metrics_task

Close ths stream immediately

def end_input(self) ‑> None
Expand source code
def end_input(self) -> None:
    """Mark the end of input, no more text will be pushed"""
    self.flush()
    self._input_ch.close()

Mark the end of input, no more text will be pushed

def flush(self) ‑> None
Expand source code
def flush(self) -> None:
    """Mark the end of the current segment"""
    if self._mtc_text:
        self._mtc_pending_texts.append(self._mtc_text)
        self._mtc_text = ""

    self._check_input_not_ended()
    self._check_not_closed()
    self._input_ch.send_nowait(self._FlushSentinel())

Mark the end of the current segment

def push_text(self, token: str) ‑> None
Expand source code
def push_text(self, token: str) -> None:
    """Push some text to be synthesized"""
    if self._metrics_task is None:
        self._metrics_task = asyncio.create_task(
            self._metrics_monitor_task(self._monitor_aiter),
            name="TTS._metrics_task",
        )

    self._mtc_text += token
    self._check_input_not_ended()
    self._check_not_closed()
    self._input_ch.send_nowait(token)

Push some text to be synthesized

class SynthesizedAudio (frame: rtc.AudioFrame,
request_id: str,
is_final: bool = False,
segment_id: str = '',
delta_text: str = '')
Expand source code
@dataclass
class SynthesizedAudio:
    frame: rtc.AudioFrame
    """Synthesized audio frame"""
    request_id: str
    """Request ID (one segment could be made up of multiple requests)"""
    is_final: bool = False
    """Whether this is latest frame of the segment (streaming only)"""
    segment_id: str = ""
    """Segment ID, each segment is separated by a flush (streaming only)"""
    delta_text: str = ""
    """Current segment of the synthesized audio (streaming only)"""

SynthesizedAudio(frame: 'rtc.AudioFrame', request_id: 'str', is_final: 'bool' = False, segment_id: 'str' = '', delta_text: 'str' = '')

Instance variables

var delta_text : str

Current segment of the synthesized audio (streaming only)

var frameAudioFrame

Synthesized audio frame

var is_final : bool

Whether this is latest frame of the segment (streaming only)

var request_id : str

Request ID (one segment could be made up of multiple requests)

var segment_id : str

Segment ID, each segment is separated by a flush (streaming only)

class SynthesizedAudioEmitter (*,
event_ch: aio.Chan[SynthesizedAudio],
request_id: str,
segment_id: str = '')
Expand source code
class SynthesizedAudioEmitter:
    """Utility for buffering and emitting audio frames with metadata to a channel.

    This class helps TTS implementers to correctly handle is_final logic when streaming responses.
    """

    def __init__(
        self,
        *,
        event_ch: aio.Chan[SynthesizedAudio],
        request_id: str,
        segment_id: str = "",
    ) -> None:
        self._event_ch = event_ch
        self._frame: rtc.AudioFrame | None = None
        self._request_id = request_id
        self._segment_id = segment_id

    def push(self, frame: Optional[rtc.AudioFrame]):
        """Emits any buffered frame and stores the new frame for later emission.

        The buffered frame is emitted as not final.
        """
        self._emit_frame(is_final=False)
        self._frame = frame

    def _emit_frame(self, is_final: bool = False):
        """Sends the buffered frame to the event channel if one exists."""
        if self._frame is None:
            return
        self._event_ch.send_nowait(
            SynthesizedAudio(
                frame=self._frame,
                request_id=self._request_id,
                segment_id=self._segment_id,
                is_final=is_final,
            )
        )
        self._frame = None

    def flush(self):
        """Emits any buffered frame as final."""
        self._emit_frame(is_final=True)

Utility for buffering and emitting audio frames with metadata to a channel.

This class helps TTS implementers to correctly handle is_final logic when streaming responses.

Methods

def flush(self)
Expand source code
def flush(self):
    """Emits any buffered frame as final."""
    self._emit_frame(is_final=True)

Emits any buffered frame as final.

def push(self, frame: Optional[rtc.AudioFrame])
Expand source code
def push(self, frame: Optional[rtc.AudioFrame]):
    """Emits any buffered frame and stores the new frame for later emission.

    The buffered frame is emitted as not final.
    """
    self._emit_frame(is_final=False)
    self._frame = frame

Emits any buffered frame and stores the new frame for later emission.

The buffered frame is emitted as not final.

class TTS (*,
capabilities: TTSCapabilities,
sample_rate: int,
num_channels: int,
conn_options: Optional[APIConnectOptions] = None)
Expand source code
class TTS(
    ABC,
    rtc.EventEmitter[Union[Literal["metrics_collected"], TEvent]],
    Generic[TEvent],
):
    def __init__(
        self,
        *,
        capabilities: TTSCapabilities,
        sample_rate: int,
        num_channels: int,
        conn_options: Optional[APIConnectOptions] = None,
    ) -> None:
        super().__init__()
        self._capabilities = capabilities
        self._sample_rate = sample_rate
        self._num_channels = num_channels
        self._label = f"{type(self).__module__}.{type(self).__name__}"
        self._conn_options = conn_options or DEFAULT_API_CONNECT_OPTIONS

    @property
    def label(self) -> str:
        return self._label

    @property
    def capabilities(self) -> TTSCapabilities:
        return self._capabilities

    @property
    def sample_rate(self) -> int:
        return self._sample_rate

    @property
    def num_channels(self) -> int:
        return self._num_channels

    @abstractmethod
    def synthesize(
        self,
        text: str,
        *,
        conn_options: Optional[APIConnectOptions] = None,
    ) -> ChunkedStream: ...

    def stream(
        self, *, conn_options: Optional[APIConnectOptions] = None
    ) -> SynthesizeStream:
        raise NotImplementedError(
            "streaming is not supported by this TTS, please use a different TTS or use a StreamAdapter"
        )

    def prewarm(self) -> None:
        """Pre-warm connection to the TTS service"""
        pass

    async def aclose(self) -> None: ...

    async def __aenter__(self) -> TTS:
        return self

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> None:
        await self.aclose()

Helper class that provides a standard way to create an ABC using inheritance.

Ancestors

Subclasses

  • FallbackAdapter
  • StreamAdapter
  • TTS
  • livekit.plugins.azure.tts.TTS
  • livekit.plugins.cartesia.tts.TTS
  • livekit.plugins.deepgram.tts.TTS
  • livekit.plugins.elevenlabs.tts.TTS
  • livekit.plugins.google.tts.TTS
  • TTS
  • livekit.plugins.neuphonic.tts.TTS
  • livekit.plugins.openai.tts.TTS
  • livekit.plugins.playai.tts.TTS
  • livekit.plugins.resemble.tts.TTS
  • livekit.plugins.rime.tts.TTS

Instance variables

prop capabilitiesTTSCapabilities
Expand source code
@property
def capabilities(self) -> TTSCapabilities:
    return self._capabilities
prop label : str
Expand source code
@property
def label(self) -> str:
    return self._label
prop num_channels : int
Expand source code
@property
def num_channels(self) -> int:
    return self._num_channels
prop sample_rate : int
Expand source code
@property
def sample_rate(self) -> int:
    return self._sample_rate

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None: ...
def prewarm(self) ‑> None
Expand source code
def prewarm(self) -> None:
    """Pre-warm connection to the TTS service"""
    pass

Pre-warm connection to the TTS service

def stream(self, *, conn_options: Optional[APIConnectOptions] = None) ‑> SynthesizeStream
Expand source code
def stream(
    self, *, conn_options: Optional[APIConnectOptions] = None
) -> SynthesizeStream:
    raise NotImplementedError(
        "streaming is not supported by this TTS, please use a different TTS or use a StreamAdapter"
    )
def synthesize(self, text: str, *, conn_options: Optional[APIConnectOptions] = None) ‑> ChunkedStream
Expand source code
@abstractmethod
def synthesize(
    self,
    text: str,
    *,
    conn_options: Optional[APIConnectOptions] = None,
) -> ChunkedStream: ...

Inherited members

class TTSCapabilities (streaming: bool)
Expand source code
@dataclass
class TTSCapabilities:
    streaming: bool
    """Whether this TTS supports streaming (generally using websockets)"""

TTSCapabilities(streaming: 'bool')

Instance variables

var streaming : bool

Whether this TTS supports streaming (generally using websockets)