Module livekit.agents.utils

Sub-modules

livekit.agents.utils.aio
livekit.agents.utils.audio
livekit.agents.utils.codecs
livekit.agents.utils.connection_pool
livekit.agents.utils.exp_filter
livekit.agents.utils.http_context
livekit.agents.utils.hw
livekit.agents.utils.images
livekit.agents.utils.log
livekit.agents.utils.misc
livekit.agents.utils.moving_average

Functions

def _compute_changes(old_list: list[~T], new_list: list[~T], key_fnc: Callable[[~T], str]) ‑> livekit.agents.utils._message_change.MessageChange[~T]
Expand source code
def compute_changes(
    old_list: list[T], new_list: list[T], key_fnc: Callable[[T], str]
) -> MessageChange[T]:
    """Compute minimum changes needed to transform old list into new list"""
    # Convert to lists of ids
    old_ids = [key_fnc(msg) for msg in old_list]
    new_ids = [key_fnc(msg) for msg in new_list]

    # Create lookup maps
    old_msgs = {key_fnc(msg): msg for msg in old_list}
    new_msgs = {key_fnc(msg): msg for msg in new_list}

    # Compute changes using ids
    changes = _compute_list_changes(old_ids, new_ids)

    # Convert back to items
    return MessageChange(
        to_delete=[old_msgs[id] for id in changes.to_delete],
        to_add=[
            (
                None if prev is None else old_msgs.get(prev) or new_msgs[prev],
                new_msgs[new],
            )
            for prev, new in changes.to_add
        ],
    )

Compute minimum changes needed to transform old list into new list

def combine_frames(buffer: AudioFrame | list[AudioFrame]) ‑> AudioFrame
Expand source code
def combine_audio_frames(buffer: AudioFrame | list[AudioFrame]) -> AudioFrame:
    """
    Combines one or more `rtc.AudioFrame` objects into a single `rtc.AudioFrame`.

    This function concatenates the audio data from multiple frames, ensuring that
    all frames have the same sample rate and number of channels. It efficiently
    merges the data by preallocating the necessary memory and copying the frame
    data without unnecessary reallocations.

    Args:
        buffer: A single `rtc.AudioFrame` or a list of `rtc.AudioFrame`
            objects to be combined.

    Returns:
        rtc.AudioFrame: A new `rtc.AudioFrame` containing the combined audio data.

    Raises:
        ValueError: If the buffer is empty.
        ValueError: If frames have differing sample rates.
        ValueError: If frames have differing numbers of channels.

    Example:
        >>> frame1 = rtc.AudioFrame(
        ...     data=b"\x01\x02", sample_rate=48000, num_channels=2, samples_per_channel=1
        ... )
        >>> frame2 = rtc.AudioFrame(
        ...     data=b"\x03\x04", sample_rate=48000, num_channels=2, samples_per_channel=1
        ... )
        >>> combined_frame = combine_audio_frames([frame1, frame2])
        >>> combined_frame.data
        b'\x01\x02\x03\x04'
        >>> combined_frame.sample_rate
        48000
        >>> combined_frame.num_channels
        2
        >>> combined_frame.samples_per_channel
        2
    """
    if not isinstance(buffer, list):
        return buffer

    if not buffer:
        raise ValueError("buffer is empty")

    sample_rate = buffer[0].sample_rate
    num_channels = buffer[0].num_channels

    total_data_length = 0
    total_samples_per_channel = 0

    for frame in buffer:
        if frame.sample_rate != sample_rate:
            raise ValueError(
                f"Sample rate mismatch: expected {sample_rate}, got {frame.sample_rate}"
            )

        if frame.num_channels != num_channels:
            raise ValueError(
                f"Channel count mismatch: expected {num_channels}, got {frame.num_channels}"
            )

        total_data_length += len(frame.data)
        total_samples_per_channel += frame.samples_per_channel

    data = bytearray(total_data_length)
    offset = 0
    for frame in buffer:
        frame_data = frame.data.cast("b")
        data[offset : offset + len(frame_data)] = frame_data
        offset += len(frame_data)

    return AudioFrame(
        data=data,
        sample_rate=sample_rate,
        num_channels=num_channels,
        samples_per_channel=total_samples_per_channel,
    )

Combines one or more rtc.AudioFrame objects into a single rtc.AudioFrame.

