Module livekit.agents.tts.stream_adapter

Classes

class StreamAdapter (*, tts: TTS, sentence_tokenizer: tokenize.SentenceTokenizer)

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code
class StreamAdapter(TTS):
    def __init__(
        self,
        *,
        tts: TTS,
        sentence_tokenizer: tokenize.SentenceTokenizer,
    ) -> None:
        super().__init__(
            capabilities=TTSCapabilities(
                streaming=True,
            ),
            sample_rate=tts.sample_rate,
            num_channels=tts.num_channels,
        )
        self._tts = tts
        self._sentence_tokenizer = sentence_tokenizer

    def synthesize(self, text: str) -> ChunkedStream:
        return self._tts.synthesize(text=text)

    def stream(self) -> SynthesizeStream:
        return StreamAdapterWrapper(
            tts=self._tts,
            sentence_tokenizer=self._sentence_tokenizer,
        )

Ancestors

Methods

def stream(self) ‑> SynthesizeStream
def synthesize(self, text: str) ‑> ChunkedStream
class StreamAdapterWrapper (*, tts: TTS, sentence_tokenizer: tokenize.SentenceTokenizer)

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code
class StreamAdapterWrapper(SynthesizeStream):
    def __init__(
        self,
        *,
        tts: TTS,
        sentence_tokenizer: tokenize.SentenceTokenizer,
    ) -> None:
        super().__init__()
        self._tts = tts
        self._sent_stream = sentence_tokenizer.stream()

    @utils.log_exceptions(logger=logger)
    async def _main_task(self) -> None:
        async def _forward_input():
            """forward input to vad"""
            async for input in self._input_ch:
                if isinstance(input, self._FlushSentinel):
                    self._sent_stream.flush()
                    continue
                self._sent_stream.push_text(input)

            self._sent_stream.end_input()

        async def _synthesize():
            async for ev in self._sent_stream:
                async for audio in self._tts.synthesize(ev.token):
                    self._event_ch.send_nowait(audio)

        tasks = [
            asyncio.create_task(_forward_input()),
            asyncio.create_task(_synthesize()),
        ]
        try:
            await asyncio.gather(*tasks)
        finally:
            await utils.aio.gracefully_cancel(*tasks)

Ancestors

Inherited members