Module livekit.agents.utils
Sub-modules
livekit.agents.utils.aiolivekit.agents.utils.audiolivekit.agents.utils.codecslivekit.agents.utils.deprecationlivekit.agents.utils.http_contextlivekit.agents.utils.http_serverlivekit.agents.utils.hwlivekit.agents.utils.images
Functions
def combine_frames(buffer: AudioFrame | list[AudioFrame]) ‑> AudioFrame-
Expand source code
def combine_audio_frames(buffer: AudioFrame | list[AudioFrame]) -> AudioFrame: """ Combines one or more `rtc.AudioFrame` objects into a single `rtc.AudioFrame`. This function concatenates the audio data from multiple frames, ensuring that all frames have the same sample rate and number of channels. It efficiently merges the data by preallocating the necessary memory and copying the frame data without unnecessary reallocations. Args: buffer: A single `rtc.AudioFrame` or a list of `rtc.AudioFrame` objects to be combined. Returns: rtc.AudioFrame: A new `rtc.AudioFrame` containing the combined audio data. Raises: ValueError: If the buffer is empty. ValueError: If frames have differing sample rates. ValueError: If frames have differing numbers of channels. Example: >>> frame1 = rtc.AudioFrame( ... data=b"\x01\x02", sample_rate=48000, num_channels=2, samples_per_channel=1 ... ) >>> frame2 = rtc.AudioFrame( ... data=b"\x03\x04", sample_rate=48000, num_channels=2, samples_per_channel=1 ... ) >>> combined_frame = combine_audio_frames([frame1, frame2]) >>> combined_frame.data b'\x01\x02\x03\x04' >>> combined_frame.sample_rate 48000 >>> combined_frame.num_channels 2 >>> combined_frame.samples_per_channel 2 """ if not isinstance(buffer, list): return buffer if not buffer: raise ValueError("buffer is empty") sample_rate = buffer[0].sample_rate num_channels = buffer[0].num_channels total_data_length = 0 total_samples_per_channel = 0 for frame in buffer: if frame.sample_rate != sample_rate: raise ValueError( f"Sample rate mismatch: expected {sample_rate}, got {frame.sample_rate}" ) if frame.num_channels != num_channels: raise ValueError( f"Channel count mismatch: expected {num_channels}, got {frame.num_channels}" ) total_data_length += len(frame.data) total_samples_per_channel += frame.samples_per_channel data = bytearray(total_data_length) offset = 0 for frame in buffer: frame_data = frame.data.cast("b") data[offset : offset + len(frame_data)] = frame_data offset += len(frame_data) return AudioFrame( data=data, sample_rate=sample_rate, num_channels=num_channels, samples_per_channel=total_samples_per_channel, )Combines one or more
rtc.AudioFrameobjects into a singlertc.AudioFrame.This function concatenates the audio data from multiple frames, ensuring that all frames have the same sample rate and number of channels. It efficiently merges the data by preallocating the necessary memory and copying the frame data without unnecessary reallocations.
Args
buffer- A single
rtc.AudioFrameor a list ofrtc.AudioFrameobjects to be combined.
Returns
rtc.AudioFrame- A new
rtc.AudioFramecontaining the combined audio data.
Raises
ValueError- If the buffer is empty.
ValueError- If frames have differing sample rates.
ValueError- If frames have differing numbers of channels.
Example
>>> frame1 = rtc.AudioFrame( ... data=b"", sample_rate=48000, num_channels=2, samples_per_channel=1 ... ) >>> frame2 = rtc.AudioFrame( ... data=b"", sample_rate=48000, num_channels=2, samples_per_channel=1 ... ) >>> combined_frame = combine_audio_frames([frame1, frame2]) >>> combined_frame.data b'' >>> combined_frame.sample_rate 48000 >>> combined_frame.num_channels 2 >>> combined_frame.samples_per_channel 2 def is_dev_mode() ‑> bool-
Expand source code
def is_dev_mode() -> bool: """Return whether the agent is running in development mode. True when launched via ``console``, ``dev``. Reads the ``LIVEKIT_DEV_MODE`` environment variable. """ return os.getenv("LIVEKIT_DEV_MODE") == "1"Return whether the agent is running in development mode.
True when launched via
console,dev. Reads theLIVEKIT_DEV_MODEenvironment variable. def is_given(obj: NotGivenOr[_T]) ‑> typing_extensions.TypeIs[~_T]-
Expand source code
def is_given(obj: NotGivenOr[_T]) -> TypeIs[_T]: return not isinstance(obj, NotGiven) def is_hosted() ‑> bool-
Expand source code
def is_hosted() -> bool: """Return whether the agent is hosted on LiveKit Cloud.""" return os.getenv("LIVEKIT_REMOTE_EOT_URL") is not NoneReturn whether the agent is hosted on LiveKit Cloud.
def log_exceptions(msg: str = '', logger: logging.Logger = <RootLogger root (WARNING)>) ‑> Callable[[~F], ~F]-
Expand source code
def log_exceptions(msg: str = "", logger: logging.Logger = logging.getLogger()) -> Callable[[F], F]: # noqa: B008 def deco(fn: F) -> F: if inspect.iscoroutinefunction(fn): @functools.wraps(fn) async def async_fn_logs(*args: Any, **kwargs: Any) -> Any: try: return await fn(*args, **kwargs) except Exception: err = f"Error in {fn.__name__}" if msg: err += f" – {msg}" logger.exception(err) raise return cast(F, async_fn_logs) else: @functools.wraps(fn) def fn_logs(*args: Any, **kwargs: Any) -> Any: try: return fn(*args, **kwargs) except Exception: err = f"Error in {fn.__name__}" if msg: err += f" – {msg}" logger.exception(err) raise return cast(F, fn_logs) return deco def merge_frames(buffer: AudioFrame | list[AudioFrame]) ‑> AudioFrame-
Expand source code
def combine_audio_frames(buffer: AudioFrame | list[AudioFrame]) -> AudioFrame: """ Combines one or more `rtc.AudioFrame` objects into a single `rtc.AudioFrame`. This function concatenates the audio data from multiple frames, ensuring that all frames have the same sample rate and number of channels. It efficiently merges the data by preallocating the necessary memory and copying the frame data without unnecessary reallocations. Args: buffer: A single `rtc.AudioFrame` or a list of `rtc.AudioFrame` objects to be combined. Returns: rtc.AudioFrame: A new `rtc.AudioFrame` containing the combined audio data. Raises: ValueError: If the buffer is empty. ValueError: If frames have differing sample rates. ValueError: If frames have differing numbers of channels. Example: >>> frame1 = rtc.AudioFrame( ... data=b"\x01\x02", sample_rate=48000, num_channels=2, samples_per_channel=1 ... ) >>> frame2 = rtc.AudioFrame( ... data=b"\x03\x04", sample_rate=48000, num_channels=2, samples_per_channel=1 ... ) >>> combined_frame = combine_audio_frames([frame1, frame2]) >>> combined_frame.data b'\x01\x02\x03\x04' >>> combined_frame.sample_rate 48000 >>> combined_frame.num_channels 2 >>> combined_frame.samples_per_channel 2 """ if not isinstance(buffer, list): return buffer if not buffer: raise ValueError("buffer is empty") sample_rate = buffer[0].sample_rate num_channels = buffer[0].num_channels total_data_length = 0 total_samples_per_channel = 0 for frame in buffer: if frame.sample_rate != sample_rate: raise ValueError( f"Sample rate mismatch: expected {sample_rate}, got {frame.sample_rate}" ) if frame.num_channels != num_channels: raise ValueError( f"Channel count mismatch: expected {num_channels}, got {frame.num_channels}" ) total_data_length += len(frame.data) total_samples_per_channel += frame.samples_per_channel data = bytearray(total_data_length) offset = 0 for frame in buffer: frame_data = frame.data.cast("b") data[offset : offset + len(frame_data)] = frame_data offset += len(frame_data) return AudioFrame( data=data, sample_rate=sample_rate, num_channels=num_channels, samples_per_channel=total_samples_per_channel, )Combines one or more
rtc.AudioFrameobjects into a singlertc.AudioFrame.This function concatenates the audio data from multiple frames, ensuring that all frames have the same sample rate and number of channels. It efficiently merges the data by preallocating the necessary memory and copying the frame data without unnecessary reallocations.
Args
buffer- A single
rtc.AudioFrameor a list ofrtc.AudioFrameobjects to be combined.
Returns
rtc.AudioFrame- A new
rtc.AudioFramecontaining the combined audio data.
Raises
ValueError- If the buffer is empty.
ValueError- If frames have differing sample rates.
ValueError- If frames have differing numbers of channels.
Example
>>> frame1 = rtc.AudioFrame( ... data=b"", sample_rate=48000, num_channels=2, samples_per_channel=1 ... ) >>> frame2 = rtc.AudioFrame( ... data=b"", sample_rate=48000, num_channels=2, samples_per_channel=1 ... ) >>> combined_frame = combine_audio_frames([frame1, frame2]) >>> combined_frame.data b'' >>> combined_frame.sample_rate 48000 >>> combined_frame.num_channels 2 >>> combined_frame.samples_per_channel 2 def nodename() ‑> str-
Expand source code
def nodename() -> str: return platform.node() def shortuuid(prefix: str = '') ‑> str-
Expand source code
def shortuuid(prefix: str = "") -> str: return prefix + str(uuid.uuid4().hex)[:12] def time_ms() ‑> int-
Expand source code
def time_ms() -> int: return int(time.time() * 1000 + 0.5) async def wait_for_agent(room: rtc.Room, *, agent_name: str | None = None) ‑> RemoteParticipant-
Expand source code
async def wait_for_agent( room: rtc.Room, *, agent_name: str | None = None, ) -> rtc.RemoteParticipant: """ Wait for an agent participant to join the room. Args: room: The room to wait for the agent in. agent_name: If provided, waits for an agent with matching lk.agent.name attribute. If None, returns the first agent participant found. Returns: The agent participant. Raises: RuntimeError: If the room is not connected. """ if not room.isconnected(): raise RuntimeError("room is not connected") fut: asyncio.Future[rtc.RemoteParticipant] = asyncio.Future() def matches_agent(p: rtc.RemoteParticipant) -> bool: if p.kind != rtc.ParticipantKind.PARTICIPANT_KIND_AGENT: return False if agent_name is None: return True return p.attributes.get(ATTRIBUTE_AGENT_NAME) == agent_name def on_participant_connected(p: rtc.RemoteParticipant) -> None: if matches_agent(p) and not fut.done(): fut.set_result(p) def on_attributes_changed(changed: list[str], p: rtc.Participant) -> None: if isinstance(p, rtc.RemoteParticipant) and matches_agent(p) and not fut.done(): fut.set_result(p) def on_connection_state_changed(state: int) -> None: if state == rtc.ConnectionState.CONN_DISCONNECTED and not fut.done(): fut.set_exception(RuntimeError("room disconnected while waiting for agent participant")) room.on("participant_connected", on_participant_connected) room.on("participant_attributes_changed", on_attributes_changed) room.on("connection_state_changed", on_connection_state_changed) try: # Check existing participants for p in room.remote_participants.values(): if matches_agent(p): return p return await fut finally: room.off("participant_connected", on_participant_connected) room.off("participant_attributes_changed", on_attributes_changed) room.off("connection_state_changed", on_connection_state_changed)Wait for an agent participant to join the room.
Args
room- The room to wait for the agent in.
agent_name- If provided, waits for an agent with matching lk.agent.name attribute. If None, returns the first agent participant found.
Returns
The agent participant.
Raises
RuntimeError- If the room is not connected.
async def wait_for_participant(room: rtc.Room,
*,
identity: str | None = None,
kind: list[rtc.ParticipantKind.ValueType] | rtc.ParticipantKind.ValueType | None = None) ‑> RemoteParticipant-
Expand source code
async def wait_for_participant( room: rtc.Room, *, identity: str | None = None, kind: list[rtc.ParticipantKind.ValueType] | rtc.ParticipantKind.ValueType | None = None, ) -> rtc.RemoteParticipant: """ Returns a participant that matches the given identity. If identity is None, the first participant that joins the room will be returned. If the participant has already joined, the function will return immediately. """ if not room.isconnected(): raise RuntimeError("room is not connected") fut = asyncio.Future[rtc.RemoteParticipant]() def kind_match(p: rtc.RemoteParticipant) -> bool: if kind is None: return True if isinstance(kind, list): return p.kind in kind return p.kind == kind def _on_participant_active(p: rtc.RemoteParticipant) -> None: if (identity is None or p.identity == identity) and kind_match(p): if not fut.done(): fut.set_result(p) def _on_connection_state_changed(state: int) -> None: if state == rtc.ConnectionState.CONN_DISCONNECTED and not fut.done(): fut.set_exception(RuntimeError("room disconnected while waiting for participant")) room.on("participant_active", _on_participant_active) room.on("connection_state_changed", _on_connection_state_changed) try: for p in room.remote_participants.values(): if p.state == rtc.ParticipantState.PARTICIPANT_STATE_ACTIVE: _on_participant_active(p) if fut.done(): break return await fut finally: room.off("participant_active", _on_participant_active) room.off("connection_state_changed", _on_connection_state_changed)Returns a participant that matches the given identity. If identity is None, the first participant that joins the room will be returned. If the participant has already joined, the function will return immediately.
async def wait_for_track_publication(room: rtc.Room,
*,
identity: str | None = None,
kind: list[rtc.TrackKind.ValueType] | rtc.TrackKind.ValueType | None = None) ‑> RemoteTrackPublication-
Expand source code
async def wait_for_track_publication( room: rtc.Room, *, identity: str | None = None, kind: list[rtc.TrackKind.ValueType] | rtc.TrackKind.ValueType | None = None, ) -> rtc.RemoteTrackPublication: """Returns a remote track matching the given identity and kind. If identity is None, the first track matching the kind will be returned. If the track has already been published, the function will return immediately. """ if not room.isconnected(): raise RuntimeError("room is not connected") fut = asyncio.Future[rtc.RemoteTrackPublication]() def kind_match(k: rtc.TrackKind.ValueType) -> bool: if kind is None: return True if isinstance(kind, list): return k in kind return k == kind def _on_track_published( publication: rtc.RemoteTrackPublication, participant: rtc.RemoteParticipant ) -> None: if fut.done(): return if (identity is None or participant.identity == identity) and kind_match(publication.kind): fut.set_result(publication) def _on_connection_state_changed(state: int) -> None: if state == rtc.ConnectionState.CONN_DISCONNECTED and not fut.done(): fut.set_exception(RuntimeError("room disconnected while waiting for track publication")) # room.on("track_subscribed", _on_track_subscribed) room.on("track_published", _on_track_published) room.on("connection_state_changed", _on_connection_state_changed) try: for p in room.remote_participants.values(): for publication in p.track_publications.values(): _on_track_published(publication, p) if fut.done(): break return await fut finally: room.off("track_published", _on_track_published) room.off("connection_state_changed", _on_connection_state_changed)Returns a remote track matching the given identity and kind. If identity is None, the first track matching the kind will be returned. If the track has already been published, the function will return immediately.
Classes
class AudioArrayBuffer (*, buffer_size: int, dtype: DTypeLike = numpy.int16, sample_rate: int = 16000)-
Expand source code
class AudioArrayBuffer: def __init__(self, *, buffer_size: int, dtype: DTypeLike = np.int16, sample_rate: int = 16000): """Create a fixed-size buffer for audio array data. Args: buffer_size: The size of the buffer in samples. dtype: The dtype of the buffer. sample_rate: The sample rate of the buffer. """ self._buffer_size = buffer_size self._dtype = dtype self._buffer = np.zeros(buffer_size, dtype=dtype) self._start_idx = 0 self._resampler: rtc.AudioResampler | None = None self._sample_rate = sample_rate def push_frame(self, frame: rtc.AudioFrame) -> int: """Push an audio frame to the buffer. Args: frame: The audio frame to push. Returns: The number of samples written to the buffer. Raises: ValueError: If the frame samples are greater than the buffer size. """ if frame.samples_per_channel > self._buffer_size: raise ValueError("frame samples are greater than the buffer size") frames: list[rtc.AudioFrame] = [] if self._resampler is None and frame.sample_rate != self._sample_rate: self._resampler = rtc.AudioResampler( input_rate=frame.sample_rate, output_rate=self._sample_rate, num_channels=frame.num_channels, quality=rtc.AudioResamplerQuality.QUICK, ) if self._resampler: if frame.sample_rate != self._resampler._input_rate: raise ValueError("frame sample rates are inconsistent") frames.extend(self._resampler.push(frame)) else: frames.append(frame) frame = merge_frames(frames) if (shift_size := self._start_idx + frame.samples_per_channel - self._buffer_size) > 0: self.shift(shift_size) ptr = self._buffer[self._start_idx : self._start_idx + frame.samples_per_channel] if frame.num_channels > 1: arr_i16 = np.frombuffer( frame.data, dtype=np.int16, count=frame.samples_per_channel * frame.num_channels ).reshape(-1, frame.num_channels) ptr[:] = (np.sum(arr_i16, axis=1, dtype=np.int32) // frame.num_channels).astype( np.int16 ) else: ptr[:] = np.frombuffer(frame.data, dtype=np.int16, count=frame.samples_per_channel) self._start_idx += frame.samples_per_channel return frame.samples_per_channel def shift(self, size: int) -> None: """Shift the buffer to the left by the given size. Args: size: The size to shift the buffer by. """ size = min(size, self._start_idx) self._buffer[: self._start_idx - size] = self._buffer[size : self._start_idx].copy() self._start_idx -= size def read(self) -> np.ndarray: return self._buffer[: self._start_idx].copy() def reset(self) -> None: self._start_idx = 0 self._buffer.fill(0) def __len__(self) -> int: return self._start_idxCreate a fixed-size buffer for audio array data.
Args
buffer_size- The size of the buffer in samples.
dtype- The dtype of the buffer.
sample_rate- The sample rate of the buffer.
Methods
def push_frame(self, frame: rtc.AudioFrame) ‑> int-
Expand source code
def push_frame(self, frame: rtc.AudioFrame) -> int: """Push an audio frame to the buffer. Args: frame: The audio frame to push. Returns: The number of samples written to the buffer. Raises: ValueError: If the frame samples are greater than the buffer size. """ if frame.samples_per_channel > self._buffer_size: raise ValueError("frame samples are greater than the buffer size") frames: list[rtc.AudioFrame] = [] if self._resampler is None and frame.sample_rate != self._sample_rate: self._resampler = rtc.AudioResampler( input_rate=frame.sample_rate, output_rate=self._sample_rate, num_channels=frame.num_channels, quality=rtc.AudioResamplerQuality.QUICK, ) if self._resampler: if frame.sample_rate != self._resampler._input_rate: raise ValueError("frame sample rates are inconsistent") frames.extend(self._resampler.push(frame)) else: frames.append(frame) frame = merge_frames(frames) if (shift_size := self._start_idx + frame.samples_per_channel - self._buffer_size) > 0: self.shift(shift_size) ptr = self._buffer[self._start_idx : self._start_idx + frame.samples_per_channel] if frame.num_channels > 1: arr_i16 = np.frombuffer( frame.data, dtype=np.int16, count=frame.samples_per_channel * frame.num_channels ).reshape(-1, frame.num_channels) ptr[:] = (np.sum(arr_i16, axis=1, dtype=np.int32) // frame.num_channels).astype( np.int16 ) else: ptr[:] = np.frombuffer(frame.data, dtype=np.int16, count=frame.samples_per_channel) self._start_idx += frame.samples_per_channel return frame.samples_per_channelPush an audio frame to the buffer.
Args
frame- The audio frame to push.
Returns
The number of samples written to the buffer.
Raises
ValueError- If the frame samples are greater than the buffer size.
def read(self) ‑> numpy.ndarray-
Expand source code
def read(self) -> np.ndarray: return self._buffer[: self._start_idx].copy() def reset(self) ‑> None-
Expand source code
def reset(self) -> None: self._start_idx = 0 self._buffer.fill(0) def shift(self, size: int) ‑> None-
Expand source code
def shift(self, size: int) -> None: """Shift the buffer to the left by the given size. Args: size: The size to shift the buffer by. """ size = min(size, self._start_idx) self._buffer[: self._start_idx - size] = self._buffer[size : self._start_idx].copy() self._start_idx -= sizeShift the buffer to the left by the given size.
Args
size- The size to shift the buffer by.
class BoundedDict (maxsize: int | None = None)-
Expand source code
class BoundedDict(OrderedDict[K, V]): def __init__(self, maxsize: int | None = None): super().__init__() self.maxsize = maxsize if self.maxsize is not None and self.maxsize <= 0: raise ValueError("maxsize must be greater than 0") def __setitem__(self, key: K, value: V) -> None: super().__setitem__(key, value) while self.maxsize is not None and len(self) > self.maxsize: self.popitem(last=False) def update_value(self, key: K, **kwargs: Any) -> V | None: """Update the value of a key with the given keyword arguments. Only update the value if the field value is not None and the field exists on the value. Args: key: The key to update. kwargs: The keyword arguments to update the value. Returns: The value of the key. """ value = self.get(key, None) if value is None: return value for field_name, field_value in kwargs.items(): if field_value is None: continue if hasattr(value, field_name): setattr(value, field_name, field_value) else: logger.warning( "field %s is not set on value of type %s, skipping", field_name, type(value).__name__, ) return value def set_or_update(self, key: K, factory: Callable[[], V], **kwargs: Any) -> V: """Set a value for a key if it doesn't exist, or update it if it does. Args: key: The key to set or update. factory: The factory function to create a new value if the key doesn't exist. kwargs: The keyword arguments to update the value. Returns: The value of the key. """ if self.get(key, None) is None: self[key] = factory() result = self.update_value(key, **kwargs) assert result is not None return result def pop_if( self, predicate: Callable[[V], bool] | None = None, ) -> tuple[K | None, V | None]: """Pop an item from the dictionary if it satisfies the predicate. Args: predicate: The predicate to check if the value satisfies. Returns: A tuple of the key and value of the popped item. """ if predicate is None: if len(self) > 0: return self.popitem(last=False) return None, None for key, value in reversed(list(self.items())): if predicate(value): return key, self.pop(key) return None, NoneDictionary that remembers insertion order
Ancestors
- collections.OrderedDict
- builtins.dict
Methods
def pop_if(self, predicate: Callable[[V], bool] | None = None) ‑> tuple[~K | None, ~V | None]-
Expand source code
def pop_if( self, predicate: Callable[[V], bool] | None = None, ) -> tuple[K | None, V | None]: """Pop an item from the dictionary if it satisfies the predicate. Args: predicate: The predicate to check if the value satisfies. Returns: A tuple of the key and value of the popped item. """ if predicate is None: if len(self) > 0: return self.popitem(last=False) return None, None for key, value in reversed(list(self.items())): if predicate(value): return key, self.pop(key) return None, NonePop an item from the dictionary if it satisfies the predicate.
Args
predicate- The predicate to check if the value satisfies.
Returns
A tuple of the key and value of the popped item.
def set_or_update(self, key: K, factory: Callable[[], V], **kwargs: Any) ‑> ~V-
Expand source code
def set_or_update(self, key: K, factory: Callable[[], V], **kwargs: Any) -> V: """Set a value for a key if it doesn't exist, or update it if it does. Args: key: The key to set or update. factory: The factory function to create a new value if the key doesn't exist. kwargs: The keyword arguments to update the value. Returns: The value of the key. """ if self.get(key, None) is None: self[key] = factory() result = self.update_value(key, **kwargs) assert result is not None return resultSet a value for a key if it doesn't exist, or update it if it does.
Args
key- The key to set or update.
factory- The factory function to create a new value if the key doesn't exist.
kwargs- The keyword arguments to update the value.
Returns
The value of the key.
def update_value(self, key: K, **kwargs: Any) ‑> ~V | None-
Expand source code
def update_value(self, key: K, **kwargs: Any) -> V | None: """Update the value of a key with the given keyword arguments. Only update the value if the field value is not None and the field exists on the value. Args: key: The key to update. kwargs: The keyword arguments to update the value. Returns: The value of the key. """ value = self.get(key, None) if value is None: return value for field_name, field_value in kwargs.items(): if field_value is None: continue if hasattr(value, field_name): setattr(value, field_name, field_value) else: logger.warning( "field %s is not set on value of type %s, skipping", field_name, type(value).__name__, ) return valueUpdate the value of a key with the given keyword arguments. Only update the value if the field value is not None and the field exists on the value.
Args
key- The key to update.
kwargs- The keyword arguments to update the value.
Returns
The value of the key.
class ConnectionPool (*,
max_session_duration: float | None = None,
mark_refreshed_on_get: bool = False,
connect_cb: collections.abc.Callable[[float], collections.abc.Awaitable[~T]] | None = None,
close_cb: collections.abc.Callable[[~T], collections.abc.Awaitable[None]] | None = None,
connect_timeout: float = 10.0)-
Expand source code
class ConnectionPool(Generic[T]): """Helper class to manage persistent connections like websockets. Handles connection pooling and reconnection after max duration. Can be used as an async context manager to automatically return connections to the pool. """ def __init__( self, *, max_session_duration: float | None = None, mark_refreshed_on_get: bool = False, connect_cb: Callable[[float], Awaitable[T]] | None = None, close_cb: Callable[[T], Awaitable[None]] | None = None, connect_timeout: float = 10.0, ) -> None: """Initialize the connection wrapper. Args: max_session_duration: Maximum duration in seconds before forcing reconnection mark_refreshed_on_get: If True, the session will be marked as fresh when get() is called. only used when max_session_duration is set. connect_cb: Optional async callback to create new connections close_cb: Optional async callback to close connections """ # noqa: E501 self._max_session_duration = max_session_duration self._mark_refreshed_on_get = mark_refreshed_on_get self._connect_cb = connect_cb self._close_cb = close_cb self._connections: dict[T, float] = {} # conn -> connected_at timestamp self._available: set[T] = set() self._connect_timeout = connect_timeout self._connect_lock = asyncio.Lock() # store connections to be reaped (closed) later. self._to_close: set[T] = set() self._prewarm_task: weakref.ref[asyncio.Task[None]] | None = None # Timing info from the last get() call self.last_acquire_time: float = 0.0 self.last_connection_reused: bool = False async def _connect(self, timeout: float) -> T: """Create a new connection. Returns: The new connection object Raises: NotImplementedError: If no connect callback was provided """ if self._connect_cb is None: raise NotImplementedError("Must provide connect_cb or implement connect()") connection = await self._connect_cb(timeout) self._connections[connection] = time.time() return connection async def _drain_to_close(self) -> None: """Drain and close all the connections queued for closing.""" while self._to_close: conn = self._to_close.pop() try: await self._maybe_close_connection(conn) except Exception as e: logger.warning(f"error closing connection: {conn}", exc_info=e) @asynccontextmanager async def connection(self, *, timeout: float) -> AsyncGenerator[T, None]: """Get a connection from the pool and automatically return it when done. Yields: An active connection object """ conn = await self.get(timeout=timeout) try: yield conn except BaseException: self.remove(conn) raise else: self.put(conn) async def get(self, *, timeout: float) -> T: """Get an available connection or create a new one if needed. Returns: An active connection object """ async with self._connect_lock: await self._drain_to_close() now = time.time() # try to reuse an available connection that hasn't expired while self._available: conn = self._available.pop() if ( self._max_session_duration is None or now - self._connections[conn] <= self._max_session_duration ): if self._mark_refreshed_on_get: self._connections[conn] = now self.last_acquire_time = 0.0 self.last_connection_reused = True return conn # connection expired; mark it for resetting. self.remove(conn) t0 = time.perf_counter() conn = await self._connect(timeout) self.last_acquire_time = time.perf_counter() - t0 self.last_connection_reused = False return conn def put(self, conn: T) -> None: """Mark a connection as available for reuse. If connection has been reset, it will not be added to the pool. Args: conn: The connection to make available """ if conn in self._connections: self._available.add(conn) async def _maybe_close_connection(self, conn: T) -> None: """Close a connection if close_cb is provided. Args: conn: The connection to close """ if self._close_cb is not None: await self._close_cb(conn) def remove(self, conn: T) -> None: """Remove a specific connection from the pool. Marks the connection to be closed during the next drain cycle. Args: conn: The connection to reset """ self._available.discard(conn) if conn in self._connections: self._to_close.add(conn) self._connections.pop(conn, None) def invalidate(self) -> None: """Clear all existing connections. Marks all current connections to be closed during the next drain cycle. """ for conn in list(self._connections.keys()): self._to_close.add(conn) self._connections.clear() self._available.clear() def prewarm(self) -> None: """Initiate prewarming of the connection pool without blocking. This method starts a background task that creates a new connection if none exist. The task automatically cleans itself up when the connection pool is closed. """ if self._prewarm_task is not None or self._connections: return async def _prewarm_impl() -> None: async with self._connect_lock: if not self._connections: conn = await self._connect(timeout=self._connect_timeout) self._available.add(conn) task = asyncio.create_task(_prewarm_impl()) self._prewarm_task = weakref.ref(task) async def aclose(self) -> None: """Close all connections, draining any pending connection closures.""" if self._prewarm_task is not None: task = self._prewarm_task() if task: await aio.gracefully_cancel(task) self.invalidate() await self._drain_to_close()Helper class to manage persistent connections like websockets.
Handles connection pooling and reconnection after max duration. Can be used as an async context manager to automatically return connections to the pool.
Initialize the connection wrapper.
Args
max_session_duration- Maximum duration in seconds before forcing reconnection
mark_refreshed_on_get- If True, the session will be marked as fresh when get() is called. only used when max_session_duration is set.
connect_cb- Optional async callback to create new connections
close_cb- Optional async callback to close connections
Ancestors
- typing.Generic
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: """Close all connections, draining any pending connection closures.""" if self._prewarm_task is not None: task = self._prewarm_task() if task: await aio.gracefully_cancel(task) self.invalidate() await self._drain_to_close()Close all connections, draining any pending connection closures.
async def connection(self, *, timeout: float) ‑> AsyncGenerator[~T, None]-
Expand source code
@asynccontextmanager async def connection(self, *, timeout: float) -> AsyncGenerator[T, None]: """Get a connection from the pool and automatically return it when done. Yields: An active connection object """ conn = await self.get(timeout=timeout) try: yield conn except BaseException: self.remove(conn) raise else: self.put(conn)Get a connection from the pool and automatically return it when done.
Yields
An active connection object
async def get(self, *, timeout: float) ‑> ~T-
Expand source code
async def get(self, *, timeout: float) -> T: """Get an available connection or create a new one if needed. Returns: An active connection object """ async with self._connect_lock: await self._drain_to_close() now = time.time() # try to reuse an available connection that hasn't expired while self._available: conn = self._available.pop() if ( self._max_session_duration is None or now - self._connections[conn] <= self._max_session_duration ): if self._mark_refreshed_on_get: self._connections[conn] = now self.last_acquire_time = 0.0 self.last_connection_reused = True return conn # connection expired; mark it for resetting. self.remove(conn) t0 = time.perf_counter() conn = await self._connect(timeout) self.last_acquire_time = time.perf_counter() - t0 self.last_connection_reused = False return connGet an available connection or create a new one if needed.
Returns
An active connection object
def invalidate(self) ‑> None-
Expand source code
def invalidate(self) -> None: """Clear all existing connections. Marks all current connections to be closed during the next drain cycle. """ for conn in list(self._connections.keys()): self._to_close.add(conn) self._connections.clear() self._available.clear()Clear all existing connections.
Marks all current connections to be closed during the next drain cycle.
def prewarm(self) ‑> None-
Expand source code
def prewarm(self) -> None: """Initiate prewarming of the connection pool without blocking. This method starts a background task that creates a new connection if none exist. The task automatically cleans itself up when the connection pool is closed. """ if self._prewarm_task is not None or self._connections: return async def _prewarm_impl() -> None: async with self._connect_lock: if not self._connections: conn = await self._connect(timeout=self._connect_timeout) self._available.add(conn) task = asyncio.create_task(_prewarm_impl()) self._prewarm_task = weakref.ref(task)Initiate prewarming of the connection pool without blocking.
This method starts a background task that creates a new connection if none exist. The task automatically cleans itself up when the connection pool is closed.
def put(self, conn: ~T) ‑> None-
Expand source code
def put(self, conn: T) -> None: """Mark a connection as available for reuse. If connection has been reset, it will not be added to the pool. Args: conn: The connection to make available """ if conn in self._connections: self._available.add(conn)Mark a connection as available for reuse.
If connection has been reset, it will not be added to the pool.
Args
conn- The connection to make available
def remove(self, conn: ~T) ‑> None-
Expand source code
def remove(self, conn: T) -> None: """Remove a specific connection from the pool. Marks the connection to be closed during the next drain cycle. Args: conn: The connection to reset """ self._available.discard(conn) if conn in self._connections: self._to_close.add(conn) self._connections.pop(conn, None)Remove a specific connection from the pool.
Marks the connection to be closed during the next drain cycle.
Args
conn- The connection to reset
class EventEmitter-
Expand source code
class EventEmitter(Generic[T_contra]): def __init__(self) -> None: """ Initialize a new instance of EventEmitter. """ self._events: Dict[T_contra, Set[Callable]] = dict() def emit(self, event: T_contra, *args) -> None: """ Trigger all callbacks associated with the given event. Args: event (T): The event to emit. *args: Positional arguments to pass to the callbacks. Example: Basic usage of emit: ```python emitter = EventEmitter[str]() def greet(name): print(f"Hello, {name}!") emitter.on('greet', greet) emitter.emit('greet', 'Alice') # Output: Hello, Alice! ``` """ if event in self._events: callables = self._events[event].copy() for callback in callables: try: sig = inspect.signature(callback) params = sig.parameters.values() has_varargs = any(p.kind == p.VAR_POSITIONAL for p in params) if has_varargs: callback(*args) else: positional_params = [ p for p in params if p.kind in (p.POSITIONAL_ONLY, p.POSITIONAL_OR_KEYWORD) ] num_params = len(positional_params) num_args = min(len(args), num_params) callback_args = args[:num_args] callback(*callback_args) except TypeError: raise except Exception: logger.exception(f"failed to emit event {event}") def once(self, event: T_contra, callback: Optional[Callable] = None) -> Callable: """ Register a callback to be called only once when the event is emitted. If a callback is provided, it registers the callback directly. If no callback is provided, it returns a decorator for use with function definitions. Args: event (T): The event to listen for. callback (Callable, optional): The callback to register. Defaults to None. Returns: Callable: The registered callback or a decorator if callback is None. Example: Using once with a direct callback: ```python emitter = EventEmitter[str]() def greet_once(name): print(f"Hello once, {name}!") emitter.once('greet', greet_once) emitter.emit('greet', 'Bob') # Output: Hello once, Bob! emitter.emit('greet', 'Bob') # No output, callback was removed after first call ``` Using once as a decorator: ```python emitter = EventEmitter[str]() @emitter.once('greet') def greet_once(name): print(f"Hello once, {name}!") emitter.emit('greet', 'Bob') # Output: Hello once, Bob! emitter.emit('greet', 'Bob') # No output ``` """ if callback is not None: def once_callback(*args, **kwargs): self.off(event, once_callback) callback(*args, **kwargs) return self.on(event, once_callback) else: def decorator(callback: Callable) -> Callable: self.once(event, callback) return callback return decorator def on(self, event: T_contra, callback: Optional[Callable] = None) -> Callable: """ Register a callback to be called whenever the event is emitted. If a callback is provided, it registers the callback directly. If no callback is provided, it returns a decorator for use with function definitions. Args: event (T): The event to listen for. callback (Callable, optional): The callback to register. Defaults to None. Returns: Callable: The registered callback or a decorator if callback is None. Example: Using on with a direct callback: ```python emitter = EventEmitter[str]() def greet(name): print(f"Hello, {name}!") emitter.on('greet', greet) emitter.emit('greet', 'Charlie') # Output: Hello, Charlie! ``` Using on as a decorator: ```python emitter = EventEmitter[str]() @emitter.on('greet') def greet(name): print(f"Hello, {name}!") emitter.emit('greet', 'Charlie') # Output: Hello, Charlie! ``` """ if callback is not None: if asyncio.iscoroutinefunction(callback): raise ValueError( "Cannot register an async callback with `.on()`. Use `asyncio.create_task` within your synchronous callback instead." ) if event not in self._events: self._events[event] = set() self._events[event].add(callback) return callback else: def decorator(callback: Callable) -> Callable: self.on(event, callback) return callback return decorator def off(self, event: T_contra, callback: Callable) -> None: """ Unregister a callback from an event. Args: event (T): The event to stop listening to. callback (Callable): The callback to remove. Example: Removing a callback: ```python emitter = EventEmitter[str]() def greet(name): print(f"Hello, {name}!") emitter.on('greet', greet) emitter.off('greet', greet) emitter.emit('greet', 'Dave') # No output, callback was removed ``` """ if event in self._events: self._events[event].discard(callback)Abstract base class for generic types.
On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::
class Mapping[KT, VT]: def __getitem__(self, key: KT) -> VT: ... # Etc.On older versions of Python, however, generic classes have to explicitly inherit from Generic.
After a class has been declared to be generic, it can then be used as follows::
def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return defaultInitialize a new instance of EventEmitter.
Ancestors
- typing.Generic
Subclasses
- AdaptiveInterruptionDetector
- ProcPool
- livekit.agents.llm.llm.LLM
- livekit.agents.llm.realtime.RealtimeSession
- livekit.agents.stt.stt.STT
- livekit.agents.tts.tts.TTS
- VAD
- livekit.agents.voice.agent_session.AgentSession
- livekit.agents.voice.amd.classifier._AMDClassifier
- livekit.agents.voice.avatar._queue_io.QueueAudioOutput
- livekit.agents.voice.avatar._types.AudioReceiver
- AudioOutput
- livekit.agents.worker.AgentServer
- BrowserPage
- Room
Methods
def emit(self, event: -T_contra, *args) ‑> None-
Expand source code
def emit(self, event: T_contra, *args) -> None: """ Trigger all callbacks associated with the given event. Args: event (T): The event to emit. *args: Positional arguments to pass to the callbacks. Example: Basic usage of emit: ```python emitter = EventEmitter[str]() def greet(name): print(f"Hello, {name}!") emitter.on('greet', greet) emitter.emit('greet', 'Alice') # Output: Hello, Alice! ``` """ if event in self._events: callables = self._events[event].copy() for callback in callables: try: sig = inspect.signature(callback) params = sig.parameters.values() has_varargs = any(p.kind == p.VAR_POSITIONAL for p in params) if has_varargs: callback(*args) else: positional_params = [ p for p in params if p.kind in (p.POSITIONAL_ONLY, p.POSITIONAL_OR_KEYWORD) ] num_params = len(positional_params) num_args = min(len(args), num_params) callback_args = args[:num_args] callback(*callback_args) except TypeError: raise except Exception: logger.exception(f"failed to emit event {event}")Trigger all callbacks associated with the given event.
Args
event:T- The event to emit.
*args- Positional arguments to pass to the callbacks.
Example
Basic usage of emit:
emitter = EventEmitter[str]() def greet(name): print(f"Hello, {name}!") emitter.on('greet', greet) emitter.emit('greet', 'Alice') # Output: Hello, Alice! def off(self, event: -T_contra, callback: Callable) ‑> None-
Expand source code
def off(self, event: T_contra, callback: Callable) -> None: """ Unregister a callback from an event. Args: event (T): The event to stop listening to. callback (Callable): The callback to remove. Example: Removing a callback: ```python emitter = EventEmitter[str]() def greet(name): print(f"Hello, {name}!") emitter.on('greet', greet) emitter.off('greet', greet) emitter.emit('greet', 'Dave') # No output, callback was removed ``` """ if event in self._events: self._events[event].discard(callback)Unregister a callback from an event.
Args
event:T- The event to stop listening to.
callback:Callable- The callback to remove.
Example
Removing a callback:
emitter = EventEmitter[str]() def greet(name): print(f"Hello, {name}!") emitter.on('greet', greet) emitter.off('greet', greet) emitter.emit('greet', 'Dave') # No output, callback was removed def on(self, event: -T_contra, callback: Callable | None = None) ‑> Callable-
Expand source code
def on(self, event: T_contra, callback: Optional[Callable] = None) -> Callable: """ Register a callback to be called whenever the event is emitted. If a callback is provided, it registers the callback directly. If no callback is provided, it returns a decorator for use with function definitions. Args: event (T): The event to listen for. callback (Callable, optional): The callback to register. Defaults to None. Returns: Callable: The registered callback or a decorator if callback is None. Example: Using on with a direct callback: ```python emitter = EventEmitter[str]() def greet(name): print(f"Hello, {name}!") emitter.on('greet', greet) emitter.emit('greet', 'Charlie') # Output: Hello, Charlie! ``` Using on as a decorator: ```python emitter = EventEmitter[str]() @emitter.on('greet') def greet(name): print(f"Hello, {name}!") emitter.emit('greet', 'Charlie') # Output: Hello, Charlie! ``` """ if callback is not None: if asyncio.iscoroutinefunction(callback): raise ValueError( "Cannot register an async callback with `.on()`. Use `asyncio.create_task` within your synchronous callback instead." ) if event not in self._events: self._events[event] = set() self._events[event].add(callback) return callback else: def decorator(callback: Callable) -> Callable: self.on(event, callback) return callback return decoratorRegister a callback to be called whenever the event is emitted.
If a callback is provided, it registers the callback directly. If no callback is provided, it returns a decorator for use with function definitions.
Args
event:T- The event to listen for.
callback:Callable, optional- The callback to register. Defaults to None.
Returns
Callable- The registered callback or a decorator if callback is None.
Example
Using on with a direct callback:
emitter = EventEmitter[str]() def greet(name): print(f"Hello, {name}!") emitter.on('greet', greet) emitter.emit('greet', 'Charlie') # Output: Hello, Charlie!Using on as a decorator:
emitter = EventEmitter[str]() @emitter.on('greet') def greet(name): print(f"Hello, {name}!") emitter.emit('greet', 'Charlie') # Output: Hello, Charlie! def once(self, event: -T_contra, callback: Callable | None = None) ‑> Callable-
Expand source code
def once(self, event: T_contra, callback: Optional[Callable] = None) -> Callable: """ Register a callback to be called only once when the event is emitted. If a callback is provided, it registers the callback directly. If no callback is provided, it returns a decorator for use with function definitions. Args: event (T): The event to listen for. callback (Callable, optional): The callback to register. Defaults to None. Returns: Callable: The registered callback or a decorator if callback is None. Example: Using once with a direct callback: ```python emitter = EventEmitter[str]() def greet_once(name): print(f"Hello once, {name}!") emitter.once('greet', greet_once) emitter.emit('greet', 'Bob') # Output: Hello once, Bob! emitter.emit('greet', 'Bob') # No output, callback was removed after first call ``` Using once as a decorator: ```python emitter = EventEmitter[str]() @emitter.once('greet') def greet_once(name): print(f"Hello once, {name}!") emitter.emit('greet', 'Bob') # Output: Hello once, Bob! emitter.emit('greet', 'Bob') # No output ``` """ if callback is not None: def once_callback(*args, **kwargs): self.off(event, once_callback) callback(*args, **kwargs) return self.on(event, once_callback) else: def decorator(callback: Callable) -> Callable: self.once(event, callback) return callback return decoratorRegister a callback to be called only once when the event is emitted.
If a callback is provided, it registers the callback directly. If no callback is provided, it returns a decorator for use with function definitions.
Args
event:T- The event to listen for.
callback:Callable, optional- The callback to register. Defaults to None.
Returns
Callable- The registered callback or a decorator if callback is None.
Example
Using once with a direct callback:
emitter = EventEmitter[str]() def greet_once(name): print(f"Hello once, {name}!") emitter.once('greet', greet_once) emitter.emit('greet', 'Bob') # Output: Hello once, Bob! emitter.emit('greet', 'Bob') # No output, callback was removed after first callUsing once as a decorator:
emitter = EventEmitter[str]() @emitter.once('greet') def greet_once(name): print(f"Hello once, {name}!") emitter.emit('greet', 'Bob') # Output: Hello once, Bob! emitter.emit('greet', 'Bob') # No output
class ExpFilter (alpha: float,
max_val: float | livekit.agents.types.NotGiven = NOT_GIVEN,
min_val: float | livekit.agents.types.NotGiven = NOT_GIVEN,
initial: float | livekit.agents.types.NotGiven = NOT_GIVEN)-
Expand source code
class ExpFilter: def __init__( self, alpha: float, max_val: NotGivenOr[float] = NOT_GIVEN, min_val: NotGivenOr[float] = NOT_GIVEN, initial: NotGivenOr[float] = NOT_GIVEN, ) -> None: if not 0 < alpha <= 1: raise ValueError("alpha must be in (0, 1].") self._alpha = alpha self._filtered = initial self._max_val = max_val self._min_val = min_val def reset( self, alpha: NotGivenOr[float] = NOT_GIVEN, initial: NotGivenOr[float] = NOT_GIVEN, min_val: NotGivenOr[float] = NOT_GIVEN, max_val: NotGivenOr[float] = NOT_GIVEN, ) -> None: if is_given(alpha): assert 0 < alpha <= 1, "alpha must be in (0, 1]." self._alpha = alpha if is_given(initial): self._filtered = initial if is_given(min_val): self._min_val = min_val if is_given(max_val): self._max_val = max_val def apply(self, exp: float, sample: NotGivenOr[float] = NOT_GIVEN) -> float: if not is_given(sample): sample = self._filtered if is_given(sample) and not is_given(self._filtered): self._filtered = sample elif is_given(sample) and is_given(self._filtered): a = self._alpha**exp self._filtered = a * self._filtered + (1 - a) * sample if not is_given(self._filtered): raise ValueError("sample or initial value must be given.") if is_given(self._max_val) and self._filtered > self._max_val: self._filtered = self._max_val if is_given(self._min_val) and self._filtered < self._min_val: self._filtered = self._min_val return self._filtered @property def value(self) -> float | None: return self._filtered if is_given(self._filtered) else None def update_base(self, alpha: float) -> None: self._alpha = alphaInstance variables
prop value : float | None-
Expand source code
@property def value(self) -> float | None: return self._filtered if is_given(self._filtered) else None
Methods
def apply(self, exp: float, sample: float | livekit.agents.types.NotGiven = NOT_GIVEN) ‑> float-
Expand source code
def apply(self, exp: float, sample: NotGivenOr[float] = NOT_GIVEN) -> float: if not is_given(sample): sample = self._filtered if is_given(sample) and not is_given(self._filtered): self._filtered = sample elif is_given(sample) and is_given(self._filtered): a = self._alpha**exp self._filtered = a * self._filtered + (1 - a) * sample if not is_given(self._filtered): raise ValueError("sample or initial value must be given.") if is_given(self._max_val) and self._filtered > self._max_val: self._filtered = self._max_val if is_given(self._min_val) and self._filtered < self._min_val: self._filtered = self._min_val return self._filtered def reset(self,
alpha: float | livekit.agents.types.NotGiven = NOT_GIVEN,
initial: float | livekit.agents.types.NotGiven = NOT_GIVEN,
min_val: float | livekit.agents.types.NotGiven = NOT_GIVEN,
max_val: float | livekit.agents.types.NotGiven = NOT_GIVEN) ‑> None-
Expand source code
def reset( self, alpha: NotGivenOr[float] = NOT_GIVEN, initial: NotGivenOr[float] = NOT_GIVEN, min_val: NotGivenOr[float] = NOT_GIVEN, max_val: NotGivenOr[float] = NOT_GIVEN, ) -> None: if is_given(alpha): assert 0 < alpha <= 1, "alpha must be in (0, 1]." self._alpha = alpha if is_given(initial): self._filtered = initial if is_given(min_val): self._min_val = min_val if is_given(max_val): self._max_val = max_val def update_base(self, alpha: float) ‑> None-
Expand source code
def update_base(self, alpha: float) -> None: self._alpha = alpha
class MovingAverage (window_size: int)-
Expand source code
class MovingAverage: def __init__(self, window_size: int) -> None: self._hist: list[float] = [0] * window_size self._sum: float = 0 self._count: int = 0 def add_sample(self, sample: float) -> None: self._count += 1 index = self._count % len(self._hist) if self._count > len(self._hist): self._sum -= self._hist[index] self._sum += sample self._hist[index] = sample def get_avg(self) -> float: if self._count == 0: return 0 return self._sum / self.size() def reset(self) -> None: self._count = 0 self._sum = 0 def size(self) -> int: return min(self._count, len(self._hist))Methods
def add_sample(self, sample: float) ‑> None-
Expand source code
def add_sample(self, sample: float) -> None: self._count += 1 index = self._count % len(self._hist) if self._count > len(self._hist): self._sum -= self._hist[index] self._sum += sample self._hist[index] = sample def get_avg(self) ‑> float-
Expand source code
def get_avg(self) -> float: if self._count == 0: return 0 return self._sum / self.size() def reset(self) ‑> None-
Expand source code
def reset(self) -> None: self._count = 0 self._sum = 0 def size(self) ‑> int-
Expand source code
def size(self) -> int: return min(self._count, len(self._hist))