Module livekit.agents.tts
Classes
class AudioEmitter (*,
label: str,
dst_ch: aio.Chan[SynthesizedAudio])-
Expand source code
class AudioEmitter: class _FlushSegment: pass @dataclass class _StartSegment: segment_id: str class _EndSegment: pass @dataclass class _SegmentContext: segment_id: str audio_duration: float = 0.0 def __init__( self, *, label: str, dst_ch: aio.Chan[SynthesizedAudio], ) -> None: self._dst_ch = dst_ch self._label = label self._request_id: str = "" self._started = False self._num_segments = 0 self._audio_durations: list[float] = [] # track durations per segment def pushed_duration(self, idx: int = -1) -> float: return ( self._audio_durations[idx] if -len(self._audio_durations) <= idx < len(self._audio_durations) else 0.0 ) @property def num_segments(self) -> int: return self._num_segments def initialize( self, *, request_id: str, sample_rate: int, num_channels: int, mime_type: str, frame_size_ms: int = 200, stream: bool = False, ) -> None: if self._started: raise RuntimeError("AudioEmitter already started") self._is_raw_pcm = False if mime_type: mt = mime_type.lower().strip() self._is_raw_pcm = mt.startswith("audio/pcm") or mt.startswith("audio/raw") self._mime_type = mime_type if not request_id: logger.warning("no request_id provided for TTS %s", self._label) request_id = "unknown" self._started = True self._request_id = request_id self._frame_size_ms = frame_size_ms self._sample_rate = sample_rate self._num_channels = num_channels self._streaming = stream from ..voice.io import TimedString self._write_ch = aio.Chan[ Union[ bytes, AudioEmitter._FlushSegment, AudioEmitter._StartSegment, AudioEmitter._EndSegment, TimedString, ] ]() self._main_atask = asyncio.create_task(self._main_task(), name="AudioEmitter._main_task") if not self._streaming: self.__start_segment(segment_id="") # always start a segment with stream=False def start_segment(self, *, segment_id: str) -> None: if not self._streaming: raise RuntimeError( "start_segment() can only be called when SynthesizeStream is initialized " "with stream=True" ) return self.__start_segment(segment_id=segment_id) def __start_segment(self, *, segment_id: str) -> None: if not self._started: raise RuntimeError("AudioEmitter isn't started") if self._write_ch.closed: return self._num_segments += 1 self._write_ch.send_nowait(self._StartSegment(segment_id=segment_id)) def end_segment(self) -> None: if not self._streaming: raise RuntimeError( "end_segment() can only be called when SynthesizeStream is initialized " "with stream=True" ) return self.__end_segment() def __end_segment(self) -> None: if not self._started: raise RuntimeError("AudioEmitter isn't started") if self._write_ch.closed: return self._write_ch.send_nowait(self._EndSegment()) def push(self, data: bytes) -> None: if not self._started: raise RuntimeError("AudioEmitter isn't started") if self._write_ch.closed: return self._write_ch.send_nowait(data) def push_timed_transcript(self, delta_text: TimedString | list[TimedString]) -> None: if not self._started: raise RuntimeError("AudioEmitter isn't started") if self._write_ch.closed: return if isinstance(delta_text, list): for text in delta_text: self._write_ch.send_nowait(text) else: self._write_ch.send_nowait(delta_text) def flush(self) -> None: if not self._started: raise RuntimeError("AudioEmitter isn't started") if self._write_ch.closed: return self._write_ch.send_nowait(self._FlushSegment()) def end_input(self) -> None: if not self._started: raise RuntimeError("AudioEmitter isn't started") if self._write_ch.closed: return self.__end_segment() self._write_ch.close() async def join(self) -> None: if not self._started: raise RuntimeError("AudioEmitter isn't started") await self._main_atask async def aclose(self) -> None: if not self._started: return await aio.cancel_and_wait(self._main_atask) @log_exceptions(logger=logger) async def _main_task(self) -> None: from ..voice.io import TimedString audio_decoder: codecs.AudioStreamDecoder | None = None decode_atask: asyncio.Task | None = None segment_ctx: AudioEmitter._SegmentContext | None = None last_frame: rtc.AudioFrame | None = None debug_frames: list[rtc.AudioFrame] = [] timed_transcripts: list[TimedString] = [] def _emit_frame(frame: rtc.AudioFrame | None = None, *, is_final: bool = False) -> None: nonlocal last_frame, segment_ctx, timed_transcripts assert segment_ctx is not None if last_frame is None: if not is_final: last_frame = frame return elif segment_ctx.audio_duration > 0: if frame is None: # NOTE: if end_input called after flush with no new audio frames pushed, # it will create a 0.01s empty frame to indicate the end of the segment frame = rtc.AudioFrame( data=b"\0\0" * (self._sample_rate // 100 * self._num_channels), sample_rate=self._sample_rate, num_channels=self._num_channels, samples_per_channel=self._sample_rate // 100, ) else: segment_ctx.audio_duration += frame.duration self._audio_durations[-1] += frame.duration if lk_dump_tts: debug_frames.append(frame) frame.userdata[USERDATA_TIMED_TRANSCRIPT] = timed_transcripts self._dst_ch.send_nowait( SynthesizedAudio( frame=frame, request_id=self._request_id, segment_id=segment_ctx.segment_id, is_final=True, ) ) timed_transcripts = [] return if last_frame is not None: last_frame.userdata[USERDATA_TIMED_TRANSCRIPT] = timed_transcripts self._dst_ch.send_nowait( SynthesizedAudio( frame=last_frame, request_id=self._request_id, segment_id=segment_ctx.segment_id, is_final=is_final, ) ) timed_transcripts = [] segment_ctx.audio_duration += last_frame.duration self._audio_durations[-1] += last_frame.duration if lk_dump_tts: debug_frames.append(last_frame) last_frame = frame def _flush_frame() -> None: nonlocal last_frame, segment_ctx, timed_transcripts assert segment_ctx is not None if last_frame is None: return last_frame.userdata[USERDATA_TIMED_TRANSCRIPT] = timed_transcripts self._dst_ch.send_nowait( SynthesizedAudio( frame=last_frame, request_id=self._request_id, segment_id=segment_ctx.segment_id, is_final=False, # flush isn't final ) ) timed_transcripts = [] segment_ctx.audio_duration += last_frame.duration self._audio_durations[-1] += last_frame.duration if lk_dump_tts: debug_frames.append(last_frame) last_frame = None def dump_segment() -> None: nonlocal segment_ctx assert segment_ctx is not None if not lk_dump_tts or not debug_frames: return ts = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") fname = ( f"lk_dump/{self._label}_{self._request_id}_{segment_ctx.segment_id}_{ts}.wav" if self._streaming else f"lk_dump/{self._label}_{self._request_id}_{ts}.wav" ) with open(fname, "wb") as f: f.write(rtc.combine_audio_frames(debug_frames).to_wav_bytes()) debug_frames.clear() @log_exceptions(logger=logger) async def _decode_task() -> None: nonlocal audio_decoder, segment_ctx assert segment_ctx is not None assert audio_decoder is not None audio_byte_stream: audio.AudioByteStream | None = None async for frame in audio_decoder: if audio_byte_stream is None: audio_byte_stream = audio.AudioByteStream( sample_rate=frame.sample_rate, num_channels=frame.num_channels, samples_per_channel=int(frame.sample_rate // 1000 * self._frame_size_ms), ) for f in audio_byte_stream.push(frame.data): _emit_frame(f) if audio_byte_stream: for f in audio_byte_stream.flush(): _emit_frame(f) await audio_decoder.aclose() audio_byte_stream: audio.AudioByteStream | None = None try: async for data in self._write_ch: if isinstance(data, TimedString): timed_transcripts.append(data) continue if isinstance(data, AudioEmitter._StartSegment): if segment_ctx: raise RuntimeError( "start_segment() called before the previous segment was ended" ) self._audio_durations.append(0.0) segment_ctx = AudioEmitter._SegmentContext(segment_id=data.segment_id) continue if not segment_ctx: if self._streaming: if isinstance(data, (AudioEmitter._EndSegment, AudioEmitter._FlushSegment)): continue # empty segment, ignore raise RuntimeError( "start_segment() must be called before pushing audio data" ) if self._is_raw_pcm: if isinstance(data, bytes): if audio_byte_stream is None: audio_byte_stream = audio.AudioByteStream( sample_rate=self._sample_rate, num_channels=self._num_channels, samples_per_channel=int( self._sample_rate // 1000 * self._frame_size_ms ), ) for f in audio_byte_stream.push(data): _emit_frame(f) elif audio_byte_stream: if isinstance(data, AudioEmitter._FlushSegment): for f in audio_byte_stream.flush(): _emit_frame(f) _flush_frame() elif isinstance(data, AudioEmitter._EndSegment): for f in audio_byte_stream.flush(): _emit_frame(f) _emit_frame(is_final=True) dump_segment() segment_ctx = audio_byte_stream = last_frame = None else: logger.warning("unknown data type: %s", type(data)) else: if isinstance(data, bytes): if not audio_decoder: audio_decoder = codecs.AudioStreamDecoder( sample_rate=self._sample_rate, num_channels=self._num_channels, format=self._mime_type, ) decode_atask = asyncio.create_task(_decode_task()) audio_decoder.push(data) elif decode_atask: if isinstance(data, AudioEmitter._FlushSegment) and audio_decoder: audio_decoder.end_input() await decode_atask _flush_frame() audio_decoder = None elif isinstance(data, AudioEmitter._EndSegment) and segment_ctx: if audio_decoder: audio_decoder.end_input() await decode_atask _emit_frame(is_final=True) dump_segment() audio_decoder = segment_ctx = audio_byte_stream = last_frame = None else: logger.warning("unknown data type: %s", type(data)) finally: if audio_decoder and decode_atask: await audio_decoder.aclose() await aio.cancel_and_wait(decode_atask)
Instance variables
prop num_segments : int
-
Expand source code
@property def num_segments(self) -> int: return self._num_segments
Methods
async def aclose(self) ‑> None
-
Expand source code
async def aclose(self) -> None: if not self._started: return await aio.cancel_and_wait(self._main_atask)
def end_input(self) ‑> None
-
Expand source code
def end_input(self) -> None: if not self._started: raise RuntimeError("AudioEmitter isn't started") if self._write_ch.closed: return self.__end_segment() self._write_ch.close()
def end_segment(self) ‑> None
-
Expand source code
def end_segment(self) -> None: if not self._streaming: raise RuntimeError( "end_segment() can only be called when SynthesizeStream is initialized " "with stream=True" ) return self.__end_segment()
def flush(self) ‑> None
-
Expand source code
def flush(self) -> None: if not self._started: raise RuntimeError("AudioEmitter isn't started") if self._write_ch.closed: return self._write_ch.send_nowait(self._FlushSegment())
def initialize(self,
*,
request_id: str,
sample_rate: int,
num_channels: int,
mime_type: str,
frame_size_ms: int = 200,
stream: bool = False) ‑> None-
Expand source code
def initialize( self, *, request_id: str, sample_rate: int, num_channels: int, mime_type: str, frame_size_ms: int = 200, stream: bool = False, ) -> None: if self._started: raise RuntimeError("AudioEmitter already started") self._is_raw_pcm = False if mime_type: mt = mime_type.lower().strip() self._is_raw_pcm = mt.startswith("audio/pcm") or mt.startswith("audio/raw") self._mime_type = mime_type if not request_id: logger.warning("no request_id provided for TTS %s", self._label) request_id = "unknown" self._started = True self._request_id = request_id self._frame_size_ms = frame_size_ms self._sample_rate = sample_rate self._num_channels = num_channels self._streaming = stream from ..voice.io import TimedString self._write_ch = aio.Chan[ Union[ bytes, AudioEmitter._FlushSegment, AudioEmitter._StartSegment, AudioEmitter._EndSegment, TimedString, ] ]() self._main_atask = asyncio.create_task(self._main_task(), name="AudioEmitter._main_task") if not self._streaming: self.__start_segment(segment_id="") # always start a segment with stream=False
async def join(self) ‑> None
-
Expand source code
async def join(self) -> None: if not self._started: raise RuntimeError("AudioEmitter isn't started") await self._main_atask
def push(self, data: bytes) ‑> None
-
Expand source code
def push(self, data: bytes) -> None: if not self._started: raise RuntimeError("AudioEmitter isn't started") if self._write_ch.closed: return self._write_ch.send_nowait(data)
def push_timed_transcript(self, delta_text: TimedString | list[TimedString]) ‑> None
-
Expand source code
def push_timed_transcript(self, delta_text: TimedString | list[TimedString]) -> None: if not self._started: raise RuntimeError("AudioEmitter isn't started") if self._write_ch.closed: return if isinstance(delta_text, list): for text in delta_text: self._write_ch.send_nowait(text) else: self._write_ch.send_nowait(delta_text)
def pushed_duration(self, idx: int = -1) ‑> float
-
Expand source code
def pushed_duration(self, idx: int = -1) -> float: return ( self._audio_durations[idx] if -len(self._audio_durations) <= idx < len(self._audio_durations) else 0.0 )
def start_segment(self, *, segment_id: str) ‑> None
-
Expand source code
def start_segment(self, *, segment_id: str) -> None: if not self._streaming: raise RuntimeError( "start_segment() can only be called when SynthesizeStream is initialized " "with stream=True" ) return self.__start_segment(segment_id=segment_id)
class AvailabilityChangedEvent (tts: TTS,
available: bool)-
Expand source code
@dataclass class AvailabilityChangedEvent: tts: TTS available: bool
AvailabilityChangedEvent(tts: 'TTS', available: 'bool')
Instance variables
var available : bool
var tts : livekit.agents.tts.tts.TTS
class ChunkedStream (*,
tts: TTS,
input_text: str,
conn_options: APIConnectOptions)-
Expand source code
class ChunkedStream(ABC): """Used by the non-streamed synthesize API, some providers support chunked http responses""" def __init__( self, *, tts: TTS, input_text: str, conn_options: APIConnectOptions, ) -> None: self._input_text = input_text self._tts = tts self._conn_options = conn_options self._event_ch = aio.Chan[SynthesizedAudio]() self._tee = aio.itertools.tee(self._event_ch, 2) self._event_aiter, monitor_aiter = self._tee self._current_attempt_has_error = False self._metrics_task = asyncio.create_task( self._metrics_monitor_task(monitor_aiter), name="TTS._metrics_task" ) self._synthesize_task = asyncio.create_task(self._main_task(), name="TTS._synthesize_task") self._synthesize_task.add_done_callback(lambda _: self._event_ch.close()) self._tts_request_span: trace.Span | None = None @property def input_text(self) -> str: return self._input_text @property def done(self) -> bool: return self._synthesize_task.done() @property def exception(self) -> BaseException | None: return self._synthesize_task.exception() async def _metrics_monitor_task(self, event_aiter: AsyncIterable[SynthesizedAudio]) -> None: """Task used to collect metrics""" start_time = time.perf_counter() audio_duration = 0.0 ttfb = -1.0 request_id = "" async for ev in event_aiter: request_id = ev.request_id if ttfb == -1.0: ttfb = time.perf_counter() - start_time audio_duration += ev.frame.duration duration = time.perf_counter() - start_time if self._current_attempt_has_error: return metrics = TTSMetrics( timestamp=time.time(), request_id=request_id, ttfb=ttfb, duration=duration, characters_count=len(self._input_text), audio_duration=audio_duration, cancelled=self._synthesize_task.cancelled(), label=self._tts._label, streamed=False, ) if self._tts_request_span: self._tts_request_span.set_attribute( trace_types.ATTR_TTS_METRICS, metrics.model_dump_json() ) self._tts.emit("metrics_collected", metrics) async def collect(self) -> rtc.AudioFrame: """Utility method to collect every frame in a single call""" frames = [] async for ev in self: frames.append(ev.frame) return rtc.combine_audio_frames(frames) @abstractmethod async def _run(self, output_emitter: AudioEmitter) -> None: ... @tracer.start_as_current_span("tts_request", end_on_exit=False) async def _main_task(self) -> None: self._tts_request_span = current_span = trace.get_current_span() current_span.set_attributes( { trace_types.ATTR_TTS_STREAMING: False, trace_types.ATTR_TTS_LABEL: self._tts.label, } ) for i in range(self._conn_options.max_retry + 1): output_emitter = AudioEmitter(label=self._tts.label, dst_ch=self._event_ch) try: with tracer.start_as_current_span("tts_request_run") as attempt_span: attempt_span.set_attribute(trace_types.ATTR_RETRY_COUNT, i) try: await self._run(output_emitter) except Exception as e: telemetry_utils.record_exception(attempt_span, e) raise output_emitter.end_input() # wait for all audio frames to be pushed & propagate errors await output_emitter.join() if output_emitter.pushed_duration() <= 0.0: raise APIError("no audio frames were pushed") current_span.set_attribute(trace_types.ATTR_TTS_INPUT_TEXT, self._input_text) return except APIError as e: retry_interval = self._conn_options._interval_for_retry(i) if self._conn_options.max_retry == 0 or self._conn_options.max_retry == i: self._emit_error(e, recoverable=False) raise else: self._emit_error(e, recoverable=True) logger.warning( f"failed to synthesize speech, retrying in {retry_interval}s", exc_info=e, extra={"tts": self._tts._label, "attempt": i + 1, "streamed": False}, ) await asyncio.sleep(retry_interval) # Reset the flag when retrying self._current_attempt_has_error = False finally: await output_emitter.aclose() def _emit_error(self, api_error: Exception, recoverable: bool) -> None: self._current_attempt_has_error = True self._tts.emit( "error", TTSError( timestamp=time.time(), label=self._tts._label, error=api_error, recoverable=recoverable, ), ) async def aclose(self) -> None: """Close is automatically called if the stream is completely collected""" await aio.cancel_and_wait(self._synthesize_task) self._event_ch.close() await self._metrics_task await self._tee.aclose() if self._tts_request_span: self._tts_request_span.end() self._tts_request_span = None async def __anext__(self) -> SynthesizedAudio: try: val = await self._event_aiter.__anext__() except StopAsyncIteration: if not self._synthesize_task.cancelled() and (exc := self._synthesize_task.exception()): raise exc # noqa: B904 raise StopAsyncIteration from None return val def __aiter__(self) -> AsyncIterator[SynthesizedAudio]: return self async def __aenter__(self) -> ChunkedStream: return self async def __aexit__( self, exc_type: type[BaseException] | None, exc: BaseException | None, exc_tb: TracebackType | None, ) -> None: await self.aclose()
Used by the non-streamed synthesize API, some providers support chunked http responses
Ancestors
- abc.ABC
Subclasses
- livekit.agents.tts.fallback_adapter.FallbackChunkedStream
- livekit.plugins.aws.tts.ChunkedStream
- livekit.plugins.azure.tts.ChunkedStream
- livekit.plugins.baseten.tts.ChunkedStream
- livekit.plugins.cartesia.tts.ChunkedStream
- livekit.plugins.deepgram.tts.ChunkedStream
- livekit.plugins.elevenlabs.tts.ChunkedStream
- livekit.plugins.google.beta.gemini_tts.ChunkedStream
- livekit.plugins.google.tts.ChunkedStream
- livekit.plugins.groq.tts.ChunkedStream
- livekit.plugins.hume.tts.ChunkedStream
- livekit.plugins.inworld.tts.ChunkedStream
- livekit.plugins.lmnt.tts.ChunkedStream
- livekit.plugins.neuphonic.tts.ChunkedStream
- livekit.plugins.openai.tts.ChunkedStream
- livekit.plugins.playai.tts.ChunkedStream
- livekit.plugins.resemble.tts.ChunkedStream
- livekit.plugins.rime.tts.ChunkedStream
- livekit.plugins.sarvam.tts.ChunkedStream
- livekit.plugins.smallestai.tts.ChunkedStream
- livekit.plugins.speechify.tts.ChunkedStream
- livekit.plugins.spitch.tts.ChunkedStream
Instance variables
prop done : bool
-
Expand source code
@property def done(self) -> bool: return self._synthesize_task.done()
prop exception : BaseException | None
-
Expand source code
@property def exception(self) -> BaseException | None: return self._synthesize_task.exception()
prop input_text : str
-
Expand source code
@property def input_text(self) -> str: return self._input_text
Methods
async def aclose(self) ‑> None
-
Expand source code
async def aclose(self) -> None: """Close is automatically called if the stream is completely collected""" await aio.cancel_and_wait(self._synthesize_task) self._event_ch.close() await self._metrics_task await self._tee.aclose() if self._tts_request_span: self._tts_request_span.end() self._tts_request_span = None
Close is automatically called if the stream is completely collected
async def collect(self) ‑> AudioFrame
-
Expand source code
async def collect(self) -> rtc.AudioFrame: """Utility method to collect every frame in a single call""" frames = [] async for ev in self: frames.append(ev.frame) return rtc.combine_audio_frames(frames)
Utility method to collect every frame in a single call
class FallbackAdapter (tts: list[TTS],
*,
max_retry_per_tts: int = 2,
sample_rate: int | None = None)-
Expand source code
class FallbackAdapter( TTS[Literal["tts_availability_changed"]], ): """ Manages multiple TTS instances, providing a fallback mechanism to ensure continuous TTS service. """ def __init__( self, tts: list[TTS], *, max_retry_per_tts: int = 2, sample_rate: int | None = None, ) -> None: """ Initialize a FallbackAdapter that manages multiple TTS instances. Args: tts (list[TTS]): A list of TTS instances to use for fallback. max_retry_per_tts (int, optional): Maximum number of retries per TTS instance. Defaults to 2. sample_rate (int | None, optional): Desired sample rate for the synthesized audio. If None, uses the maximum sample rate among the TTS instances. Raises: ValueError: If less than one TTS instance is provided. ValueError: If TTS instances have different numbers of channels. """ # noqa: E501 if len(tts) < 1: raise ValueError("at least one TTS instance must be provided.") if len({t.num_channels for t in tts}) != 1: raise ValueError("all TTS must have the same number of channels") if sample_rate is None: sample_rate = max(t.sample_rate for t in tts) num_channels = tts[0].num_channels super().__init__( capabilities=TTSCapabilities( streaming=all(t.capabilities.streaming for t in tts), aligned_transcript=all(t.capabilities.aligned_transcript for t in tts), ), sample_rate=sample_rate, num_channels=num_channels, ) self._tts_instances = tts self._max_retry_per_tts = max_retry_per_tts self._status: list[_TTSStatus] = [] for t in tts: resampler = None if sample_rate != t.sample_rate: logger.info(f"resampling {t.label} from {t.sample_rate}Hz to {sample_rate}Hz") resampler = rtc.AudioResampler(input_rate=t.sample_rate, output_rate=sample_rate) self._status.append( _TTSStatus(available=True, recovering_task=None, resampler=resampler) ) t.on("metrics_collected", self._on_metrics_collected) def synthesize( self, text: str, *, conn_options: APIConnectOptions = DEFAULT_FALLBACK_API_CONNECT_OPTIONS ) -> FallbackChunkedStream: return FallbackChunkedStream(tts=self, input_text=text, conn_options=conn_options) def stream( self, *, conn_options: APIConnectOptions = DEFAULT_FALLBACK_API_CONNECT_OPTIONS ) -> FallbackSynthesizeStream: return FallbackSynthesizeStream(tts=self, conn_options=conn_options) def prewarm(self) -> None: if self._tts_instances: self._tts_instances[0].prewarm() def _on_metrics_collected(self, *args: Any, **kwargs: Any) -> None: self.emit("metrics_collected", *args, **kwargs) async def aclose(self) -> None: for tts_status in self._status: if tts_status.recovering_task is not None: await aio.cancel_and_wait(tts_status.recovering_task) for t in self._tts_instances: t.off("metrics_collected", self._on_metrics_collected)
Manages multiple TTS instances, providing a fallback mechanism to ensure continuous TTS service.
Initialize a FallbackAdapter that manages multiple TTS instances.
Args
tts
:list[TTS]
- A list of TTS instances to use for fallback.
max_retry_per_tts
:int
, optional- Maximum number of retries per TTS instance. Defaults to 2.
sample_rate
:int | None
, optional- Desired sample rate for the synthesized audio. If None, uses the maximum sample rate among the TTS instances.
Raises
ValueError
- If less than one TTS instance is provided.
ValueError
- If TTS instances have different numbers of channels.
Ancestors
- livekit.agents.tts.tts.TTS
- abc.ABC
- EventEmitter
- typing.Generic
Methods
async def aclose(self) ‑> None
-
Expand source code
async def aclose(self) -> None: for tts_status in self._status: if tts_status.recovering_task is not None: await aio.cancel_and_wait(tts_status.recovering_task) for t in self._tts_instances: t.off("metrics_collected", self._on_metrics_collected)
def prewarm(self) ‑> None
-
Expand source code
def prewarm(self) -> None: if self._tts_instances: self._tts_instances[0].prewarm()
Pre-warm connection to the TTS service
def stream(self,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=0, retry_interval=2.0, timeout=10.0)) ‑> livekit.agents.tts.fallback_adapter.FallbackSynthesizeStream-
Expand source code
def stream( self, *, conn_options: APIConnectOptions = DEFAULT_FALLBACK_API_CONNECT_OPTIONS ) -> FallbackSynthesizeStream: return FallbackSynthesizeStream(tts=self, conn_options=conn_options)
def synthesize(self,
text: str,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=0, retry_interval=2.0, timeout=10.0)) ‑> livekit.agents.tts.fallback_adapter.FallbackChunkedStream-
Expand source code
def synthesize( self, text: str, *, conn_options: APIConnectOptions = DEFAULT_FALLBACK_API_CONNECT_OPTIONS ) -> FallbackChunkedStream: return FallbackChunkedStream(tts=self, input_text=text, conn_options=conn_options)
Inherited members
class FallbackChunkedStream (*,
tts: FallbackAdapter,
input_text: str,
conn_options: APIConnectOptions)-
Expand source code
class FallbackChunkedStream(ChunkedStream): def __init__( self, *, tts: FallbackAdapter, input_text: str, conn_options: APIConnectOptions ) -> None: super().__init__(tts=tts, input_text=input_text, conn_options=conn_options) self._fallback_adapter = tts async def _metrics_monitor_task(self, event_aiter: AsyncIterable[SynthesizedAudio]) -> None: pass # do nothing async def _try_synthesize( self, *, tts: TTS, recovering: bool = False ) -> AsyncGenerator[SynthesizedAudio, None]: try: async with tts.synthesize( self._input_text, conn_options=dataclasses.replace( self._conn_options, max_retry=self._fallback_adapter._max_retry_per_tts, timeout=self._conn_options.timeout, retry_interval=self._conn_options.retry_interval, ), ) as stream: async for audio in stream: yield audio except Exception as e: if recovering: logger.warning( f"{tts.label} recovery failed", extra={"streamed": False}, exc_info=e ) raise logger.warning( f"{tts.label} error, switching to next TTS", extra={"streamed": False}, ) raise def _try_recovery(self, tts: TTS) -> None: assert isinstance(self._tts, FallbackAdapter) tts_status = self._tts._status[self._tts._tts_instances.index(tts)] if tts_status.recovering_task is None or tts_status.recovering_task.done(): async def _recover_tts_task(tts: TTS) -> None: try: async for _ in self._try_synthesize(tts=tts, recovering=True): pass tts_status.available = True logger.info(f"tts.FallbackAdapter, {tts.label} recovered") self._tts.emit( "tts_availability_changed", AvailabilityChangedEvent(tts=tts, available=True), ) except Exception: # exceptions already logged inside _try_synthesize return tts_status.recovering_task = asyncio.create_task(_recover_tts_task(tts)) async def _run(self, output_emitter: AudioEmitter) -> None: assert isinstance(self._tts, FallbackAdapter) start_time = time.time() all_failed = all(not tts_status.available for tts_status in self._tts._status) if all_failed: logger.error("all TTSs are unavailable, retrying..") output_emitter.initialize( request_id=utils.shortuuid(), sample_rate=self._tts.sample_rate, num_channels=self._tts.num_channels, mime_type="audio/pcm", ) for i, tts in enumerate(self._tts._tts_instances): tts_status = self._tts._status[i] if tts_status.available or all_failed: try: resampler = tts_status.resampler async for synthesized_audio in self._try_synthesize(tts=tts, recovering=False): if texts := synthesized_audio.frame.userdata.get(USERDATA_TIMED_TRANSCRIPT): output_emitter.push_timed_transcript(texts) if resampler is not None: for rf in resampler.push(synthesized_audio.frame): output_emitter.push(rf.data.tobytes()) else: output_emitter.push(synthesized_audio.frame.data.tobytes()) if resampler is not None: for rf in resampler.flush(): output_emitter.push(rf.data.tobytes()) return except Exception: # exceptions already logged inside _try_synthesize if tts_status.available: tts_status.available = False self._tts.emit( "tts_availability_changed", AvailabilityChangedEvent(tts=tts, available=False), ) if output_emitter.pushed_duration() > 0.0: logger.warning( f"{tts.label} already synthesized of audio, ignoring fallback" ) return self._try_recovery(tts) raise APIConnectionError( f"all TTSs failed ({[tts.label for tts in self._tts._tts_instances]}) after {time.time() - start_time} seconds" # noqa: E501 )
Used by the non-streamed synthesize API, some providers support chunked http responses
Ancestors
- livekit.agents.tts.tts.ChunkedStream
- abc.ABC
class FallbackSynthesizeStream (*,
tts: FallbackAdapter,
conn_options: APIConnectOptions)-
Expand source code
class FallbackSynthesizeStream(SynthesizeStream): def __init__(self, *, tts: FallbackAdapter, conn_options: APIConnectOptions): super().__init__(tts=tts, conn_options=conn_options) self._fallback_adapter = tts self._pushed_tokens: list[str] = [] async def _metrics_monitor_task(self, event_aiter: AsyncIterable[SynthesizedAudio]) -> None: pass # do nothing async def _try_synthesize( self, *, tts: TTS, input_ch: aio.ChanReceiver[str | SynthesizeStream._FlushSentinel], conn_options: APIConnectOptions, recovering: bool = False, ) -> AsyncGenerator[SynthesizedAudio, None]: stream = tts.stream(conn_options=conn_options) @utils.log_exceptions(logger=logger) async def _forward_input_task() -> None: try: async for data in input_ch: if isinstance(data, str): stream.push_text(data) elif isinstance(data, self._FlushSentinel): stream.flush() finally: stream.end_input() input_task = asyncio.create_task(_forward_input_task()) try: async with stream: async for audio in stream: yield audio except Exception as e: if recovering: logger.warning( f"{tts.label} recovery failed", extra={"streamed": True}, exc_info=e, ) raise logger.exception( f"{tts.label} error, switching to next TTS", extra={"streamed": True}, ) raise finally: await utils.aio.cancel_and_wait(input_task) async def _run(self, output_emitter: AudioEmitter) -> None: start_time = time.time() all_failed = all(not tts_status.available for tts_status in self._fallback_adapter._status) if all_failed: logger.error("all TTSs are unavailable, retrying..") new_input_ch: aio.Chan[str | SynthesizeStream._FlushSentinel] | None = None output_emitter.initialize( request_id=utils.shortuuid(), sample_rate=self._fallback_adapter.sample_rate, num_channels=self._fallback_adapter.num_channels, mime_type="audio/pcm", stream=True, ) output_emitter.start_segment(segment_id=utils.shortuuid()) async def _forward_input_task() -> None: nonlocal new_input_ch async for data in self._input_ch: if new_input_ch: new_input_ch.send_nowait(data) if isinstance(data, str) and data: self._pushed_tokens.append(data) if new_input_ch: new_input_ch.close() input_task = asyncio.create_task(_forward_input_task()) try: for i, tts in enumerate(self._fallback_adapter._tts_instances): tts_status = self._fallback_adapter._status[i] if tts_status.available or all_failed: try: new_input_ch = aio.Chan[Union[str, SynthesizeStream._FlushSentinel]]() for text in self._pushed_tokens: new_input_ch.send_nowait(text) if input_task.done(): new_input_ch.close() resampler = tts_status.resampler async for synthesized_audio in self._try_synthesize( tts=tts, input_ch=new_input_ch, conn_options=dataclasses.replace( self._conn_options, max_retry=self._fallback_adapter._max_retry_per_tts, timeout=self._conn_options.timeout, retry_interval=self._conn_options.retry_interval, ), recovering=False, ): if texts := synthesized_audio.frame.userdata.get( USERDATA_TIMED_TRANSCRIPT ): output_emitter.push_timed_transcript(texts) if resampler is not None: for resampled_frame in resampler.push(synthesized_audio.frame): output_emitter.push(resampled_frame.data.tobytes()) if synthesized_audio.is_final: for resampled_frame in resampler.flush(): output_emitter.push(resampled_frame.data.tobytes()) else: output_emitter.push(synthesized_audio.frame.data.tobytes()) return except Exception: if tts_status.available: tts_status.available = False self._tts.emit( "tts_availability_changed", AvailabilityChangedEvent(tts=tts, available=False), ) if output_emitter.pushed_duration() > 0.0: logger.warning( f"{tts.label} already synthesized of audio, ignoring the current segment for the tts fallback" # noqa: E501 ) return self._try_recovery(tts) raise APIConnectionError( f"all TTSs failed ({[tts.label for tts in self._fallback_adapter._tts_instances]}) after {time.time() - start_time} seconds" # noqa: E501 ) finally: await utils.aio.cancel_and_wait(input_task) def _try_recovery(self, tts: TTS) -> None: assert isinstance(self._tts, FallbackAdapter) retry_text = self._pushed_tokens.copy() if not retry_text: return tts_status = self._tts._status[self._tts._tts_instances.index(tts)] if tts_status.recovering_task is None or tts_status.recovering_task.done(): async def _recover_tts_task(tts: TTS) -> None: try: input_ch = aio.Chan[Union[str, SynthesizeStream._FlushSentinel]]() for t in retry_text: input_ch.send_nowait(t) input_ch.close() async for _ in self._try_synthesize( tts=tts, input_ch=input_ch, recovering=True, conn_options=dataclasses.replace( self._conn_options, max_retry=0, timeout=self._conn_options.timeout, retry_interval=self._conn_options.retry_interval, ), ): pass tts_status.available = True logger.info(f"tts.FallbackAdapter, {tts.label} recovered") self._tts.emit( "tts_availability_changed", AvailabilityChangedEvent(tts=tts, available=True), ) except Exception: return tts_status.recovering_task = asyncio.create_task(_recover_tts_task(tts))
Helper class that provides a standard way to create an ABC using inheritance.
Ancestors
- livekit.agents.tts.tts.SynthesizeStream
- abc.ABC
class SentenceStreamPacer (*, min_remaining_audio: float = 5.0, max_text_length: int = 300)
-
Expand source code
class SentenceStreamPacer: def __init__(self, *, min_remaining_audio: float = 5.0, max_text_length: int = 300) -> None: """ Controls the pacing of text sent to TTS. It buffers sentences and decides when to flush based on remaining audio duration. This may reduce waste from interruptions and improve speech quality by sending larger chunks of text with more context. Args: min_remaining_audio: Minimum remaining audio duration (seconds) before sending next batch. max_text_length: Maximum text length sent to TTS at once. """ self._options = StreamPacerOptions( min_remaining_audio=min_remaining_audio, max_text_length=max_text_length, ) def wrap(self, sent_stream: SentenceStream, audio_emitter: AudioEmitter) -> StreamPacerWrapper: return StreamPacerWrapper( options=self._options, sent_stream=sent_stream, audio_emitter=audio_emitter )
Controls the pacing of text sent to TTS. It buffers sentences and decides when to flush based on remaining audio duration. This may reduce waste from interruptions and improve speech quality by sending larger chunks of text with more context.
Args
min_remaining_audio
- Minimum remaining audio duration (seconds) before sending next batch.
max_text_length
- Maximum text length sent to TTS at once.
Methods
def wrap(self,
sent_stream: SentenceStream,
audio_emitter: AudioEmitter) ‑> livekit.agents.tts.stream_pacer.StreamPacerWrapper-
Expand source code
def wrap(self, sent_stream: SentenceStream, audio_emitter: AudioEmitter) -> StreamPacerWrapper: return StreamPacerWrapper( options=self._options, sent_stream=sent_stream, audio_emitter=audio_emitter )
class StreamAdapter (*,
tts: TTS,
sentence_tokenizer: NotGivenOr[tokenize.SentenceTokenizer] = NOT_GIVEN,
text_pacing: SentenceStreamPacer | bool = False)-
Expand source code
class StreamAdapter(TTS): def __init__( self, *, tts: TTS, sentence_tokenizer: NotGivenOr[tokenize.SentenceTokenizer] = NOT_GIVEN, text_pacing: SentenceStreamPacer | bool = False, ) -> None: super().__init__( capabilities=TTSCapabilities(streaming=True, aligned_transcript=True), sample_rate=tts.sample_rate, num_channels=tts.num_channels, ) self._wrapped_tts = tts self._sentence_tokenizer = sentence_tokenizer or tokenize.blingfire.SentenceTokenizer( retain_format=True ) self._stream_pacer: SentenceStreamPacer | None = None if text_pacing is True: self._stream_pacer = SentenceStreamPacer() elif isinstance(text_pacing, SentenceStreamPacer): self._stream_pacer = text_pacing @self._wrapped_tts.on("metrics_collected") def _forward_metrics(*args: Any, **kwargs: Any) -> None: # TODO(theomonnom): The segment_id needs to be populated! self.emit("metrics_collected", *args, **kwargs) def synthesize( self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS ) -> ChunkedStream: return self._wrapped_tts.synthesize(text=text, conn_options=conn_options) def stream( self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS ) -> StreamAdapterWrapper: return StreamAdapterWrapper(tts=self, conn_options=conn_options) def prewarm(self) -> None: self._wrapped_tts.prewarm()
Helper class that provides a standard way to create an ABC using inheritance.
Ancestors
- livekit.agents.tts.tts.TTS
- abc.ABC
- EventEmitter
- typing.Generic
Methods
def prewarm(self) ‑> None
-
Expand source code
def prewarm(self) -> None: self._wrapped_tts.prewarm()
Pre-warm connection to the TTS service
def stream(self,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.agents.tts.stream_adapter.StreamAdapterWrapper-
Expand source code
def stream( self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS ) -> StreamAdapterWrapper: return StreamAdapterWrapper(tts=self, conn_options=conn_options)
def synthesize(self,
text: str,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.agents.tts.tts.ChunkedStream-
Expand source code
def synthesize( self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS ) -> ChunkedStream: return self._wrapped_tts.synthesize(text=text, conn_options=conn_options)
Inherited members
class StreamAdapterWrapper (*,
tts: StreamAdapter,
conn_options: APIConnectOptions)-
Expand source code
class StreamAdapterWrapper(SynthesizeStream): def __init__(self, *, tts: StreamAdapter, conn_options: APIConnectOptions) -> None: super().__init__(tts=tts, conn_options=DEFAULT_STREAM_ADAPTER_API_CONNECT_OPTIONS) self._tts: StreamAdapter = tts self._wrapped_tts_conn_options = conn_options async def _metrics_monitor_task(self, event_aiter: AsyncIterable[SynthesizedAudio]) -> None: pass # do nothing async def _run(self, output_emitter: AudioEmitter) -> None: sent_stream = self._tts._sentence_tokenizer.stream() if self._tts._stream_pacer: sent_stream = self._tts._stream_pacer.wrap( sent_stream=sent_stream, audio_emitter=output_emitter, ) request_id = utils.shortuuid() output_emitter.initialize( request_id=request_id, sample_rate=self._tts.sample_rate, num_channels=self._tts.num_channels, mime_type="audio/pcm", stream=True, ) segment_id = utils.shortuuid() output_emitter.start_segment(segment_id=segment_id) async def _forward_input() -> None: async for data in self._input_ch: if isinstance(data, self._FlushSentinel): sent_stream.flush() continue sent_stream.push_text(data) sent_stream.end_input() async def _synthesize() -> None: from ..voice.io import TimedString duration = 0.0 async for ev in sent_stream: output_emitter.push_timed_transcript( TimedString(text=ev.token, start_time=duration) ) if not (text := ev.token.strip()): continue async with self._tts._wrapped_tts.synthesize( text, conn_options=self._wrapped_tts_conn_options ) as tts_stream: async for audio in tts_stream: output_emitter.push(audio.frame.data.tobytes()) duration += audio.frame.duration output_emitter.flush() tasks = [ asyncio.create_task(_forward_input()), asyncio.create_task(_synthesize()), ] try: await asyncio.gather(*tasks) finally: await sent_stream.aclose() await utils.aio.cancel_and_wait(*tasks)
Helper class that provides a standard way to create an ABC using inheritance.
Ancestors
- livekit.agents.tts.tts.SynthesizeStream
- abc.ABC
class SynthesizeStream (*,
tts: TTS,
conn_options: APIConnectOptions)-
Expand source code
class SynthesizeStream(ABC): class _FlushSentinel: ... def __init__(self, *, tts: TTS, conn_options: APIConnectOptions) -> None: super().__init__() self._tts = tts self._conn_options = conn_options self._input_ch = aio.Chan[Union[str, SynthesizeStream._FlushSentinel]]() self._event_ch = aio.Chan[SynthesizedAudio]() self._tee = aio.itertools.tee(self._event_ch, 2) self._event_aiter, self._monitor_aiter = self._tee self._task = asyncio.create_task(self._main_task(), name="TTS._main_task") self._task.add_done_callback(lambda _: self._event_ch.close()) self._metrics_task: asyncio.Task[None] | None = None # started on first push self._current_attempt_has_error = False self._started_time: float = 0 self._pushed_text: str = "" # used to track metrics self._mtc_pending_texts: list[str] = [] self._mtc_text = "" self._num_segments = 0 self._tts_request_span: trace.Span | None = None @abstractmethod async def _run(self, output_emitter: AudioEmitter) -> None: ... @tracer.start_as_current_span("tts_request", end_on_exit=False) async def _main_task(self) -> None: self._tts_request_span = current_span = trace.get_current_span() current_span.set_attributes( { trace_types.ATTR_TTS_STREAMING: True, trace_types.ATTR_TTS_LABEL: self._tts.label, } ) for i in range(self._conn_options.max_retry + 1): output_emitter = AudioEmitter(label=self._tts.label, dst_ch=self._event_ch) try: with tracer.start_as_current_span("tts_request_run") as attempt_span: attempt_span.set_attribute(trace_types.ATTR_RETRY_COUNT, i) try: await self._run(output_emitter) except Exception as e: telemetry_utils.record_exception(attempt_span, e) raise output_emitter.end_input() # wait for all audio frames to be pushed & propagate errors await output_emitter.join() if self._pushed_text.strip(): if output_emitter.pushed_duration(idx=-1) <= 0.0: raise APIError(f"no audio frames were pushed for text: {self._pushed_text}") if self._num_segments != output_emitter.num_segments: raise APIError( f"number of segments mismatch: expected {self._num_segments}, " f"but got {output_emitter.num_segments}" ) current_span.set_attribute(trace_types.ATTR_TTS_INPUT_TEXT, self._pushed_text) return except APIError as e: retry_interval = self._conn_options._interval_for_retry(i) if self._conn_options.max_retry == 0 or self._conn_options.max_retry == i: self._emit_error(e, recoverable=False) raise else: self._emit_error(e, recoverable=True) logger.warning( f"failed to synthesize speech, retrying in {retry_interval}s", exc_info=e, extra={"tts": self._tts._label, "attempt": i + 1, "streamed": True}, ) await asyncio.sleep(retry_interval) # Reset the flag when retrying self._current_attempt_has_error = False finally: await output_emitter.aclose() def _emit_error(self, api_error: Exception, recoverable: bool) -> None: self._current_attempt_has_error = True self._tts.emit( "error", TTSError( timestamp=time.time(), label=self._tts._label, error=api_error, recoverable=recoverable, ), ) def _mark_started(self) -> None: # only set the started time once, it'll get reset after we emit metrics if self._started_time == 0: self._started_time = time.perf_counter() async def _metrics_monitor_task(self, event_aiter: AsyncIterable[SynthesizedAudio]) -> None: """Task used to collect metrics""" audio_duration = 0.0 ttfb = -1.0 request_id = "" segment_id = "" def _emit_metrics() -> None: nonlocal audio_duration, ttfb, request_id, segment_id if not self._started_time or self._current_attempt_has_error: return duration = time.perf_counter() - self._started_time if not self._mtc_pending_texts: return text = self._mtc_pending_texts.pop(0) if not text: return metrics = TTSMetrics( timestamp=time.time(), request_id=request_id, segment_id=segment_id, ttfb=ttfb, duration=duration, characters_count=len(text), audio_duration=audio_duration, cancelled=self._task.cancelled(), label=self._tts._label, streamed=True, ) if self._tts_request_span: self._tts_request_span.set_attribute( trace_types.ATTR_TTS_METRICS, metrics.model_dump_json() ) self._tts.emit("metrics_collected", metrics) audio_duration = 0.0 ttfb = -1.0 request_id = "" self._started_time = 0 async for ev in event_aiter: if ttfb == -1.0: ttfb = time.perf_counter() - self._started_time audio_duration += ev.frame.duration request_id = ev.request_id segment_id = ev.segment_id if ev.is_final: _emit_metrics() def push_text(self, token: str) -> None: """Push some text to be synthesized""" if not token or self._input_ch.closed: return self._pushed_text += token if self._metrics_task is None: self._metrics_task = asyncio.create_task( self._metrics_monitor_task(self._monitor_aiter), name="TTS._metrics_task" ) if not self._mtc_text: if self._num_segments >= 1: logger.warning( "SynthesizeStream: handling multiple segments in a single instance is " "deprecated. Please create a new SynthesizeStream instance for each segment. " "Most TTS plugins now use pooled WebSocket connections via ConnectionPool." ) return self._num_segments += 1 self._mtc_text += token self._input_ch.send_nowait(token) def flush(self) -> None: """Mark the end of the current segment""" if self._input_ch.closed: return if self._mtc_text: self._mtc_pending_texts.append(self._mtc_text) self._mtc_text = "" self._input_ch.send_nowait(self._FlushSentinel()) def end_input(self) -> None: """Mark the end of input, no more text will be pushed""" self.flush() self._input_ch.close() async def aclose(self) -> None: """Close ths stream immediately""" await aio.cancel_and_wait(self._task) self._event_ch.close() self._input_ch.close() if self._metrics_task is not None: await self._metrics_task await self._tee.aclose() if self._tts_request_span: self._tts_request_span.end() self._tts_request_span = None async def __anext__(self) -> SynthesizedAudio: try: val = await self._event_aiter.__anext__() except StopAsyncIteration: if not self._task.cancelled() and (exc := self._task.exception()): raise exc # noqa: B904 raise StopAsyncIteration from None return val def __aiter__(self) -> AsyncIterator[SynthesizedAudio]: return self async def __aenter__(self) -> SynthesizeStream: return self async def __aexit__( self, exc_type: type[BaseException] | None, exc: BaseException | None, exc_tb: TracebackType | None, ) -> None: await self.aclose()
Helper class that provides a standard way to create an ABC using inheritance.
Ancestors
- abc.ABC
Subclasses
- livekit.agents.tts.fallback_adapter.FallbackSynthesizeStream
- livekit.agents.tts.stream_adapter.StreamAdapterWrapper
- livekit.plugins.cartesia.tts.SynthesizeStream
- livekit.plugins.deepgram.tts.SynthesizeStream
- livekit.plugins.elevenlabs.tts.SynthesizeStream
- livekit.plugins.google.tts.SynthesizeStream
- livekit.plugins.playai.tts.SynthesizeStream
- livekit.plugins.resemble.tts.SynthesizeStream
Methods
async def aclose(self) ‑> None
-
Expand source code
async def aclose(self) -> None: """Close ths stream immediately""" await aio.cancel_and_wait(self._task) self._event_ch.close() self._input_ch.close() if self._metrics_task is not None: await self._metrics_task await self._tee.aclose() if self._tts_request_span: self._tts_request_span.end() self._tts_request_span = None
Close ths stream immediately
def end_input(self) ‑> None
-
Expand source code
def end_input(self) -> None: """Mark the end of input, no more text will be pushed""" self.flush() self._input_ch.close()
Mark the end of input, no more text will be pushed
def flush(self) ‑> None
-
Expand source code
def flush(self) -> None: """Mark the end of the current segment""" if self._input_ch.closed: return if self._mtc_text: self._mtc_pending_texts.append(self._mtc_text) self._mtc_text = "" self._input_ch.send_nowait(self._FlushSentinel())
Mark the end of the current segment
def push_text(self, token: str) ‑> None
-
Expand source code
def push_text(self, token: str) -> None: """Push some text to be synthesized""" if not token or self._input_ch.closed: return self._pushed_text += token if self._metrics_task is None: self._metrics_task = asyncio.create_task( self._metrics_monitor_task(self._monitor_aiter), name="TTS._metrics_task" ) if not self._mtc_text: if self._num_segments >= 1: logger.warning( "SynthesizeStream: handling multiple segments in a single instance is " "deprecated. Please create a new SynthesizeStream instance for each segment. " "Most TTS plugins now use pooled WebSocket connections via ConnectionPool." ) return self._num_segments += 1 self._mtc_text += token self._input_ch.send_nowait(token)
Push some text to be synthesized
class SynthesizedAudio (frame: rtc.AudioFrame,
request_id: str,
is_final: bool = False,
segment_id: str = '',
delta_text: str = '')-
Expand source code
@dataclass class SynthesizedAudio: frame: rtc.AudioFrame """Synthesized audio frame""" request_id: str """Request ID (one segment could be made up of multiple requests)""" is_final: bool = False """Whether this is latest frame of the segment""" segment_id: str = "" """Segment ID, each segment is separated by a flush (streaming only)""" delta_text: str = "" """Current segment of the synthesized audio (streaming only)"""
SynthesizedAudio(frame: 'rtc.AudioFrame', request_id: 'str', is_final: 'bool' = False, segment_id: 'str' = '', delta_text: 'str' = '')
Instance variables
var delta_text : str
-
Current segment of the synthesized audio (streaming only)
var frame : AudioFrame
-
Synthesized audio frame
var is_final : bool
-
Whether this is latest frame of the segment
var request_id : str
-
Request ID (one segment could be made up of multiple requests)
var segment_id : str
-
Segment ID, each segment is separated by a flush (streaming only)
class TTS (*,
capabilities: TTSCapabilities,
sample_rate: int,
num_channels: int)-
Expand source code
class TTS( ABC, rtc.EventEmitter[Union[Literal["metrics_collected", "error"], TEvent]], Generic[TEvent], ): def __init__( self, *, capabilities: TTSCapabilities, sample_rate: int, num_channels: int, ) -> None: super().__init__() self._capabilities = capabilities self._sample_rate = sample_rate self._num_channels = num_channels self._label = f"{type(self).__module__}.{type(self).__name__}" @property def label(self) -> str: return self._label @property def capabilities(self) -> TTSCapabilities: return self._capabilities @property def sample_rate(self) -> int: return self._sample_rate @property def num_channels(self) -> int: return self._num_channels @abstractmethod def synthesize( self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS ) -> ChunkedStream: ... def stream( self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS ) -> SynthesizeStream: raise NotImplementedError( "streaming is not supported by this TTS, please use a different TTS or use a StreamAdapter" # noqa: E501 ) def prewarm(self) -> None: """Pre-warm connection to the TTS service""" pass async def aclose(self) -> None: ... async def __aenter__(self) -> TTS: return self async def __aexit__( self, exc_type: type[BaseException] | None, exc: BaseException | None, exc_tb: TracebackType | None, ) -> None: await self.aclose()
Helper class that provides a standard way to create an ABC using inheritance.
Ancestors
- abc.ABC
- EventEmitter
- typing.Generic
Subclasses
- livekit.agents.tts.fallback_adapter.FallbackAdapter
- livekit.agents.tts.stream_adapter.StreamAdapter
- livekit.plugins.aws.tts.TTS
- livekit.plugins.azure.tts.TTS
- livekit.plugins.baseten.tts.TTS
- livekit.plugins.cartesia.tts.TTS
- livekit.plugins.deepgram.tts.TTS
- livekit.plugins.elevenlabs.tts.TTS
- livekit.plugins.google.beta.gemini_tts.TTS
- livekit.plugins.google.tts.TTS
- livekit.plugins.groq.tts.TTS
- livekit.plugins.hume.tts.TTS
- livekit.plugins.inworld.tts.TTS
- livekit.plugins.lmnt.tts.TTS
- livekit.plugins.neuphonic.tts.TTS
- livekit.plugins.openai.tts.TTS
- livekit.plugins.playai.tts.TTS
- livekit.plugins.resemble.tts.TTS
- livekit.plugins.rime.tts.TTS
- livekit.plugins.sarvam.tts.TTS
- livekit.plugins.smallestai.tts.TTS
- livekit.plugins.speechify.tts.TTS
- livekit.plugins.spitch.tts.TTS
Instance variables
prop capabilities : TTSCapabilities
-
Expand source code
@property def capabilities(self) -> TTSCapabilities: return self._capabilities
prop label : str
-
Expand source code
@property def label(self) -> str: return self._label
prop num_channels : int
-
Expand source code
@property def num_channels(self) -> int: return self._num_channels
prop sample_rate : int
-
Expand source code
@property def sample_rate(self) -> int: return self._sample_rate
Methods
async def aclose(self) ‑> None
-
Expand source code
async def aclose(self) -> None: ...
def prewarm(self) ‑> None
-
Expand source code
def prewarm(self) -> None: """Pre-warm connection to the TTS service""" pass
Pre-warm connection to the TTS service
def stream(self,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.agents.tts.tts.SynthesizeStream-
Expand source code
def stream( self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS ) -> SynthesizeStream: raise NotImplementedError( "streaming is not supported by this TTS, please use a different TTS or use a StreamAdapter" # noqa: E501 )
def synthesize(self,
text: str,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.agents.tts.tts.ChunkedStream-
Expand source code
@abstractmethod def synthesize( self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS ) -> ChunkedStream: ...
Inherited members
class TTSCapabilities (streaming: bool, aligned_transcript: bool = False)
-
Expand source code
@dataclass class TTSCapabilities: streaming: bool """Whether this TTS supports streaming (generally using websockets)""" aligned_transcript: bool = False """Whether this TTS supports aligned transcripts with word timestamps"""
TTSCapabilities(streaming: 'bool', aligned_transcript: 'bool' = False)
Instance variables
var aligned_transcript : bool
-
Whether this TTS supports aligned transcripts with word timestamps
var streaming : bool
-
Whether this TTS supports streaming (generally using websockets)
class TTSError (**data: Any)
-
Expand source code
class TTSError(BaseModel): model_config = ConfigDict(arbitrary_types_allowed=True) type: Literal["tts_error"] = "tts_error" timestamp: float label: str error: Exception = Field(..., exclude=True) recoverable: bool
Usage docs: https://docs.pydantic.dev/2.10/concepts/models/
A base class for creating Pydantic models.
Attributes
__class_vars__
- The names of the class variables defined on the model.
__private_attributes__
- Metadata about the private attributes of the model.
__signature__
- The synthesized
__init__
[Signature
][inspect.Signature] of the model. __pydantic_complete__
- Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__
- The core schema of the model.
__pydantic_custom_init__
- Whether the model has a custom
__init__
function. __pydantic_decorators__
- Metadata containing the decorators defined on the model.
This replaces
Model.__validators__
andModel.__root_validators__
from Pydantic V1. __pydantic_generic_metadata__
- Metadata for generic models; contains data used for a similar purpose to args, origin, parameters in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__
- Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__
- The name of the post-init method for the model, if defined.
__pydantic_root_model__
- Whether the model is a [
RootModel
][pydantic.root_model.RootModel]. __pydantic_serializer__
- The
pydantic-core
SchemaSerializer
used to dump instances of the model. __pydantic_validator__
- The
pydantic-core
SchemaValidator
used to validate instances of the model. __pydantic_fields__
- A dictionary of field names and their corresponding [
FieldInfo
][pydantic.fields.FieldInfo] objects. __pydantic_computed_fields__
- A dictionary of computed field names and their corresponding [
ComputedFieldInfo
][pydantic.fields.ComputedFieldInfo] objects. __pydantic_extra__
- A dictionary containing extra values, if [
extra
][pydantic.config.ConfigDict.extra] is set to'allow'
. __pydantic_fields_set__
- The names of fields explicitly set during instantiation.
__pydantic_private__
- Values of private attributes set on the model instance.
Create a new model by parsing and validating input data from keyword arguments.
Raises [
ValidationError
][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.self
is explicitly positional-only to allowself
as a field name.Ancestors
- pydantic.main.BaseModel
Class variables
var error : Exception
var label : str
var model_config
var recoverable : bool
var timestamp : float
var type : Literal['tts_error']