Module livekit.agents.tokenize.tokenizer

Classes

class SentenceStream
Expand source code
class SentenceStream(ABC):
    def __init__(self) -> None:
        self._event_ch = aio.Chan[TokenData]()

    @abstractmethod
    def push_text(self, text: str) -> None: ...

    @abstractmethod
    def flush(self) -> None: ...

    @abstractmethod
    def end_input(self) -> None: ...

    @abstractmethod
    async def aclose(self) -> None: ...

    async def __anext__(self) -> TokenData:
        return await self._event_ch.__anext__()

    def __aiter__(self) -> AsyncIterator[TokenData]:
        return self

    def _do_close(self) -> None:
        self._event_ch.close()

    def _check_not_closed(self) -> None:
        if self._event_ch.closed:
            cls = type(self)
            raise RuntimeError(f"{cls.__module__}.{cls.__name__} is closed")

Helper class that provides a standard way to create an ABC using inheritance.

Ancestors

  • abc.ABC

Subclasses

Methods

async def aclose(self) ‑> None
Expand source code
@abstractmethod
async def aclose(self) -> None: ...
def end_input(self) ‑> None
Expand source code
@abstractmethod
def end_input(self) -> None: ...
def flush(self) ‑> None
Expand source code
@abstractmethod
def flush(self) -> None: ...
def push_text(self, text: str) ‑> None
Expand source code
@abstractmethod
def push_text(self, text: str) -> None: ...
class SentenceTokenizer
Expand source code
class SentenceTokenizer(ABC):
    @abstractmethod
    def tokenize(self, text: str, *, language: str | None = None) -> list[str]:
        pass

    @abstractmethod
    def stream(self, *, language: str | None = None) -> "SentenceStream":
        pass

Helper class that provides a standard way to create an ABC using inheritance.

Ancestors

  • abc.ABC

Subclasses

Methods

def stream(self, *, language: str | None = None) ‑> SentenceStream
Expand source code
@abstractmethod
def stream(self, *, language: str | None = None) -> "SentenceStream":
    pass
def tokenize(self, text: str, *, language: str | None = None) ‑> list[str]
Expand source code
@abstractmethod
def tokenize(self, text: str, *, language: str | None = None) -> list[str]:
    pass
class TokenData (segment_id: str = '', token: str = '')
Expand source code
@dataclass
class TokenData:
    segment_id: str = ""
    token: str = ""

TokenData(segment_id: 'str' = '', token: 'str' = '')

Class variables

var segment_id : str
var token : str
class WordStream
Expand source code
class WordStream(ABC):
    def __init__(self) -> None:
        self._event_ch = aio.Chan[TokenData]()

    @abstractmethod
    def push_text(self, text: str) -> None: ...

    @abstractmethod
    def flush(self) -> None: ...

    @abstractmethod
    def end_input(self) -> None: ...

    @abstractmethod
    async def aclose(self) -> None: ...

    async def __anext__(self) -> TokenData:
        return await self._event_ch.__anext__()

    def __aiter__(self) -> AsyncIterator[TokenData]:
        return self

    def _do_close(self) -> None:
        self._event_ch.close()

    def _check_not_closed(self) -> None:
        if self._event_ch.closed:
            cls = type(self)
            raise RuntimeError(f"{cls.__module__}.{cls.__name__} is closed")

Helper class that provides a standard way to create an ABC using inheritance.

Ancestors

  • abc.ABC

Subclasses

Methods

async def aclose(self) ‑> None
Expand source code
@abstractmethod
async def aclose(self) -> None: ...
def end_input(self) ‑> None
Expand source code
@abstractmethod
def end_input(self) -> None: ...
def flush(self) ‑> None
Expand source code
@abstractmethod
def flush(self) -> None: ...
def push_text(self, text: str) ‑> None
Expand source code
@abstractmethod
def push_text(self, text: str) -> None: ...
class WordTokenizer
Expand source code
class WordTokenizer(ABC):
    @abstractmethod
    def tokenize(self, text: str, *, language: str | None = None) -> list[str]:
        pass

    @abstractmethod
    def stream(self, *, language: str | None = None) -> "WordStream":
        pass

    def format_words(self, words: list[str]) -> str:
        return " ".join(words)

Helper class that provides a standard way to create an ABC using inheritance.

Ancestors

  • abc.ABC

Subclasses

Methods

def format_words(self, words: list[str]) ‑> str
Expand source code
def format_words(self, words: list[str]) -> str:
    return " ".join(words)
def stream(self, *, language: str | None = None) ‑> WordStream
Expand source code
@abstractmethod
def stream(self, *, language: str | None = None) -> "WordStream":
    pass
def tokenize(self, text: str, *, language: str | None = None) ‑> list[str]
Expand source code
@abstractmethod
def tokenize(self, text: str, *, language: str | None = None) -> list[str]:
    pass