Module livekit.agents.tokenize.token_stream

Classes

class BufferedSentenceStream (*, tokenizer: TokenizeCallable, min_token_len: int, min_ctx_len: int)
Expand source code
class BufferedSentenceStream(BufferedTokenStream, SentenceStream):
    def __init__(
        self,
        *,
        tokenizer: TokenizeCallable,
        min_token_len: int,
        min_ctx_len: int,
    ) -> None:
        super().__init__(
            tokenize_fnc=tokenizer,
            min_token_len=min_token_len,
            min_ctx_len=min_ctx_len,
        )

Helper class that provides a standard way to create an ABC using inheritance.

Ancestors

class BufferedTokenStream (*, tokenize_fnc: TokenizeCallable, min_token_len: int, min_ctx_len: int)
Expand source code
class BufferedTokenStream:
    def __init__(
        self,
        *,
        tokenize_fnc: TokenizeCallable,
        min_token_len: int,
        min_ctx_len: int,
    ) -> None:
        self._event_ch = aio.Chan[TokenData]()
        self._tokenize_fnc = tokenize_fnc
        self._min_ctx_len = min_ctx_len
        self._min_token_len = min_token_len
        self._current_segment_id = shortuuid()

        self._buf_tokens: list[str] = []  # <= min_token_len
        self._in_buf = ""
        self._out_buf = ""

    @typing.no_type_check
    def push_text(self, text: str) -> None:
        self._check_not_closed()
        self._in_buf += text

        if len(self._in_buf) < self._min_ctx_len:
            return

        while True:
            tokens = self._tokenize_fnc(self._in_buf)
            if len(tokens) <= 1:
                break

            if self._out_buf:
                self._out_buf += " "

            tok = tokens.pop(0)
            tok_text = tok
            if isinstance(tok, tuple):
                tok_text = tok[0]

            self._out_buf += tok_text
            if len(self._out_buf) >= self._min_token_len:
                self._event_ch.send_nowait(
                    TokenData(token=self._out_buf, segment_id=self._current_segment_id)
                )

                self._out_buf = ""

            if isinstance(tok, tuple):
                self._in_buf = self._in_buf[tok[2] :]
            else:
                tok_i = max(self._in_buf.find(tok), 0)
                self._in_buf = self._in_buf[tok_i + len(tok) :].lstrip()

    @typing.no_type_check
    def flush(self) -> None:
        self._check_not_closed()

        if self._in_buf or self._out_buf:
            tokens = self._tokenize_fnc(self._in_buf)
            if tokens:
                if self._out_buf:
                    self._out_buf += " "

                if isinstance(tokens[0], tuple):
                    self._out_buf += " ".join([tok[0] for tok in tokens])
                else:
                    self._out_buf += " ".join(tokens)

            if self._out_buf:
                self._event_ch.send_nowait(
                    TokenData(token=self._out_buf, segment_id=self._current_segment_id)
                )

            self._current_segment_id = shortuuid()

        self._in_buf = ""
        self._out_buf = ""

    def end_input(self) -> None:
        self.flush()
        self._event_ch.close()

    async def aclose(self) -> None:
        self._event_ch.close()

    def _check_not_closed(self) -> None:
        if self._event_ch.closed:
            cls = type(self)
            raise RuntimeError(f"{cls.__module__}.{cls.__name__} is closed")

    def __aiter__(self) -> "BufferedTokenStream":
        return self

    async def __anext__(self) -> TokenData:
        return await self._event_ch.__anext__()

Subclasses

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    self._event_ch.close()
def end_input(self) ‑> None
Expand source code
def end_input(self) -> None:
    self.flush()
    self._event_ch.close()
def flush(self) ‑> None
Expand source code
@typing.no_type_check
def flush(self) -> None:
    self._check_not_closed()

    if self._in_buf or self._out_buf:
        tokens = self._tokenize_fnc(self._in_buf)
        if tokens:
            if self._out_buf:
                self._out_buf += " "

            if isinstance(tokens[0], tuple):
                self._out_buf += " ".join([tok[0] for tok in tokens])
            else:
                self._out_buf += " ".join(tokens)

        if self._out_buf:
            self._event_ch.send_nowait(
                TokenData(token=self._out_buf, segment_id=self._current_segment_id)
            )

        self._current_segment_id = shortuuid()

    self._in_buf = ""
    self._out_buf = ""
def push_text(self, text: str) ‑> None
Expand source code
@typing.no_type_check
def push_text(self, text: str) -> None:
    self._check_not_closed()
    self._in_buf += text

    if len(self._in_buf) < self._min_ctx_len:
        return

    while True:
        tokens = self._tokenize_fnc(self._in_buf)
        if len(tokens) <= 1:
            break

        if self._out_buf:
            self._out_buf += " "

        tok = tokens.pop(0)
        tok_text = tok
        if isinstance(tok, tuple):
            tok_text = tok[0]

        self._out_buf += tok_text
        if len(self._out_buf) >= self._min_token_len:
            self._event_ch.send_nowait(
                TokenData(token=self._out_buf, segment_id=self._current_segment_id)
            )

            self._out_buf = ""

        if isinstance(tok, tuple):
            self._in_buf = self._in_buf[tok[2] :]
        else:
            tok_i = max(self._in_buf.find(tok), 0)
            self._in_buf = self._in_buf[tok_i + len(tok) :].lstrip()
class BufferedWordStream (*, tokenizer: TokenizeCallable, min_token_len: int, min_ctx_len: int)
Expand source code
class BufferedWordStream(BufferedTokenStream, WordStream):
    def __init__(
        self,
        *,
        tokenizer: TokenizeCallable,
        min_token_len: int,
        min_ctx_len: int,
    ) -> None:
        super().__init__(
            tokenize_fnc=tokenizer,
            min_token_len=min_token_len,
            min_ctx_len=min_ctx_len,
        )

Helper class that provides a standard way to create an ABC using inheritance.

Ancestors