Module livekit.agents.tts

Sub-modules

livekit.agents.tts.stream_adapter
livekit.agents.tts.tts

Classes

class ChunkedStream (tts: TTS, text: str)

Used by the non-streamed synthesize API, some providers support chunked http responses

Expand source code
class ChunkedStream(ABC):
    """Used by the non-streamed synthesize API, some providers support chunked http responses"""

    def __init__(self, tts: TTS, text: str) -> None:
        self._event_ch = aio.Chan[SynthesizedAudio]()
        self._tts = tts
        self._input_text = text

        self._event_aiter, monitor_aiter = aio.itertools.tee(self._event_ch, 2)
        self._metrics_task = asyncio.create_task(
            self._metrics_monitor_task(monitor_aiter), name="TTS._metrics_task"
        )

        self._task = asyncio.create_task(self._main_task())
        self._task.add_done_callback(lambda _: self._event_ch.close())

    async def _metrics_monitor_task(
        self, event_aiter: AsyncIterable[SynthesizedAudio]
    ) -> None:
        """Task used to collect metrics"""

        start_time = time.perf_counter()
        audio_duration = 0.0
        ttfb = -1.0
        request_id = ""

        async for ev in event_aiter:
            request_id = ev.request_id
            if ttfb == -1.0:
                ttfb = time.perf_counter() - start_time

            audio_duration += ev.frame.duration

        duration = time.perf_counter() - start_time
        metrics = TTSMetrics(
            timestamp=time.time(),
            request_id=request_id,
            ttfb=ttfb,
            duration=duration,
            characters_count=len(self._input_text),
            audio_duration=audio_duration,
            cancelled=self._task.cancelled(),
            label=self._tts._label,
            streamed=False,
            error=None,
        )
        self._tts.emit("metrics_collected", metrics)

    async def collect(self) -> rtc.AudioFrame:
        """Utility method to collect every frame in a single call"""
        frames = []
        async for ev in self:
            frames.append(ev.frame)
        return audio.merge_frames(frames)

    @abstractmethod
    async def _main_task(self) -> None: ...

    async def aclose(self) -> None:
        """Close is automatically called if the stream is completely collected"""
        await aio.gracefully_cancel(self._task)
        self._event_ch.close()
        await self._metrics_task

    async def __anext__(self) -> SynthesizedAudio:
        try:
            val = await self._event_aiter.__anext__()
        except StopAsyncIteration:
            if self._task.done() and (exc := self._task.exception()):
                raise exc from None

            raise StopAsyncIteration

        return val

    def __aiter__(self) -> AsyncIterator[SynthesizedAudio]:
        return self

Ancestors

  • abc.ABC

Subclasses

  • livekit.plugins.azure.tts.ChunkedStream
  • livekit.plugins.cartesia.tts.ChunkedStream
  • livekit.plugins.elevenlabs.tts.ChunkedStream
  • livekit.plugins.google.tts.ChunkedStream
  • livekit.plugins.openai.tts.ChunkedStream

Methods

async def aclose(self) ‑> None

Close is automatically called if the stream is completely collected

async def collect(self) ‑> AudioFrame

Utility method to collect every frame in a single call

class StreamAdapter (*, tts: TTS, sentence_tokenizer: tokenize.SentenceTokenizer)

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code
class StreamAdapter(TTS):
    def __init__(
        self,
        *,
        tts: TTS,
        sentence_tokenizer: tokenize.SentenceTokenizer,
    ) -> None:
        super().__init__(
            capabilities=TTSCapabilities(
                streaming=True,
            ),
            sample_rate=tts.sample_rate,
            num_channels=tts.num_channels,
        )
        self._tts = tts
        self._sentence_tokenizer = sentence_tokenizer

        @self._tts.on("metrics_collected")
        def _forward_metrics(*args, **kwargs):
            self.emit("metrics_collected", *args, **kwargs)

    def synthesize(self, text: str) -> ChunkedStream:
        return self._tts.synthesize(text=text)

    def stream(self) -> SynthesizeStream:
        return StreamAdapterWrapper(
            self,
            wrapped_tts=self._tts,
            sentence_tokenizer=self._sentence_tokenizer,
        )

Ancestors

Methods

def stream(self) ‑> SynthesizeStream
def synthesize(self, text: str) ‑> ChunkedStream

Inherited members

