Module livekit.agents.tts.tts

Classes

class ChunkedStream (*,
tts: TTS,
input_text: str,
conn_options: APIConnectOptions)
Expand source code
class ChunkedStream(ABC):
    """Used by the non-streamed synthesize API, some providers support chunked http responses"""

    def __init__(
        self, *, tts: TTS, input_text: str, conn_options: APIConnectOptions
    ) -> None:
        self._input_text = input_text
        self._tts = tts
        self._conn_options = conn_options
        self._event_ch = aio.Chan[SynthesizedAudio]()

        self._event_aiter, monitor_aiter = aio.itertools.tee(self._event_ch, 2)
        self._metrics_task = asyncio.create_task(
            self._metrics_monitor_task(monitor_aiter), name="TTS._metrics_task"
        )
        self._synthesize_task = asyncio.create_task(
            self._main_task(), name="TTS._synthesize_task"
        )
        self._synthesize_task.add_done_callback(lambda _: self._event_ch.close())

    @property
    def input_text(self) -> str:
        return self._input_text

    @property
    def done(self) -> bool:
        return self._synthesize_task.done()

    @property
    def exception(self) -> BaseException | None:
        return self._synthesize_task.exception()

    async def _metrics_monitor_task(
        self, event_aiter: AsyncIterable[SynthesizedAudio]
    ) -> None:
        """Task used to collect metrics"""

        start_time = time.perf_counter()
        audio_duration = 0.0
        ttfb = -1.0
        request_id = ""

        async for ev in event_aiter:
            request_id = ev.request_id
            if ttfb == -1.0:
                ttfb = time.perf_counter() - start_time

            audio_duration += ev.frame.duration

        duration = time.perf_counter() - start_time
        metrics = TTSMetrics(
            timestamp=time.time(),
            request_id=request_id,
            ttfb=ttfb,
            duration=duration,
            characters_count=len(self._input_text),
            audio_duration=audio_duration,
            cancelled=self._synthesize_task.cancelled(),
            label=self._tts._label,
            streamed=False,
            error=None,
        )
        self._tts.emit("metrics_collected", metrics)

    async def collect(self) -> rtc.AudioFrame:
        """Utility method to collect every frame in a single call"""
        frames = []
        async for ev in self:
            frames.append(ev.frame)

        return rtc.combine_audio_frames(frames)

    @abstractmethod
    async def _run(self) -> None: ...

    async def _main_task(self) -> None:
        for i in range(self._conn_options.max_retry + 1):
            try:
                return await self._run()
            except APIError as e:
                if self._conn_options.max_retry == 0:
                    raise
                elif i == self._conn_options.max_retry:
                    raise APIConnectionError(
                        f"failed to synthesize speech after {self._conn_options.max_retry + 1} attempts",
                    ) from e
                else:
                    logger.warning(
                        f"failed to synthesize speech, retrying in {self._conn_options.retry_interval}s",
                        exc_info=e,
                        extra={
                            "tts": self._tts._label,
                            "attempt": i + 1,
                            "streamed": False,
                        },
                    )

                await asyncio.sleep(self._conn_options.retry_interval)

    async def aclose(self) -> None:
        """Close is automatically called if the stream is completely collected"""
        await aio.gracefully_cancel(self._synthesize_task)
        self._event_ch.close()
        await self._metrics_task

    async def __anext__(self) -> SynthesizedAudio:
        try:
            val = await self._event_aiter.__anext__()
        except StopAsyncIteration:
            if not self._synthesize_task.cancelled() and (
                exc := self._synthesize_task.exception()
            ):
                raise exc from None

            raise StopAsyncIteration

        return val

    def __aiter__(self) -> AsyncIterator[SynthesizedAudio]:
        return self

    async def __aenter__(self) -> ChunkedStream:
        return self

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> None:
        await self.aclose()

Used by the non-streamed synthesize API, some providers support chunked http responses

Ancestors

  • abc.ABC

