Module livekit.agents.tokenize.tokenizer
Classes
class SentenceStream-
Expand source code
class SentenceStream(ABC): def __init__(self) -> None: self._event_ch = aio.Chan[TokenData]() @abstractmethod def push_text(self, text: str) -> None: ... @abstractmethod def flush(self) -> None: ... @abstractmethod def end_input(self) -> None: ... @abstractmethod async def aclose(self) -> None: ... async def __anext__(self) -> TokenData: return await self._event_ch.__anext__() def __aiter__(self) -> AsyncIterator[TokenData]: return self def _do_close(self) -> None: self._event_ch.close() def _check_not_closed(self) -> None: if self._event_ch.closed: cls = type(self) raise RuntimeError(f"{cls.__module__}.{cls.__name__} is closed")Helper class that provides a standard way to create an ABC using inheritance.
Ancestors
- abc.ABC
Subclasses
Methods
async def aclose(self) ‑> None-
Expand source code
@abstractmethod async def aclose(self) -> None: ... def end_input(self) ‑> None-
Expand source code
@abstractmethod def end_input(self) -> None: ... def flush(self) ‑> None-
Expand source code
@abstractmethod def flush(self) -> None: ... def push_text(self, text: str) ‑> None-
Expand source code
@abstractmethod def push_text(self, text: str) -> None: ...
class SentenceTokenizer-
Expand source code
class SentenceTokenizer(ABC): @abstractmethod def tokenize(self, text: str, *, language: str | None = None) -> list[str]: pass @abstractmethod def stream(self, *, language: str | None = None) -> "SentenceStream": passHelper class that provides a standard way to create an ABC using inheritance.
Ancestors
- abc.ABC
Subclasses
- SentenceTokenizer
- livekit.plugins.nltk.sentence_tokenizer.SentenceTokenizer
Methods
def stream(self, *, language: str | None = None) ‑> SentenceStream-
Expand source code
@abstractmethod def stream(self, *, language: str | None = None) -> "SentenceStream": pass def tokenize(self, text: str, *, language: str | None = None) ‑> list[str]-
Expand source code
@abstractmethod def tokenize(self, text: str, *, language: str | None = None) -> list[str]: pass
class TokenData (segment_id: str = '', token: str = '')-
Expand source code
@dataclass class TokenData: segment_id: str = "" token: str = ""TokenData(segment_id: 'str' = '', token: 'str' = '')
Instance variables
var segment_id : strvar token : str
class WordStream-
Expand source code
class WordStream(ABC): def __init__(self) -> None: self._event_ch = aio.Chan[TokenData]() @abstractmethod def push_text(self, text: str) -> None: ... @abstractmethod def flush(self) -> None: ... @abstractmethod def end_input(self) -> None: ... @abstractmethod async def aclose(self) -> None: ... async def __anext__(self) -> TokenData: return await self._event_ch.__anext__() def __aiter__(self) -> AsyncIterator[TokenData]: return self def _do_close(self) -> None: self._event_ch.close() def _check_not_closed(self) -> None: if self._event_ch.closed: cls = type(self) raise RuntimeError(f"{cls.__module__}.{cls.__name__} is closed")Helper class that provides a standard way to create an ABC using inheritance.
Ancestors
- abc.ABC
Subclasses
Methods
async def aclose(self) ‑> None-
Expand source code
@abstractmethod async def aclose(self) -> None: ... def end_input(self) ‑> None-
Expand source code
@abstractmethod def end_input(self) -> None: ... def flush(self) ‑> None-
Expand source code
@abstractmethod def flush(self) -> None: ... def push_text(self, text: str) ‑> None-
Expand source code
@abstractmethod def push_text(self, text: str) -> None: ...
class WordTokenizer-
Expand source code
class WordTokenizer(ABC): @abstractmethod def tokenize(self, text: str, *, language: str | None = None) -> list[str]: pass @abstractmethod def stream(self, *, language: str | None = None) -> "WordStream": pass def format_words(self, words: list[str]) -> str: return " ".join(words)Helper class that provides a standard way to create an ABC using inheritance.
Ancestors
- abc.ABC
Subclasses
Methods
def format_words(self, words: list[str]) ‑> str-
Expand source code
def format_words(self, words: list[str]) -> str: return " ".join(words) def stream(self, *, language: str | None = None) ‑> WordStream-
Expand source code
@abstractmethod def stream(self, *, language: str | None = None) -> "WordStream": pass def tokenize(self, text: str, *, language: str | None = None) ‑> list[str]-
Expand source code
@abstractmethod def tokenize(self, text: str, *, language: str | None = None) -> list[str]: pass