Module livekit.agents.pipeline.agent_output

Classes

class AgentOutput (*,
room: rtc.Room,
agent_playout: AgentPlayout,
llm: llm.LLM,
tts: text_to_speech.TTS)
Expand source code
class AgentOutput:
    def __init__(
        self,
        *,
        room: rtc.Room,
        agent_playout: AgentPlayout,
        llm: llm.LLM,
        tts: text_to_speech.TTS,
    ) -> None:
        self._room, self._agent_playout, self._llm, self._tts = (
            room,
            agent_playout,
            llm,
            tts,
        )
        self._tasks = set[asyncio.Task[Any]]()

    @property
    def playout(self) -> AgentPlayout:
        return self._agent_playout

    async def aclose(self) -> None:
        for task in self._tasks:
            task.cancel()

        await asyncio.gather(*self._tasks, return_exceptions=True)

    def synthesize(
        self,
        *,
        speech_id: str,
        tts_source: SpeechSource,
        transcript_source: SpeechSource,
        transcription: bool,
        transcription_speed: float,
        sentence_tokenizer: tokenize.SentenceTokenizer,
        word_tokenizer: tokenize.WordTokenizer,
        hyphenate_word: Callable[[str], list[str]],
    ) -> SynthesisHandle:
        def _before_forward(
            fwd: agent_transcription.TTSSegmentsForwarder,
            transcription: rtc.Transcription,
        ):
            if not transcription:
                transcription.segments = []

            return transcription

        transcription_fwd = agent_transcription.TTSSegmentsForwarder(
            room=self._room,
            participant=self._room.local_participant,
            speed=transcription_speed,
            sentence_tokenizer=sentence_tokenizer,
            word_tokenizer=word_tokenizer,
            hyphenate_word=hyphenate_word,
            before_forward_cb=_before_forward,
        )

        handle = SynthesisHandle(
            tts_source=tts_source,
            transcript_source=transcript_source,
            agent_playout=self._agent_playout,
            tts=self._tts,
            transcription_fwd=transcription_fwd,
            speech_id=speech_id,
        )

        task = asyncio.create_task(self._synthesize_task(handle))
        self._tasks.add(task)
        task.add_done_callback(self._tasks.remove)
        return handle

    @utils.log_exceptions(logger=logger)
    async def _synthesize_task(self, handle: SynthesisHandle) -> None:
        """Synthesize speech from the source"""
        tts_source = handle._tts_source
        transcript_source = handle._transcript_source

        if isinstance(tts_source, Awaitable):
            tts_source = await tts_source
        if isinstance(transcript_source, Awaitable):
            transcript_source = await transcript_source

        if isinstance(tts_source, str):
            co = self._str_synthesis_task(tts_source, transcript_source, handle)
        else:
            co = self._stream_synthesis_task(tts_source, transcript_source, handle)

        synth = asyncio.create_task(co)
        synth.add_done_callback(lambda _: handle._buf_ch.close())
        try:
            _ = await asyncio.wait(
                [synth, handle._interrupt_fut], return_when=asyncio.FIRST_COMPLETED
            )
        finally:
            await utils.aio.gracefully_cancel(synth)

    @utils.log_exceptions(logger=logger)
    async def _read_transcript_task(
        self, transcript_source: AsyncIterable[str] | str, handle: SynthesisHandle
    ) -> None:
        try:
            if isinstance(transcript_source, str):
                handle._tr_fwd.push_text(transcript_source)
            else:
                async for seg in transcript_source:
                    if not handle._tr_fwd.closed:
                        handle._tr_fwd.push_text(seg)

            if not handle.tts_forwarder.closed:
                handle.tts_forwarder.mark_text_segment_end()
        finally:
            if inspect.isasyncgen(transcript_source):
                await transcript_source.aclose()

    @utils.log_exceptions(logger=logger)
    async def _str_synthesis_task(
        self,
        tts_text: str,
        transcript_source: AsyncIterable[str] | str,
        handle: SynthesisHandle,
    ) -> None:
        """synthesize speech from a string"""
        read_transcript_atask: asyncio.Task | None = None

        first_frame = True
        tts_stream = handle._tts.synthesize(tts_text)
        try:
            async for audio in tts_stream:
                if first_frame:
                    first_frame = False
                    read_transcript_atask = asyncio.create_task(
                        self._read_transcript_task(transcript_source, handle)
                    )

                handle._buf_ch.send_nowait(audio.frame)
                if not handle.tts_forwarder.closed:
                    handle.tts_forwarder.push_audio(audio.frame)

            if not handle.tts_forwarder.closed:
                handle.tts_forwarder.mark_audio_segment_end()

            if read_transcript_atask is not None:
                await read_transcript_atask
        finally:
            await tts_stream.aclose()

            if read_transcript_atask is not None:
                await utils.aio.gracefully_cancel(read_transcript_atask)

    @utils.log_exceptions(logger=logger)
    async def _stream_synthesis_task(
        self,
        tts_source: AsyncIterable[str],
        transcript_source: AsyncIterable[str] | str,
        handle: SynthesisHandle,
    ) -> None:
        """synthesize speech from streamed text"""

        @utils.log_exceptions(logger=logger)
        async def _read_generated_audio_task(
            tts_stream: text_to_speech.SynthesizeStream,
        ) -> None:
            try:
                async for audio in tts_stream:
                    if not handle._tr_fwd.closed:
                        handle._tr_fwd.push_audio(audio.frame)

                    handle._buf_ch.send_nowait(audio.frame)
            finally:
                if handle._tr_fwd and not handle._tr_fwd.closed:
                    handle._tr_fwd.mark_audio_segment_end()

                await tts_stream.aclose()

        tts_stream: text_to_speech.SynthesizeStream | None = None
        read_tts_atask: asyncio.Task | None = None
        read_transcript_atask: asyncio.Task | None = None

        try:
            async for seg in tts_source:
                if tts_stream is None:
                    tts_stream = handle._tts.stream()
                    read_tts_atask = asyncio.create_task(
                        _read_generated_audio_task(tts_stream)
                    )
                    read_transcript_atask = asyncio.create_task(
                        self._read_transcript_task(transcript_source, handle)
                    )

                tts_stream.push_text(seg)

            if tts_stream is not None:
                tts_stream.end_input()
                assert read_transcript_atask and read_tts_atask
                await read_tts_atask
                await read_transcript_atask

        finally:
            if read_tts_atask is not None:
                assert read_transcript_atask is not None
                await utils.aio.gracefully_cancel(read_tts_atask, read_transcript_atask)

            if inspect.isasyncgen(tts_source):
                await tts_source.aclose()