Subclasses

  • FallbackChunkedStream
  • livekit.plugins.azure.tts.ChunkedStream
  • livekit.plugins.cartesia.tts.ChunkedStream
  • livekit.plugins.deepgram.tts.ChunkedStream
  • livekit.plugins.elevenlabs.tts.ChunkedStream
  • livekit.plugins.google.tts.ChunkedStream
  • livekit.plugins.openai.tts.ChunkedStream

Instance variables

prop done : bool
Expand source code
@property
def done(self) -> bool:
    return self._synthesize_task.done()
prop exception : BaseException | None
Expand source code
@property
def exception(self) -> BaseException | None:
    return self._synthesize_task.exception()
prop input_text : str
Expand source code
@property
def input_text(self) -> str:
    return self._input_text

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    """Close is automatically called if the stream is completely collected"""
    await aio.gracefully_cancel(self._synthesize_task)
    self._event_ch.close()
    await self._metrics_task

Close is automatically called if the stream is completely collected

async def collect(self) ‑> AudioFrame
Expand source code
async def collect(self) -> rtc.AudioFrame:
    """Utility method to collect every frame in a single call"""
    frames = []
    async for ev in self:
        frames.append(ev.frame)

    return rtc.combine_audio_frames(frames)

Utility method to collect every frame in a single call

class SynthesizeStream (*,
tts: TTS,
conn_options: APIConnectOptions)
Expand source code
class SynthesizeStream(ABC):
    class _FlushSentinel: ...

    def __init__(self, *, tts: TTS, conn_options: APIConnectOptions) -> None:
        super().__init__()
        self._tts = tts
        self._conn_options = conn_options
        self._input_ch = aio.Chan[Union[str, SynthesizeStream._FlushSentinel]]()
        self._event_ch = aio.Chan[SynthesizedAudio]()
        self._event_aiter, self._monitor_aiter = aio.itertools.tee(self._event_ch, 2)

        self._task = asyncio.create_task(self._main_task(), name="TTS._main_task")
        self._task.add_done_callback(lambda _: self._event_ch.close())
        self._metrics_task: asyncio.Task | None = None  # started on first push

        # used to track metrics
        self._mtc_pending_texts: list[str] = []
        self._mtc_text = ""

    @abstractmethod
    async def _run(self) -> None: ...

    async def _main_task(self) -> None:
        for i in range(self._conn_options.max_retry + 1):
            try:
                return await self._run()
            except APIError as e:
                if self._conn_options.max_retry == 0:
                    raise
                elif i == self._conn_options.max_retry:
                    raise APIConnectionError(
                        f"failed to synthesize speech after {self._conn_options.max_retry + 1} attempts",
                    ) from e
                else:
                    logger.warning(
                        f"failed to synthesize speech, retrying in {self._conn_options.retry_interval}s",
                        exc_info=e,
                        extra={
                            "tts": self._tts._label,
                            "attempt": i + 1,
                            "streamed": True,
                        },
                    )

                await asyncio.sleep(self._conn_options.retry_interval)

    async def _metrics_monitor_task(
        self, event_aiter: AsyncIterable[SynthesizedAudio]
    ) -> None:
        """Task used to collect metrics"""
        start_time = time.perf_counter()
        audio_duration = 0.0
        ttfb = -1.0
        request_id = ""

        def _emit_metrics():
            nonlocal start_time, audio_duration, ttfb, request_id
            duration = time.perf_counter() - start_time

            if not self._mtc_pending_texts:
                return

            text = self._mtc_pending_texts.pop(0)
            if not text:
                return

            metrics = TTSMetrics(
                timestamp=time.time(),
                request_id=request_id,
                ttfb=ttfb,
                duration=duration,
                characters_count=len(text),
                audio_duration=audio_duration,
                cancelled=self._task.cancelled(),
                label=self._tts._label,
                streamed=True,
                error=None,
            )
            self._tts.emit("metrics_collected", metrics)

            audio_duration = 0.0
            ttfb = -1.0
            request_id = ""
            start_time = time.perf_counter()

        async for ev in event_aiter:
            if ttfb == -1.0:
                ttfb = time.perf_counter() - start_time

            audio_duration += ev.frame.duration
            request_id = ev.request_id

            if ev.is_final:
                _emit_metrics()

        if request_id:
            _emit_metrics()

    def push_text(self, token: str) -> None:
        """Push some text to be synthesized"""
        if self._metrics_task is None:
            self._metrics_task = asyncio.create_task(
                self._metrics_monitor_task(self._monitor_aiter),
                name="TTS._metrics_task",
            )

        self._mtc_text += token
        self._check_input_not_ended()
        self._check_not_closed()
        self._input_ch.send_nowait(token)

    def flush(self) -> None:
        """Mark the end of the current segment"""
        if self._mtc_text:
            self._mtc_pending_texts.append(self._mtc_text)
            self._mtc_text = ""

        self._check_input_not_ended()
        self._check_not_closed()
        self._input_ch.send_nowait(self._FlushSentinel())

    def end_input(self) -> None:
        """Mark the end of input, no more text will be pushed"""
        self.flush()
        self._input_ch.close()

    async def aclose(self) -> None:
        """Close ths stream immediately"""
        self._input_ch.close()
        await aio.gracefully_cancel(self._task)

        if self._metrics_task is not None:
            await self._metrics_task

    def _check_not_closed(self) -> None:
        if self._event_ch.closed:
            cls = type(self)
            raise RuntimeError(f"{cls.__module__}.{cls.__name__} is closed")

    def _check_input_not_ended(self) -> None:
        if self._input_ch.closed:
            cls = type(self)
            raise RuntimeError(f"{cls.__module__}.{cls.__name__} input ended")

    async def __anext__(self) -> SynthesizedAudio:
        try:
            val = await self._event_aiter.__anext__()
        except StopAsyncIteration:
            if not self._task.cancelled() and (exc := self._task.exception()):
                raise exc from None

            raise StopAsyncIteration

        return val

    def __aiter__(self) -> AsyncIterator[SynthesizedAudio]:
        return self

    async def __aenter__(self) -> SynthesizeStream:
        return self

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> None:
        await self.aclose()