class StreamAdapterWrapper (tts: TTS, *, wrapped_tts: TTS, sentence_tokenizer: tokenize.SentenceTokenizer)

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code
class StreamAdapterWrapper(SynthesizeStream):
    def __init__(
        self,
        tts: TTS,
        *,
        wrapped_tts: TTS,
        sentence_tokenizer: tokenize.SentenceTokenizer,
    ) -> None:
        super().__init__(tts)
        self._wrapped_tts = wrapped_tts
        self._sent_stream = sentence_tokenizer.stream()

    async def _metrics_monitor_task(
        self, event_aiter: AsyncIterable[SynthesizedAudio]
    ) -> None:
        pass  # do nothing

    @utils.log_exceptions(logger=logger)
    async def _main_task(self) -> None:
        async def _forward_input():
            """forward input to vad"""
            async for input in self._input_ch:
                if isinstance(input, self._FlushSentinel):
                    self._sent_stream.flush()
                    continue
                self._sent_stream.push_text(input)

            self._sent_stream.end_input()

        async def _synthesize():
            async for ev in self._sent_stream:
                async for audio in self._wrapped_tts.synthesize(ev.token):
                    self._event_ch.send_nowait(audio)

        tasks = [
            asyncio.create_task(_forward_input()),
            asyncio.create_task(_synthesize()),
        ]
        try:
            await asyncio.gather(*tasks)
        finally:
            await utils.aio.gracefully_cancel(*tasks)

Ancestors

Inherited members

class SynthesizeStream (tts: TTS)

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code
class SynthesizeStream(ABC):
    class _FlushSentinel: ...

    def __init__(self, tts: TTS) -> None:
        self._tts = tts
        self._input_ch = aio.Chan[Union[str, SynthesizeStream._FlushSentinel]]()
        self._event_ch = aio.Chan[SynthesizedAudio]()
        self._event_aiter, self._monitor_aiter = aio.itertools.tee(self._event_ch, 2)

        self._task = asyncio.create_task(self._main_task(), name="TTS._main_task")
        self._task.add_done_callback(lambda _: self._event_ch.close())
        self._metrics_task: asyncio.Task | None = None  # started on first push

        # used to track metrics
        self._mtc_pending_texts: list[str] = []
        self._mtc_text = ""

    @abstractmethod
    async def _main_task(self) -> None: ...

    async def _metrics_monitor_task(
        self, event_aiter: AsyncIterable[SynthesizedAudio]
    ) -> None:
        """Task used to collect metrics"""
        start_time = time.perf_counter()
        audio_duration = 0.0
        ttfb = -1.0
        request_id = ""

        def _emit_metrics():
            nonlocal start_time, audio_duration, ttfb, request_id
            duration = time.perf_counter() - start_time

            if not self._mtc_pending_texts:
                return

            text = self._mtc_pending_texts.pop(0)
            if not text:
                return

            metrics = TTSMetrics(
                timestamp=time.time(),
                request_id=request_id,
                ttfb=ttfb,
                duration=duration,
                characters_count=len(text),
                audio_duration=audio_duration,
                cancelled=self._task.cancelled(),
                label=self._tts._label,
                streamed=True,
                error=None,
            )
            self._tts.emit("metrics_collected", metrics)

            audio_duration = 0.0
            ttfb = -1.0
            request_id = ""
            start_time = time.perf_counter()

        async for ev in event_aiter:
            if ttfb == -1.0:
                ttfb = time.perf_counter() - start_time

            audio_duration += ev.frame.duration
            request_id = ev.request_id

            if ev.is_final:
                _emit_metrics()

        if request_id:
            _emit_metrics()

    def push_text(self, token: str) -> None:
        """Push some text to be synthesized"""
        if self._metrics_task is None:
            self._metrics_task = asyncio.create_task(
                self._metrics_monitor_task(self._monitor_aiter),
                name="TTS._metrics_task",
            )

        self._mtc_text += token
        self._check_input_not_ended()
        self._check_not_closed()
        self._input_ch.send_nowait(token)

    def flush(self) -> None:
        """Mark the end of the current segment"""
        if self._mtc_text:
            self._mtc_pending_texts.append(self._mtc_text)
            self._mtc_text = ""

        self._check_input_not_ended()
        self._check_not_closed()
        self._input_ch.send_nowait(self._FlushSentinel())

    def end_input(self) -> None:
        """Mark the end of input, no more text will be pushed"""
        self.flush()
        self._input_ch.close()

    async def aclose(self) -> None:
        """Close ths stream immediately"""
        self._input_ch.close()
        await aio.gracefully_cancel(self._task)

        if self._metrics_task is not None:
            await self._metrics_task

    def _check_not_closed(self) -> None:
        if self._event_ch.closed:
            cls = type(self)
            raise RuntimeError(f"{cls.__module__}.{cls.__name__} is closed")

    def _check_input_not_ended(self) -> None:
        if self._input_ch.closed:
            cls = type(self)
            raise RuntimeError(f"{cls.__module__}.{cls.__name__} input ended")

    async def __anext__(self) -> SynthesizedAudio:
        try:
            val = await self._event_aiter.__anext__()
        except StopAsyncIteration:
            if self._task.done() and (exc := self._task.exception()):
                raise exc from None

            raise StopAsyncIteration

        return val

    def __aiter__(self) -> AsyncIterator[SynthesizedAudio]:
        return self

