Module livekit.agents.tts.tts
Classes
class ChunkedStream (*,
tts: TTS,
input_text: str,
conn_options: APIConnectOptions)-
Expand source code
class ChunkedStream(ABC): """Used by the non-streamed synthesize API, some providers support chunked http responses""" def __init__( self, *, tts: TTS, input_text: str, conn_options: APIConnectOptions ) -> None: self._input_text = input_text self._tts = tts self._conn_options = conn_options self._event_ch = aio.Chan[SynthesizedAudio]() self._event_aiter, monitor_aiter = aio.itertools.tee(self._event_ch, 2) self._metrics_task = asyncio.create_task( self._metrics_monitor_task(monitor_aiter), name="TTS._metrics_task" ) self._synthesize_task = asyncio.create_task( self._main_task(), name="TTS._synthesize_task" ) self._synthesize_task.add_done_callback(lambda _: self._event_ch.close()) @property def input_text(self) -> str: return self._input_text @property def done(self) -> bool: return self._synthesize_task.done() @property def exception(self) -> BaseException | None: return self._synthesize_task.exception() async def _metrics_monitor_task( self, event_aiter: AsyncIterable[SynthesizedAudio] ) -> None: """Task used to collect metrics""" start_time = time.perf_counter() audio_duration = 0.0 ttfb = -1.0 request_id = "" async for ev in event_aiter: request_id = ev.request_id if ttfb == -1.0: ttfb = time.perf_counter() - start_time audio_duration += ev.frame.duration duration = time.perf_counter() - start_time metrics = TTSMetrics( timestamp=time.time(), request_id=request_id, ttfb=ttfb, duration=duration, characters_count=len(self._input_text), audio_duration=audio_duration, cancelled=self._synthesize_task.cancelled(), label=self._tts._label, streamed=False, error=None, ) self._tts.emit("metrics_collected", metrics) async def collect(self) -> rtc.AudioFrame: """Utility method to collect every frame in a single call""" frames = [] async for ev in self: frames.append(ev.frame) return rtc.combine_audio_frames(frames) @abstractmethod async def _run(self) -> None: ... async def _main_task(self) -> None: for i in range(self._conn_options.max_retry + 1): try: return await self._run() except APIError as e: if self._conn_options.max_retry == 0: raise elif i == self._conn_options.max_retry: raise APIConnectionError( f"failed to synthesize speech after {self._conn_options.max_retry + 1} attempts", ) from e else: logger.warning( f"failed to synthesize speech, retrying in {self._conn_options.retry_interval}s", exc_info=e, extra={ "tts": self._tts._label, "attempt": i + 1, "streamed": False, }, ) await asyncio.sleep(self._conn_options.retry_interval) async def aclose(self) -> None: """Close is automatically called if the stream is completely collected""" await aio.gracefully_cancel(self._synthesize_task) self._event_ch.close() await self._metrics_task async def __anext__(self) -> SynthesizedAudio: try: val = await self._event_aiter.__anext__() except StopAsyncIteration: if not self._synthesize_task.cancelled() and ( exc := self._synthesize_task.exception() ): raise exc from None raise StopAsyncIteration return val def __aiter__(self) -> AsyncIterator[SynthesizedAudio]: return self async def __aenter__(self) -> ChunkedStream: return self async def __aexit__( self, exc_type: type[BaseException] | None, exc: BaseException | None, exc_tb: TracebackType | None, ) -> None: await self.aclose()
Used by the non-streamed synthesize API, some providers support chunked http responses
Ancestors
- abc.ABC
Subclasses
- FallbackChunkedStream
- livekit.plugins.azure.tts.ChunkedStream
- livekit.plugins.cartesia.tts.ChunkedStream
- livekit.plugins.deepgram.tts.ChunkedStream
- livekit.plugins.elevenlabs.tts.ChunkedStream
- livekit.plugins.google.tts.ChunkedStream
- livekit.plugins.openai.tts.ChunkedStream
Instance variables
prop done : bool
-
Expand source code
@property def done(self) -> bool: return self._synthesize_task.done()
prop exception : BaseException | None
-
Expand source code
@property def exception(self) -> BaseException | None: return self._synthesize_task.exception()
prop input_text : str
-
Expand source code
@property def input_text(self) -> str: return self._input_text
Methods
async def aclose(self) ‑> None
-
Expand source code
async def aclose(self) -> None: """Close is automatically called if the stream is completely collected""" await aio.gracefully_cancel(self._synthesize_task) self._event_ch.close() await self._metrics_task
Close is automatically called if the stream is completely collected
async def collect(self) ‑> AudioFrame
-
Expand source code
async def collect(self) -> rtc.AudioFrame: """Utility method to collect every frame in a single call""" frames = [] async for ev in self: frames.append(ev.frame) return rtc.combine_audio_frames(frames)
Utility method to collect every frame in a single call
class SynthesizeStream (*,
tts: TTS,
conn_options: APIConnectOptions)-
Expand source code
class SynthesizeStream(ABC): class _FlushSentinel: ... def __init__(self, *, tts: TTS, conn_options: APIConnectOptions) -> None: super().__init__() self._tts = tts self._conn_options = conn_options self._input_ch = aio.Chan[Union[str, SynthesizeStream._FlushSentinel]]() self._event_ch = aio.Chan[SynthesizedAudio]() self._event_aiter, self._monitor_aiter = aio.itertools.tee(self._event_ch, 2) self._task = asyncio.create_task(self._main_task(), name="TTS._main_task") self._task.add_done_callback(lambda _: self._event_ch.close()) self._metrics_task: asyncio.Task | None = None # started on first push # used to track metrics self._mtc_pending_texts: list[str] = [] self._mtc_text = "" @abstractmethod async def _run(self) -> None: ... async def _main_task(self) -> None: for i in range(self._conn_options.max_retry + 1): try: return await self._run() except APIError as e: if self._conn_options.max_retry == 0: raise elif i == self._conn_options.max_retry: raise APIConnectionError( f"failed to synthesize speech after {self._conn_options.max_retry + 1} attempts", ) from e else: logger.warning( f"failed to synthesize speech, retrying in {self._conn_options.retry_interval}s", exc_info=e, extra={ "tts": self._tts._label, "attempt": i + 1, "streamed": True, }, ) await asyncio.sleep(self._conn_options.retry_interval) async def _metrics_monitor_task( self, event_aiter: AsyncIterable[SynthesizedAudio] ) -> None: """Task used to collect metrics""" start_time = time.perf_counter() audio_duration = 0.0 ttfb = -1.0 request_id = "" def _emit_metrics(): nonlocal start_time, audio_duration, ttfb, request_id duration = time.perf_counter() - start_time if not self._mtc_pending_texts: return text = self._mtc_pending_texts.pop(0) if not text: return metrics = TTSMetrics( timestamp=time.time(), request_id=request_id, ttfb=ttfb, duration=duration, characters_count=len(text), audio_duration=audio_duration, cancelled=self._task.cancelled(), label=self._tts._label, streamed=True, error=None, ) self._tts.emit("metrics_collected", metrics) audio_duration = 0.0 ttfb = -1.0 request_id = "" start_time = time.perf_counter() async for ev in event_aiter: if ttfb == -1.0: ttfb = time.perf_counter() - start_time audio_duration += ev.frame.duration request_id = ev.request_id if ev.is_final: _emit_metrics() if request_id: _emit_metrics() def push_text(self, token: str) -> None: """Push some text to be synthesized""" if self._metrics_task is None: self._metrics_task = asyncio.create_task( self._metrics_monitor_task(self._monitor_aiter), name="TTS._metrics_task", ) self._mtc_text += token self._check_input_not_ended() self._check_not_closed() self._input_ch.send_nowait(token) def flush(self) -> None: """Mark the end of the current segment""" if self._mtc_text: self._mtc_pending_texts.append(self._mtc_text) self._mtc_text = "" self._check_input_not_ended() self._check_not_closed() self._input_ch.send_nowait(self._FlushSentinel()) def end_input(self) -> None: """Mark the end of input, no more text will be pushed""" self.flush() self._input_ch.close() async def aclose(self) -> None: """Close ths stream immediately""" self._input_ch.close() await aio.gracefully_cancel(self._task) if self._metrics_task is not None: await self._metrics_task def _check_not_closed(self) -> None: if self._event_ch.closed: cls = type(self) raise RuntimeError(f"{cls.__module__}.{cls.__name__} is closed") def _check_input_not_ended(self) -> None: if self._input_ch.closed: cls = type(self) raise RuntimeError(f"{cls.__module__}.{cls.__name__} input ended") async def __anext__(self) -> SynthesizedAudio: try: val = await self._event_aiter.__anext__() except StopAsyncIteration: if not self._task.cancelled() and (exc := self._task.exception()): raise exc from None raise StopAsyncIteration return val def __aiter__(self) -> AsyncIterator[SynthesizedAudio]: return self async def __aenter__(self) -> SynthesizeStream: return self async def __aexit__( self, exc_type: type[BaseException] | None, exc: BaseException | None, exc_tb: TracebackType | None, ) -> None: await self.aclose()
Helper class that provides a standard way to create an ABC using inheritance.
Ancestors
- abc.ABC
Subclasses
- FallbackSynthesizeStream
- StreamAdapterWrapper
- livekit.plugins.cartesia.tts.SynthesizeStream
- livekit.plugins.deepgram.tts.SynthesizeStream
- livekit.plugins.elevenlabs.tts.SynthesizeStream
Methods
async def aclose(self) ‑> None
-
Expand source code
async def aclose(self) -> None: """Close ths stream immediately""" self._input_ch.close() await aio.gracefully_cancel(self._task) if self._metrics_task is not None: await self._metrics_task
Close ths stream immediately
def end_input(self) ‑> None
-
Expand source code
def end_input(self) -> None: """Mark the end of input, no more text will be pushed""" self.flush() self._input_ch.close()
Mark the end of input, no more text will be pushed
def flush(self) ‑> None
-
Expand source code
def flush(self) -> None: """Mark the end of the current segment""" if self._mtc_text: self._mtc_pending_texts.append(self._mtc_text) self._mtc_text = "" self._check_input_not_ended() self._check_not_closed() self._input_ch.send_nowait(self._FlushSentinel())
Mark the end of the current segment
def push_text(self, token: str) ‑> None
-
Expand source code
def push_text(self, token: str) -> None: """Push some text to be synthesized""" if self._metrics_task is None: self._metrics_task = asyncio.create_task( self._metrics_monitor_task(self._monitor_aiter), name="TTS._metrics_task", ) self._mtc_text += token self._check_input_not_ended() self._check_not_closed() self._input_ch.send_nowait(token)
Push some text to be synthesized
class SynthesizedAudio (frame: rtc.AudioFrame,
request_id: str,
is_final: bool = False,
segment_id: str = '',
delta_text: str = '')-
Expand source code
@dataclass class SynthesizedAudio: frame: rtc.AudioFrame """Synthesized audio frame""" request_id: str """Request ID (one segment could be made up of multiple requests)""" is_final: bool = False """Whether this is latest frame of the segment (streaming only)""" segment_id: str = "" """Segment ID, each segment is separated by a flush (streaming only)""" delta_text: str = "" """Current segment of the synthesized audio (streaming only)"""
SynthesizedAudio(frame: 'rtc.AudioFrame', request_id: 'str', is_final: 'bool' = False, segment_id: 'str' = '', delta_text: 'str' = '')
Class variables
var delta_text : str
-
Current segment of the synthesized audio (streaming only)
var frame : AudioFrame
-
Synthesized audio frame
var is_final : bool
-
Whether this is latest frame of the segment (streaming only)
var request_id : str
-
Request ID (one segment could be made up of multiple requests)
var segment_id : str
-
Segment ID, each segment is separated by a flush (streaming only)
class TTS (*,
capabilities: TTSCapabilities,
sample_rate: int,
num_channels: int)-
Expand source code
class TTS( ABC, rtc.EventEmitter[Union[Literal["metrics_collected"], TEvent]], Generic[TEvent], ): def __init__( self, *, capabilities: TTSCapabilities, sample_rate: int, num_channels: int ) -> None: super().__init__() self._capabilities = capabilities self._sample_rate = sample_rate self._num_channels = num_channels self._label = f"{type(self).__module__}.{type(self).__name__}" @property def label(self) -> str: return self._label @property def capabilities(self) -> TTSCapabilities: return self._capabilities @property def sample_rate(self) -> int: return self._sample_rate @property def num_channels(self) -> int: return self._num_channels @abstractmethod def synthesize( self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> ChunkedStream: ... def stream( self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS ) -> SynthesizeStream: raise NotImplementedError( "streaming is not supported by this TTS, please use a different TTS or use a StreamAdapter" ) async def aclose(self) -> None: ... async def __aenter__(self) -> TTS: return self async def __aexit__( self, exc_type: type[BaseException] | None, exc: BaseException | None, exc_tb: TracebackType | None, ) -> None: await self.aclose()
Helper class that provides a standard way to create an ABC using inheritance.
Ancestors
- abc.ABC
- EventEmitter
- typing.Generic
Subclasses
- FallbackAdapter
- StreamAdapter
- livekit.plugins.azure.tts.TTS
- livekit.plugins.cartesia.tts.TTS
- livekit.plugins.deepgram.tts.TTS
- livekit.plugins.elevenlabs.tts.TTS
- livekit.plugins.google.tts.TTS
- livekit.plugins.openai.tts.TTS
Instance variables
prop capabilities : TTSCapabilities
-
Expand source code
@property def capabilities(self) -> TTSCapabilities: return self._capabilities
prop label : str
-
Expand source code
@property def label(self) -> str: return self._label
prop num_channels : int
-
Expand source code
@property def num_channels(self) -> int: return self._num_channels
prop sample_rate : int
-
Expand source code
@property def sample_rate(self) -> int: return self._sample_rate
Methods
async def aclose(self) ‑> None
-
Expand source code
async def aclose(self) -> None: ...
def stream(self,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=5.0, timeout=10.0)) ‑> SynthesizeStream-
Expand source code
def stream( self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS ) -> SynthesizeStream: raise NotImplementedError( "streaming is not supported by this TTS, please use a different TTS or use a StreamAdapter" )
def synthesize(self,
text: str,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=5.0, timeout=10.0)) ‑> ChunkedStream-
Expand source code
@abstractmethod def synthesize( self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS, ) -> ChunkedStream: ...
Inherited members
class TTSCapabilities (streaming: bool)
-
Expand source code
@dataclass class TTSCapabilities: streaming: bool """Whether this TTS supports streaming (generally using websockets)"""
TTSCapabilities(streaming: 'bool')
Class variables
var streaming : bool
-
Whether this TTS supports streaming (generally using websockets)