Module livekit.agents.tts
Sub-modules
livekit.agents.tts.fallback_adapter
livekit.agents.tts.stream_adapter
livekit.agents.tts.tts
Classes
class AvailabilityChangedEvent (tts: TTS,
available: bool)-
Expand source code
@dataclass class AvailabilityChangedEvent: tts: TTS available: bool
AvailabilityChangedEvent(tts: 'TTS', available: 'bool')
Instance variables
var available : bool
var tts : TTS
class ChunkedStream (*,
tts: TTS,
input_text: str,
conn_options: Optional[APIConnectOptions] = None)-
Expand source code
class ChunkedStream(ABC): """Used by the non-streamed synthesize API, some providers support chunked http responses""" def __init__( self, *, tts: TTS, input_text: str, conn_options: Optional[APIConnectOptions] = None, ) -> None: self._input_text = input_text self._tts = tts self._conn_options = conn_options or DEFAULT_API_CONNECT_OPTIONS self._event_ch = aio.Chan[SynthesizedAudio]() self._event_aiter, monitor_aiter = aio.itertools.tee(self._event_ch, 2) self._metrics_task = asyncio.create_task( self._metrics_monitor_task(monitor_aiter), name="TTS._metrics_task" ) self._synthesize_task = asyncio.create_task( self._main_task(), name="TTS._synthesize_task" ) self._synthesize_task.add_done_callback(lambda _: self._event_ch.close()) @property def input_text(self) -> str: return self._input_text @property def done(self) -> bool: return self._synthesize_task.done() @property def exception(self) -> BaseException | None: return self._synthesize_task.exception() async def _metrics_monitor_task( self, event_aiter: AsyncIterable[SynthesizedAudio] ) -> None: """Task used to collect metrics""" start_time = time.perf_counter() audio_duration = 0.0 ttfb = -1.0 request_id = "" async for ev in event_aiter: request_id = ev.request_id if ttfb == -1.0: ttfb = time.perf_counter() - start_time audio_duration += ev.frame.duration duration = time.perf_counter() - start_time metrics = TTSMetrics( timestamp=time.time(), request_id=request_id, ttfb=ttfb, duration=duration, characters_count=len(self._input_text), audio_duration=audio_duration, cancelled=self._synthesize_task.cancelled(), label=self._tts._label, streamed=False, error=None, ) self._tts.emit("metrics_collected", metrics) async def collect(self) -> rtc.AudioFrame: """Utility method to collect every frame in a single call""" frames = [] async for ev in self: frames.append(ev.frame) return rtc.combine_audio_frames(frames) @abstractmethod async def _run(self) -> None: ... async def _main_task(self) -> None: for i in range(self._conn_options.max_retry + 1): try: return await self._run() except APIError as e: retry_interval = self._conn_options._interval_for_retry(i) if self._conn_options.max_retry == 0: raise elif i == self._conn_options.max_retry: raise APIConnectionError( f"failed to synthesize speech after {self._conn_options.max_retry + 1} attempts", ) from e else: logger.warning( f"failed to synthesize speech, retrying in {retry_interval}s", exc_info=e, extra={ "tts": self._tts._label, "attempt": i + 1, "streamed": False, }, ) await asyncio.sleep(retry_interval) async def aclose(self) -> None: """Close is automatically called if the stream is completely collected""" await aio.gracefully_cancel(self._synthesize_task) self._event_ch.close() await self._metrics_task async def __anext__(self) -> SynthesizedAudio: try: val = await self._event_aiter.__anext__() except StopAsyncIteration: if not self._synthesize_task.cancelled() and ( exc := self._synthesize_task.exception() ): raise exc from None raise StopAsyncIteration return val def __aiter__(self) -> AsyncIterator[SynthesizedAudio]: return self async def __aenter__(self) -> ChunkedStream: return self async def __aexit__( self, exc_type: type[BaseException] | None, exc: BaseException | None, exc_tb: TracebackType | None, ) -> None: await self.aclose()
Used by the non-streamed synthesize API, some providers support chunked http responses
Ancestors
- abc.ABC
Subclasses
- FallbackChunkedStream
- ChunkedStream
- livekit.plugins.azure.tts.ChunkedStream
- livekit.plugins.cartesia.tts.ChunkedStream
- livekit.plugins.deepgram.tts.ChunkedStream
- livekit.plugins.elevenlabs.tts.ChunkedStream
- livekit.plugins.google.tts.ChunkedStream
- ChunkedStream
- livekit.plugins.neuphonic.tts.ChunkedStream
- livekit.plugins.openai.tts.ChunkedStream
- livekit.plugins.playai.tts.ChunkedStream
- livekit.plugins.resemble.tts.ChunkedStream
- livekit.plugins.rime.tts.ChunkedStream
Instance variables
prop done : bool
-
Expand source code
@property def done(self) -> bool: return self._synthesize_task.done()
prop exception : BaseException | None
-
Expand source code
@property def exception(self) -> BaseException | None: return self._synthesize_task.exception()
prop input_text : str
-
Expand source code
@property def input_text(self) -> str: return self._input_text
Methods
async def aclose(self) ‑> None
-
Expand source code
async def aclose(self) -> None: """Close is automatically called if the stream is completely collected""" await aio.gracefully_cancel(self._synthesize_task) self._event_ch.close() await self._metrics_task
Close is automatically called if the stream is completely collected
async def collect(self) ‑> AudioFrame
-
Expand source code
async def collect(self) -> rtc.AudioFrame: """Utility method to collect every frame in a single call""" frames = [] async for ev in self: frames.append(ev.frame) return rtc.combine_audio_frames(frames)
Utility method to collect every frame in a single call
class FallbackAdapter (tts: list[TTS],
*,
attempt_timeout: float = 10.0,
max_retry_per_tts: int = 1,
retry_interval: float = 5,
no_fallback_after_audio_duration: float | None = 3.0,
sample_rate: int | None = None)-
Expand source code
class FallbackAdapter( TTS[Literal["tts_availability_changed"]], ): """ Manages multiple TTS instances, providing a fallback mechanism to ensure continuous TTS service. """ def __init__( self, tts: list[TTS], *, attempt_timeout: float = 10.0, max_retry_per_tts: int = 1, # only retry once by default retry_interval: float = 5, no_fallback_after_audio_duration: float | None = 3.0, sample_rate: int | None = None, ) -> None: """ Initialize a FallbackAdapter that manages multiple TTS instances. Args: tts (list[TTS]): A list of TTS instances to use for fallback. attempt_timeout (float, optional): Timeout for each synthesis attempt in seconds. Defaults to 10.0. max_retry_per_tts (int, optional): Maximum number of retries per TTS instance. Defaults to 1. no_fallback_after_audio_duration (float | None, optional): Disables fallback after this duration of audio is synthesized. Defaults to 3.0. This is used to prevent unnaturally resaying the same text when the first TTS instance fails. sample_rate (int | None, optional): Desired sample rate for the synthesized audio. If None, uses the maximum sample rate among the TTS instances. Raises: ValueError: If less than one TTS instance is provided. ValueError: If TTS instances have different numbers of channels. """ if len(tts) < 1: raise ValueError("at least one TTS instance must be provided.") if len(set(t.num_channels for t in tts)) != 1: raise ValueError("all TTS must have the same number of channels") if sample_rate is None: sample_rate = max(t.sample_rate for t in tts) num_channels = tts[0].num_channels super().__init__( capabilities=TTSCapabilities( streaming=all(t.capabilities.streaming for t in tts), ), sample_rate=sample_rate, num_channels=num_channels, ) self._tts_instances = tts self._attempt_timeout = attempt_timeout self._max_retry_per_tts = max_retry_per_tts self._retry_interval = retry_interval self._no_fallback_after_audio_duration = no_fallback_after_audio_duration self._status: list[_TTSStatus] = [] for t in tts: resampler = None if sample_rate != t.sample_rate: logger.info( f"resampling {t.label} from {t.sample_rate}Hz to {sample_rate}Hz" ) resampler = rtc.AudioResampler( input_rate=t.sample_rate, output_rate=sample_rate ) self._status.append( _TTSStatus(available=True, recovering_task=None, resampler=resampler) ) def synthesize( self, text: str, *, conn_options: Optional[APIConnectOptions] = None, ) -> "FallbackChunkedStream": return FallbackChunkedStream( tts=self, input_text=text, conn_options=conn_options or DEFAULT_FALLBACK_API_CONNECT_OPTIONS, ) def stream( self, *, conn_options: Optional[APIConnectOptions] = None, ) -> "FallbackSynthesizeStream": return FallbackSynthesizeStream( tts=self, conn_options=conn_options or DEFAULT_FALLBACK_API_CONNECT_OPTIONS, ) def prewarm(self) -> None: if self._tts_instances: self._tts_instances[0].prewarm() async def aclose(self) -> None: for tts_status in self._status: if tts_status.recovering_task is not None: await aio.gracefully_cancel(tts_status.recovering_task)
Manages multiple TTS instances, providing a fallback mechanism to ensure continuous TTS service.
Initialize a FallbackAdapter that manages multiple TTS instances.
Args
tts
:list[TTS]
- A list of TTS instances to use for fallback.
attempt_timeout
:float
, optional- Timeout for each synthesis attempt in seconds. Defaults to 10.0.
max_retry_per_tts
:int
, optional- Maximum number of retries per TTS instance. Defaults to 1.
no_fallback_after_audio_duration
:float | None
, optional- Disables fallback after this duration of audio is synthesized. Defaults to 3.0. This is used to prevent unnaturally resaying the same text when the first TTS instance fails.
sample_rate
:int | None
, optional- Desired sample rate for the synthesized audio. If None, uses the maximum sample rate among the TTS instances.
Raises
ValueError
- If less than one TTS instance is provided.
ValueError
- If TTS instances have different numbers of channels.
Ancestors
- TTS
- abc.ABC
- EventEmitter
- typing.Generic
Methods
async def aclose(self) ‑> None
-
Expand source code
async def aclose(self) -> None: for tts_status in self._status: if tts_status.recovering_task is not None: await aio.gracefully_cancel(tts_status.recovering_task)
def stream(self, *, conn_options: Optional[APIConnectOptions] = None) ‑> FallbackSynthesizeStream
-
Expand source code
def stream( self, *, conn_options: Optional[APIConnectOptions] = None, ) -> "FallbackSynthesizeStream": return FallbackSynthesizeStream( tts=self, conn_options=conn_options or DEFAULT_FALLBACK_API_CONNECT_OPTIONS, )
def synthesize(self, text: str, *, conn_options: Optional[APIConnectOptions] = None) ‑> FallbackChunkedStream
-
Expand source code
def synthesize( self, text: str, *, conn_options: Optional[APIConnectOptions] = None, ) -> "FallbackChunkedStream": return FallbackChunkedStream( tts=self, input_text=text, conn_options=conn_options or DEFAULT_FALLBACK_API_CONNECT_OPTIONS, )
Inherited members
class FallbackChunkedStream (*,
tts: FallbackAdapter,
input_text: str,
conn_options: Optional[APIConnectOptions])-
Expand source code
class FallbackChunkedStream(ChunkedStream): def __init__( self, *, tts: FallbackAdapter, input_text: str, conn_options: Optional[APIConnectOptions], ) -> None: super().__init__(tts=tts, input_text=input_text, conn_options=conn_options) self._fallback_adapter = tts async def _try_synthesize( self, *, tts: TTS, recovering: bool = False ) -> AsyncGenerator[SynthesizedAudio, None]: try: audio_duration = 0.0 async with tts.synthesize( self._input_text, conn_options=dataclasses.replace( self._conn_options, max_retry=self._fallback_adapter._max_retry_per_tts, timeout=self._fallback_adapter._attempt_timeout, retry_interval=self._fallback_adapter._retry_interval, ), ) as stream: while True: try: audio = await asyncio.wait_for( stream.__anext__(), self._fallback_adapter._attempt_timeout if audio_duration == 0.0 else None, ) audio_duration += audio.frame.duration yield audio except StopAsyncIteration: break if audio_duration == 0.0: raise APIConnectionError("no audio received") except asyncio.TimeoutError: if recovering: logger.warning( f"{tts.label} recovery timed out", extra={"streamed": False} ) raise logger.warning( f"{tts.label} timed out, switching to next TTS", extra={"streamed": False}, ) raise except APIError as e: if recovering: logger.warning( f"{tts.label} recovery failed", exc_info=e, extra={"streamed": False}, ) raise logger.warning( f"{tts.label} failed, switching to next TTS", exc_info=e, extra={"streamed": False}, ) raise except Exception: if recovering: logger.exception( f"{tts.label} recovery unexpected error", extra={"streamed": False} ) raise logger.exception( f"{tts.label} unexpected error, switching to next TTS", extra={"streamed": False}, ) raise def _try_recovery(self, tts: TTS) -> None: assert isinstance(self._tts, FallbackAdapter) tts_status = self._tts._status[self._tts._tts_instances.index(tts)] if tts_status.recovering_task is None or tts_status.recovering_task.done(): async def _recover_tts_task(tts: TTS) -> None: try: async for _ in self._try_synthesize(tts=tts, recovering=True): pass tts_status.available = True logger.info(f"tts.FallbackAdapter, {tts.label} recovered") self._tts.emit( "tts_availability_changed", AvailabilityChangedEvent(tts=tts, available=True), ) except Exception: return tts_status.recovering_task = asyncio.create_task(_recover_tts_task(tts)) async def _run(self) -> None: assert isinstance(self._tts, FallbackAdapter) start_time = time.time() all_failed = all(not tts_status.available for tts_status in self._tts._status) if all_failed: logger.error("all TTSs are unavailable, retrying..") for i, tts in enumerate(self._tts._tts_instances): tts_status = self._tts._status[i] if tts_status.available or all_failed: audio_duration = 0.0 try: request_id: str | None = None resampler = tts_status.resampler async for synthesized_audio in self._try_synthesize( tts=tts, recovering=False ): audio_duration += synthesized_audio.frame.duration request_id = synthesized_audio.request_id if resampler is not None: for rf in resampler.push(synthesized_audio.frame): self._event_ch.send_nowait( SynthesizedAudio( frame=rf, request_id=synthesized_audio.request_id, ) ) continue self._event_ch.send_nowait(synthesized_audio) if resampler is not None and request_id is not None: for rf in resampler.flush(): self._event_ch.send_nowait( SynthesizedAudio( frame=rf, request_id=request_id, ) ) return except Exception: # exceptions already logged inside _try_synthesize if tts_status.available: tts_status.available = False self._tts.emit( "tts_availability_changed", AvailabilityChangedEvent(tts=tts, available=False), ) if self._tts._no_fallback_after_audio_duration is not None: if ( audio_duration >= self._tts._no_fallback_after_audio_duration ): logger.warning( f"{tts.label} already synthesized {audio_duration}s of audio, ignoring fallback" ) return self._try_recovery(tts) raise APIConnectionError( "all TTSs failed (%s) after %s seconds" % ( [tts.label for tts in self._tts._tts_instances], time.time() - start_time, ) )
Used by the non-streamed synthesize API, some providers support chunked http responses
Ancestors
- ChunkedStream
- abc.ABC
Inherited members
class FallbackSynthesizeStream (*,
tts: FallbackAdapter,
conn_options: Optional[APIConnectOptions] = None)-
Expand source code
class FallbackSynthesizeStream(SynthesizeStream): def __init__( self, *, tts: FallbackAdapter, conn_options: Optional[APIConnectOptions] = None, ): super().__init__( tts=tts, conn_options=conn_options or DEFAULT_FALLBACK_API_CONNECT_OPTIONS ) self._fallback_adapter = tts self._total_segments: list[list[str]] = [] self._pending_segments_chunks: list[list[str]] = [] self._current_segment_text: list[str] = [] async def _try_synthesize( self, *, tts: TTS, input_ch: aio.ChanReceiver[str | SynthesizeStream._FlushSentinel], conn_options: APIConnectOptions, recovering: bool = False, ) -> AsyncGenerator[SynthesizedAudio, None]: stream = tts.stream(conn_options=conn_options) input_sent_fut = asyncio.Future() # type: ignore @utils.log_exceptions(logger=logger) async def _input_task() -> None: try: segment = "" async for data in input_ch: if isinstance(data, str): segment += data stream.push_text(data) elif isinstance(data, self._FlushSentinel): # start the timeout on flush if segment: segment = "" with contextlib.suppress(asyncio.InvalidStateError): input_sent_fut.set_result(True) stream.flush() finally: with contextlib.suppress(RuntimeError): stream.end_input() with contextlib.suppress(asyncio.InvalidStateError): input_sent_fut.set_result(False) input_task = asyncio.create_task(_input_task()) next_audio_task: asyncio.Future[SynthesizedAudio] | None = None try: audio_duration = 0.0 async with stream: while True: if next_audio_task is None or next_audio_task.done(): next_audio_task = asyncio.ensure_future(stream.__anext__()) try: if not input_sent_fut.done(): await asyncio.wait( [input_sent_fut, next_audio_task], return_when=asyncio.FIRST_COMPLETED, ) if not next_audio_task.done(): continue audio = next_audio_task.result() else: audio = await asyncio.wait_for( next_audio_task, self._fallback_adapter._attempt_timeout ) audio_duration += audio.frame.duration if audio.is_final: input_sent_fut = asyncio.Future() audio_duration = 0.0 yield audio except StopAsyncIteration: break if ( audio_duration == 0.0 and input_sent_fut.done() and input_sent_fut.result() ): raise APIConnectionError("no audio received") except asyncio.TimeoutError: if recovering: logger.warning( f"{tts.label} recovery timed out", extra={"streamed": True} ) raise logger.warning( f"{tts.label} timed out, switching to next TTS", extra={"streamed": True}, ) raise except APIError as e: if recovering: logger.warning( f"{tts.label} recovery failed", exc_info=e, extra={"streamed": True} ) raise logger.warning( f"{tts.label} failed, switching to next TTS", exc_info=e, extra={"streamed": True}, ) raise except Exception: if recovering: logger.exception( f"{tts.label} recovery unexpected error", extra={"streamed": True}, ) raise logger.exception( f"{tts.label} unexpected error, switching to next TTS", extra={"streamed": True}, ) raise finally: if next_audio_task is not None: await utils.aio.gracefully_cancel(next_audio_task) await utils.aio.gracefully_cancel(input_task) async def _run(self) -> None: start_time = time.time() all_failed = all( not tts_status.available for tts_status in self._fallback_adapter._status ) if all_failed: logger.error("all TTSs are unavailable, retrying..") new_input_ch: aio.Chan[str | SynthesizeStream._FlushSentinel] | None = None async def _forward_input_task(): nonlocal new_input_ch async for data in self._input_ch: if new_input_ch: new_input_ch.send_nowait(data) if isinstance(data, str) and data: self._current_segment_text.append(data) elif ( isinstance(data, self._FlushSentinel) and self._current_segment_text ): self._total_segments.append(self._current_segment_text) self._pending_segments_chunks.append(self._current_segment_text) self._current_segment_text = [] if new_input_ch: new_input_ch.close() input_task = asyncio.create_task(_forward_input_task()) try: for i, tts in enumerate(self._fallback_adapter._tts_instances): tts_status = self._fallback_adapter._status[i] if tts_status.available or all_failed: audio_duration = 0.0 try: new_input_ch = aio.Chan[ Union[str, SynthesizeStream._FlushSentinel] ]() for text in self._pending_segments_chunks: for chunk in text: new_input_ch.send_nowait(chunk) new_input_ch.send_nowait(self._FlushSentinel()) for chunk in self._current_segment_text: new_input_ch.send_nowait(chunk) if input_task.done(): new_input_ch.close() last_segment_id: str | None = None resampler = tts_status.resampler async for synthesized_audio in self._try_synthesize( tts=tts, input_ch=new_input_ch, conn_options=dataclasses.replace( self._conn_options, max_retry=self._fallback_adapter._max_retry_per_tts, timeout=self._fallback_adapter._attempt_timeout, retry_interval=self._fallback_adapter._retry_interval, ), recovering=False, ): audio_duration += synthesized_audio.frame.duration if resampler is not None: for resampled_frame in resampler.push( synthesized_audio.frame ): self._event_ch.send_nowait( dataclasses.replace( synthesized_audio, frame=resampled_frame ) ) if synthesized_audio.is_final: for resampled_frame in resampler.flush(): self._event_ch.send_nowait( dataclasses.replace( synthesized_audio, frame=resampled_frame ) ) else: self._event_ch.send_nowait(synthesized_audio) if ( synthesized_audio.is_final or ( last_segment_id is not None and synthesized_audio.segment_id != last_segment_id ) ) and self._pending_segments_chunks: audio_duration = 0.0 self._pending_segments_chunks.pop(0) last_segment_id = synthesized_audio.segment_id return except Exception: if tts_status.available: tts_status.available = False self._tts.emit( "tts_availability_changed", AvailabilityChangedEvent(tts=tts, available=False), ) if ( self._fallback_adapter._no_fallback_after_audio_duration is not None ): if ( audio_duration >= self._fallback_adapter._no_fallback_after_audio_duration and self._pending_segments_chunks ): logger.warning( f"{tts.label} already synthesized {audio_duration}s of audio, ignoring the current segment for the tts fallback" ) return self._try_recovery(tts) raise APIConnectionError( "all TTSs failed (%s) after %s seconds" % ( [tts.label for tts in self._fallback_adapter._tts_instances], time.time() - start_time, ) ) finally: await utils.aio.gracefully_cancel(input_task) def _try_recovery(self, tts: TTS) -> None: assert isinstance(self._tts, FallbackAdapter) retry_segments = [self._current_segment_text.copy()] if self._total_segments: retry_segments.insert(0, self._total_segments[-1]) tts_status = self._tts._status[self._tts._tts_instances.index(tts)] if tts_status.recovering_task is None or tts_status.recovering_task.done(): async def _recover_tts_task(tts: TTS) -> None: try: input_ch = aio.Chan[Union[str, SynthesizeStream._FlushSentinel]]() for segment in retry_segments: for t in segment: input_ch.send_nowait(t) input_ch.send_nowait(self._FlushSentinel()) input_ch.close() async for _ in self._try_synthesize( tts=tts, input_ch=input_ch, recovering=True, conn_options=dataclasses.replace( self._conn_options, max_retry=0, timeout=self._fallback_adapter._attempt_timeout, retry_interval=self._fallback_adapter._retry_interval, ), ): pass tts_status.available = True logger.info(f"tts.FallbackAdapter, {tts.label} recovered") self._tts.emit( "tts_availability_changed", AvailabilityChangedEvent(tts=tts, available=True), ) except Exception: return tts_status.recovering_task = asyncio.create_task(_recover_tts_task(tts))
Helper class that provides a standard way to create an ABC using inheritance.
Ancestors
- SynthesizeStream
- abc.ABC
Inherited members
class StreamAdapter (*,
tts: TTS,
sentence_tokenizer: tokenize.SentenceTokenizer)-
Expand source code
class StreamAdapter(TTS): def __init__( self, *, tts: TTS, sentence_tokenizer: tokenize.SentenceTokenizer, ) -> None: super().__init__( capabilities=TTSCapabilities( streaming=True, ), sample_rate=tts.sample_rate, num_channels=tts.num_channels, ) self._tts = tts self._sentence_tokenizer = sentence_tokenizer @self._tts.on("metrics_collected") def _forward_metrics(*args, **kwargs): self.emit("metrics_collected", *args, **kwargs) def synthesize( self, text: str, *, conn_options: Optional[APIConnectOptions] = None, ) -> "ChunkedStream": return self._tts.synthesize(text=text, conn_options=conn_options) def stream( self, *, conn_options: Optional[APIConnectOptions] = None, ) -> "StreamAdapterWrapper": return StreamAdapterWrapper( tts=self, conn_options=conn_options, wrapped_tts=self._tts, sentence_tokenizer=self._sentence_tokenizer, ) def prewarm(self) -> None: self._tts.prewarm()
Helper class that provides a standard way to create an ABC using inheritance.
Ancestors
- TTS
- abc.ABC
- EventEmitter
- typing.Generic
Methods
def stream(self, *, conn_options: Optional[APIConnectOptions] = None) ‑> StreamAdapterWrapper
-
Expand source code
def stream( self, *, conn_options: Optional[APIConnectOptions] = None, ) -> "StreamAdapterWrapper": return StreamAdapterWrapper( tts=self, conn_options=conn_options, wrapped_tts=self._tts, sentence_tokenizer=self._sentence_tokenizer, )
def synthesize(self, text: str, *, conn_options: Optional[APIConnectOptions] = None) ‑> ChunkedStream
-
Expand source code
def synthesize( self, text: str, *, conn_options: Optional[APIConnectOptions] = None, ) -> "ChunkedStream": return self._tts.synthesize(text=text, conn_options=conn_options)
Inherited members
class StreamAdapterWrapper (*,
tts: TTS,
wrapped_tts: TTS,
sentence_tokenizer: tokenize.SentenceTokenizer,
conn_options: Optional[APIConnectOptions])-
Expand source code
class StreamAdapterWrapper(SynthesizeStream): def __init__( self, *, tts: TTS, wrapped_tts: TTS, sentence_tokenizer: tokenize.SentenceTokenizer, conn_options: Optional[APIConnectOptions], ) -> None: super().__init__(tts=tts, conn_options=conn_options) self._wrapped_tts = wrapped_tts self._sent_stream = sentence_tokenizer.stream() async def _metrics_monitor_task( self, event_aiter: AsyncIterable[SynthesizedAudio] ) -> None: pass # do nothing async def _run(self) -> None: async def _forward_input(): """forward input to vad""" async for data in self._input_ch: if isinstance(data, self._FlushSentinel): self._sent_stream.flush() continue self._sent_stream.push_text(data) self._sent_stream.end_input() async def _synthesize(): async for ev in self._sent_stream: last_audio: SynthesizedAudio | None = None async for audio in self._wrapped_tts.synthesize(ev.token): if last_audio is not None: self._event_ch.send_nowait(last_audio) last_audio = audio if last_audio is not None: last_audio.is_final = True self._event_ch.send_nowait(last_audio) tasks = [ asyncio.create_task(_forward_input()), asyncio.create_task(_synthesize()), ] try: await asyncio.gather(*tasks) finally: await utils.aio.gracefully_cancel(*tasks) await self._wrapped_tts.aclose()
Helper class that provides a standard way to create an ABC using inheritance.
Ancestors
- SynthesizeStream
- abc.ABC
Inherited members
class SynthesizeStream (*,
tts: TTS,
conn_options: Optional[APIConnectOptions] = None)-
Expand source code
class SynthesizeStream(ABC): class _FlushSentinel: ... def __init__( self, *, tts: TTS, conn_options: Optional[APIConnectOptions] = None ) -> None: super().__init__() self._tts = tts self._conn_options = conn_options or DEFAULT_API_CONNECT_OPTIONS self._input_ch = aio.Chan[Union[str, SynthesizeStream._FlushSentinel]]() self._event_ch = aio.Chan[SynthesizedAudio]() self._event_aiter, self._monitor_aiter = aio.itertools.tee(self._event_ch, 2) self._task = asyncio.create_task(self._main_task(), name="TTS._main_task") self._task.add_done_callback(lambda _: self._event_ch.close()) self._metrics_task: asyncio.Task | None = None # started on first push self._started_time: float = 0 # used to track metrics self._mtc_pending_texts: list[str] = [] self._mtc_text = "" @abstractmethod async def _run(self) -> None: ... async def _main_task(self) -> None: for i in range(self._conn_options.max_retry + 1): try: return await self._run() except APIError as e: retry_interval = self._conn_options._interval_for_retry(i) if self._conn_options.max_retry == 0: raise elif i == self._conn_options.max_retry: raise APIConnectionError( f"failed to synthesize speech after {self._conn_options.max_retry + 1} attempts", ) from e else: logger.warning( f"failed to synthesize speech, retrying in {retry_interval}s", exc_info=e, extra={ "tts": self._tts._label, "attempt": i + 1, "streamed": True, }, ) await asyncio.sleep(retry_interval) def _mark_started(self) -> None: # only set the started time once, it'll get reset after we emit metrics if self._started_time == 0: self._started_time = time.perf_counter() async def _metrics_monitor_task( self, event_aiter: AsyncIterable[SynthesizedAudio] ) -> None: """Task used to collect metrics""" audio_duration = 0.0 ttfb = -1.0 request_id = "" def _emit_metrics(): nonlocal audio_duration, ttfb, request_id if not self._started_time: return duration = time.perf_counter() - self._started_time if not self._mtc_pending_texts: return text = self._mtc_pending_texts.pop(0) if not text: return metrics = TTSMetrics( timestamp=time.time(), request_id=request_id, ttfb=ttfb, duration=duration, characters_count=len(text), audio_duration=audio_duration, cancelled=self._task.cancelled(), label=self._tts._label, streamed=True, error=None, ) self._tts.emit("metrics_collected", metrics) audio_duration = 0.0 ttfb = -1.0 request_id = "" self._started_time = 0 async for ev in event_aiter: if ttfb == -1.0: ttfb = time.perf_counter() - self._started_time audio_duration += ev.frame.duration request_id = ev.request_id if ev.is_final: _emit_metrics() if request_id: _emit_metrics() def push_text(self, token: str) -> None: """Push some text to be synthesized""" if self._metrics_task is None: self._metrics_task = asyncio.create_task( self._metrics_monitor_task(self._monitor_aiter), name="TTS._metrics_task", ) self._mtc_text += token self._check_input_not_ended() self._check_not_closed() self._input_ch.send_nowait(token) def flush(self) -> None: """Mark the end of the current segment""" if self._mtc_text: self._mtc_pending_texts.append(self._mtc_text) self._mtc_text = "" self._check_input_not_ended() self._check_not_closed() self._input_ch.send_nowait(self._FlushSentinel()) def end_input(self) -> None: """Mark the end of input, no more text will be pushed""" self.flush() self._input_ch.close() async def aclose(self) -> None: """Close ths stream immediately""" self._input_ch.close() await aio.gracefully_cancel(self._task) if self._metrics_task is not None: await self._metrics_task def _check_not_closed(self) -> None: if self._event_ch.closed: cls = type(self) raise RuntimeError(f"{cls.__module__}.{cls.__name__} is closed") def _check_input_not_ended(self) -> None: if self._input_ch.closed: cls = type(self) raise RuntimeError(f"{cls.__module__}.{cls.__name__} input ended") async def __anext__(self) -> SynthesizedAudio: try: val = await self._event_aiter.__anext__() except StopAsyncIteration: if not self._task.cancelled() and (exc := self._task.exception()): raise exc from None raise StopAsyncIteration return val def __aiter__(self) -> AsyncIterator[SynthesizedAudio]: return self async def __aenter__(self) -> SynthesizeStream: return self async def __aexit__( self, exc_type: type[BaseException] | None, exc: BaseException | None, exc_tb: TracebackType | None, ) -> None: await self.aclose()
Helper class that provides a standard way to create an ABC using inheritance.
Ancestors
- abc.ABC
Subclasses
- FallbackSynthesizeStream
- StreamAdapterWrapper
- livekit.plugins.cartesia.tts.SynthesizeStream
- livekit.plugins.deepgram.tts.SynthesizeStream
- livekit.plugins.elevenlabs.tts.SynthesizeStream
- livekit.plugins.neuphonic.tts.SynthesizeStream
- livekit.plugins.playai.tts.SynthesizeStream
- livekit.plugins.resemble.tts.SynthesizeStream
Methods
async def aclose(self) ‑> None
-
Expand source code
async def aclose(self) -> None: """Close ths stream immediately""" self._input_ch.close() await aio.gracefully_cancel(self._task) if self._metrics_task is not None: await self._metrics_task
Close ths stream immediately
def end_input(self) ‑> None
-
Expand source code
def end_input(self) -> None: """Mark the end of input, no more text will be pushed""" self.flush() self._input_ch.close()
Mark the end of input, no more text will be pushed
def flush(self) ‑> None
-
Expand source code
def flush(self) -> None: """Mark the end of the current segment""" if self._mtc_text: self._mtc_pending_texts.append(self._mtc_text) self._mtc_text = "" self._check_input_not_ended() self._check_not_closed() self._input_ch.send_nowait(self._FlushSentinel())
Mark the end of the current segment
def push_text(self, token: str) ‑> None
-
Expand source code
def push_text(self, token: str) -> None: """Push some text to be synthesized""" if self._metrics_task is None: self._metrics_task = asyncio.create_task( self._metrics_monitor_task(self._monitor_aiter), name="TTS._metrics_task", ) self._mtc_text += token self._check_input_not_ended() self._check_not_closed() self._input_ch.send_nowait(token)
Push some text to be synthesized
class SynthesizedAudio (frame: rtc.AudioFrame,
request_id: str,
is_final: bool = False,
segment_id: str = '',
delta_text: str = '')-
Expand source code
@dataclass class SynthesizedAudio: frame: rtc.AudioFrame """Synthesized audio frame""" request_id: str """Request ID (one segment could be made up of multiple requests)""" is_final: bool = False """Whether this is latest frame of the segment (streaming only)""" segment_id: str = "" """Segment ID, each segment is separated by a flush (streaming only)""" delta_text: str = "" """Current segment of the synthesized audio (streaming only)"""
SynthesizedAudio(frame: 'rtc.AudioFrame', request_id: 'str', is_final: 'bool' = False, segment_id: 'str' = '', delta_text: 'str' = '')
Instance variables
var delta_text : str
-
Current segment of the synthesized audio (streaming only)
var frame : AudioFrame
-
Synthesized audio frame
var is_final : bool
-
Whether this is latest frame of the segment (streaming only)
var request_id : str
-
Request ID (one segment could be made up of multiple requests)
var segment_id : str
-
Segment ID, each segment is separated by a flush (streaming only)
class SynthesizedAudioEmitter (*,
event_ch: aio.Chan[SynthesizedAudio],
request_id: str,
segment_id: str = '')-
Expand source code
class SynthesizedAudioEmitter: """Utility for buffering and emitting audio frames with metadata to a channel. This class helps TTS implementers to correctly handle is_final logic when streaming responses. """ def __init__( self, *, event_ch: aio.Chan[SynthesizedAudio], request_id: str, segment_id: str = "", ) -> None: self._event_ch = event_ch self._frame: rtc.AudioFrame | None = None self._request_id = request_id self._segment_id = segment_id def push(self, frame: Optional[rtc.AudioFrame]): """Emits any buffered frame and stores the new frame for later emission. The buffered frame is emitted as not final. """ self._emit_frame(is_final=False) self._frame = frame def _emit_frame(self, is_final: bool = False): """Sends the buffered frame to the event channel if one exists.""" if self._frame is None: return self._event_ch.send_nowait( SynthesizedAudio( frame=self._frame, request_id=self._request_id, segment_id=self._segment_id, is_final=is_final, ) ) self._frame = None def flush(self): """Emits any buffered frame as final.""" self._emit_frame(is_final=True)
Utility for buffering and emitting audio frames with metadata to a channel.
This class helps TTS implementers to correctly handle is_final logic when streaming responses.
Methods
def flush(self)
-
Expand source code
def flush(self): """Emits any buffered frame as final.""" self._emit_frame(is_final=True)
Emits any buffered frame as final.
def push(self, frame: Optional[rtc.AudioFrame])
-
Expand source code
def push(self, frame: Optional[rtc.AudioFrame]): """Emits any buffered frame and stores the new frame for later emission. The buffered frame is emitted as not final. """ self._emit_frame(is_final=False) self._frame = frame
Emits any buffered frame and stores the new frame for later emission.
The buffered frame is emitted as not final.
class TTS (*,
capabilities: TTSCapabilities,
sample_rate: int,
num_channels: int,
conn_options: Optional[APIConnectOptions] = None)-
Expand source code
class TTS( ABC, rtc.EventEmitter[Union[Literal["metrics_collected"], TEvent]], Generic[TEvent], ): def __init__( self, *, capabilities: TTSCapabilities, sample_rate: int, num_channels: int, conn_options: Optional[APIConnectOptions] = None, ) -> None: super().__init__() self._capabilities = capabilities self._sample_rate = sample_rate self._num_channels = num_channels self._label = f"{type(self).__module__}.{type(self).__name__}" self._conn_options = conn_options or DEFAULT_API_CONNECT_OPTIONS @property def label(self) -> str: return self._label @property def capabilities(self) -> TTSCapabilities: return self._capabilities @property def sample_rate(self) -> int: return self._sample_rate @property def num_channels(self) -> int: return self._num_channels @abstractmethod def synthesize( self, text: str, *, conn_options: Optional[APIConnectOptions] = None, ) -> ChunkedStream: ... def stream( self, *, conn_options: Optional[APIConnectOptions] = None ) -> SynthesizeStream: raise NotImplementedError( "streaming is not supported by this TTS, please use a different TTS or use a StreamAdapter" ) def prewarm(self) -> None: """Pre-warm connection to the TTS service""" pass async def aclose(self) -> None: ... async def __aenter__(self) -> TTS: return self async def __aexit__( self, exc_type: type[BaseException] | None, exc: BaseException | None, exc_tb: TracebackType | None, ) -> None: await self.aclose()
Helper class that provides a standard way to create an ABC using inheritance.
Ancestors
- abc.ABC
- EventEmitter
- typing.Generic
Subclasses
- FallbackAdapter
- StreamAdapter
- TTS
- livekit.plugins.azure.tts.TTS
- livekit.plugins.cartesia.tts.TTS
- livekit.plugins.deepgram.tts.TTS
- livekit.plugins.elevenlabs.tts.TTS
- livekit.plugins.google.tts.TTS
- TTS
- livekit.plugins.neuphonic.tts.TTS
- livekit.plugins.openai.tts.TTS
- livekit.plugins.playai.tts.TTS
- livekit.plugins.resemble.tts.TTS
- livekit.plugins.rime.tts.TTS
Instance variables
prop capabilities : TTSCapabilities
-
Expand source code
@property def capabilities(self) -> TTSCapabilities: return self._capabilities
prop label : str
-
Expand source code
@property def label(self) -> str: return self._label
prop num_channels : int
-
Expand source code
@property def num_channels(self) -> int: return self._num_channels
prop sample_rate : int
-
Expand source code
@property def sample_rate(self) -> int: return self._sample_rate
Methods
async def aclose(self) ‑> None
-
Expand source code
async def aclose(self) -> None: ...
def prewarm(self) ‑> None
-
Expand source code
def prewarm(self) -> None: """Pre-warm connection to the TTS service""" pass
Pre-warm connection to the TTS service
def stream(self, *, conn_options: Optional[APIConnectOptions] = None) ‑> SynthesizeStream
-
Expand source code
def stream( self, *, conn_options: Optional[APIConnectOptions] = None ) -> SynthesizeStream: raise NotImplementedError( "streaming is not supported by this TTS, please use a different TTS or use a StreamAdapter" )
def synthesize(self, text: str, *, conn_options: Optional[APIConnectOptions] = None) ‑> ChunkedStream
-
Expand source code
@abstractmethod def synthesize( self, text: str, *, conn_options: Optional[APIConnectOptions] = None, ) -> ChunkedStream: ...
Inherited members
class TTSCapabilities (streaming: bool)
-
Expand source code
@dataclass class TTSCapabilities: streaming: bool """Whether this TTS supports streaming (generally using websockets)"""
TTSCapabilities(streaming: 'bool')
Instance variables
var streaming : bool
-
Whether this TTS supports streaming (generally using websockets)