Helper class that provides a standard way to create an ABC using inheritance.

Ancestors

  • abc.ABC

Subclasses

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    """Close ths stream immediately"""
    self._input_ch.close()
    await aio.gracefully_cancel(self._task)

    if self._metrics_task is not None:
        await self._metrics_task

Close ths stream immediately

def end_input(self) ‑> None
Expand source code
def end_input(self) -> None:
    """Mark the end of input, no more text will be pushed"""
    self.flush()
    self._input_ch.close()

Mark the end of input, no more text will be pushed

def flush(self) ‑> None
Expand source code
def flush(self) -> None:
    """Mark the end of the current segment"""
    if self._mtc_text:
        self._mtc_pending_texts.append(self._mtc_text)
        self._mtc_text = ""

    self._check_input_not_ended()
    self._check_not_closed()
    self._input_ch.send_nowait(self._FlushSentinel())

Mark the end of the current segment

def push_text(self, token: str) ‑> None
Expand source code
def push_text(self, token: str) -> None:
    """Push some text to be synthesized"""
    if self._metrics_task is None:
        self._metrics_task = asyncio.create_task(
            self._metrics_monitor_task(self._monitor_aiter),
            name="TTS._metrics_task",
        )

    self._mtc_text += token
    self._check_input_not_ended()
    self._check_not_closed()
    self._input_ch.send_nowait(token)

Push some text to be synthesized

