Module livekit.agents.vad

Classes

class VAD (*,
capabilities: VADCapabilities)
Expand source code
class VAD(ABC, rtc.EventEmitter[Literal["metrics_collected"]]):
    def __init__(self, *, capabilities: VADCapabilities) -> None:
        super().__init__()
        self._capabilities = capabilities
        self._label = f"{type(self).__module__}.{type(self).__name__}"

    @property
    def capabilities(self) -> VADCapabilities:
        return self._capabilities

    @abstractmethod
    def stream(self) -> "VADStream": ...

Helper class that provides a standard way to create an ABC using inheritance.

Ancestors

Subclasses

  • livekit.plugins.silero.vad.VAD

Instance variables

prop capabilitiesVADCapabilities
Expand source code
@property
def capabilities(self) -> VADCapabilities:
    return self._capabilities

Methods

def stream(self) ‑> VADStream
Expand source code
@abstractmethod
def stream(self) -> "VADStream": ...

Inherited members

class VADCapabilities (update_interval: float)
Expand source code
@dataclass
class VADCapabilities:
    update_interval: float

VADCapabilities(update_interval: 'float')

Class variables

var update_interval : float
class VADEvent (type: VADEventType,
samples_index: int,
timestamp: float,
speech_duration: float,
silence_duration: float,
frames: List[rtc.AudioFrame] = <factory>,
probability: float = 0.0,
inference_duration: float = 0.0,
speaking: bool = False,
raw_accumulated_silence: float = 0.0,
raw_accumulated_speech: float = 0.0)
Expand source code
@dataclass
class VADEvent:
    """
    Represents an event detected by the Voice Activity Detector (VAD).
    """

    type: VADEventType
    """Type of the VAD event (e.g., start of speech, end of speech, inference done)."""

    samples_index: int
    """Index of the audio sample where the event occurred, relative to the inference sample rate."""

    timestamp: float
    """Timestamp (in seconds) when the event was fired."""

    speech_duration: float
    """Duration of the speech segment in seconds."""

    silence_duration: float
    """Duration of the silence segment in seconds."""

    frames: List[rtc.AudioFrame] = field(default_factory=list)
    """
    List of audio frames associated with the speech.

    - For `start_of_speech` events, this contains the audio chunks that triggered the detection.
    - For `inference_done` events, this contains the audio chunks that were processed.
    - For `end_of_speech` events, this contains the complete user speech.
    """

    probability: float = 0.0
    """Probability that speech is present (only for `INFERENCE_DONE` events)."""

    inference_duration: float = 0.0
    """Time taken to perform the inference, in seconds (only for `INFERENCE_DONE` events)."""

    speaking: bool = False
    """Indicates whether speech was detected in the frames."""

    raw_accumulated_silence: float = 0.0
    """Threshold used to detect silence."""

    raw_accumulated_speech: float = 0.0
    """Threshold used to detect speech."""

Represents an event detected by the Voice Activity Detector (VAD).

Class variables

var frames : List[AudioFrame]

List of audio frames associated with the speech.

  • For start_of_speech events, this contains the audio chunks that triggered the detection.
  • For inference_done events, this contains the audio chunks that were processed.
  • For end_of_speech events, this contains the complete user speech.
var inference_duration : float

Time taken to perform the inference, in seconds (only for INFERENCE_DONE events).

var probability : float

Probability that speech is present (only for INFERENCE_DONE events).

var raw_accumulated_silence : float

Threshold used to detect silence.

var raw_accumulated_speech : float

Threshold used to detect speech.

var samples_index : int

Index of the audio sample where the event occurred, relative to the inference sample rate.

var silence_duration : float

Duration of the silence segment in seconds.

var speaking : bool

Indicates whether speech was detected in the frames.

var speech_duration : float

Duration of the speech segment in seconds.

var timestamp : float

Timestamp (in seconds) when the event was fired.

var typeVADEventType

Type of the VAD event (e.g., start of speech, end of speech, inference done).

class VADEventType (*args, **kwds)
Expand source code
@unique
class VADEventType(str, Enum):
    START_OF_SPEECH = "start_of_speech"
    INFERENCE_DONE = "inference_done"
    END_OF_SPEECH = "end_of_speech"

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

Ancestors

  • builtins.str
  • enum.Enum

Class variables