This function concatenates the audio data from multiple frames, ensuring that all frames have the same sample rate and number of channels. It efficiently merges the data by preallocating the necessary memory and copying the frame data without unnecessary reallocations.

Args

buffer
A single rtc.AudioFrame or a list of rtc.AudioFrame objects to be combined.

Returns

rtc.AudioFrame
A new rtc.AudioFrame containing the combined audio data.

Raises

ValueError
If the buffer is empty.
ValueError
If frames have differing sample rates.
ValueError
If frames have differing numbers of channels.

Example

>>> frame1 = rtc.AudioFrame(
...     data=b"", sample_rate=48000, num_channels=2, samples_per_channel=1
... )
>>> frame2 = rtc.AudioFrame(
...     data=b"", sample_rate=48000, num_channels=2, samples_per_channel=1
... )
>>> combined_frame = combine_audio_frames([frame1, frame2])
>>> combined_frame.data
b''
>>> combined_frame.sample_rate
48000
>>> combined_frame.num_channels
2
>>> combined_frame.samples_per_channel
2
def is_given(obj: NotGivenOr[_T]) ‑> TypeGuard[~_T]
Expand source code
def is_given(obj: NotGivenOr[_T]) -> TypeGuard[_T]:
    return not isinstance(obj, NotGiven)
def log_exceptions(msg: str = '', logger: logging.Logger = <RootLogger root (WARNING)>) ‑> Callable[[Any], Any]
Expand source code
def log_exceptions(
    msg: str = "", logger: logging.Logger = logging.getLogger()
) -> Callable[[Any], Any]:
    def deco(fn: Callable[[Any], Any]):
        if asyncio.iscoroutinefunction(fn):

            @functools.wraps(fn)
            async def async_fn_logs(*args: Any, **kwargs: Any):
                try:
                    return await fn(*args, **kwargs)
                except Exception:
                    err = f"Error in {fn.__name__}"
                    if msg:
                        err += f" – {msg}"
                    logger.exception(err)
                    raise

            return async_fn_logs
        else:

            @functools.wraps(fn)
            def fn_logs(*args: Any, **kwargs: Any):
                try:
                    return fn(*args, **kwargs)
                except Exception:
                    err = f"Error in {fn.__name__}"
                    if msg:
                        err += f" – {msg}"
                    logger.exception(err)
                    raise

            return fn_logs

    return deco
def merge_frames(buffer: AudioFrame | list[AudioFrame]) ‑> AudioFrame
Expand source code
def combine_audio_frames(buffer: AudioFrame | list[AudioFrame]) -> AudioFrame:
    """
    Combines one or more `rtc.AudioFrame` objects into a single `rtc.AudioFrame`.

    This function concatenates the audio data from multiple frames, ensuring that
    all frames have the same sample rate and number of channels. It efficiently
    merges the data by preallocating the necessary memory and copying the frame
    data without unnecessary reallocations.

    Args:
        buffer: A single `rtc.AudioFrame` or a list of `rtc.AudioFrame`
            objects to be combined.

    Returns:
        rtc.AudioFrame: A new `rtc.AudioFrame` containing the combined audio data.

    Raises:
        ValueError: If the buffer is empty.
        ValueError: If frames have differing sample rates.
        ValueError: If frames have differing numbers of channels.

    Example:
        >>> frame1 = rtc.AudioFrame(
        ...     data=b"\x01\x02", sample_rate=48000, num_channels=2, samples_per_channel=1
        ... )
        >>> frame2 = rtc.AudioFrame(
        ...     data=b"\x03\x04", sample_rate=48000, num_channels=2, samples_per_channel=1
        ... )
        >>> combined_frame = combine_audio_frames([frame1, frame2])
        >>> combined_frame.data
        b'\x01\x02\x03\x04'
        >>> combined_frame.sample_rate
        48000
        >>> combined_frame.num_channels
        2
        >>> combined_frame.samples_per_channel
        2
    """
    if not isinstance(buffer, list):
        return buffer

    if not buffer:
        raise ValueError("buffer is empty")

    sample_rate = buffer[0].sample_rate
    num_channels = buffer[0].num_channels

    total_data_length = 0
    total_samples_per_channel = 0

    for frame in buffer:
        if frame.sample_rate != sample_rate:
            raise ValueError(
                f"Sample rate mismatch: expected {sample_rate}, got {frame.sample_rate}"
            )

        if frame.num_channels != num_channels:
            raise ValueError(
                f"Channel count mismatch: expected {num_channels}, got {frame.num_channels}"
            )

        total_data_length += len(frame.data)
        total_samples_per_channel += frame.samples_per_channel

    data = bytearray(total_data_length)
    offset = 0
    for frame in buffer:
        frame_data = frame.data.cast("b")
        data[offset : offset + len(frame_data)] = frame_data
        offset += len(frame_data)

    return AudioFrame(
        data=data,
        sample_rate=sample_rate,
        num_channels=num_channels,
        samples_per_channel=total_samples_per_channel,
    )

