Module livekit.agents.tts
Sub-modules
livekit.agents.tts.stream_adapter
livekit.agents.tts.tts
Classes
class ChunkedStream
-
Used by the non-streamed synthesize API, some providers support chunked http responses
Expand source code
class ChunkedStream(ABC): """Used by the non-streamed synthesize API, some providers support chunked http responses""" def __init__(self): self._event_ch = aio.Chan[SynthesizedAudio]() self._task = asyncio.create_task(self._main_task()) self._task.add_done_callback(lambda _: self._event_ch.close()) async def collect(self) -> rtc.AudioFrame: """Utility method to collect every frame in a single call""" frames = [] async for ev in self: frames.append(ev.frame) return audio.merge_frames(frames) @abstractmethod async def _main_task(self) -> None: ... async def aclose(self) -> None: """Close is automatically called if the stream is completely collected""" await aio.gracefully_cancel(self._task) self._event_ch.close() async def __anext__(self) -> SynthesizedAudio: return await self._event_ch.__anext__() def __aiter__(self) -> AsyncIterator[SynthesizedAudio]: return self
Ancestors
- abc.ABC
Subclasses
- livekit.plugins.azure.tts.ChunkedStream
- livekit.plugins.cartesia.tts.ChunkedStream
- livekit.plugins.elevenlabs.tts.ChunkedStream
- livekit.plugins.google.tts.ChunkedStream
- livekit.plugins.openai.tts.ChunkedStream
Methods
async def aclose(self) ‑> None
-
Close is automatically called if the stream is completely collected
async def collect(self) ‑> AudioFrame
-
Utility method to collect every frame in a single call
class StreamAdapter (*, tts: TTS, sentence_tokenizer: tokenize.SentenceTokenizer)
-
Helper class that provides a standard way to create an ABC using inheritance.
Expand source code
class StreamAdapter(TTS): def __init__( self, *, tts: TTS, sentence_tokenizer: tokenize.SentenceTokenizer, ) -> None: super().__init__( capabilities=TTSCapabilities( streaming=True, ), sample_rate=tts.sample_rate, num_channels=tts.num_channels, ) self._tts = tts self._sentence_tokenizer = sentence_tokenizer def synthesize(self, text: str) -> ChunkedStream: return self._tts.synthesize(text=text) def stream(self) -> SynthesizeStream: return StreamAdapterWrapper( tts=self._tts, sentence_tokenizer=self._sentence_tokenizer, )
Ancestors
- TTS
- abc.ABC
Methods
def stream(self) ‑> SynthesizeStream
def synthesize(self, text: str) ‑> ChunkedStream
class StreamAdapterWrapper (*, tts: TTS, sentence_tokenizer: tokenize.SentenceTokenizer)
-
Helper class that provides a standard way to create an ABC using inheritance.
Expand source code
class StreamAdapterWrapper(SynthesizeStream): def __init__( self, *, tts: TTS, sentence_tokenizer: tokenize.SentenceTokenizer, ) -> None: super().__init__() self._tts = tts self._sent_stream = sentence_tokenizer.stream() @utils.log_exceptions(logger=logger) async def _main_task(self) -> None: async def _forward_input(): """forward input to vad""" async for input in self._input_ch: if isinstance(input, self._FlushSentinel): self._sent_stream.flush() continue self._sent_stream.push_text(input) self._sent_stream.end_input() async def _synthesize(): async for ev in self._sent_stream: async for audio in self._tts.synthesize(ev.token): self._event_ch.send_nowait(audio) tasks = [ asyncio.create_task(_forward_input()), asyncio.create_task(_synthesize()), ] try: await asyncio.gather(*tasks) finally: await utils.aio.gracefully_cancel(*tasks)
Ancestors
- SynthesizeStream
- abc.ABC
Inherited members
class SynthesizeStream
-
Helper class that provides a standard way to create an ABC using inheritance.
Expand source code
class SynthesizeStream(ABC): class _FlushSentinel: pass def __init__(self): self._input_ch = aio.Chan[Union[str, SynthesizeStream._FlushSentinel]]() self._event_ch = aio.Chan[SynthesizedAudio]() self._task = asyncio.create_task(self._main_task(), name="TTS._main_task") self._task.add_done_callback(lambda _: self._event_ch.close()) @abstractmethod async def _main_task(self) -> None: ... def push_text(self, token: str) -> None: """Push some text to be synthesized""" self._check_input_not_ended() self._check_not_closed() self._input_ch.send_nowait(token) def flush(self) -> None: """Mark the end of the current segment""" self._check_input_not_ended() self._check_not_closed() self._input_ch.send_nowait(self._FlushSentinel()) def end_input(self) -> None: """Mark the end of input, no more text will be pushed""" self.flush() self._input_ch.close() async def aclose(self) -> None: """Close ths stream immediately""" self._input_ch.close() await aio.gracefully_cancel(self._task) self._event_ch.close() def _check_not_closed(self) -> None: if self._event_ch.closed: cls = type(self) raise RuntimeError(f"{cls.__module__}.{cls.__name__} is closed") def _check_input_not_ended(self) -> None: if self._input_ch.closed: cls = type(self) raise RuntimeError(f"{cls.__module__}.{cls.__name__} input ended") async def __anext__(self) -> SynthesizedAudio: return await self._event_ch.__anext__() def __aiter__(self) -> AsyncIterator[SynthesizedAudio]: return self
Ancestors
- abc.ABC
Subclasses
- StreamAdapterWrapper
- livekit.plugins.cartesia.tts.SynthesizeStream
- livekit.plugins.elevenlabs.tts.SynthesizeStream
Methods
async def aclose(self) ‑> None
-
Close ths stream immediately
def end_input(self) ‑> None
-
Mark the end of input, no more text will be pushed
def flush(self) ‑> None
-
Mark the end of the current segment
def push_text(self, token: str) ‑> None
-
Push some text to be synthesized
class SynthesizedAudio (request_id: str, segment_id: str, frame: rtc.AudioFrame, delta_text: str = '')
-
SynthesizedAudio(request_id: 'str', segment_id: 'str', frame: 'rtc.AudioFrame', delta_text: 'str' = '')
Expand source code
@dataclass class SynthesizedAudio: request_id: str """Request ID (one segment could be made up of multiple requests)""" segment_id: str """Segment ID, each segment is separated by a flush""" frame: rtc.AudioFrame """Synthesized audio frame""" delta_text: str = "" """Current segment of the synthesized audio"""
Class variables
var delta_text : str
-
Current segment of the synthesized audio
var frame : AudioFrame
-
Synthesized audio frame
var request_id : str
-
Request ID (one segment could be made up of multiple requests)
var segment_id : str
-
Segment ID, each segment is separated by a flush
class TTS (*, capabilities: TTSCapabilities, sample_rate: int, num_channels: int)
-
Helper class that provides a standard way to create an ABC using inheritance.
Expand source code
class TTS(ABC): def __init__( self, *, capabilities: TTSCapabilities, sample_rate: int, num_channels: int ) -> None: self._capabilities = capabilities self._sample_rate = sample_rate self._num_channels = num_channels @property def capabilities(self) -> TTSCapabilities: return self._capabilities @property def sample_rate(self) -> int: return self._sample_rate @property def num_channels(self) -> int: return self._num_channels @abstractmethod def synthesize(self, text: str) -> ChunkedStream: ... def stream(self) -> SynthesizeStream: raise NotImplementedError( "streaming is not supported by this TTS, please use a different TTS or use a StreamAdapter" ) async def aclose(self) -> None: ...
Ancestors
- abc.ABC
Subclasses
- StreamAdapter
- livekit.plugins.azure.tts.TTS
- livekit.plugins.cartesia.tts.TTS
- livekit.plugins.elevenlabs.tts.TTS
- livekit.plugins.google.tts.TTS
- livekit.plugins.openai.tts.TTS
Instance variables
prop capabilities : TTSCapabilities
-
Expand source code
@property def capabilities(self) -> TTSCapabilities: return self._capabilities
prop num_channels : int
-
Expand source code
@property def num_channels(self) -> int: return self._num_channels
prop sample_rate : int
-
Expand source code
@property def sample_rate(self) -> int: return self._sample_rate
Methods
async def aclose(self) ‑> None
def stream(self) ‑> SynthesizeStream
def synthesize(self, text: str) ‑> ChunkedStream
class TTSCapabilities (streaming: bool)
-
TTSCapabilities(streaming: 'bool')
Expand source code
@dataclass class TTSCapabilities: streaming: bool
Class variables
var streaming : bool