var END_OF_SPEECH
var INFERENCE_DONE
var START_OF_SPEECH
class VADStream (vad: VAD)
Expand source code
class VADStream(ABC):
    class _FlushSentinel:
        pass

    def __init__(self, vad: VAD) -> None:
        self._vad = vad
        self._last_activity_time = time.perf_counter()
        self._input_ch = aio.Chan[Union[rtc.AudioFrame, VADStream._FlushSentinel]]()
        self._event_ch = aio.Chan[VADEvent]()

        self._event_aiter, monitor_aiter = aio.itertools.tee(self._event_ch, 2)
        self._metrics_task = asyncio.create_task(
            self._metrics_monitor_task(monitor_aiter), name="TTS._metrics_task"
        )

        self._task = asyncio.create_task(self._main_task())
        self._task.add_done_callback(lambda _: self._event_ch.close())

    @abstractmethod
    async def _main_task(self) -> None: ...

    async def _metrics_monitor_task(self, event_aiter: AsyncIterable[VADEvent]) -> None:
        """Task used to collect metrics"""

        inference_duration_total = 0.0
        inference_count = 0

        async for ev in event_aiter:
            if ev.type == VADEventType.INFERENCE_DONE:
                inference_duration_total += ev.inference_duration
                inference_count += 1

                if inference_count >= 1 / self._vad.capabilities.update_interval:
                    vad_metrics = VADMetrics(
                        timestamp=time.time(),
                        idle_time=time.perf_counter() - self._last_activity_time,
                        inference_duration_total=inference_duration_total,
                        inference_count=inference_count,
                        label=self._vad._label,
                    )
                    self._vad.emit("metrics_collected", vad_metrics)

                    inference_duration_total = 0.0
                    inference_count = 0
            elif ev.type in [VADEventType.START_OF_SPEECH, VADEventType.END_OF_SPEECH]:
                self._last_activity_time = time.perf_counter()

    def push_frame(self, frame: rtc.AudioFrame) -> None:
        """Push some text to be synthesized"""
        self._check_input_not_ended()
        self._check_not_closed()
        self._input_ch.send_nowait(frame)

    def flush(self) -> None:
        """Mark the end of the current segment"""
        self._check_input_not_ended()
        self._check_not_closed()
        self._input_ch.send_nowait(self._FlushSentinel())

    def end_input(self) -> None:
        """Mark the end of input, no more text will be pushed"""
        self.flush()
        self._input_ch.close()

    async def aclose(self) -> None:
        """Close ths stream immediately"""
        self._input_ch.close()
        await aio.gracefully_cancel(self._task)
        self._event_ch.close()
        await self._metrics_task

    async def __anext__(self) -> VADEvent:
        try:
            val = await self._event_aiter.__anext__()
        except StopAsyncIteration:
            if not self._task.cancelled() and (exc := self._task.exception()):
                raise exc from None

            raise StopAsyncIteration

        return val

    def __aiter__(self) -> AsyncIterator[VADEvent]:
        return self

    def _check_not_closed(self) -> None:
        if self._event_ch.closed:
            cls = type(self)
            raise RuntimeError(f"{cls.__module__}.{cls.__name__} is closed")

    def _check_input_not_ended(self) -> None:
        if self._input_ch.closed:
            cls = type(self)
            raise RuntimeError(f"{cls.__module__}.{cls.__name__} input ended")

Helper class that provides a standard way to create an ABC using inheritance.

Ancestors

  • abc.ABC

Subclasses

  • livekit.plugins.silero.vad.VADStream

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    """Close ths stream immediately"""
    self._input_ch.close()
    await aio.gracefully_cancel(self._task)
    self._event_ch.close()
    await self._metrics_task

Close ths stream immediately

def end_input(self) ‑> None
Expand source code
def end_input(self) -> None:
    """Mark the end of input, no more text will be pushed"""
    self.flush()
    self._input_ch.close()

Mark the end of input, no more text will be pushed

def flush(self) ‑> None
Expand source code
def flush(self) -> None:
    """Mark the end of the current segment"""
    self._check_input_not_ended()
    self._check_not_closed()
    self._input_ch.send_nowait(self._FlushSentinel())

Mark the end of the current segment

def push_frame(self, frame: rtc.AudioFrame) ‑> None
Expand source code
def push_frame(self, frame: rtc.AudioFrame) -> None:
    """Push some text to be synthesized"""
    self._check_input_not_ended()
    self._check_not_closed()
    self._input_ch.send_nowait(frame)

Push some text to be synthesized