Combines one or more rtc.AudioFrame objects into a single rtc.AudioFrame.

This function concatenates the audio data from multiple frames, ensuring that all frames have the same sample rate and number of channels. It efficiently merges the data by preallocating the necessary memory and copying the frame data without unnecessary reallocations.

Args

buffer
A single rtc.AudioFrame or a list of rtc.AudioFrame objects to be combined.

Returns

rtc.AudioFrame
A new rtc.AudioFrame containing the combined audio data.

Raises

ValueError
If the buffer is empty.
ValueError
If frames have differing sample rates.
ValueError
If frames have differing numbers of channels.

Example

>>> frame1 = rtc.AudioFrame(
...     data=b"", sample_rate=48000, num_channels=2, samples_per_channel=1
... )
>>> frame2 = rtc.AudioFrame(
...     data=b"", sample_rate=48000, num_channels=2, samples_per_channel=1
... )
>>> combined_frame = combine_audio_frames([frame1, frame2])
>>> combined_frame.data
b''
>>> combined_frame.sample_rate
48000
>>> combined_frame.num_channels
2
>>> combined_frame.samples_per_channel
2
def shortuuid(prefix: str = '') ‑> str
Expand source code
def shortuuid(prefix: str = "") -> str:
    return prefix + str(uuid.uuid4().hex)[:12]
def time_ms() ‑> int
Expand source code
def time_ms() -> int:
    return int(time.time() * 1000 + 0.5)

Classes

class ConnectionPool (*,
max_session_duration: float | None = None,
mark_refreshed_on_get: bool = False,
connect_cb: Callable[[], Awaitable[~T]] | None = None,
close_cb: Callable[[~T], Awaitable[None]] | None = None)
Expand source code
class ConnectionPool(Generic[T]):
    """Helper class to manage persistent connections like websockets.

    Handles connection pooling and reconnection after max duration.
    Can be used as an async context manager to automatically return connections to the pool.
    """

    def __init__(
        self,
        *,
        max_session_duration: Optional[float] = None,
        mark_refreshed_on_get: bool = False,
        connect_cb: Optional[Callable[[], Awaitable[T]]] = None,
        close_cb: Optional[Callable[[T], Awaitable[None]]] = None,
    ) -> None:
        """Initialize the connection wrapper.

        Args:
            max_session_duration: Maximum duration in seconds before forcing reconnection
            mark_refreshed_on_get: If True, the session will be marked as fresh when get() is called. only used when max_session_duration is set.
            connect_cb: Optional async callback to create new connections
            close_cb: Optional async callback to close connections
        """
        self._max_session_duration = max_session_duration
        self._mark_refreshed_on_get = mark_refreshed_on_get
        self._connect_cb = connect_cb
        self._close_cb = close_cb
        self._connections: dict[T, float] = {}  # conn -> connected_at timestamp
        self._available: Set[T] = set()

        # store connections to be reaped (closed) later.
        self._to_close: Set[T] = set()

        self._prewarm_task: Optional[weakref.ref[asyncio.Task]] = None

    async def _connect(self) -> T:
        """Create a new connection.

        Returns:
            The new connection object

        Raises:
            NotImplementedError: If no connect callback was provided
        """
        if self._connect_cb is None:
            raise NotImplementedError("Must provide connect_cb or implement connect()")
        connection = await self._connect_cb()
        self._connections[connection] = time.time()
        return connection

    async def _drain_to_close(self) -> None:
        """Drain and close all the connections queued for closing."""
        for conn in list(self._to_close):
            await self._maybe_close_connection(conn)
        self._to_close.clear()

    @asynccontextmanager
    async def connection(self) -> AsyncGenerator[T, None]:
        """Get a connection from the pool and automatically return it when done.

        Yields:
            An active connection object
        """
        conn = await self.get()
        try:
            yield conn
        except Exception:
            self.remove(conn)
            raise
        else:
            self.put(conn)

    async def get(self) -> T:
        """Get an available connection or create a new one if needed.

        Returns:
            An active connection object
        """
        await self._drain_to_close()

        now = time.time()

        # try to reuse an available connection that hasn't expired
        while self._available:
            conn = self._available.pop()
            if (
                self._max_session_duration is None
                or now - self._connections[conn] <= self._max_session_duration
            ):
                if self._mark_refreshed_on_get:
                    self._connections[conn] = now
                return conn
            # connection expired; mark it for resetting.
            self.remove(conn)

        return await self._connect()

    def put(self, conn: T) -> None:
        """Mark a connection as available for reuse.

        If connection has been reset, it will not be added to the pool.

        Args:
            conn: The connection to make available
        """
        if conn in self._connections:
            self._available.add(conn)

    async def _maybe_close_connection(self, conn: T) -> None:
        """Close a connection if close_cb is provided.

        Args:
            conn: The connection to close
        """
        if self._close_cb is not None:
            await self._close_cb(conn)

    def remove(self, conn: T) -> None:
        """Remove a specific connection from the pool.

        Marks the connection to be closed during the next drain cycle.

        Args:
            conn: The connection to reset
        """
        self._available.discard(conn)
        if conn in self._connections:
            self._to_close.add(conn)
            self._connections.pop(conn, None)

    def invalidate(self) -> None:
        """Clear all existing connections.

        Marks all current connections to be closed during the next drain cycle.
        """
        for conn in list(self._connections.keys()):
            self._to_close.add(conn)
        self._connections.clear()
        self._available.clear()

    def prewarm(self) -> None:
        """Initiate prewarming of the connection pool without blocking.

        This method starts a background task that creates a new connection if none exist.
        The task automatically cleans itself up when the connection pool is closed.
        """
        if self._prewarm_task is not None or self._connections:
            return

        async def _prewarm_impl():
            if not self._connections:
                conn = await self._connect()
                self._available.add(conn)

        task = asyncio.create_task(_prewarm_impl())
        self._prewarm_task = weakref.ref(task)

    async def aclose(self):
        """Close all connections, draining any pending connection closures."""
        if self._prewarm_task is not None:
            task = self._prewarm_task()
            if task:
                aio.gracefully_cancel(task)

        self.invalidate()
        await self._drain_to_close()