Ancestors

  • abc.ABC

Subclasses

  • StreamAdapterWrapper
  • livekit.plugins.cartesia.tts.SynthesizeStream
  • livekit.plugins.elevenlabs.tts.SynthesizeStream

Methods

async def aclose(self) ‑> None

Close ths stream immediately

def end_input(self) ‑> None

Mark the end of input, no more text will be pushed

def flush(self) ‑> None

Mark the end of the current segment

def push_text(self, token: str) ‑> None

Push some text to be synthesized

class SynthesizedAudio (frame: rtc.AudioFrame, request_id: str, is_final: bool = False, segment_id: str = '', delta_text: str = '')

SynthesizedAudio(frame: 'rtc.AudioFrame', request_id: 'str', is_final: 'bool' = False, segment_id: 'str' = '', delta_text: 'str' = '')

Expand source code
@dataclass
class SynthesizedAudio:
    frame: rtc.AudioFrame
    """Synthesized audio frame"""
    request_id: str
    """Request ID (one segment could be made up of multiple requests)"""
    is_final: bool = False
    """Whether this is latest frame of the segment (streaming only)"""
    segment_id: str = ""
    """Segment ID, each segment is separated by a flush (streaming only)"""
    delta_text: str = ""
    """Current segment of the synthesized audio (streaming only)"""

Class variables

var delta_text : str

Current segment of the synthesized audio (streaming only)

var frameAudioFrame

Synthesized audio frame

var is_final : bool

Whether this is latest frame of the segment (streaming only)

var request_id : str

Request ID (one segment could be made up of multiple requests)

var segment_id : str

Segment ID, each segment is separated by a flush (streaming only)

class TTS (*, capabilities: TTSCapabilities, sample_rate: int, num_channels: int)

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code
class TTS(ABC, rtc.EventEmitter[Literal["metrics_collected"]]):
    def __init__(
        self, *, capabilities: TTSCapabilities, sample_rate: int, num_channels: int
    ) -> None:
        super().__init__()
        self._capabilities = capabilities
        self._sample_rate = sample_rate
        self._num_channels = num_channels
        self._label = f"{type(self).__module__}.{type(self).__name__}"

    @property
    def capabilities(self) -> TTSCapabilities:
        return self._capabilities

    @property
    def sample_rate(self) -> int:
        return self._sample_rate

    @property
    def num_channels(self) -> int:
        return self._num_channels

    @abstractmethod
    def synthesize(self, text: str) -> ChunkedStream: ...

    def stream(self) -> SynthesizeStream:
        raise NotImplementedError(
            "streaming is not supported by this TTS, please use a different TTS or use a StreamAdapter"
        )

    async def aclose(self) -> None: ...

Ancestors

Subclasses

  • StreamAdapter
  • livekit.plugins.azure.tts.TTS
  • livekit.plugins.cartesia.tts.TTS
  • livekit.plugins.elevenlabs.tts.TTS
  • livekit.plugins.google.tts.TTS
  • livekit.plugins.openai.tts.TTS

Instance variables

prop capabilitiesTTSCapabilities
Expand source code
@property
def capabilities(self) -> TTSCapabilities:
    return self._capabilities
prop num_channels : int
Expand source code
@property
def num_channels(self) -> int:
    return self._num_channels
prop sample_rate : int
Expand source code
@property
def sample_rate(self) -> int:
    return self._sample_rate

Methods

async def aclose(self) ‑> None
def stream(self) ‑> SynthesizeStream
def synthesize(self, text: str) ‑> ChunkedStream

Inherited members

class TTSCapabilities (streaming: bool)

TTSCapabilities(streaming: 'bool')

Expand source code
@dataclass
class TTSCapabilities:
    streaming: bool

Class variables

var streaming : bool