Module livekit.agents.pipeline.speech_handle
Classes
class SpeechHandle (*,
id: str,
allow_interruptions: bool,
add_to_chat_ctx: bool,
is_reply: bool,
user_question: str,
fnc_nested_depth: int = 0,
extra_tools_messages: list[ChatMessage] | None = None)-
Expand source code
class SpeechHandle: def __init__( self, *, id: str, allow_interruptions: bool, add_to_chat_ctx: bool, is_reply: bool, user_question: str, fnc_nested_depth: int = 0, extra_tools_messages: list[ChatMessage] | None = None, ) -> None: self._id = id self._allow_interruptions = allow_interruptions self._add_to_chat_ctx = add_to_chat_ctx # is_reply is True when the speech is answering to a user question self._is_reply = is_reply self._user_question = user_question self._user_committed = False self._init_fut = asyncio.Future[None]() self._done_fut = asyncio.Future[None]() self._initialized = False self._speech_committed = False # speech committed (interrupted or not) # source and synthesis_handle are None until the speech is initialized self._source: str | LLMStream | AsyncIterable[str] | None = None self._synthesis_handle: SynthesisHandle | None = None # nested speech handle and function calls self._fnc_nested_depth = fnc_nested_depth self._fnc_extra_tools_messages: list[ChatMessage] | None = extra_tools_messages self._nested_speech_handles: list[SpeechHandle] = [] self._nested_speech_changed = asyncio.Event() self._nested_speech_finished = False @staticmethod def create_assistant_reply( *, allow_interruptions: bool, add_to_chat_ctx: bool, user_question: str, ) -> SpeechHandle: return SpeechHandle( id=utils.shortuuid(), allow_interruptions=allow_interruptions, add_to_chat_ctx=add_to_chat_ctx, is_reply=True, user_question=user_question, ) @staticmethod def create_assistant_speech( *, allow_interruptions: bool, add_to_chat_ctx: bool, ) -> SpeechHandle: return SpeechHandle( id=utils.shortuuid(), allow_interruptions=allow_interruptions, add_to_chat_ctx=add_to_chat_ctx, is_reply=False, user_question="", ) @staticmethod def create_tool_speech( *, allow_interruptions: bool, add_to_chat_ctx: bool, fnc_nested_depth: int, extra_tools_messages: list[ChatMessage], ) -> SpeechHandle: return SpeechHandle( id=utils.shortuuid(), allow_interruptions=allow_interruptions, add_to_chat_ctx=add_to_chat_ctx, is_reply=False, user_question="", fnc_nested_depth=fnc_nested_depth, extra_tools_messages=extra_tools_messages, ) async def wait_for_initialization(self) -> None: await asyncio.shield(self._init_fut) def initialize( self, *, source: str | LLMStream | AsyncIterable[str], synthesis_handle: SynthesisHandle, ) -> None: if self.interrupted: raise RuntimeError("speech is interrupted") self._source = source self._synthesis_handle = synthesis_handle self._initialized = True self._init_fut.set_result(None) def mark_user_committed(self) -> None: self._user_committed = True def mark_speech_committed(self) -> None: self._speech_committed = True @property def user_committed(self) -> bool: return self._user_committed @property def speech_committed(self) -> bool: return self._speech_committed @property def id(self) -> str: return self._id @property def allow_interruptions(self) -> bool: return self._allow_interruptions @property def add_to_chat_ctx(self) -> bool: return self._add_to_chat_ctx @property def source(self) -> str | LLMStream | AsyncIterable[str]: if self._source is None: raise RuntimeError("speech not initialized") return self._source @property def synthesis_handle(self) -> SynthesisHandle: if self._synthesis_handle is None: raise RuntimeError("speech not initialized") return self._synthesis_handle @synthesis_handle.setter def synthesis_handle(self, synthesis_handle: SynthesisHandle) -> None: """synthesis handle can be replaced for the same speech. This is useful when we need to do a new generation. (e.g for automatic function call answers)""" if self._synthesis_handle is None: raise RuntimeError("speech not initialized") self._synthesis_handle = synthesis_handle @property def initialized(self) -> bool: return self._initialized @property def is_reply(self) -> bool: return self._is_reply @property def user_question(self) -> str: return self._user_question @property def interrupted(self) -> bool: return self._init_fut.cancelled() or ( self._synthesis_handle is not None and self._synthesis_handle.interrupted ) def join(self) -> asyncio.Future: return self._done_fut def _set_done(self) -> None: self._done_fut.set_result(None) def interrupt(self) -> None: if not self.allow_interruptions: raise RuntimeError("interruptions are not allowed") self.cancel() def cancel(self) -> None: self._init_fut.cancel() if self._synthesis_handle is not None: self._synthesis_handle.interrupt() @property def fnc_nested_depth(self) -> int: return self._fnc_nested_depth @property def extra_tools_messages(self) -> list[ChatMessage] | None: return self._fnc_extra_tools_messages def add_nested_speech(self, speech_handle: SpeechHandle) -> None: self._nested_speech_handles.append(speech_handle) self._nested_speech_changed.set() @property def nested_speech_handles(self) -> list[SpeechHandle]: return self._nested_speech_handles @property def nested_speech_changed(self) -> asyncio.Event: return self._nested_speech_changed @property def nested_speech_finished(self) -> bool: return self._nested_speech_finished def mark_nested_speech_finished(self) -> None: self._nested_speech_finished = True
Static methods
def create_assistant_reply(*, allow_interruptions: bool, add_to_chat_ctx: bool, user_question: str) ‑> SpeechHandle
-
Expand source code
@staticmethod def create_assistant_reply( *, allow_interruptions: bool, add_to_chat_ctx: bool, user_question: str, ) -> SpeechHandle: return SpeechHandle( id=utils.shortuuid(), allow_interruptions=allow_interruptions, add_to_chat_ctx=add_to_chat_ctx, is_reply=True, user_question=user_question, )
def create_assistant_speech(*, allow_interruptions: bool, add_to_chat_ctx: bool) ‑> SpeechHandle
-
Expand source code
@staticmethod def create_assistant_speech( *, allow_interruptions: bool, add_to_chat_ctx: bool, ) -> SpeechHandle: return SpeechHandle( id=utils.shortuuid(), allow_interruptions=allow_interruptions, add_to_chat_ctx=add_to_chat_ctx, is_reply=False, user_question="", )
def create_tool_speech(*,
allow_interruptions: bool,
add_to_chat_ctx: bool,
fnc_nested_depth: int,
extra_tools_messages: list[ChatMessage]) ‑> SpeechHandle-
Expand source code
@staticmethod def create_tool_speech( *, allow_interruptions: bool, add_to_chat_ctx: bool, fnc_nested_depth: int, extra_tools_messages: list[ChatMessage], ) -> SpeechHandle: return SpeechHandle( id=utils.shortuuid(), allow_interruptions=allow_interruptions, add_to_chat_ctx=add_to_chat_ctx, is_reply=False, user_question="", fnc_nested_depth=fnc_nested_depth, extra_tools_messages=extra_tools_messages, )
Instance variables
prop add_to_chat_ctx : bool
-
Expand source code
@property def add_to_chat_ctx(self) -> bool: return self._add_to_chat_ctx
prop allow_interruptions : bool
-
Expand source code
@property def allow_interruptions(self) -> bool: return self._allow_interruptions
prop extra_tools_messages : list[ChatMessage] | None
-
Expand source code
@property def extra_tools_messages(self) -> list[ChatMessage] | None: return self._fnc_extra_tools_messages
prop fnc_nested_depth : int
-
Expand source code
@property def fnc_nested_depth(self) -> int: return self._fnc_nested_depth
prop id : str
-
Expand source code
@property def id(self) -> str: return self._id
prop initialized : bool
-
Expand source code
@property def initialized(self) -> bool: return self._initialized
prop interrupted : bool
-
Expand source code
@property def interrupted(self) -> bool: return self._init_fut.cancelled() or ( self._synthesis_handle is not None and self._synthesis_handle.interrupted )
prop is_reply : bool
-
Expand source code
@property def is_reply(self) -> bool: return self._is_reply
prop nested_speech_changed : asyncio.Event
-
Expand source code
@property def nested_speech_changed(self) -> asyncio.Event: return self._nested_speech_changed
prop nested_speech_finished : bool
-
Expand source code
@property def nested_speech_finished(self) -> bool: return self._nested_speech_finished
prop nested_speech_handles : list[SpeechHandle]
-
Expand source code
@property def nested_speech_handles(self) -> list[SpeechHandle]: return self._nested_speech_handles
prop source : str | LLMStream | AsyncIterable[str]
-
Expand source code
@property def source(self) -> str | LLMStream | AsyncIterable[str]: if self._source is None: raise RuntimeError("speech not initialized") return self._source
prop speech_committed : bool
-
Expand source code
@property def speech_committed(self) -> bool: return self._speech_committed
prop synthesis_handle : SynthesisHandle
-
Expand source code
@property def synthesis_handle(self) -> SynthesisHandle: if self._synthesis_handle is None: raise RuntimeError("speech not initialized") return self._synthesis_handle
prop user_committed : bool
-
Expand source code
@property def user_committed(self) -> bool: return self._user_committed
prop user_question : str
-
Expand source code
@property def user_question(self) -> str: return self._user_question
Methods
def add_nested_speech(self,
speech_handle: SpeechHandle) ‑> None-
Expand source code
def add_nested_speech(self, speech_handle: SpeechHandle) -> None: self._nested_speech_handles.append(speech_handle) self._nested_speech_changed.set()
def cancel(self) ‑> None
-
Expand source code
def cancel(self) -> None: self._init_fut.cancel() if self._synthesis_handle is not None: self._synthesis_handle.interrupt()
def initialize(self,
*,
source: str | LLMStream | AsyncIterable[str],
synthesis_handle: SynthesisHandle) ‑> None-
Expand source code
def initialize( self, *, source: str | LLMStream | AsyncIterable[str], synthesis_handle: SynthesisHandle, ) -> None: if self.interrupted: raise RuntimeError("speech is interrupted") self._source = source self._synthesis_handle = synthesis_handle self._initialized = True self._init_fut.set_result(None)
def interrupt(self) ‑> None
-
Expand source code
def interrupt(self) -> None: if not self.allow_interruptions: raise RuntimeError("interruptions are not allowed") self.cancel()
def join(self) ‑> _asyncio.Future
-
Expand source code
def join(self) -> asyncio.Future: return self._done_fut
def mark_nested_speech_finished(self) ‑> None
-
Expand source code
def mark_nested_speech_finished(self) -> None: self._nested_speech_finished = True
def mark_speech_committed(self) ‑> None
-
Expand source code
def mark_speech_committed(self) -> None: self._speech_committed = True
def mark_user_committed(self) ‑> None
-
Expand source code
def mark_user_committed(self) -> None: self._user_committed = True
async def wait_for_initialization(self) ‑> None
-
Expand source code
async def wait_for_initialization(self) -> None: await asyncio.shield(self._init_fut)