Module livekit.agents.transcription

Sub-modules

livekit.agents.transcription.stt_forwarder
livekit.agents.transcription.tts_forwarder

Classes

class STTSegmentsForwarder (*, room: rtc.Room, participant: rtc.Participant | str, track: rtc.Track | rtc.TrackPublication | str | None = None, before_forward_cb: BeforeForwardCallback = <function _default_before_forward_cb>, will_forward_transcription: WillForwardTranscription | None = None)

Forward STT transcription to the users. (Useful for client-side rendering)

Expand source code
class STTSegmentsForwarder:
    """
    Forward STT transcription to the users. (Useful for client-side rendering)
    """

    def __init__(
        self,
        *,
        room: rtc.Room,
        participant: rtc.Participant | str,
        track: rtc.Track | rtc.TrackPublication | str | None = None,
        before_forward_cb: BeforeForwardCallback = _default_before_forward_cb,
        # backward compatibility
        will_forward_transcription: WillForwardTranscription | None = None,
    ):
        identity = participant if isinstance(participant, str) else participant.identity
        if track is None:
            track = _utils.find_micro_track_id(room, identity)
        elif isinstance(track, (rtc.TrackPublication, rtc.Track)):
            track = track.sid

        if will_forward_transcription is not None:
            logger.warning(
                "will_forward_transcription is deprecated and will be removed in 1.5.0, use before_forward_cb instead",
            )
            before_forward_cb = will_forward_transcription

        self._room, self._participant_identity, self._track_id = room, identity, track
        self._before_forward_cb = before_forward_cb
        self._queue = asyncio.Queue[Optional[rtc.TranscriptionSegment]]()
        self._main_task = asyncio.create_task(self._run())
        self._current_id = _utils.segment_uuid()

    async def _run(self):
        try:
            while True:
                seg = await self._queue.get()
                if seg is None:
                    break

                base_transcription = rtc.Transcription(
                    participant_identity=self._participant_identity,
                    track_sid=self._track_id,
                    segments=[seg],  # no history for now
                )

                transcription = self._before_forward_cb(self, base_transcription)
                if asyncio.iscoroutine(transcription):
                    transcription = await transcription

                if not isinstance(transcription, rtc.Transcription):
                    transcription = _default_before_forward_cb(self, base_transcription)

                if transcription.segments and self._room.isconnected():
                    await self._room.local_participant.publish_transcription(
                        transcription
                    )

        except Exception:
            logger.exception("error in stt transcription")

    def update(self, ev: stt.SpeechEvent):
        if ev.type == stt.SpeechEventType.INTERIM_TRANSCRIPT:
            # TODO(theomonnom): We always take the first alternative, we should mb expose opt to the
            # user?
            text = ev.alternatives[0].text
            self._queue.put_nowait(
                rtc.TranscriptionSegment(
                    id=self._current_id,
                    text=text,
                    start_time=0,
                    end_time=0,
                    final=False,
                    language="",  # TODO
                )
            )
        elif ev.type == stt.SpeechEventType.FINAL_TRANSCRIPT:
            text = ev.alternatives[0].text
            self._queue.put_nowait(
                rtc.TranscriptionSegment(
                    id=self._current_id,
                    text=text,
                    start_time=0,
                    end_time=0,
                    final=True,
                    language="",  # TODO
                )
            )

            self._current_id = _utils.segment_uuid()

    async def aclose(self, *, wait: bool = True) -> None:
        self._queue.put_nowait(None)

        if not wait:
            self._main_task.cancel()

        with contextlib.suppress(asyncio.CancelledError):
            await self._main_task

Methods

async def aclose(self, *, wait: bool = True) ‑> None
def update(self, ev: stt.SpeechEvent)
class TTSSegmentsForwarder (*, room: rtc.Room, participant: rtc.Participant | str, track: rtc.Track | rtc.TrackPublication | str | None = None, language: str = '', speed: float = 1.0, new_sentence_delay: float = 0.4, word_tokenizer: tokenize.WordTokenizer = <livekit.agents.tokenize.basic.WordTokenizer object>, sentence_tokenizer: tokenize.SentenceTokenizer = <livekit.agents.tokenize.basic.SentenceTokenizer object>, hyphenate_word: Callable[[str], list[str]] = <function hyphenate_word>, before_forward_cb: BeforeForwardCallback = <function _default_before_forward_callback>, loop: asyncio.AbstractEventLoop | None = None, will_forward_transcription: WillForwardTranscription | None = None)

