Module livekit.agents.pipeline.speech_handle

Classes

class SpeechHandle (*,
id: str,
allow_interruptions: bool,
add_to_chat_ctx: bool,
is_reply: bool,
user_question: str,
fnc_nested_depth: int = 0,
extra_tools_messages: list[ChatMessage] | None = None)
Expand source code
class SpeechHandle:
    def __init__(
        self,
        *,
        id: str,
        allow_interruptions: bool,
        add_to_chat_ctx: bool,
        is_reply: bool,
        user_question: str,
        fnc_nested_depth: int = 0,
        extra_tools_messages: list[ChatMessage] | None = None,
    ) -> None:
        self._id = id
        self._allow_interruptions = allow_interruptions
        self._add_to_chat_ctx = add_to_chat_ctx

        # is_reply is True when the speech is answering to a user question
        self._is_reply = is_reply
        self._user_question = user_question
        self._user_committed = False

        self._init_fut = asyncio.Future[None]()
        self._done_fut = asyncio.Future[None]()
        self._initialized = False
        self._speech_committed = False  # speech committed (interrupted or not)

        # source and synthesis_handle are None until the speech is initialized
        self._source: str | LLMStream | AsyncIterable[str] | None = None
        self._synthesis_handle: SynthesisHandle | None = None

        # nested speech handle and function calls
        self._fnc_nested_depth = fnc_nested_depth
        self._fnc_extra_tools_messages: list[ChatMessage] | None = extra_tools_messages

        self._nested_speech_handles: list[SpeechHandle] = []
        self._nested_speech_changed = asyncio.Event()
        self._nested_speech_finished = False

    @staticmethod
    def create_assistant_reply(
        *,
        allow_interruptions: bool,
        add_to_chat_ctx: bool,
        user_question: str,
    ) -> SpeechHandle:
        return SpeechHandle(
            id=utils.shortuuid(),
            allow_interruptions=allow_interruptions,
            add_to_chat_ctx=add_to_chat_ctx,
            is_reply=True,
            user_question=user_question,
        )

    @staticmethod
    def create_assistant_speech(
        *,
        allow_interruptions: bool,
        add_to_chat_ctx: bool,
    ) -> SpeechHandle:
        return SpeechHandle(
            id=utils.shortuuid(),
            allow_interruptions=allow_interruptions,
            add_to_chat_ctx=add_to_chat_ctx,
            is_reply=False,
            user_question="",
        )

    @staticmethod
    def create_tool_speech(
        *,
        allow_interruptions: bool,
        add_to_chat_ctx: bool,
        fnc_nested_depth: int,
        extra_tools_messages: list[ChatMessage],
    ) -> SpeechHandle:
        return SpeechHandle(
            id=utils.shortuuid(),
            allow_interruptions=allow_interruptions,
            add_to_chat_ctx=add_to_chat_ctx,
            is_reply=False,
            user_question="",
            fnc_nested_depth=fnc_nested_depth,
            extra_tools_messages=extra_tools_messages,
        )

    async def wait_for_initialization(self) -> None:
        await asyncio.shield(self._init_fut)

    def initialize(
        self,
        *,
        source: str | LLMStream | AsyncIterable[str],
        synthesis_handle: SynthesisHandle,
    ) -> None:
        if self.interrupted:
            raise RuntimeError("speech is interrupted")

        self._source = source
        self._synthesis_handle = synthesis_handle
        self._initialized = True
        self._init_fut.set_result(None)

    def mark_user_committed(self) -> None:
        self._user_committed = True

    def mark_speech_committed(self) -> None:
        self._speech_committed = True

    @property
    def user_committed(self) -> bool:
        return self._user_committed

    @property
    def speech_committed(self) -> bool:
        return self._speech_committed

    @property
    def id(self) -> str:
        return self._id

    @property
    def allow_interruptions(self) -> bool:
        return self._allow_interruptions

    @property
    def add_to_chat_ctx(self) -> bool:
        return self._add_to_chat_ctx

    @property
    def source(self) -> str | LLMStream | AsyncIterable[str]:
        if self._source is None:
            raise RuntimeError("speech not initialized")
        return self._source

    @property
    def synthesis_handle(self) -> SynthesisHandle:
        if self._synthesis_handle is None:
            raise RuntimeError("speech not initialized")
        return self._synthesis_handle

    @synthesis_handle.setter
    def synthesis_handle(self, synthesis_handle: SynthesisHandle) -> None:
        """synthesis handle can be replaced for the same speech.
        This is useful when we need to do a new generation. (e.g for automatic function call answers)"""
        if self._synthesis_handle is None:
            raise RuntimeError("speech not initialized")

        self._synthesis_handle = synthesis_handle

    @property
    def initialized(self) -> bool:
        return self._initialized

    @property
    def is_reply(self) -> bool:
        return self._is_reply

    @property
    def user_question(self) -> str:
        return self._user_question

    @property
    def interrupted(self) -> bool:
        return self._init_fut.cancelled() or (
            self._synthesis_handle is not None and self._synthesis_handle.interrupted
        )

    def join(self) -> asyncio.Future:
        return self._done_fut

    def _set_done(self) -> None:
        self._done_fut.set_result(None)

    def interrupt(self) -> None:
        if not self.allow_interruptions:
            raise RuntimeError("interruptions are not allowed")
        self.cancel()

    def cancel(self) -> None:
        self._init_fut.cancel()

        if self._synthesis_handle is not None:
            self._synthesis_handle.interrupt()

    @property
    def fnc_nested_depth(self) -> int:
        return self._fnc_nested_depth

    @property
    def extra_tools_messages(self) -> list[ChatMessage] | None:
        return self._fnc_extra_tools_messages

    def add_nested_speech(self, speech_handle: SpeechHandle) -> None:
        self._nested_speech_handles.append(speech_handle)
        self._nested_speech_changed.set()

    @property
    def nested_speech_handles(self) -> list[SpeechHandle]:
        return self._nested_speech_handles

    @property
    def nested_speech_changed(self) -> asyncio.Event:
        return self._nested_speech_changed

    @property
    def nested_speech_finished(self) -> bool:
        return self._nested_speech_finished

    def mark_nested_speech_finished(self) -> None:
        self._nested_speech_finished = True