Helper class to manage persistent connections like websockets.

Handles connection pooling and reconnection after max duration. Can be used as an async context manager to automatically return connections to the pool.

Initialize the connection wrapper.

Args

max_session_duration
Maximum duration in seconds before forcing reconnection
mark_refreshed_on_get
If True, the session will be marked as fresh when get() is called. only used when max_session_duration is set.
connect_cb
Optional async callback to create new connections
close_cb
Optional async callback to close connections

Ancestors

  • typing.Generic

Methods

async def aclose(self)
Expand source code
async def aclose(self):
    """Close all connections, draining any pending connection closures."""
    if self._prewarm_task is not None:
        task = self._prewarm_task()
        if task:
            aio.gracefully_cancel(task)

    self.invalidate()
    await self._drain_to_close()

Close all connections, draining any pending connection closures.

async def connection(self) ‑> AsyncGenerator[~T, None]
Expand source code
@asynccontextmanager
async def connection(self) -> AsyncGenerator[T, None]:
    """Get a connection from the pool and automatically return it when done.

    Yields:
        An active connection object
    """
    conn = await self.get()
    try:
        yield conn
    except Exception:
        self.remove(conn)
        raise
    else:
        self.put(conn)

Get a connection from the pool and automatically return it when done.

Yields

An active connection object

async def get(self) ‑> ~T
Expand source code
async def get(self) -> T:
    """Get an available connection or create a new one if needed.

    Returns:
        An active connection object
    """
    await self._drain_to_close()

    now = time.time()

    # try to reuse an available connection that hasn't expired
    while self._available:
        conn = self._available.pop()
        if (
            self._max_session_duration is None
            or now - self._connections[conn] <= self._max_session_duration
        ):
            if self._mark_refreshed_on_get:
                self._connections[conn] = now
            return conn
        # connection expired; mark it for resetting.
        self.remove(conn)

    return await self._connect()

Get an available connection or create a new one if needed.

Returns

An active connection object

