Module livekit.agents.utils
Sub-modules
livekit.agents.utils.aio
livekit.agents.utils.audio
livekit.agents.utils.codecs
livekit.agents.utils.connection_pool
livekit.agents.utils.exp_filter
livekit.agents.utils.http_context
livekit.agents.utils.hw
livekit.agents.utils.images
livekit.agents.utils.log
livekit.agents.utils.misc
livekit.agents.utils.moving_average
Functions
def _compute_changes(old_list: list[~T], new_list: list[~T], key_fnc: Callable[[~T], str]) ‑> livekit.agents.utils._message_change.MessageChange[~T]
-
Expand source code
def compute_changes( old_list: list[T], new_list: list[T], key_fnc: Callable[[T], str] ) -> MessageChange[T]: """Compute minimum changes needed to transform old list into new list""" # Convert to lists of ids old_ids = [key_fnc(msg) for msg in old_list] new_ids = [key_fnc(msg) for msg in new_list] # Create lookup maps old_msgs = {key_fnc(msg): msg for msg in old_list} new_msgs = {key_fnc(msg): msg for msg in new_list} # Compute changes using ids changes = _compute_list_changes(old_ids, new_ids) # Convert back to items return MessageChange( to_delete=[old_msgs[id] for id in changes.to_delete], to_add=[ ( None if prev is None else old_msgs.get(prev) or new_msgs[prev], new_msgs[new], ) for prev, new in changes.to_add ], )
Compute minimum changes needed to transform old list into new list
def combine_frames(buffer: AudioFrame | list[AudioFrame]) ‑> AudioFrame
-
Expand source code
def combine_audio_frames(buffer: AudioFrame | list[AudioFrame]) -> AudioFrame: """ Combines one or more `rtc.AudioFrame` objects into a single `rtc.AudioFrame`. This function concatenates the audio data from multiple frames, ensuring that all frames have the same sample rate and number of channels. It efficiently merges the data by preallocating the necessary memory and copying the frame data without unnecessary reallocations. Args: buffer: A single `rtc.AudioFrame` or a list of `rtc.AudioFrame` objects to be combined. Returns: rtc.AudioFrame: A new `rtc.AudioFrame` containing the combined audio data. Raises: ValueError: If the buffer is empty. ValueError: If frames have differing sample rates. ValueError: If frames have differing numbers of channels. Example: >>> frame1 = rtc.AudioFrame( ... data=b"\x01\x02", sample_rate=48000, num_channels=2, samples_per_channel=1 ... ) >>> frame2 = rtc.AudioFrame( ... data=b"\x03\x04", sample_rate=48000, num_channels=2, samples_per_channel=1 ... ) >>> combined_frame = combine_audio_frames([frame1, frame2]) >>> combined_frame.data b'\x01\x02\x03\x04' >>> combined_frame.sample_rate 48000 >>> combined_frame.num_channels 2 >>> combined_frame.samples_per_channel 2 """ if not isinstance(buffer, list): return buffer if not buffer: raise ValueError("buffer is empty") sample_rate = buffer[0].sample_rate num_channels = buffer[0].num_channels total_data_length = 0 total_samples_per_channel = 0 for frame in buffer: if frame.sample_rate != sample_rate: raise ValueError( f"Sample rate mismatch: expected {sample_rate}, got {frame.sample_rate}" ) if frame.num_channels != num_channels: raise ValueError( f"Channel count mismatch: expected {num_channels}, got {frame.num_channels}" ) total_data_length += len(frame.data) total_samples_per_channel += frame.samples_per_channel data = bytearray(total_data_length) offset = 0 for frame in buffer: frame_data = frame.data.cast("b") data[offset : offset + len(frame_data)] = frame_data offset += len(frame_data) return AudioFrame( data=data, sample_rate=sample_rate, num_channels=num_channels, samples_per_channel=total_samples_per_channel, )
Combines one or more
rtc.AudioFrame
objects into a singlertc.AudioFrame
.This function concatenates the audio data from multiple frames, ensuring that all frames have the same sample rate and number of channels. It efficiently merges the data by preallocating the necessary memory and copying the frame data without unnecessary reallocations.
Args
buffer
- A single
rtc.AudioFrame
or a list ofrtc.AudioFrame
objects to be combined.
Returns
rtc.AudioFrame
- A new
rtc.AudioFrame
containing the combined audio data.
Raises
ValueError
- If the buffer is empty.
ValueError
- If frames have differing sample rates.
ValueError
- If frames have differing numbers of channels.
Example
>>> frame1 = rtc.AudioFrame( ... data=b"", sample_rate=48000, num_channels=2, samples_per_channel=1 ... ) >>> frame2 = rtc.AudioFrame( ... data=b"", sample_rate=48000, num_channels=2, samples_per_channel=1 ... ) >>> combined_frame = combine_audio_frames([frame1, frame2]) >>> combined_frame.data b'' >>> combined_frame.sample_rate 48000 >>> combined_frame.num_channels 2 >>> combined_frame.samples_per_channel 2
def is_given(obj: NotGivenOr[_T]) ‑> TypeGuard[~_T]
-
Expand source code
def is_given(obj: NotGivenOr[_T]) -> TypeGuard[_T]: return not isinstance(obj, NotGiven)
def log_exceptions(msg: str = '', logger: logging.Logger = <RootLogger root (WARNING)>) ‑> Callable[[Any], Any]
-
Expand source code
def log_exceptions( msg: str = "", logger: logging.Logger = logging.getLogger() ) -> Callable[[Any], Any]: def deco(fn: Callable[[Any], Any]): if asyncio.iscoroutinefunction(fn): @functools.wraps(fn) async def async_fn_logs(*args: Any, **kwargs: Any): try: return await fn(*args, **kwargs) except Exception: err = f"Error in {fn.__name__}" if msg: err += f" – {msg}" logger.exception(err) raise return async_fn_logs else: @functools.wraps(fn) def fn_logs(*args: Any, **kwargs: Any): try: return fn(*args, **kwargs) except Exception: err = f"Error in {fn.__name__}" if msg: err += f" – {msg}" logger.exception(err) raise return fn_logs return deco
def merge_frames(buffer: AudioFrame | list[AudioFrame]) ‑> AudioFrame
-
Expand source code
def combine_audio_frames(buffer: AudioFrame | list[AudioFrame]) -> AudioFrame: """ Combines one or more `rtc.AudioFrame` objects into a single `rtc.AudioFrame`. This function concatenates the audio data from multiple frames, ensuring that all frames have the same sample rate and number of channels. It efficiently merges the data by preallocating the necessary memory and copying the frame data without unnecessary reallocations. Args: buffer: A single `rtc.AudioFrame` or a list of `rtc.AudioFrame` objects to be combined. Returns: rtc.AudioFrame: A new `rtc.AudioFrame` containing the combined audio data. Raises: ValueError: If the buffer is empty. ValueError: If frames have differing sample rates. ValueError: If frames have differing numbers of channels. Example: >>> frame1 = rtc.AudioFrame( ... data=b"\x01\x02", sample_rate=48000, num_channels=2, samples_per_channel=1 ... ) >>> frame2 = rtc.AudioFrame( ... data=b"\x03\x04", sample_rate=48000, num_channels=2, samples_per_channel=1 ... ) >>> combined_frame = combine_audio_frames([frame1, frame2]) >>> combined_frame.data b'\x01\x02\x03\x04' >>> combined_frame.sample_rate 48000 >>> combined_frame.num_channels 2 >>> combined_frame.samples_per_channel 2 """ if not isinstance(buffer, list): return buffer if not buffer: raise ValueError("buffer is empty") sample_rate = buffer[0].sample_rate num_channels = buffer[0].num_channels total_data_length = 0 total_samples_per_channel = 0 for frame in buffer: if frame.sample_rate != sample_rate: raise ValueError( f"Sample rate mismatch: expected {sample_rate}, got {frame.sample_rate}" ) if frame.num_channels != num_channels: raise ValueError( f"Channel count mismatch: expected {num_channels}, got {frame.num_channels}" ) total_data_length += len(frame.data) total_samples_per_channel += frame.samples_per_channel data = bytearray(total_data_length) offset = 0 for frame in buffer: frame_data = frame.data.cast("b") data[offset : offset + len(frame_data)] = frame_data offset += len(frame_data) return AudioFrame( data=data, sample_rate=sample_rate, num_channels=num_channels, samples_per_channel=total_samples_per_channel, )
Combines one or more
rtc.AudioFrame
objects into a singlertc.AudioFrame
.This function concatenates the audio data from multiple frames, ensuring that all frames have the same sample rate and number of channels. It efficiently merges the data by preallocating the necessary memory and copying the frame data without unnecessary reallocations.
Args
buffer
- A single
rtc.AudioFrame
or a list ofrtc.AudioFrame
objects to be combined.
Returns
rtc.AudioFrame
- A new
rtc.AudioFrame
containing the combined audio data.
Raises
ValueError
- If the buffer is empty.
ValueError
- If frames have differing sample rates.
ValueError
- If frames have differing numbers of channels.
Example
>>> frame1 = rtc.AudioFrame( ... data=b"", sample_rate=48000, num_channels=2, samples_per_channel=1 ... ) >>> frame2 = rtc.AudioFrame( ... data=b"", sample_rate=48000, num_channels=2, samples_per_channel=1 ... ) >>> combined_frame = combine_audio_frames([frame1, frame2]) >>> combined_frame.data b'' >>> combined_frame.sample_rate 48000 >>> combined_frame.num_channels 2 >>> combined_frame.samples_per_channel 2
def shortuuid(prefix: str = '') ‑> str
-
Expand source code
def shortuuid(prefix: str = "") -> str: return prefix + str(uuid.uuid4().hex)[:12]
def time_ms() ‑> int
-
Expand source code
def time_ms() -> int: return int(time.time() * 1000 + 0.5)
Classes
class ConnectionPool (*,
max_session_duration: float | None = None,
mark_refreshed_on_get: bool = False,
connect_cb: Callable[[], Awaitable[~T]] | None = None,
close_cb: Callable[[~T], Awaitable[None]] | None = None)-
Expand source code
class ConnectionPool(Generic[T]): """Helper class to manage persistent connections like websockets. Handles connection pooling and reconnection after max duration. Can be used as an async context manager to automatically return connections to the pool. """ def __init__( self, *, max_session_duration: Optional[float] = None, mark_refreshed_on_get: bool = False, connect_cb: Optional[Callable[[], Awaitable[T]]] = None, close_cb: Optional[Callable[[T], Awaitable[None]]] = None, ) -> None: """Initialize the connection wrapper. Args: max_session_duration: Maximum duration in seconds before forcing reconnection mark_refreshed_on_get: If True, the session will be marked as fresh when get() is called. only used when max_session_duration is set. connect_cb: Optional async callback to create new connections close_cb: Optional async callback to close connections """ self._max_session_duration = max_session_duration self._mark_refreshed_on_get = mark_refreshed_on_get self._connect_cb = connect_cb self._close_cb = close_cb self._connections: dict[T, float] = {} # conn -> connected_at timestamp self._available: Set[T] = set() # store connections to be reaped (closed) later. self._to_close: Set[T] = set() self._prewarm_task: Optional[weakref.ref[asyncio.Task]] = None async def _connect(self) -> T: """Create a new connection. Returns: The new connection object Raises: NotImplementedError: If no connect callback was provided """ if self._connect_cb is None: raise NotImplementedError("Must provide connect_cb or implement connect()") connection = await self._connect_cb() self._connections[connection] = time.time() return connection async def _drain_to_close(self) -> None: """Drain and close all the connections queued for closing.""" for conn in list(self._to_close): await self._maybe_close_connection(conn) self._to_close.clear() @asynccontextmanager async def connection(self) -> AsyncGenerator[T, None]: """Get a connection from the pool and automatically return it when done. Yields: An active connection object """ conn = await self.get() try: yield conn except Exception: self.remove(conn) raise else: self.put(conn) async def get(self) -> T: """Get an available connection or create a new one if needed. Returns: An active connection object """ await self._drain_to_close() now = time.time() # try to reuse an available connection that hasn't expired while self._available: conn = self._available.pop() if ( self._max_session_duration is None or now - self._connections[conn] <= self._max_session_duration ): if self._mark_refreshed_on_get: self._connections[conn] = now return conn # connection expired; mark it for resetting. self.remove(conn) return await self._connect() def put(self, conn: T) -> None: """Mark a connection as available for reuse. If connection has been reset, it will not be added to the pool. Args: conn: The connection to make available """ if conn in self._connections: self._available.add(conn) async def _maybe_close_connection(self, conn: T) -> None: """Close a connection if close_cb is provided. Args: conn: The connection to close """ if self._close_cb is not None: await self._close_cb(conn) def remove(self, conn: T) -> None: """Remove a specific connection from the pool. Marks the connection to be closed during the next drain cycle. Args: conn: The connection to reset """ self._available.discard(conn) if conn in self._connections: self._to_close.add(conn) self._connections.pop(conn, None) def invalidate(self) -> None: """Clear all existing connections. Marks all current connections to be closed during the next drain cycle. """ for conn in list(self._connections.keys()): self._to_close.add(conn) self._connections.clear() self._available.clear() def prewarm(self) -> None: """Initiate prewarming of the connection pool without blocking. This method starts a background task that creates a new connection if none exist. The task automatically cleans itself up when the connection pool is closed. """ if self._prewarm_task is not None or self._connections: return async def _prewarm_impl(): if not self._connections: conn = await self._connect() self._available.add(conn) task = asyncio.create_task(_prewarm_impl()) self._prewarm_task = weakref.ref(task) async def aclose(self): """Close all connections, draining any pending connection closures.""" if self._prewarm_task is not None: task = self._prewarm_task() if task: aio.gracefully_cancel(task) self.invalidate() await self._drain_to_close()
Helper class to manage persistent connections like websockets.
Handles connection pooling and reconnection after max duration. Can be used as an async context manager to automatically return connections to the pool.
Initialize the connection wrapper.
Args
max_session_duration
- Maximum duration in seconds before forcing reconnection
mark_refreshed_on_get
- If True, the session will be marked as fresh when get() is called. only used when max_session_duration is set.
connect_cb
- Optional async callback to create new connections
close_cb
- Optional async callback to close connections
Ancestors
- typing.Generic
Methods
async def aclose(self)
-
Expand source code
async def aclose(self): """Close all connections, draining any pending connection closures.""" if self._prewarm_task is not None: task = self._prewarm_task() if task: aio.gracefully_cancel(task) self.invalidate() await self._drain_to_close()
Close all connections, draining any pending connection closures.
async def connection(self) ‑> AsyncGenerator[~T, None]
-
Expand source code
@asynccontextmanager async def connection(self) -> AsyncGenerator[T, None]: """Get a connection from the pool and automatically return it when done. Yields: An active connection object """ conn = await self.get() try: yield conn except Exception: self.remove(conn) raise else: self.put(conn)
Get a connection from the pool and automatically return it when done.
Yields
An active connection object
async def get(self) ‑> ~T
-
Expand source code
async def get(self) -> T: """Get an available connection or create a new one if needed. Returns: An active connection object """ await self._drain_to_close() now = time.time() # try to reuse an available connection that hasn't expired while self._available: conn = self._available.pop() if ( self._max_session_duration is None or now - self._connections[conn] <= self._max_session_duration ): if self._mark_refreshed_on_get: self._connections[conn] = now return conn # connection expired; mark it for resetting. self.remove(conn) return await self._connect()
Get an available connection or create a new one if needed.
Returns
An active connection object
def invalidate(self) ‑> None
-
Expand source code
def invalidate(self) -> None: """Clear all existing connections. Marks all current connections to be closed during the next drain cycle. """ for conn in list(self._connections.keys()): self._to_close.add(conn) self._connections.clear() self._available.clear()
Clear all existing connections.
Marks all current connections to be closed during the next drain cycle.
def prewarm(self) ‑> None
-
Expand source code
def prewarm(self) -> None: """Initiate prewarming of the connection pool without blocking. This method starts a background task that creates a new connection if none exist. The task automatically cleans itself up when the connection pool is closed. """ if self._prewarm_task is not None or self._connections: return async def _prewarm_impl(): if not self._connections: conn = await self._connect() self._available.add(conn) task = asyncio.create_task(_prewarm_impl()) self._prewarm_task = weakref.ref(task)
Initiate prewarming of the connection pool without blocking.
This method starts a background task that creates a new connection if none exist. The task automatically cleans itself up when the connection pool is closed.
def put(self, conn: ~T) ‑> None
-
Expand source code
def put(self, conn: T) -> None: """Mark a connection as available for reuse. If connection has been reset, it will not be added to the pool. Args: conn: The connection to make available """ if conn in self._connections: self._available.add(conn)
Mark a connection as available for reuse.
If connection has been reset, it will not be added to the pool.
Args
conn
- The connection to make available
def remove(self, conn: ~T) ‑> None
-
Expand source code
def remove(self, conn: T) -> None: """Remove a specific connection from the pool. Marks the connection to be closed during the next drain cycle. Args: conn: The connection to reset """ self._available.discard(conn) if conn in self._connections: self._to_close.add(conn) self._connections.pop(conn, None)
Remove a specific connection from the pool.
Marks the connection to be closed during the next drain cycle.
Args
conn
- The connection to reset
class EventEmitter
-
Expand source code
class EventEmitter(Generic[T_contra]): def __init__(self) -> None: """ Initialize a new instance of EventEmitter. """ self._events: Dict[T_contra, Set[Callable]] = dict() def emit(self, event: T_contra, *args) -> None: """ Trigger all callbacks associated with the given event. Args: event (T): The event to emit. *args: Positional arguments to pass to the callbacks. Example: Basic usage of emit: ```python emitter = EventEmitter[str]() def greet(name): print(f"Hello, {name}!") emitter.on('greet', greet) emitter.emit('greet', 'Alice') # Output: Hello, Alice! ``` """ if event in self._events: callables = self._events[event].copy() for callback in callables: try: sig = inspect.signature(callback) params = sig.parameters.values() has_varargs = any(p.kind == p.VAR_POSITIONAL for p in params) if has_varargs: callback(*args) else: positional_params = [ p for p in params if p.kind in (p.POSITIONAL_ONLY, p.POSITIONAL_OR_KEYWORD) ] num_params = len(positional_params) num_args = min(len(args), num_params) callback_args = args[:num_args] callback(*callback_args) except TypeError: raise except Exception: logger.exception(f"failed to emit event {event}") def once(self, event: T_contra, callback: Optional[Callable] = None) -> Callable: """ Register a callback to be called only once when the event is emitted. If a callback is provided, it registers the callback directly. If no callback is provided, it returns a decorator for use with function definitions. Args: event (T): The event to listen for. callback (Callable, optional): The callback to register. Defaults to None. Returns: Callable: The registered callback or a decorator if callback is None. Example: Using once with a direct callback: ```python emitter = EventEmitter[str]() def greet_once(name): print(f"Hello once, {name}!") emitter.once('greet', greet_once) emitter.emit('greet', 'Bob') # Output: Hello once, Bob! emitter.emit('greet', 'Bob') # No output, callback was removed after first call ``` Using once as a decorator: ```python emitter = EventEmitter[str]() @emitter.once('greet') def greet_once(name): print(f"Hello once, {name}!") emitter.emit('greet', 'Bob') # Output: Hello once, Bob! emitter.emit('greet', 'Bob') # No output ``` """ if callback is not None: def once_callback(*args, **kwargs): self.off(event, once_callback) callback(*args, **kwargs) return self.on(event, once_callback) else: def decorator(callback: Callable) -> Callable: self.once(event, callback) return callback return decorator def on(self, event: T_contra, callback: Optional[Callable] = None) -> Callable: """ Register a callback to be called whenever the event is emitted. If a callback is provided, it registers the callback directly. If no callback is provided, it returns a decorator for use with function definitions. Args: event (T): The event to listen for. callback (Callable, optional): The callback to register. Defaults to None. Returns: Callable: The registered callback or a decorator if callback is None. Example: Using on with a direct callback: ```python emitter = EventEmitter[str]() def greet(name): print(f"Hello, {name}!") emitter.on('greet', greet) emitter.emit('greet', 'Charlie') # Output: Hello, Charlie! ``` Using on as a decorator: ```python emitter = EventEmitter[str]() @emitter.on('greet') def greet(name): print(f"Hello, {name}!") emitter.emit('greet', 'Charlie') # Output: Hello, Charlie! ``` """ if callback is not None: if asyncio.iscoroutinefunction(callback): raise ValueError( "Cannot register an async callback with `.on()`. Use `asyncio.create_task` within your synchronous callback instead." ) if event not in self._events: self._events[event] = set() self._events[event].add(callback) return callback else: def decorator(callback: Callable) -> Callable: self.on(event, callback) return callback return decorator def off(self, event: T_contra, callback: Callable) -> None: """ Unregister a callback from an event. Args: event (T): The event to stop listening to. callback (Callable): The callback to remove. Example: Removing a callback: ```python emitter = EventEmitter[str]() def greet(name): print(f"Hello, {name}!") emitter.on('greet', greet) emitter.off('greet', greet) emitter.emit('greet', 'Dave') # No output, callback was removed ``` """ if event in self._events: self._events[event].discard(callback)
Abstract base class for generic types.
On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::
class Mapping[KT, VT]: def __getitem__(self, key: KT) -> VT: ... # Etc.
On older versions of Python, however, generic classes have to explicitly inherit from Generic.
After a class has been declared to be generic, it can then be used as follows::
def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
Initialize a new instance of EventEmitter.
Ancestors
- typing.Generic
Subclasses
- ProcPool
- LLM
- AgentPlayout
- MultimodalAgent
- AgentPlayout
- HumanInput
- VoicePipelineAgent
- STT
- TTS
- VAD
- livekit.agents.worker.Worker
- GeminiRealtimeSession
- ModelTranscriber
- TranscriberSession
- RealtimeSession
- Room
Methods
def emit(self, event: -T_contra, *args) ‑> None
-
Expand source code
def emit(self, event: T_contra, *args) -> None: """ Trigger all callbacks associated with the given event. Args: event (T): The event to emit. *args: Positional arguments to pass to the callbacks. Example: Basic usage of emit: ```python emitter = EventEmitter[str]() def greet(name): print(f"Hello, {name}!") emitter.on('greet', greet) emitter.emit('greet', 'Alice') # Output: Hello, Alice! ``` """ if event in self._events: callables = self._events[event].copy() for callback in callables: try: sig = inspect.signature(callback) params = sig.parameters.values() has_varargs = any(p.kind == p.VAR_POSITIONAL for p in params) if has_varargs: callback(*args) else: positional_params = [ p for p in params if p.kind in (p.POSITIONAL_ONLY, p.POSITIONAL_OR_KEYWORD) ] num_params = len(positional_params) num_args = min(len(args), num_params) callback_args = args[:num_args] callback(*callback_args) except TypeError: raise except Exception: logger.exception(f"failed to emit event {event}")
Trigger all callbacks associated with the given event.
Args
event
:T
- The event to emit.
*args
- Positional arguments to pass to the callbacks.
Example
Basic usage of emit:
emitter = EventEmitter[str]() def greet(name): print(f"Hello, {name}!") emitter.on('greet', greet) emitter.emit('greet', 'Alice') # Output: Hello, Alice!
def off(self, event: -T_contra, callback: Callable) ‑> None
-
Expand source code
def off(self, event: T_contra, callback: Callable) -> None: """ Unregister a callback from an event. Args: event (T): The event to stop listening to. callback (Callable): The callback to remove. Example: Removing a callback: ```python emitter = EventEmitter[str]() def greet(name): print(f"Hello, {name}!") emitter.on('greet', greet) emitter.off('greet', greet) emitter.emit('greet', 'Dave') # No output, callback was removed ``` """ if event in self._events: self._events[event].discard(callback)
Unregister a callback from an event.
Args
event
:T
- The event to stop listening to.
callback
:Callable
- The callback to remove.
Example
Removing a callback:
emitter = EventEmitter[str]() def greet(name): print(f"Hello, {name}!") emitter.on('greet', greet) emitter.off('greet', greet) emitter.emit('greet', 'Dave') # No output, callback was removed
def on(self, event: -T_contra, callback: Callable | None = None) ‑> Callable
-
Expand source code
def on(self, event: T_contra, callback: Optional[Callable] = None) -> Callable: """ Register a callback to be called whenever the event is emitted. If a callback is provided, it registers the callback directly. If no callback is provided, it returns a decorator for use with function definitions. Args: event (T): The event to listen for. callback (Callable, optional): The callback to register. Defaults to None. Returns: Callable: The registered callback or a decorator if callback is None. Example: Using on with a direct callback: ```python emitter = EventEmitter[str]() def greet(name): print(f"Hello, {name}!") emitter.on('greet', greet) emitter.emit('greet', 'Charlie') # Output: Hello, Charlie! ``` Using on as a decorator: ```python emitter = EventEmitter[str]() @emitter.on('greet') def greet(name): print(f"Hello, {name}!") emitter.emit('greet', 'Charlie') # Output: Hello, Charlie! ``` """ if callback is not None: if asyncio.iscoroutinefunction(callback): raise ValueError( "Cannot register an async callback with `.on()`. Use `asyncio.create_task` within your synchronous callback instead." ) if event not in self._events: self._events[event] = set() self._events[event].add(callback) return callback else: def decorator(callback: Callable) -> Callable: self.on(event, callback) return callback return decorator
Register a callback to be called whenever the event is emitted.
If a callback is provided, it registers the callback directly. If no callback is provided, it returns a decorator for use with function definitions.
Args
event
:T
- The event to listen for.
callback
:Callable
, optional- The callback to register. Defaults to None.
Returns
Callable
- The registered callback or a decorator if callback is None.
Example
Using on with a direct callback:
emitter = EventEmitter[str]() def greet(name): print(f"Hello, {name}!") emitter.on('greet', greet) emitter.emit('greet', 'Charlie') # Output: Hello, Charlie!
Using on as a decorator:
emitter = EventEmitter[str]() @emitter.on('greet') def greet(name): print(f"Hello, {name}!") emitter.emit('greet', 'Charlie') # Output: Hello, Charlie!
def once(self, event: -T_contra, callback: Callable | None = None) ‑> Callable
-
Expand source code
def once(self, event: T_contra, callback: Optional[Callable] = None) -> Callable: """ Register a callback to be called only once when the event is emitted. If a callback is provided, it registers the callback directly. If no callback is provided, it returns a decorator for use with function definitions. Args: event (T): The event to listen for. callback (Callable, optional): The callback to register. Defaults to None. Returns: Callable: The registered callback or a decorator if callback is None. Example: Using once with a direct callback: ```python emitter = EventEmitter[str]() def greet_once(name): print(f"Hello once, {name}!") emitter.once('greet', greet_once) emitter.emit('greet', 'Bob') # Output: Hello once, Bob! emitter.emit('greet', 'Bob') # No output, callback was removed after first call ``` Using once as a decorator: ```python emitter = EventEmitter[str]() @emitter.once('greet') def greet_once(name): print(f"Hello once, {name}!") emitter.emit('greet', 'Bob') # Output: Hello once, Bob! emitter.emit('greet', 'Bob') # No output ``` """ if callback is not None: def once_callback(*args, **kwargs): self.off(event, once_callback) callback(*args, **kwargs) return self.on(event, once_callback) else: def decorator(callback: Callable) -> Callable: self.once(event, callback) return callback return decorator
Register a callback to be called only once when the event is emitted.
If a callback is provided, it registers the callback directly. If no callback is provided, it returns a decorator for use with function definitions.
Args
event
:T
- The event to listen for.
callback
:Callable
, optional- The callback to register. Defaults to None.
Returns
Callable
- The registered callback or a decorator if callback is None.
Example
Using once with a direct callback:
emitter = EventEmitter[str]() def greet_once(name): print(f"Hello once, {name}!") emitter.once('greet', greet_once) emitter.emit('greet', 'Bob') # Output: Hello once, Bob! emitter.emit('greet', 'Bob') # No output, callback was removed after first call
Using once as a decorator:
emitter = EventEmitter[str]() @emitter.once('greet') def greet_once(name): print(f"Hello once, {name}!") emitter.emit('greet', 'Bob') # Output: Hello once, Bob! emitter.emit('greet', 'Bob') # No output
class ExpFilter (alpha: float, max_val: float = -1.0)
-
Expand source code
class ExpFilter: def __init__(self, alpha: float, max_val: float = -1.0) -> None: self._alpha = alpha self._filtered = -1.0 self._max_val = max_val def reset(self, alpha: float = -1.0) -> None: if alpha != -1.0: self._alpha = alpha self._filtered = -1.0 def apply(self, exp: float, sample: float) -> float: if self._filtered == -1.0: self._filtered = sample else: a = self._alpha**exp self._filtered = a * self._filtered + (1 - a) * sample if self._max_val != -1.0 and self._filtered > self._max_val: self._filtered = self._max_val return self._filtered def filtered(self) -> float: return self._filtered def update_base(self, alpha: float) -> None: self._alpha = alpha
Methods
def apply(self, exp: float, sample: float) ‑> float
-
Expand source code
def apply(self, exp: float, sample: float) -> float: if self._filtered == -1.0: self._filtered = sample else: a = self._alpha**exp self._filtered = a * self._filtered + (1 - a) * sample if self._max_val != -1.0 and self._filtered > self._max_val: self._filtered = self._max_val return self._filtered
def filtered(self) ‑> float
-
Expand source code
def filtered(self) -> float: return self._filtered
def reset(self, alpha: float = -1.0) ‑> None
-
Expand source code
def reset(self, alpha: float = -1.0) -> None: if alpha != -1.0: self._alpha = alpha self._filtered = -1.0
def update_base(self, alpha: float) ‑> None
-
Expand source code
def update_base(self, alpha: float) -> None: self._alpha = alpha
class MovingAverage (window_size: int)
-
Expand source code
class MovingAverage: def __init__(self, window_size: int) -> None: self._hist: list[float] = [0] * window_size self._sum: float = 0 self._count: int = 0 def add_sample(self, sample: float) -> None: self._count += 1 index = self._count % len(self._hist) if self._count > len(self._hist): self._sum -= self._hist[index] self._sum += sample self._hist[index] = sample def get_avg(self) -> float: if self._count == 0: return 0 return self._sum / self.size() def reset(self): self._count = 0 self._sum = 0 def size(self) -> int: return min(self._count, len(self._hist))
Methods
def add_sample(self, sample: float) ‑> None
-
Expand source code
def add_sample(self, sample: float) -> None: self._count += 1 index = self._count % len(self._hist) if self._count > len(self._hist): self._sum -= self._hist[index] self._sum += sample self._hist[index] = sample
def get_avg(self) ‑> float
-
Expand source code
def get_avg(self) -> float: if self._count == 0: return 0 return self._sum / self.size()
def reset(self)
-
Expand source code
def reset(self): self._count = 0 self._sum = 0
def size(self) ‑> int
-
Expand source code
def size(self) -> int: return min(self._count, len(self._hist))