Static methods

def create_assistant_reply(*, allow_interruptions: bool, add_to_chat_ctx: bool, user_question: str) ‑> SpeechHandle
Expand source code
@staticmethod
def create_assistant_reply(
    *,
    allow_interruptions: bool,
    add_to_chat_ctx: bool,
    user_question: str,
) -> SpeechHandle:
    return SpeechHandle(
        id=utils.shortuuid(),
        allow_interruptions=allow_interruptions,
        add_to_chat_ctx=add_to_chat_ctx,
        is_reply=True,
        user_question=user_question,
    )
def create_assistant_speech(*, allow_interruptions: bool, add_to_chat_ctx: bool) ‑> SpeechHandle
Expand source code
@staticmethod
def create_assistant_speech(
    *,
    allow_interruptions: bool,
    add_to_chat_ctx: bool,
) -> SpeechHandle:
    return SpeechHandle(
        id=utils.shortuuid(),
        allow_interruptions=allow_interruptions,
        add_to_chat_ctx=add_to_chat_ctx,
        is_reply=False,
        user_question="",
    )
def create_tool_speech(*,
allow_interruptions: bool,
add_to_chat_ctx: bool,
fnc_nested_depth: int,
extra_tools_messages: list[ChatMessage]) ‑> SpeechHandle
Expand source code
@staticmethod
def create_tool_speech(
    *,
    allow_interruptions: bool,
    add_to_chat_ctx: bool,
    fnc_nested_depth: int,
    extra_tools_messages: list[ChatMessage],
) -> SpeechHandle:
    return SpeechHandle(
        id=utils.shortuuid(),
        allow_interruptions=allow_interruptions,
        add_to_chat_ctx=add_to_chat_ctx,
        is_reply=False,
        user_question="",
        fnc_nested_depth=fnc_nested_depth,
        extra_tools_messages=extra_tools_messages,
    )

Instance variables

prop add_to_chat_ctx : bool
Expand source code
@property
def add_to_chat_ctx(self) -> bool:
    return self._add_to_chat_ctx