def invalidate(self) ‑> None
Expand source code
def invalidate(self) -> None:
    """Clear all existing connections.

    Marks all current connections to be closed during the next drain cycle.
    """
    for conn in list(self._connections.keys()):
        self._to_close.add(conn)
    self._connections.clear()
    self._available.clear()

Clear all existing connections.

Marks all current connections to be closed during the next drain cycle.

def prewarm(self) ‑> None
Expand source code
def prewarm(self) -> None:
    """Initiate prewarming of the connection pool without blocking.

    This method starts a background task that creates a new connection if none exist.
    The task automatically cleans itself up when the connection pool is closed.
    """
    if self._prewarm_task is not None or self._connections:
        return

    async def _prewarm_impl():
        if not self._connections:
            conn = await self._connect()
            self._available.add(conn)

    task = asyncio.create_task(_prewarm_impl())
    self._prewarm_task = weakref.ref(task)

Initiate prewarming of the connection pool without blocking.

This method starts a background task that creates a new connection if none exist. The task automatically cleans itself up when the connection pool is closed.

def put(self, conn: ~T) ‑> None
Expand source code
def put(self, conn: T) -> None:
    """Mark a connection as available for reuse.

    If connection has been reset, it will not be added to the pool.

    Args:
        conn: The connection to make available
    """
    if conn in self._connections:
        self._available.add(conn)

Mark a connection as available for reuse.

If connection has been reset, it will not be added to the pool.

Args

conn
The connection to make available
def remove(self, conn: ~T) ‑> None
Expand source code
def remove(self, conn: T) -> None:
    """Remove a specific connection from the pool.

    Marks the connection to be closed during the next drain cycle.

    Args:
        conn: The connection to reset
    """
    self._available.discard(conn)
    if conn in self._connections:
        self._to_close.add(conn)
        self._connections.pop(conn, None)

Remove a specific connection from the pool.

Marks the connection to be closed during the next drain cycle.

Args

conn
The connection to reset
class EventEmitter
Expand source code
class EventEmitter(Generic[T_contra]):
    def __init__(self) -> None:
        """
        Initialize a new instance of EventEmitter.
        """
        self._events: Dict[T_contra, Set[Callable]] = dict()

    def emit(self, event: T_contra, *args) -> None:
        """
        Trigger all callbacks associated with the given event.

        Args:
            event (T): The event to emit.
            *args: Positional arguments to pass to the callbacks.

        Example:
            Basic usage of emit:

            ```python
            emitter = EventEmitter[str]()

            def greet(name):
                print(f"Hello, {name}!")

            emitter.on('greet', greet)
            emitter.emit('greet', 'Alice')  # Output: Hello, Alice!
            ```
        """
        if event in self._events:
            callables = self._events[event].copy()
            for callback in callables:
                try:
                    sig = inspect.signature(callback)
                    params = sig.parameters.values()

                    has_varargs = any(p.kind == p.VAR_POSITIONAL for p in params)
                    if has_varargs:
                        callback(*args)
                    else:
                        positional_params = [
                            p
                            for p in params
                            if p.kind in (p.POSITIONAL_ONLY, p.POSITIONAL_OR_KEYWORD)
                        ]
                        num_params = len(positional_params)
                        num_args = min(len(args), num_params)
                        callback_args = args[:num_args]

                        callback(*callback_args)
                except TypeError:
                    raise
                except Exception:
                    logger.exception(f"failed to emit event {event}")

    def once(self, event: T_contra, callback: Optional[Callable] = None) -> Callable:
        """
        Register a callback to be called only once when the event is emitted.

        If a callback is provided, it registers the callback directly.
        If no callback is provided, it returns a decorator for use with function definitions.

        Args:
            event (T): The event to listen for.
            callback (Callable, optional): The callback to register. Defaults to None.

        Returns:
            Callable: The registered callback or a decorator if callback is None.

        Example:
            Using once with a direct callback:

            ```python
            emitter = EventEmitter[str]()

            def greet_once(name):
                print(f"Hello once, {name}!")

            emitter.once('greet', greet_once)
            emitter.emit('greet', 'Bob')    # Output: Hello once, Bob!
            emitter.emit('greet', 'Bob')    # No output, callback was removed after first call
            ```

            Using once as a decorator:

            ```python
            emitter = EventEmitter[str]()

            @emitter.once('greet')
            def greet_once(name):
                print(f"Hello once, {name}!")

            emitter.emit('greet', 'Bob')    # Output: Hello once, Bob!
            emitter.emit('greet', 'Bob')    # No output
            ```
        """
        if callback is not None:

            def once_callback(*args, **kwargs):
                self.off(event, once_callback)
                callback(*args, **kwargs)

            return self.on(event, once_callback)
        else:

            def decorator(callback: Callable) -> Callable:
                self.once(event, callback)
                return callback

            return decorator

    def on(self, event: T_contra, callback: Optional[Callable] = None) -> Callable:
        """
        Register a callback to be called whenever the event is emitted.

        If a callback is provided, it registers the callback directly.
        If no callback is provided, it returns a decorator for use with function definitions.

        Args:
            event (T): The event to listen for.
            callback (Callable, optional): The callback to register. Defaults to None.

        Returns:
            Callable: The registered callback or a decorator if callback is None.

        Example:
            Using on with a direct callback:

            ```python
            emitter = EventEmitter[str]()

            def greet(name):
                print(f"Hello, {name}!")

            emitter.on('greet', greet)
            emitter.emit('greet', 'Charlie')  # Output: Hello, Charlie!
            ```

            Using on as a decorator:

            ```python
            emitter = EventEmitter[str]()

            @emitter.on('greet')
            def greet(name):
                print(f"Hello, {name}!")

            emitter.emit('greet', 'Charlie')  # Output: Hello, Charlie!
            ```
        """
        if callback is not None:
            if asyncio.iscoroutinefunction(callback):
                raise ValueError(
                    "Cannot register an async callback with `.on()`. Use `asyncio.create_task` within your synchronous callback instead."
                )

            if event not in self._events:
                self._events[event] = set()
            self._events[event].add(callback)
            return callback
        else:

            def decorator(callback: Callable) -> Callable:
                self.on(event, callback)
                return callback

            return decorator

    def off(self, event: T_contra, callback: Callable) -> None:
        """
        Unregister a callback from an event.

        Args:
            event (T): The event to stop listening to.
            callback (Callable): The callback to remove.

        Example:
            Removing a callback:

            ```python
            emitter = EventEmitter[str]()

            def greet(name):
                print(f"Hello, {name}!")

            emitter.on('greet', greet)
            emitter.off('greet', greet)
            emitter.emit('greet', 'Dave')  # No output, callback was removed
            ```
        """
        if event in self._events:
            self._events[event].discard(callback)