Instance variables

prop playout : AgentPlayout
Expand source code
@property
def playout(self) -> AgentPlayout:
    return self._agent_playout

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    for task in self._tasks:
        task.cancel()

    await asyncio.gather(*self._tasks, return_exceptions=True)
def synthesize(self,
*,
speech_id: str,
tts_source: SpeechSource,
transcript_source: SpeechSource,
transcription: bool,
transcription_speed: float,
sentence_tokenizer: tokenize.SentenceTokenizer,
word_tokenizer: tokenize.WordTokenizer,
hyphenate_word: Callable[[str], list[str]]) ‑> SynthesisHandle
Expand source code
def synthesize(
    self,
    *,
    speech_id: str,
    tts_source: SpeechSource,
    transcript_source: SpeechSource,
    transcription: bool,
    transcription_speed: float,
    sentence_tokenizer: tokenize.SentenceTokenizer,
    word_tokenizer: tokenize.WordTokenizer,
    hyphenate_word: Callable[[str], list[str]],
) -> SynthesisHandle:
    def _before_forward(
        fwd: agent_transcription.TTSSegmentsForwarder,
        transcription: rtc.Transcription,
    ):
        if not transcription:
            transcription.segments = []

        return transcription

    transcription_fwd = agent_transcription.TTSSegmentsForwarder(
        room=self._room,
        participant=self._room.local_participant,
        speed=transcription_speed,
        sentence_tokenizer=sentence_tokenizer,
        word_tokenizer=word_tokenizer,
        hyphenate_word=hyphenate_word,
        before_forward_cb=_before_forward,
    )

    handle = SynthesisHandle(
        tts_source=tts_source,
        transcript_source=transcript_source,
        agent_playout=self._agent_playout,
        tts=self._tts,
        transcription_fwd=transcription_fwd,
        speech_id=speech_id,
    )

    task = asyncio.create_task(self._synthesize_task(handle))
    self._tasks.add(task)
    task.add_done_callback(self._tasks.remove)
    return handle