prop allow_interruptions : bool
Expand source code
@property
def allow_interruptions(self) -> bool:
    return self._allow_interruptions
prop extra_tools_messages : list[ChatMessage] | None
Expand source code
@property
def extra_tools_messages(self) -> list[ChatMessage] | None:
    return self._fnc_extra_tools_messages
prop fnc_nested_depth : int
Expand source code
@property
def fnc_nested_depth(self) -> int:
    return self._fnc_nested_depth
prop id : str
Expand source code
@property
def id(self) -> str:
    return self._id
prop initialized : bool
Expand source code
@property
def initialized(self) -> bool:
    return self._initialized
prop interrupted : bool
Expand source code
@property
def interrupted(self) -> bool:
    return self._init_fut.cancelled() or (
        self._synthesis_handle is not None and self._synthesis_handle.interrupted
    )
prop is_reply : bool
Expand source code
@property
def is_reply(self) -> bool:
    return self._is_reply
prop nested_speech_changed : asyncio.Event
Expand source code
@property
def nested_speech_changed(self) -> asyncio.Event:
    return self._nested_speech_changed
prop nested_speech_finished : bool
Expand source code
@property
def nested_speech_finished(self) -> bool:
    return self._nested_speech_finished
prop nested_speech_handles : list[SpeechHandle]
Expand source code
@property
def nested_speech_handles(self) -> list[SpeechHandle]:
    return self._nested_speech_handles
prop source : str | LLMStream | AsyncIterable[str]
Expand source code
@property
def source(self) -> str | LLMStream | AsyncIterable[str]:
    if self._source is None:
        raise RuntimeError("speech not initialized")
    return self._source
prop speech_committed : bool
Expand source code
@property
def speech_committed(self) -> bool:
    return self._speech_committed
prop synthesis_handle : SynthesisHandle
Expand source code
@property
def synthesis_handle(self) -> SynthesisHandle:
    if self._synthesis_handle is None:
        raise RuntimeError("speech not initialized")
    return self._synthesis_handle
prop user_committed : bool
Expand source code
@property
def user_committed(self) -> bool:
    return self._user_committed
prop user_question : str
Expand source code
@property
def user_question(self) -> str:
    return self._user_question

Methods

def add_nested_speech(self,
speech_handle: SpeechHandle) ‑> None
Expand source code
def add_nested_speech(self, speech_handle: SpeechHandle) -> None:
    self._nested_speech_handles.append(speech_handle)
    self._nested_speech_changed.set()
def cancel(self) ‑> None
Expand source code
def cancel(self) -> None:
    self._init_fut.cancel()

    if self._synthesis_handle is not None:
        self._synthesis_handle.interrupt()
def initialize(self,
*,
source: str | LLMStream | AsyncIterable[str],
synthesis_handle: SynthesisHandle) ‑> None
Expand source code
def initialize(
    self,
    *,
    source: str | LLMStream | AsyncIterable[str],
    synthesis_handle: SynthesisHandle,
) -> None:
    if self.interrupted:
        raise RuntimeError("speech is interrupted")

    self._source = source
    self._synthesis_handle = synthesis_handle
    self._initialized = True
    self._init_fut.set_result(None)
def interrupt(self) ‑> None
Expand source code
def interrupt(self) -> None:
    if not self.allow_interruptions:
        raise RuntimeError("interruptions are not allowed")
    self.cancel()
def join(self) ‑> _asyncio.Future
Expand source code
def join(self) -> asyncio.Future:
    return self._done_fut
def mark_nested_speech_finished(self) ‑> None
Expand source code
def mark_nested_speech_finished(self) -> None:
    self._nested_speech_finished = True
def mark_speech_committed(self) ‑> None
Expand source code
def mark_speech_committed(self) -> None:
    self._speech_committed = True
def mark_user_committed(self) ‑> None
Expand source code
def mark_user_committed(self) -> None:
    self._user_committed = True
async def wait_for_initialization(self) ‑> None
Expand source code
async def wait_for_initialization(self) -> None:
    await asyncio.shield(self._init_fut)