Module livekit.agents.pipeline.agent_output

Classes

class AgentOutput (*, room: rtc.Room, agent_playout: AgentPlayout, llm: llm.LLM, tts: text_to_speech.TTS)
Expand source code
class AgentOutput:
    def __init__(
        self,
        *,
        room: rtc.Room,
        agent_playout: AgentPlayout,
        llm: llm.LLM,
        tts: text_to_speech.TTS,
    ) -> None:
        self._room, self._agent_playout, self._llm, self._tts = (
            room,
            agent_playout,
            llm,
            tts,
        )
        self._tasks = set[asyncio.Task[Any]]()

    @property
    def playout(self) -> AgentPlayout:
        return self._agent_playout

    async def aclose(self) -> None:
        for task in self._tasks:
            task.cancel()

        await asyncio.gather(*self._tasks, return_exceptions=True)

    def synthesize(
        self,
        *,
        speech_id: str,
        tts_source: SpeechSource,
        transcript_source: SpeechSource,
        transcription: bool,
        transcription_speed: float,
        sentence_tokenizer: tokenize.SentenceTokenizer,
        word_tokenizer: tokenize.WordTokenizer,
        hyphenate_word: Callable[[str], list[str]],
    ) -> SynthesisHandle:
        def _before_forward(
            fwd: agent_transcription.TTSSegmentsForwarder,
            transcription: rtc.Transcription,
        ):
            if not transcription:
                transcription.segments = []

            return transcription

        transcription_fwd = agent_transcription.TTSSegmentsForwarder(
            room=self._room,
            participant=self._room.local_participant,
            speed=transcription_speed,
            sentence_tokenizer=sentence_tokenizer,
            word_tokenizer=word_tokenizer,
            hyphenate_word=hyphenate_word,
            before_forward_cb=_before_forward,
        )

        handle = SynthesisHandle(
            tts_source=tts_source,
            transcript_source=transcript_source,
            agent_playout=self._agent_playout,
            tts=self._tts,
            transcription_fwd=transcription_fwd,
            speech_id=speech_id,
        )

        task = asyncio.create_task(self._synthesize_task(handle))
        self._tasks.add(task)
        task.add_done_callback(self._tasks.remove)
        return handle

    @utils.log_exceptions(logger=logger)
    async def _synthesize_task(self, handle: SynthesisHandle) -> None:
        """Synthesize speech from the source"""
        tts_source = handle._tts_source
        transcript_source = handle._transcript_source

        if isinstance(tts_source, Awaitable):
            tts_source = await tts_source
            co = _str_synthesis_task(tts_source, transcript_source, handle)
        elif isinstance(tts_source, str):
            co = _str_synthesis_task(tts_source, transcript_source, handle)
        else:
            co = _stream_synthesis_task(tts_source, transcript_source, handle)

        synth = asyncio.create_task(co)
        synth.add_done_callback(lambda _: handle._buf_ch.close())
        try:
            _ = await asyncio.wait(
                [synth, handle._interrupt_fut], return_when=asyncio.FIRST_COMPLETED
            )
        finally:
            await utils.aio.gracefully_cancel(synth)

Instance variables

prop playout : AgentPlayout
Expand source code
@property
def playout(self) -> AgentPlayout:
    return self._agent_playout

Methods

async def aclose(self) ‑> None
def synthesize(self, *, speech_id: str, tts_source: SpeechSource, transcript_source: SpeechSource, transcription: bool, transcription_speed: float, sentence_tokenizer: tokenize.SentenceTokenizer, word_tokenizer: tokenize.WordTokenizer, hyphenate_word: Callable[[str], list[str]]) ‑> SynthesisHandle
class SynthesisHandle (*, speech_id: str, tts_source: SpeechSource, transcript_source: SpeechSource, agent_playout: AgentPlayout, tts: text_to_speech.TTS, transcription_fwd: agent_transcription.TTSSegmentsForwarder)
Expand source code
class SynthesisHandle:
    def __init__(
        self,
        *,
        speech_id: str,
        tts_source: SpeechSource,
        transcript_source: SpeechSource,
        agent_playout: AgentPlayout,
        tts: text_to_speech.TTS,
        transcription_fwd: agent_transcription.TTSSegmentsForwarder,
    ) -> None:
        (
            self._tts_source,
            self._transcript_source,
            self._agent_playout,
            self._tts,
            self._tr_fwd,
        ) = (
            tts_source,
            transcript_source,
            agent_playout,
            tts,
            transcription_fwd,
        )
        self._buf_ch = utils.aio.Chan[rtc.AudioFrame]()
        self._play_handle: PlayoutHandle | None = None
        self._interrupt_fut = asyncio.Future[None]()
        self._speech_id = speech_id

    @property
    def speech_id(self) -> str:
        return self._speech_id

    @property
    def tts_forwarder(self) -> agent_transcription.TTSSegmentsForwarder:
        return self._tr_fwd

    @property
    def validated(self) -> bool:
        return self._play_handle is not None

    @property
    def interrupted(self) -> bool:
        return self._interrupt_fut.done()

    @property
    def play_handle(self) -> PlayoutHandle | None:
        return self._play_handle

    def play(self) -> PlayoutHandle:
        """Validate the speech for playout"""
        if self.interrupted:
            raise RuntimeError("synthesis was interrupted")

        self._play_handle = self._agent_playout.play(
            self._speech_id, self._buf_ch, transcription_fwd=self._tr_fwd
        )
        return self._play_handle

    def interrupt(self) -> None:
        """Interrupt the speech"""
        if self.interrupted:
            return

        logger.debug(
            "interrupting synthesis/playout",
            extra={"speech_id": self.speech_id},
        )

        if self._play_handle is not None:
            self._play_handle.interrupt()

        self._interrupt_fut.set_result(None)

Instance variables

prop interrupted : bool
Expand source code
@property
def interrupted(self) -> bool:
    return self._interrupt_fut.done()
prop play_handle : PlayoutHandle | None
Expand source code
@property
def play_handle(self) -> PlayoutHandle | None:
    return self._play_handle
prop speech_id : str
Expand source code
@property
def speech_id(self) -> str:
    return self._speech_id
prop tts_forwarder : agent_transcription.TTSSegmentsForwarder
Expand source code
@property
def tts_forwarder(self) -> agent_transcription.TTSSegmentsForwarder:
    return self._tr_fwd
prop validated : bool
Expand source code
@property
def validated(self) -> bool:
    return self._play_handle is not None

Methods

def interrupt(self) ‑> None

Interrupt the speech

def play(self) ‑> PlayoutHandle

Validate the speech for playout