Abstract base class for generic types.

On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::

class Mapping[KT, VT]:
    def __getitem__(self, key: KT) -> VT:
        ...
    # Etc.

On older versions of Python, however, generic classes have to explicitly inherit from Generic.

After a class has been declared to be generic, it can then be used as follows::

def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT:
    try:
        return mapping[key]
    except KeyError:
        return default

Initialize a new instance of EventEmitter.

Ancestors

  • typing.Generic

Subclasses

Methods

def emit(self, event: -T_contra, *args) ‑> None
Expand source code
def emit(self, event: T_contra, *args) -> None:
    """
    Trigger all callbacks associated with the given event.

    Args:
        event (T): The event to emit.
        *args: Positional arguments to pass to the callbacks.

    Example:
        Basic usage of emit:

        ```python
        emitter = EventEmitter[str]()

        def greet(name):
            print(f"Hello, {name}!")

        emitter.on('greet', greet)
        emitter.emit('greet', 'Alice')  # Output: Hello, Alice!
        ```
    """
    if event in self._events:
        callables = self._events[event].copy()
        for callback in callables:
            try:
                sig = inspect.signature(callback)
                params = sig.parameters.values()

                has_varargs = any(p.kind == p.VAR_POSITIONAL for p in params)
                if has_varargs:
                    callback(*args)
                else:
                    positional_params = [
                        p
                        for p in params
                        if p.kind in (p.POSITIONAL_ONLY, p.POSITIONAL_OR_KEYWORD)
                    ]
                    num_params = len(positional_params)
                    num_args = min(len(args), num_params)
                    callback_args = args[:num_args]

                    callback(*callback_args)
            except TypeError:
                raise
            except Exception:
                logger.exception(f"failed to emit event {event}")

Trigger all callbacks associated with the given event.

Args

event : T
The event to emit.
*args
Positional arguments to pass to the callbacks.

Example

Basic usage of emit:

emitter = EventEmitter[str]()

def greet(name):
    print(f"Hello, {name}!")