class SynthesizedAudio (frame: rtc.AudioFrame,
request_id: str,
is_final: bool = False,
segment_id: str = '',
delta_text: str = '')
Expand source code
@dataclass
class SynthesizedAudio:
    frame: rtc.AudioFrame
    """Synthesized audio frame"""
    request_id: str
    """Request ID (one segment could be made up of multiple requests)"""
    is_final: bool = False
    """Whether this is latest frame of the segment (streaming only)"""
    segment_id: str = ""
    """Segment ID, each segment is separated by a flush (streaming only)"""
    delta_text: str = ""
    """Current segment of the synthesized audio (streaming only)"""

SynthesizedAudio(frame: 'rtc.AudioFrame', request_id: 'str', is_final: 'bool' = False, segment_id: 'str' = '', delta_text: 'str' = '')

Class variables

var delta_text : str

Current segment of the synthesized audio (streaming only)

var frameAudioFrame

Synthesized audio frame

var is_final : bool

Whether this is latest frame of the segment (streaming only)

var request_id : str

Request ID (one segment could be made up of multiple requests)

var segment_id : str

Segment ID, each segment is separated by a flush (streaming only)

class TTS (*,
capabilities: TTSCapabilities,
sample_rate: int,
num_channels: int)
Expand source code
class TTS(
    ABC,
    rtc.EventEmitter[Union[Literal["metrics_collected"], TEvent]],
    Generic[TEvent],
):
    def __init__(
        self, *, capabilities: TTSCapabilities, sample_rate: int, num_channels: int
    ) -> None:
        super().__init__()
        self._capabilities = capabilities
        self._sample_rate = sample_rate
        self._num_channels = num_channels
        self._label = f"{type(self).__module__}.{type(self).__name__}"

    @property
    def label(self) -> str:
        return self._label

    @property
    def capabilities(self) -> TTSCapabilities:
        return self._capabilities

    @property
    def sample_rate(self) -> int:
        return self._sample_rate

    @property
    def num_channels(self) -> int:
        return self._num_channels

    @abstractmethod
    def synthesize(
        self,
        text: str,
        *,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    ) -> ChunkedStream: ...

    def stream(
        self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS
    ) -> SynthesizeStream:
        raise NotImplementedError(
            "streaming is not supported by this TTS, please use a different TTS or use a StreamAdapter"
        )

    async def aclose(self) -> None: ...

    async def __aenter__(self) -> TTS:
        return self

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> None:
        await self.aclose()

Helper class that provides a standard way to create an ABC using inheritance.

Ancestors

Subclasses

  • FallbackAdapter
  • StreamAdapter
  • livekit.plugins.azure.tts.TTS
  • livekit.plugins.cartesia.tts.TTS
  • livekit.plugins.deepgram.tts.TTS
  • livekit.plugins.elevenlabs.tts.TTS
  • livekit.plugins.google.tts.TTS
  • livekit.plugins.openai.tts.TTS

Instance variables

prop capabilitiesTTSCapabilities
Expand source code
@property
def capabilities(self) -> TTSCapabilities:
    return self._capabilities
prop label : str
Expand source code
@property
def label(self) -> str:
    return self._label
prop num_channels : int
Expand source code
@property
def num_channels(self) -> int:
    return self._num_channels
prop sample_rate : int
Expand source code
@property
def sample_rate(self) -> int:
    return self._sample_rate

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None: ...
def stream(self,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=5.0, timeout=10.0)) ‑> SynthesizeStream
Expand source code
def stream(
    self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS
) -> SynthesizeStream:
    raise NotImplementedError(
        "streaming is not supported by this TTS, please use a different TTS or use a StreamAdapter"
    )
def synthesize(self,
text: str,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=5.0, timeout=10.0)) ‑> ChunkedStream
Expand source code
@abstractmethod
def synthesize(
    self,
    text: str,
    *,
    conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
) -> ChunkedStream: ...

Inherited members

class TTSCapabilities (streaming: bool)
Expand source code
@dataclass
class TTSCapabilities:
    streaming: bool
    """Whether this TTS supports streaming (generally using websockets)"""

TTSCapabilities(streaming: 'bool')

Class variables

var streaming : bool

Whether this TTS supports streaming (generally using websockets)