Module livekit.agents.tokenize.token_stream
Classes
class BufferedSentenceStream (*, tokenizer: TokenizeCallable, min_token_len: int, min_ctx_len: int)
-
Expand source code
class BufferedSentenceStream(BufferedTokenStream, SentenceStream): def __init__( self, *, tokenizer: TokenizeCallable, min_token_len: int, min_ctx_len: int, ) -> None: super().__init__( tokenize_fnc=tokenizer, min_token_len=min_token_len, min_ctx_len=min_ctx_len, )
Helper class that provides a standard way to create an ABC using inheritance.
Ancestors
- BufferedTokenStream
- SentenceStream
- abc.ABC
class BufferedTokenStream (*, tokenize_fnc: TokenizeCallable, min_token_len: int, min_ctx_len: int)
-
Expand source code
class BufferedTokenStream: def __init__( self, *, tokenize_fnc: TokenizeCallable, min_token_len: int, min_ctx_len: int, ) -> None: self._event_ch = aio.Chan[TokenData]() self._tokenize_fnc = tokenize_fnc self._min_ctx_len = min_ctx_len self._min_token_len = min_token_len self._current_segment_id = shortuuid() self._buf_tokens: list[str] = [] # <= min_token_len self._in_buf = "" self._out_buf = "" @typing.no_type_check def push_text(self, text: str) -> None: self._check_not_closed() self._in_buf += text if len(self._in_buf) < self._min_ctx_len: return while True: tokens = self._tokenize_fnc(self._in_buf) if len(tokens) <= 1: break if self._out_buf: self._out_buf += " " tok = tokens.pop(0) tok_text = tok if isinstance(tok, tuple): tok_text = tok[0] self._out_buf += tok_text if len(self._out_buf) >= self._min_token_len: self._event_ch.send_nowait( TokenData(token=self._out_buf, segment_id=self._current_segment_id) ) self._out_buf = "" if isinstance(tok, tuple): self._in_buf = self._in_buf[tok[2] :] else: tok_i = max(self._in_buf.find(tok), 0) self._in_buf = self._in_buf[tok_i + len(tok) :].lstrip() @typing.no_type_check def flush(self) -> None: self._check_not_closed() if self._in_buf or self._out_buf: tokens = self._tokenize_fnc(self._in_buf) if tokens: if self._out_buf: self._out_buf += " " if isinstance(tokens[0], tuple): self._out_buf += " ".join([tok[0] for tok in tokens]) else: self._out_buf += " ".join(tokens) if self._out_buf: self._event_ch.send_nowait( TokenData(token=self._out_buf, segment_id=self._current_segment_id) ) self._current_segment_id = shortuuid() self._in_buf = "" self._out_buf = "" def end_input(self) -> None: self.flush() self._event_ch.close() async def aclose(self) -> None: self._event_ch.close() def _check_not_closed(self) -> None: if self._event_ch.closed: cls = type(self) raise RuntimeError(f"{cls.__module__}.{cls.__name__} is closed") def __aiter__(self) -> "BufferedTokenStream": return self async def __anext__(self) -> TokenData: return await self._event_ch.__anext__()
Subclasses
Methods
async def aclose(self) ‑> None
-
Expand source code
async def aclose(self) -> None: self._event_ch.close()
def end_input(self) ‑> None
-
Expand source code
def end_input(self) -> None: self.flush() self._event_ch.close()
def flush(self) ‑> None
-
Expand source code
@typing.no_type_check def flush(self) -> None: self._check_not_closed() if self._in_buf or self._out_buf: tokens = self._tokenize_fnc(self._in_buf) if tokens: if self._out_buf: self._out_buf += " " if isinstance(tokens[0], tuple): self._out_buf += " ".join([tok[0] for tok in tokens]) else: self._out_buf += " ".join(tokens) if self._out_buf: self._event_ch.send_nowait( TokenData(token=self._out_buf, segment_id=self._current_segment_id) ) self._current_segment_id = shortuuid() self._in_buf = "" self._out_buf = ""
def push_text(self, text: str) ‑> None
-
Expand source code
@typing.no_type_check def push_text(self, text: str) -> None: self._check_not_closed() self._in_buf += text if len(self._in_buf) < self._min_ctx_len: return while True: tokens = self._tokenize_fnc(self._in_buf) if len(tokens) <= 1: break if self._out_buf: self._out_buf += " " tok = tokens.pop(0) tok_text = tok if isinstance(tok, tuple): tok_text = tok[0] self._out_buf += tok_text if len(self._out_buf) >= self._min_token_len: self._event_ch.send_nowait( TokenData(token=self._out_buf, segment_id=self._current_segment_id) ) self._out_buf = "" if isinstance(tok, tuple): self._in_buf = self._in_buf[tok[2] :] else: tok_i = max(self._in_buf.find(tok), 0) self._in_buf = self._in_buf[tok_i + len(tok) :].lstrip()
class BufferedWordStream (*, tokenizer: TokenizeCallable, min_token_len: int, min_ctx_len: int)
-
Expand source code
class BufferedWordStream(BufferedTokenStream, WordStream): def __init__( self, *, tokenizer: TokenizeCallable, min_token_len: int, min_ctx_len: int, ) -> None: super().__init__( tokenize_fnc=tokenizer, min_token_len=min_token_len, min_ctx_len=min_ctx_len, )
Helper class that provides a standard way to create an ABC using inheritance.
Ancestors
- BufferedTokenStream
- WordStream
- abc.ABC