emitter.on('greet', greet)
emitter.emit('greet', 'Alice')  # Output: Hello, Alice!
def off(self, event: -T_contra, callback: Callable) ‑> None
Expand source code
def off(self, event: T_contra, callback: Callable) -> None:
    """
    Unregister a callback from an event.

    Args:
        event (T): The event to stop listening to.
        callback (Callable): The callback to remove.

    Example:
        Removing a callback:

        ```python
        emitter = EventEmitter[str]()

        def greet(name):
            print(f"Hello, {name}!")

        emitter.on('greet', greet)
        emitter.off('greet', greet)
        emitter.emit('greet', 'Dave')  # No output, callback was removed
        ```
    """
    if event in self._events:
        self._events[event].discard(callback)

Unregister a callback from an event.

Args

event : T
The event to stop listening to.
callback : Callable
The callback to remove.

Example

Removing a callback:

emitter = EventEmitter[str]()

def greet(name):
    print(f"Hello, {name}!")

emitter.on('greet', greet)
emitter.off('greet', greet)
emitter.emit('greet', 'Dave')  # No output, callback was removed
def on(self, event: -T_contra, callback: Callable | None = None) ‑> Callable
Expand source code
def on(self, event: T_contra, callback: Optional[Callable] = None) -> Callable:
    """
    Register a callback to be called whenever the event is emitted.

    If a callback is provided, it registers the callback directly.
    If no callback is provided, it returns a decorator for use with function definitions.

    Args:
        event (T): The event to listen for.
        callback (Callable, optional): The callback to register. Defaults to None.

    Returns:
        Callable: The registered callback or a decorator if callback is None.

    Example:
        Using on with a direct callback:

        ```python
        emitter = EventEmitter[str]()

        def greet(name):
            print(f"Hello, {name}!")

        emitter.on('greet', greet)
        emitter.emit('greet', 'Charlie')  # Output: Hello, Charlie!
        ```

        Using on as a decorator:

        ```python
        emitter = EventEmitter[str]()

        @emitter.on('greet')
        def greet(name):
            print(f"Hello, {name}!")

        emitter.emit('greet', 'Charlie')  # Output: Hello, Charlie!
        ```
    """
    if callback is not None:
        if asyncio.iscoroutinefunction(callback):
            raise ValueError(
                "Cannot register an async callback with `.on()`. Use `asyncio.create_task` within your synchronous callback instead."
            )

        if event not in self._events:
            self._events[event] = set()
        self._events[event].add(callback)
        return callback
    else:

        def decorator(callback: Callable) -> Callable:
            self.on(event, callback)
            return callback

        return decorator

Register a callback to be called whenever the event is emitted.

If a callback is provided, it registers the callback directly. If no callback is provided, it returns a decorator for use with function definitions.

Args

event : T
The event to listen for.
callback : Callable, optional
The callback to register. Defaults to None.

Returns

Callable
The registered callback or a decorator if callback is None.

Example

Using on with a direct callback:

emitter = EventEmitter[str]()

def greet(name):
    print(f"Hello, {name}!")

emitter.on('greet', greet)
emitter.emit('greet', 'Charlie')  # Output: Hello, Charlie!

Using on as a decorator:

emitter = EventEmitter[str]()

@emitter.on('greet')
def greet(name):
    print(f"Hello, {name}!")

emitter.emit('greet', 'Charlie')  # Output: Hello, Charlie!
def once(self, event: -T_contra, callback: Callable | None = None) ‑> Callable
Expand source code
def once(self, event: T_contra, callback: Optional[Callable] = None) -> Callable:
    """
    Register a callback to be called only once when the event is emitted.

    If a callback is provided, it registers the callback directly.
    If no callback is provided, it returns a decorator for use with function definitions.

    Args:
        event (T): The event to listen for.
        callback (Callable, optional): The callback to register. Defaults to None.

    Returns:
        Callable: The registered callback or a decorator if callback is None.

    Example:
        Using once with a direct callback:

        ```python
        emitter = EventEmitter[str]()

        def greet_once(name):
            print(f"Hello once, {name}!")

        emitter.once('greet', greet_once)
        emitter.emit('greet', 'Bob')    # Output: Hello once, Bob!
        emitter.emit('greet', 'Bob')    # No output, callback was removed after first call
        ```

        Using once as a decorator:

        ```python
        emitter = EventEmitter[str]()

        @emitter.once('greet')
        def greet_once(name):
            print(f"Hello once, {name}!")

        emitter.emit('greet', 'Bob')    # Output: Hello once, Bob!
        emitter.emit('greet', 'Bob')    # No output
        ```
    """
    if callback is not None:

        def once_callback(*args, **kwargs):
            self.off(event, once_callback)
            callback(*args, **kwargs)

        return self.on(event, once_callback)
    else:

        def decorator(callback: Callable) -> Callable:
            self.once(event, callback)
            return callback

        return decorator