class SynthesisHandle (*,
speech_id: str,
tts_source: SpeechSource,
transcript_source: SpeechSource,
agent_playout: AgentPlayout,
tts: text_to_speech.TTS,
transcription_fwd: agent_transcription.TTSSegmentsForwarder)
Expand source code
class SynthesisHandle:
    def __init__(
        self,
        *,
        speech_id: str,
        tts_source: SpeechSource,
        transcript_source: SpeechSource,
        agent_playout: AgentPlayout,
        tts: text_to_speech.TTS,
        transcription_fwd: agent_transcription.TTSSegmentsForwarder,
    ) -> None:
        (
            self._tts_source,
            self._transcript_source,
            self._agent_playout,
            self._tts,
            self._tr_fwd,
        ) = (
            tts_source,
            transcript_source,
            agent_playout,
            tts,
            transcription_fwd,
        )
        self._buf_ch = utils.aio.Chan[rtc.AudioFrame]()
        self._play_handle: PlayoutHandle | None = None
        self._interrupt_fut = asyncio.Future[None]()
        self._speech_id = speech_id

    @property
    def speech_id(self) -> str:
        return self._speech_id

    @property
    def tts_forwarder(self) -> agent_transcription.TTSSegmentsForwarder:
        return self._tr_fwd

    @property
    def validated(self) -> bool:
        return self._play_handle is not None

    @property
    def interrupted(self) -> bool:
        return self._interrupt_fut.done()

    @property
    def play_handle(self) -> PlayoutHandle | None:
        return self._play_handle

    def play(self) -> PlayoutHandle:
        """Validate the speech for playout"""
        if self.interrupted:
            raise RuntimeError("synthesis was interrupted")

        self._play_handle = self._agent_playout.play(
            self._speech_id, self._buf_ch, transcription_fwd=self._tr_fwd
        )
        return self._play_handle

    def interrupt(self) -> None:
        """Interrupt the speech"""
        if self.interrupted:
            return

        logger.debug(
            "agent interrupted",
            extra={"speech_id": self.speech_id},
        )

        if self._play_handle is not None:
            self._play_handle.interrupt()

        self._interrupt_fut.set_result(None)

Instance variables

prop interrupted : bool
Expand source code
@property
def interrupted(self) -> bool:
    return self._interrupt_fut.done()
prop play_handle : PlayoutHandle | None
Expand source code
@property
def play_handle(self) -> PlayoutHandle | None:
    return self._play_handle
prop speech_id : str
Expand source code
@property
def speech_id(self) -> str:
    return self._speech_id
prop tts_forwarder : agent_transcription.TTSSegmentsForwarder
Expand source code
@property
def tts_forwarder(self) -> agent_transcription.TTSSegmentsForwarder:
    return self._tr_fwd
prop validated : bool
Expand source code
@property
def validated(self) -> bool:
    return self._play_handle is not None

Methods

def interrupt(self) ‑> None
Expand source code
def interrupt(self) -> None:
    """Interrupt the speech"""
    if self.interrupted:
        return

    logger.debug(
        "agent interrupted",
        extra={"speech_id": self.speech_id},
    )

    if self._play_handle is not None:
        self._play_handle.interrupt()

    self._interrupt_fut.set_result(None)

Interrupt the speech

def play(self) ‑> PlayoutHandle
Expand source code
def play(self) -> PlayoutHandle:
    """Validate the speech for playout"""
    if self.interrupted:
        raise RuntimeError("synthesis was interrupted")

    self._play_handle = self._agent_playout.play(
        self._speech_id, self._buf_ch, transcription_fwd=self._tr_fwd
    )
    return self._play_handle

Validate the speech for playout