Module livekit.agents.pipeline.plotter

Classes

class AssistantPlotter (loop: asyncio.events.AbstractEventLoop)
Expand source code
class AssistantPlotter:
    def __init__(self, loop: asyncio.AbstractEventLoop) -> None:
        self._loop = loop
        self._started = False

    async def start(self):
        if self._started:
            return

        mp_pch, mp_cch = socket.socketpair()
        self._duplex = await utils.aio.duplex_unix._AsyncDuplex.open(mp_pch)
        self._plot_proc = mp.Process(target=_draw_plot, args=(mp_cch,), daemon=True)
        self._plot_proc.start()
        mp_cch.close()

        self._started = True
        self._closed = False
        self._start_time = time.time()

    def plot_value(self, which: PlotType, y: float):
        if not self._started:
            return

        ts = time.time() - self._start_time
        self._send_message(PlotMessage(which=which, x=ts, y=y))

    def plot_event(self, which: EventType):
        if not self._started:
            return

        ts = time.time() - self._start_time
        self._send_message(PlotEventMessage(which=which, x=ts))

    def _send_message(self, msg: channel.Message) -> None:
        if self._closed:
            return

        async def _asend_message():
            try:
                await channel.asend_message(self._duplex, msg)
            except Exception:
                self._closed = True

        asyncio.ensure_future(_asend_message())

    async def terminate(self):
        if not self._started:
            return

        self._plot_proc.terminate()

        with contextlib.suppress(utils.aio.duplex_unix.DuplexClosed):
            await self._duplex.aclose()

Methods

def plot_event(self, which: Literal['user_started_speaking', 'user_stopped_speaking', 'agent_started_speaking', 'agent_stopped_speaking'])
def plot_value(self, which: Literal['vad_probability', 'raw_vol', 'smoothed_vol'], y: float)
async def start(self)
async def terminate(self)
class PlotEventMessage (which: Literal['user_started_speaking', 'user_stopped_speaking', 'agent_started_speaking', 'agent_stopped_speaking'] = 'user_started_speaking', x: float = 0.0)

PlotEventMessage(which: Literal['user_started_speaking', 'user_stopped_speaking', 'agent_started_speaking', 'agent_stopped_speaking'] = 'user_started_speaking', x: float = 0.0)

Expand source code
@dataclass
class PlotEventMessage:
    MSG_ID: ClassVar[int] = 2

    which: EventType = "user_started_speaking"
    x: float = 0.0

    def write(self, b: io.BytesIO) -> None:
        channel.write_string(b, self.which)
        channel.write_float(b, self.x)

    def read(self, b: io.BytesIO) -> None:
        self.which = channel.read_string(b)  # type: ignore
        self.x = channel.read_float(b)

Class variables

var MSG_ID : ClassVar[int]
var which : Literal['user_started_speaking', 'user_stopped_speaking', 'agent_started_speaking', 'agent_stopped_speaking']
var x : float

Methods

def read(self, b: _io.BytesIO) ‑> None
def write(self, b: _io.BytesIO) ‑> None
class PlotMessage (which: Literal['vad_probability', 'raw_vol', 'smoothed_vol'] = 'vad_probability', x: float = 0.0, y: float = 0.0)

PlotMessage(which: Literal['vad_probability', 'raw_vol', 'smoothed_vol'] = 'vad_probability', x: float = 0.0, y: float = 0.0)

Expand source code
@dataclass
class PlotMessage:
    MSG_ID: ClassVar[int] = 1

    which: PlotType = "vad_probability"
    x: float = 0.0
    y: float = 0.0

    def write(self, b: io.BytesIO) -> None:
        channel.write_string(b, self.which)
        channel.write_float(b, self.x)
        channel.write_float(b, self.y)

    def read(self, b: io.BytesIO) -> None:
        self.which = channel.read_string(b)  # type: ignore
        self.x = channel.read_float(b)
        self.y = channel.read_float(b)

Class variables

var MSG_ID : ClassVar[int]
var which : Literal['vad_probability', 'raw_vol', 'smoothed_vol']
var x : float
var y : float

Methods

def read(self, b: _io.BytesIO) ‑> None
def write(self, b: _io.BytesIO) ‑> None