Register a callback to be called only once when the event is emitted.

If a callback is provided, it registers the callback directly. If no callback is provided, it returns a decorator for use with function definitions.

Args

event : T
The event to listen for.
callback : Callable, optional
The callback to register. Defaults to None.

Returns

Callable
The registered callback or a decorator if callback is None.

Example

Using once with a direct callback:

emitter = EventEmitter[str]()

def greet_once(name):
    print(f"Hello once, {name}!")

emitter.once('greet', greet_once)
emitter.emit('greet', 'Bob')    # Output: Hello once, Bob!
emitter.emit('greet', 'Bob')    # No output, callback was removed after first call

Using once as a decorator:

emitter = EventEmitter[str]()

@emitter.once('greet')
def greet_once(name):
    print(f"Hello once, {name}!")

emitter.emit('greet', 'Bob')    # Output: Hello once, Bob!
emitter.emit('greet', 'Bob')    # No output
class ExpFilter (alpha: float, max_val: float = -1.0)
Expand source code
class ExpFilter:
    def __init__(self, alpha: float, max_val: float = -1.0) -> None:
        self._alpha = alpha
        self._filtered = -1.0
        self._max_val = max_val

    def reset(self, alpha: float = -1.0) -> None:
        if alpha != -1.0:
            self._alpha = alpha
        self._filtered = -1.0

    def apply(self, exp: float, sample: float) -> float:
        if self._filtered == -1.0:
            self._filtered = sample
        else:
            a = self._alpha**exp
            self._filtered = a * self._filtered + (1 - a) * sample

        if self._max_val != -1.0 and self._filtered > self._max_val:
            self._filtered = self._max_val

        return self._filtered

    def filtered(self) -> float:
        return self._filtered

    def update_base(self, alpha: float) -> None:
        self._alpha = alpha

Methods

def apply(self, exp: float, sample: float) ‑> float
Expand source code
def apply(self, exp: float, sample: float) -> float:
    if self._filtered == -1.0:
        self._filtered = sample
    else:
        a = self._alpha**exp
        self._filtered = a * self._filtered + (1 - a) * sample

    if self._max_val != -1.0 and self._filtered > self._max_val:
        self._filtered = self._max_val

    return self._filtered
def filtered(self) ‑> float
Expand source code
def filtered(self) -> float:
    return self._filtered
def reset(self, alpha: float = -1.0) ‑> None
Expand source code
def reset(self, alpha: float = -1.0) -> None:
    if alpha != -1.0:
        self._alpha = alpha
    self._filtered = -1.0
def update_base(self, alpha: float) ‑> None
Expand source code
def update_base(self, alpha: float) -> None:
    self._alpha = alpha
class MovingAverage (window_size: int)
Expand source code
class MovingAverage:
    def __init__(self, window_size: int) -> None:
        self._hist: list[float] = [0] * window_size
        self._sum: float = 0
        self._count: int = 0

    def add_sample(self, sample: float) -> None:
        self._count += 1
        index = self._count % len(self._hist)
        if self._count > len(self._hist):
            self._sum -= self._hist[index]
        self._sum += sample
        self._hist[index] = sample

    def get_avg(self) -> float:
        if self._count == 0:
            return 0
        return self._sum / self.size()

    def reset(self):
        self._count = 0
        self._sum = 0

    def size(self) -> int:
        return min(self._count, len(self._hist))

Methods

def add_sample(self, sample: float) ‑> None
Expand source code
def add_sample(self, sample: float) -> None:
    self._count += 1
    index = self._count % len(self._hist)
    if self._count > len(self._hist):
        self._sum -= self._hist[index]
    self._sum += sample
    self._hist[index] = sample
def get_avg(self) ‑> float
Expand source code
def get_avg(self) -> float:
    if self._count == 0:
        return 0
    return self._sum / self.size()
def reset(self)
Expand source code
def reset(self):
    self._count = 0
    self._sum = 0
def size(self) ‑> int
Expand source code
def size(self) -> int:
    return min(self._count, len(self._hist))