Forward TTS transcription to the users. This class tries to imitate the right timing of speech with the synthesized text. The first estimation is based on the speed argument. Once we have received the full audio of a specific text segment, we recalculate the avg speech speed using the length of the text & audio and catch up/ slow down the transcription if needed.

Args

room
room where the transcription will be sent
participant
participant or identity that is pushing the TTS
track
track where the TTS audio is being sent
language
language of the text
speed
average speech speed in characters per second (used by default if the full audio is not received yet)
new_sentence_delay
delay in seconds between sentences
auto_playout
if True, the forwarder will automatically start the transcription once the first audio frame is received. If False, you need to call segment_playout_started to start the transcription.
word_tokenizer
word tokenizer used to split the text into words
sentence_tokenizer
sentence tokenizer used to split the text into sentences
hyphenate_word
function that returns a list of hyphens for a given word
Expand source code
class TTSSegmentsForwarder:
    """
    Forward TTS transcription to the users. This class tries to imitate the right timing of
    speech with the synthesized text. The first estimation is based on the speed argument. Once
    we have received the full audio of a specific text segment, we recalculate the avg speech
    speed using the length of the text & audio and catch up/ slow down the transcription if needed.
    """

    def __init__(
        self,
        *,
        room: rtc.Room,
        participant: rtc.Participant | str,
        track: rtc.Track | rtc.TrackPublication | str | None = None,
        language: str = "",
        speed: float = 1.0,
        new_sentence_delay: float = 0.4,
        word_tokenizer: tokenize.WordTokenizer = tokenize.basic.WordTokenizer(),
        sentence_tokenizer: tokenize.SentenceTokenizer = tokenize.basic.SentenceTokenizer(),
        hyphenate_word: Callable[[str], list[str]] = tokenize.basic.hyphenate_word,
        before_forward_cb: BeforeForwardCallback = _default_before_forward_callback,
        loop: asyncio.AbstractEventLoop | None = None,
        # backward compatibility
        will_forward_transcription: WillForwardTranscription | None = None,
    ):
        """
        Args:
            room: room where the transcription will be sent
            participant: participant or identity that is pushing the TTS
            track: track where the TTS audio is being sent
            language: language of the text
            speed: average speech speed in characters per second (used by default if the full audio is not received yet)
            new_sentence_delay: delay in seconds between sentences
            auto_playout: if True, the forwarder will automatically start the transcription once the
                first audio frame is received. If False, you need to call segment_playout_started
                to start the transcription.
            word_tokenizer: word tokenizer used to split the text into words
            sentence_tokenizer: sentence tokenizer used to split the text into sentences
            hyphenate_word: function that returns a list of hyphens for a given word

        """
        identity = participant if isinstance(participant, str) else participant.identity

        if track is None:
            track = _utils.find_micro_track_id(room, identity)
        elif isinstance(track, (rtc.TrackPublication, rtc.Track)):
            track = track.sid

        if will_forward_transcription is not None:
            logger.warning(
                "will_forward_transcription is deprecated and will be removed in 1.5.0, use before_forward_cb instead",
            )
            before_forward_cb = will_forward_transcription

        speed = speed * STANDARD_SPEECH_RATE
        self._opts = _TTSOptions(
            room=room,
            participant_identity=identity,
            track_id=track,
            language=language,
            speed=speed,
            word_tokenizer=word_tokenizer,
            sentence_tokenizer=sentence_tokenizer,
            hyphenate_word=hyphenate_word,
            new_sentence_delay=new_sentence_delay,
            before_forward_cb=before_forward_cb,
        )
        self._closed = False
        self._loop = loop or asyncio.get_event_loop()
        self._close_future = asyncio.Future[None]()

        self._playing_seg_index = -1
        self._finshed_seg_index = -1

        self._text_q_changed = asyncio.Event()
        self._text_q = list[Union[_TextData, None]]()
        self._audio_q_changed = asyncio.Event()
        self._audio_q = list[Union[_AudioData, None]]()

        self._text_data: _TextData | None = None
        self._audio_data: _AudioData | None = None

        self._played_text = ""

        self._main_atask: asyncio.Task | None = None
        self._task_set = utils.aio.TaskSet(loop)

    def segment_playout_started(self) -> None:
        """
        Notify that the playout of the audio segment has started.
        This will start forwarding the transcription for the current segment.
        """
        self._check_not_closed()
        self._playing_seg_index += 1

        if self._main_atask is None:
            self._main_atask = asyncio.create_task(self._main_task())

    def segment_playout_finished(self) -> None:
        """
        Notify that the playout of the audio segment has finished.
        This will catchup and directly send the final transcription in case the forwarder is too
        late.
        """
        self._check_not_closed()
        self._finshed_seg_index += 1

    def push_audio(self, frame: rtc.AudioFrame) -> None:
        self._check_not_closed()

        if self._audio_data is None:
            self._audio_data = _AudioData()
            self._audio_q.append(self._audio_data)
            self._audio_q_changed.set()

        frame_duration = frame.samples_per_channel / frame.sample_rate
        self._audio_data.pushed_duration += frame_duration

    def mark_audio_segment_end(self) -> None:
        self._check_not_closed()

        if self._audio_data is None:
            self.push_audio(rtc.AudioFrame(bytes(), 24000, 1, 0))

        assert self._audio_data is not None
        self._audio_data.done = True
        self._audio_data = None

    def push_text(self, text: str) -> None:
        self._check_not_closed()

        if self._text_data is None:
            self._text_data = _TextData(
                sentence_stream=self._opts.sentence_tokenizer.stream()
            )
            self._text_q.append(self._text_data)
            self._text_q_changed.set()

        self._text_data.pushed_text += text
        self._text_data.sentence_stream.push_text(text)

    def mark_text_segment_end(self) -> None:
        self._check_not_closed()

        if self._text_data is None:
            self.push_text("")

        assert self._text_data is not None
        self._text_data.done = True
        self._text_data.sentence_stream.end_input()
        self._text_data = None

    @property
    def closed(self) -> bool:
        return self._closed

    @property
    def played_text(self) -> str:
        return self._played_text

    async def aclose(self) -> None:
        if self._closed:
            return

        self._closed = True
        self._close_future.set_result(None)

        for text_data in self._text_q:
            assert text_data is not None
            await text_data.sentence_stream.aclose()

        self._text_q.append(None)
        self._audio_q.append(None)
        self._text_q_changed.set()
        self._audio_q_changed.set()

        await self._task_set.aclose()

        if self._main_atask is not None:
            await self._main_atask

    @utils.log_exceptions(logger=logger)
    async def _main_task(self) -> None:
        """Main task that forwards the transcription to the room."""
        rtc_seg_ch = utils.aio.Chan[rtc.TranscriptionSegment]()

        @utils.log_exceptions(logger=logger)
        async def _forward_task():
            async for rtc_seg in rtc_seg_ch:
                base_transcription = rtc.Transcription(
                    participant_identity=self._opts.participant_identity,
                    track_sid=self._opts.track_id,
                    segments=[rtc_seg],  # no history for now
                )

                transcription = self._opts.before_forward_cb(self, base_transcription)
                if asyncio.iscoroutine(transcription):
                    transcription = await transcription

                # fallback to default impl if no custom/user stream is returned
                if not isinstance(transcription, rtc.Transcription):
                    transcription = _default_before_forward_callback(
                        self, base_transcription
                    )

                if transcription.segments and self._opts.room.isconnected():
                    try:
                        await self._opts.room.local_participant.publish_transcription(
                            transcription
                        )
                    except PublishTranscriptionError:
                        continue

        forward_task = asyncio.create_task(_forward_task())

        seg_index = 0
        q_done = False
        while not q_done:
            await self._text_q_changed.wait()
            await self._audio_q_changed.wait()

            while self._text_q and self._audio_q:
                text_data = self._text_q.pop(0)
                audio_data = self._audio_q.pop(0)

                if text_data is None or audio_data is None:
                    q_done = True
                    break

                # wait until the segment is validated and has started playing
                while not self._closed:
                    if self._playing_seg_index >= seg_index:
                        break

                    await self._sleep_if_not_closed(0.125)

                sentence_stream = text_data.sentence_stream
                forward_start_time = time.time()

                async for ev in sentence_stream:
                    await self._sync_sentence_co(
                        seg_index,
                        forward_start_time,
                        text_data,
                        audio_data,
                        ev.token,
                        rtc_seg_ch,
                    )

                seg_index += 1

            self._text_q_changed.clear()
            self._audio_q_changed.clear()

        rtc_seg_ch.close()
        await forward_task

    async def _sync_sentence_co(
        self,
        segment_index: int,
        segment_start_time: float,
        text_data: _TextData,
        audio_data: _AudioData,
        sentence: str,
        rtc_seg_ch: utils.aio.Chan[rtc.TranscriptionSegment],
    ):
        """Synchronize the transcription with the audio playout for a given sentence."""
        # put each sentence in a different transcription segment

        real_speed = None
        if audio_data.pushed_duration > 0 and audio_data.done:
            real_speed = (
                len(self._calc_hyphens(text_data.pushed_text))
                / audio_data.pushed_duration
            )

        seg_id = _utils.segment_uuid()
        words = self._opts.word_tokenizer.tokenize(text=sentence)
        processed_words: list[str] = []

        og_text = self._played_text
        for word in words:
            if segment_index <= self._finshed_seg_index:
                # playout of the audio segment already finished
                # break the loop and send the final transcription
                break

            if self._closed:
                # transcription closed, early
                return

            word_hyphens = len(self._opts.hyphenate_word(word))
            processed_words.append(word)

            # elapsed time since the start of the seg
            elapsed_time = time.time() - segment_start_time
            text = self._opts.word_tokenizer.format_words(processed_words)

            # remove any punctuation at the end of a non-final transcript
            text = text.rstrip("".join(PUNCTUATIONS))

            speed = self._opts.speed
            if real_speed is not None:
                speed = real_speed
                estimated_pauses_s = (
                    text_data.forwarded_sentences * self._opts.new_sentence_delay
                )
                hyph_pauses = estimated_pauses_s * speed

                target_hyphens = round(speed * elapsed_time)
                dt = target_hyphens - text_data.forwarded_hyphens - hyph_pauses
                to_wait_hyphens = max(0.0, word_hyphens - dt)
                delay = to_wait_hyphens / speed
            else:
                delay = word_hyphens / speed

            first_delay = min(delay / 2, 2 / speed)
            await self._sleep_if_not_closed(first_delay)

            rtc_seg_ch.send_nowait(
                rtc.TranscriptionSegment(
                    id=seg_id,
                    text=text,
                    start_time=0,
                    end_time=0,
                    final=False,
                    language=self._opts.language,
                )
            )
            self._played_text = f"{og_text} {text}"

            await self._sleep_if_not_closed(delay - first_delay)
            text_data.forwarded_hyphens += word_hyphens

        rtc_seg_ch.send_nowait(
            rtc.TranscriptionSegment(
                id=seg_id,
                text=sentence,
                start_time=0,
                end_time=0,
                final=True,
                language=self._opts.language,
            )
        )
        self._played_text = f"{og_text} {sentence}"

        await self._sleep_if_not_closed(self._opts.new_sentence_delay)
        text_data.forwarded_sentences += 1

    async def _sleep_if_not_closed(self, delay: float) -> None:
        with contextlib.suppress(asyncio.TimeoutError):
            await asyncio.wait([self._close_future], timeout=delay)

    def _calc_hyphens(self, text: str) -> list[str]:
        hyphens: list[str] = []
        words = self._opts.word_tokenizer.tokenize(text=text)
        for word in words:
            new = self._opts.hyphenate_word(word)
            hyphens.extend(new)

        return hyphens

    def _check_not_closed(self) -> None:
        if self._closed:
            raise RuntimeError("TTSForwarder is closed")

Instance variables

prop closed : bool
Expand source code
@property
def closed(self) -> bool:
    return self._closed
prop played_text : str
Expand source code
@property
def played_text(self) -> str:
    return self._played_text

Methods

async def aclose(self) ‑> None
def mark_audio_segment_end(self) ‑> None
def mark_text_segment_end(self) ‑> None
def push_audio(self, frame: rtc.AudioFrame) ‑> None
def push_text(self, text: str) ‑> None
def segment_playout_finished(self) ‑> None

Notify that the playout of the audio segment has finished. This will catchup and directly send the final transcription in case the forwarder is too late.

def segment_playout_started(self) ‑> None

Notify that the playout of the audio segment has started. This will start forwarding the transcription for the current segment.