Module livekit.rtc.media_devices
Classes
class InputCapture (source: AudioSource,
input_stream: "'sd.InputStream'",
task: asyncio.Task,
apm: Optional[AudioProcessingModule],
delay_estimator: Optional[_APMDelayEstimator])-
Expand source code
@dataclass class InputCapture: """Holds resources for an active audio input capture. Attributes: source: `rtc.AudioSource` that receives captured frames. This can be published as a `LocalAudioTrack`. input_stream: Underlying `sounddevice.InputStream`. task: Async task that drains a queue and calls `source.capture_frame`. apm: Optional `rtc.AudioProcessingModule` used to process 10 ms frames (AEC, NS, HPF, AGC). When performing echo cancellation, pass this instance to `open_output_player` so reverse frames are provided. delay_estimator: Internal helper used to combine capture and render delays. """ source: AudioSource input_stream: "sd.InputStream" task: asyncio.Task apm: Optional[AudioProcessingModule] delay_estimator: Optional[_APMDelayEstimator] async def aclose(self) -> None: """Stop capture and close underlying resources.""" if self.task and not self.task.done(): self.task.cancel() try: await self.task except asyncio.CancelledError: pass try: self.input_stream.stop() self.input_stream.close() except Exception: passHolds resources for an active audio input capture.
Attributes
sourcertc.AudioSourcethat receives captured frames. This can be published as aLocalAudioTrack.input_stream- Underlying
sounddevice.InputStream. task- Async task that drains a queue and calls
source.capture_frame. apm- Optional
rtc.AudioProcessingModuleused to process 10 ms frames (AEC, NS, HPF, AGC). When performing echo cancellation, pass this instance toopen_output_playerso reverse frames are provided. delay_estimator- Internal helper used to combine capture and render delays.
Instance variables
var apm : Optional[AudioProcessingModule]var delay_estimator : Optional[_APMDelayEstimator]var input_stream : 'sd.InputStream'var source : AudioSourcevar task : asyncio.Task
Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: """Stop capture and close underlying resources.""" if self.task and not self.task.done(): self.task.cancel() try: await self.task except asyncio.CancelledError: pass try: self.input_stream.stop() self.input_stream.close() except Exception: passStop capture and close underlying resources.
class MediaDevices (*,
loop: Optional[asyncio.AbstractEventLoop] = None,
input_sample_rate: int = 48000,
output_sample_rate: int = 48000,
num_channels: int = 1,
blocksize: int = 4800)-
Expand source code
class MediaDevices: """High-level interface to native audio devices. - Device enumeration helpers. - Audio input capture into `rtc.AudioSource` with optional APM processing. - Audio output player that can feed APM reverse stream for AEC. Design notes: - APM operates on 10 ms frames; this module slices input/output audio into `FRAME_SAMPLES` for processing calls. - For AEC to be effective, render audio that could leak back into the mic should be played through `OutputPlayer` with the same `apm` instance. - Timing alignment: this helper does not attempt to set device latency on APM; for most setups the default behavior is acceptable. """ def __init__( self, *, loop: Optional[asyncio.AbstractEventLoop] = None, input_sample_rate: int = DEFAULT_SAMPLE_RATE, output_sample_rate: int = DEFAULT_SAMPLE_RATE, num_channels: int = DEFAULT_CHANNELS, blocksize: int = BLOCKSIZE, ) -> None: self._loop = _ensure_loop(loop) self._in_sr = input_sample_rate self._out_sr = output_sample_rate self._channels = num_channels self._blocksize = blocksize self._delay_estimator: Optional[_APMDelayEstimator] = None self._apm: Optional[AudioProcessingModule] = None # Device enumeration def list_input_devices(self) -> list[dict[str, Any]]: """List available input devices. Returns a list of dictionaries with the `sounddevice` metadata and an added `index` key corresponding to the device index. """ import sounddevice as sd # type: ignore[import-not-found, import-untyped] devices = sd.query_devices() result: list[dict[str, Any]] = [] for idx, dev in enumerate(devices): if dev.get("max_input_channels", 0) > 0: result.append({"index": idx, **dev}) return result def list_output_devices(self) -> list[dict[str, Any]]: """List available output devices with indices.""" import sounddevice as sd # type: ignore[import-not-found, import-untyped] devices = sd.query_devices() result: list[dict[str, Any]] = [] for idx, dev in enumerate(devices): if dev.get("max_output_channels", 0) > 0: result.append({"index": idx, **dev}) return result def default_input_device(self) -> Optional[int]: """Return the default input device index (or None).""" import sounddevice as sd # type: ignore[import-not-found, import-untyped] dev = sd.default.device return dev[0] if isinstance(dev, (list, tuple)) else None def default_output_device(self) -> Optional[int]: """Return the default output device index (or None).""" import sounddevice as sd # type: ignore[import-not-found, import-untyped] dev = sd.default.device return dev[1] if isinstance(dev, (list, tuple)) else None # Capture / Playback def open_input( self, *, enable_aec: bool = True, noise_suppression: bool = True, high_pass_filter: bool = True, auto_gain_control: bool = True, input_device: Optional[int] = None, queue_capacity: int = 50, input_channel_index: Optional[int] = None, ) -> InputCapture: """Open the default (or chosen) audio input device and start capture. Frames are sliced into 10 ms chunks. If any processing option is enabled, an `AudioProcessingModule` is created and applied to each frame before it is queued for `AudioSource.capture_frame`. To enable AEC end-to-end, call `open_output()` after opening the input device. The output player will automatically use the input's APM for reverse stream processing, enabling echo cancellation. Args: enable_aec: Enable acoustic echo cancellation. noise_suppression: Enable noise suppression. high_pass_filter: Enable high-pass filtering. auto_gain_control: Enable automatic gain control. input_device: Optional input device index (default system device if None). queue_capacity: Max queued frames between callback and async pump. input_channel_index: Optional zero-based device channel to capture. If provided, only that channel is opened (via sounddevice mapping) and used as mono input. Returns: InputCapture: Holder with `source`, `apm`, and `aclose()`. """ import sounddevice as sd # type: ignore[import-not-found, import-untyped] loop = self._loop source = AudioSource(self._in_sr, self._channels, loop=loop) apm: Optional[AudioProcessingModule] = None if enable_aec or noise_suppression or high_pass_filter or auto_gain_control: apm = AudioProcessingModule( echo_cancellation=enable_aec, noise_suppression=noise_suppression, high_pass_filter=high_pass_filter, auto_gain_control=auto_gain_control, ) delay_estimator: Optional[_APMDelayEstimator] = ( _APMDelayEstimator() if apm is not None else None ) # Store the shared estimator and APM on the device helper so the output player can reuse them self._delay_estimator = delay_estimator self._apm = apm # Queue from callback to async task q: asyncio.Queue[AudioFrame] = asyncio.Queue(maxsize=queue_capacity) def _input_callback( indata: np.ndarray, frame_count: int, time_info: Any, status: Any ) -> None: # Slice into 10 ms frames, optionally APM, enqueue for async capture # Compute input (capture) delay using PortAudio timing; combine with last # measured output delay to provide APM stream delay in milliseconds. if apm is not None: try: input_delay_sec = float(time_info.currentTime - time_info.inputBufferAdcTime) output_delay_sec = ( float(delay_estimator.get_output_delay()) if delay_estimator else 0.0 ) total_delay_ms = int(max((input_delay_sec + output_delay_sec) * 1000.0, 0.0)) try: apm.set_stream_delay_ms(total_delay_ms) except Exception: pass except Exception: pass num_frames = frame_count // FRAME_SAMPLES for i in range(num_frames): start = i * FRAME_SAMPLES end = start + FRAME_SAMPLES if end > frame_count: break chunk = indata[start:end, 0] frame = AudioFrame( data=chunk.tobytes(), samples_per_channel=FRAME_SAMPLES, sample_rate=self._in_sr, num_channels=self._channels, ) if apm is not None: try: apm.process_stream(frame) except Exception: # Continue even if APM processing fails pass try: # Non-blocking: drop if full if not q.full(): loop.call_soon_threadsafe(q.put_nowait, frame) except Exception: pass # Note: input_channel_index is currently not used as sounddevice mapping # parameter is not supported in all versions. input_stream = sd.InputStream( callback=_input_callback, dtype="int16", channels=self._channels, device=input_device, samplerate=self._in_sr, blocksize=self._blocksize, ) input_stream.start() async def _pump() -> None: # Drain queue into AudioSource while True: try: frame = await q.get() except asyncio.CancelledError: break try: await source.capture_frame(frame) except Exception: # Ignore capture errors to keep the pump alive pass task = asyncio.create_task(_pump()) return InputCapture( source=source, input_stream=input_stream, task=task, apm=apm, delay_estimator=delay_estimator, ) def open_output( self, *, output_device: Optional[int] = None, ) -> OutputPlayer: """Create an `OutputPlayer` for rendering and (optionally) AEC reverse. If an input device was opened with AEC enabled, the output player will automatically feed the APM's reverse stream for echo cancellation. Args: output_device: Optional output device index (default system device if None). """ return OutputPlayer( sample_rate=self._out_sr, num_channels=self._channels, blocksize=self._blocksize, apm_for_reverse=self._apm, output_device=output_device, delay_estimator=self._delay_estimator, )High-level interface to native audio devices.
- Device enumeration helpers.
- Audio input capture into
rtc.AudioSourcewith optional APM processing. - Audio output player that can feed APM reverse stream for AEC.
Design notes: - APM operates on 10 ms frames; this module slices input/output audio into
FRAME_SAMPLESfor processing calls. - For AEC to be effective, render audio that could leak back into the mic should be played throughOutputPlayerwith the sameapminstance. - Timing alignment: this helper does not attempt to set device latency on APM; for most setups the default behavior is acceptable.Methods
def default_input_device(self) ‑> int | None-
Expand source code
def default_input_device(self) -> Optional[int]: """Return the default input device index (or None).""" import sounddevice as sd # type: ignore[import-not-found, import-untyped] dev = sd.default.device return dev[0] if isinstance(dev, (list, tuple)) else NoneReturn the default input device index (or None).
def default_output_device(self) ‑> int | None-
Expand source code
def default_output_device(self) -> Optional[int]: """Return the default output device index (or None).""" import sounddevice as sd # type: ignore[import-not-found, import-untyped] dev = sd.default.device return dev[1] if isinstance(dev, (list, tuple)) else NoneReturn the default output device index (or None).
def list_input_devices(self) ‑> list[dict[str, typing.Any]]-
Expand source code
def list_input_devices(self) -> list[dict[str, Any]]: """List available input devices. Returns a list of dictionaries with the `sounddevice` metadata and an added `index` key corresponding to the device index. """ import sounddevice as sd # type: ignore[import-not-found, import-untyped] devices = sd.query_devices() result: list[dict[str, Any]] = [] for idx, dev in enumerate(devices): if dev.get("max_input_channels", 0) > 0: result.append({"index": idx, **dev}) return resultList available input devices.
Returns a list of dictionaries with the
sounddevicemetadata and an addedindexkey corresponding to the device index. def list_output_devices(self) ‑> list[dict[str, typing.Any]]-
Expand source code
def list_output_devices(self) -> list[dict[str, Any]]: """List available output devices with indices.""" import sounddevice as sd # type: ignore[import-not-found, import-untyped] devices = sd.query_devices() result: list[dict[str, Any]] = [] for idx, dev in enumerate(devices): if dev.get("max_output_channels", 0) > 0: result.append({"index": idx, **dev}) return resultList available output devices with indices.
def open_input(self,
*,
enable_aec: bool = True,
noise_suppression: bool = True,
high_pass_filter: bool = True,
auto_gain_control: bool = True,
input_device: Optional[int] = None,
queue_capacity: int = 50,
input_channel_index: Optional[int] = None) ‑> InputCapture-
Expand source code
def open_input( self, *, enable_aec: bool = True, noise_suppression: bool = True, high_pass_filter: bool = True, auto_gain_control: bool = True, input_device: Optional[int] = None, queue_capacity: int = 50, input_channel_index: Optional[int] = None, ) -> InputCapture: """Open the default (or chosen) audio input device and start capture. Frames are sliced into 10 ms chunks. If any processing option is enabled, an `AudioProcessingModule` is created and applied to each frame before it is queued for `AudioSource.capture_frame`. To enable AEC end-to-end, call `open_output()` after opening the input device. The output player will automatically use the input's APM for reverse stream processing, enabling echo cancellation. Args: enable_aec: Enable acoustic echo cancellation. noise_suppression: Enable noise suppression. high_pass_filter: Enable high-pass filtering. auto_gain_control: Enable automatic gain control. input_device: Optional input device index (default system device if None). queue_capacity: Max queued frames between callback and async pump. input_channel_index: Optional zero-based device channel to capture. If provided, only that channel is opened (via sounddevice mapping) and used as mono input. Returns: InputCapture: Holder with `source`, `apm`, and `aclose()`. """ import sounddevice as sd # type: ignore[import-not-found, import-untyped] loop = self._loop source = AudioSource(self._in_sr, self._channels, loop=loop) apm: Optional[AudioProcessingModule] = None if enable_aec or noise_suppression or high_pass_filter or auto_gain_control: apm = AudioProcessingModule( echo_cancellation=enable_aec, noise_suppression=noise_suppression, high_pass_filter=high_pass_filter, auto_gain_control=auto_gain_control, ) delay_estimator: Optional[_APMDelayEstimator] = ( _APMDelayEstimator() if apm is not None else None ) # Store the shared estimator and APM on the device helper so the output player can reuse them self._delay_estimator = delay_estimator self._apm = apm # Queue from callback to async task q: asyncio.Queue[AudioFrame] = asyncio.Queue(maxsize=queue_capacity) def _input_callback( indata: np.ndarray, frame_count: int, time_info: Any, status: Any ) -> None: # Slice into 10 ms frames, optionally APM, enqueue for async capture # Compute input (capture) delay using PortAudio timing; combine with last # measured output delay to provide APM stream delay in milliseconds. if apm is not None: try: input_delay_sec = float(time_info.currentTime - time_info.inputBufferAdcTime) output_delay_sec = ( float(delay_estimator.get_output_delay()) if delay_estimator else 0.0 ) total_delay_ms = int(max((input_delay_sec + output_delay_sec) * 1000.0, 0.0)) try: apm.set_stream_delay_ms(total_delay_ms) except Exception: pass except Exception: pass num_frames = frame_count // FRAME_SAMPLES for i in range(num_frames): start = i * FRAME_SAMPLES end = start + FRAME_SAMPLES if end > frame_count: break chunk = indata[start:end, 0] frame = AudioFrame( data=chunk.tobytes(), samples_per_channel=FRAME_SAMPLES, sample_rate=self._in_sr, num_channels=self._channels, ) if apm is not None: try: apm.process_stream(frame) except Exception: # Continue even if APM processing fails pass try: # Non-blocking: drop if full if not q.full(): loop.call_soon_threadsafe(q.put_nowait, frame) except Exception: pass # Note: input_channel_index is currently not used as sounddevice mapping # parameter is not supported in all versions. input_stream = sd.InputStream( callback=_input_callback, dtype="int16", channels=self._channels, device=input_device, samplerate=self._in_sr, blocksize=self._blocksize, ) input_stream.start() async def _pump() -> None: # Drain queue into AudioSource while True: try: frame = await q.get() except asyncio.CancelledError: break try: await source.capture_frame(frame) except Exception: # Ignore capture errors to keep the pump alive pass task = asyncio.create_task(_pump()) return InputCapture( source=source, input_stream=input_stream, task=task, apm=apm, delay_estimator=delay_estimator, )Open the default (or chosen) audio input device and start capture.
Frames are sliced into 10 ms chunks. If any processing option is enabled, an
AudioProcessingModuleis created and applied to each frame before it is queued forAudioSource.capture_frame.To enable AEC end-to-end, call
open_output()after opening the input device. The output player will automatically use the input's APM for reverse stream processing, enabling echo cancellation.Args
enable_aec- Enable acoustic echo cancellation.
noise_suppression- Enable noise suppression.
high_pass_filter- Enable high-pass filtering.
auto_gain_control- Enable automatic gain control.
input_device- Optional input device index (default system device if None).
queue_capacity- Max queued frames between callback and async pump.
input_channel_index- Optional zero-based device channel to capture. If provided, only that channel is opened (via sounddevice mapping) and used as mono input.
Returns
InputCapture- Holder with
source,apm, andaclose().
def open_output(self, *, output_device: Optional[int] = None) ‑> OutputPlayer-
Expand source code
def open_output( self, *, output_device: Optional[int] = None, ) -> OutputPlayer: """Create an `OutputPlayer` for rendering and (optionally) AEC reverse. If an input device was opened with AEC enabled, the output player will automatically feed the APM's reverse stream for echo cancellation. Args: output_device: Optional output device index (default system device if None). """ return OutputPlayer( sample_rate=self._out_sr, num_channels=self._channels, blocksize=self._blocksize, apm_for_reverse=self._apm, output_device=output_device, delay_estimator=self._delay_estimator, )Create an
OutputPlayerfor rendering and (optionally) AEC reverse.If an input device was opened with AEC enabled, the output player will automatically feed the APM's reverse stream for echo cancellation.
Args
output_device- Optional output device index (default system device if None).
class OutputPlayer (*,
sample_rate: int = 48000,
num_channels: int = 1,
blocksize: int = 4800,
apm_for_reverse: Optional[AudioProcessingModule] = None,
output_device: Optional[int] = None,
delay_estimator: Optional[_APMDelayEstimator] = None)-
Expand source code
class OutputPlayer: """Simple audio output helper using `sounddevice.OutputStream`. When `apm_for_reverse` is provided, this player will feed the same PCM it renders (in 10 ms frames) into the APM reverse path so that echo cancellation can correlate mic input with speaker output. The OutputPlayer includes an internal `AudioMixer` for convenient multi-track playback. Use `add_track()` and `remove_track()` to dynamically manage tracks, then call `start()` to begin playback. """ def __init__( self, *, sample_rate: int = DEFAULT_SAMPLE_RATE, num_channels: int = DEFAULT_CHANNELS, blocksize: int = BLOCKSIZE, apm_for_reverse: Optional[AudioProcessingModule] = None, output_device: Optional[int] = None, delay_estimator: Optional[_APMDelayEstimator] = None, ) -> None: import sounddevice as sd # type: ignore[import-not-found, import-untyped] self._sample_rate = sample_rate self._num_channels = num_channels self._blocksize = blocksize self._apm = apm_for_reverse self._buffer = bytearray() self._buffer_lock = asyncio.Lock() self._play_task: Optional[asyncio.Task] = None self._running = False self._delay_estimator = delay_estimator # Internal mixer for add_track/remove_track API self._mixer: Optional[AudioMixer] = None self._track_streams: dict[ str, tuple[AudioStream, _AudioStreamIterator] ] = {} # track.sid -> (AudioStream, adapter) def _callback(outdata: np.ndarray, frame_count: int, time_info: Any, status: Any) -> None: # Pull PCM int16 from buffer; zero if not enough bytes_needed = frame_count * 2 # Important: Do not take asyncio locks in realtime callbacks. We keep the # critical section minimal and tolerate occasional underruns. available = len(self._buffer) if available >= bytes_needed: chunk = self._buffer[:bytes_needed] outdata[:, 0] = np.frombuffer(chunk, dtype=np.int16, count=frame_count) del self._buffer[:bytes_needed] elif available > 0: outdata[: available // 2, 0] = np.frombuffer( self._buffer[:available], dtype=np.int16, count=available // 2 ) outdata[available // 2 :, 0] = 0 del self._buffer[:available] else: outdata.fill(0) # Measure render (output) delay: time until DAC from current callback time try: output_delay_sec = float(time_info.outputBufferDacTime - time_info.currentTime) if self._delay_estimator is not None: self._delay_estimator.set_output_delay(output_delay_sec) except Exception: pass if self._apm is not None: # Feed reverse stream in 10 ms frames for AEC num_chunks = frame_count // FRAME_SAMPLES for i in range(num_chunks): start = i * FRAME_SAMPLES end = start + FRAME_SAMPLES if end > frame_count: break render_chunk = outdata[start:end, 0] render_frame = AudioFrame( render_chunk.tobytes(), self._sample_rate, 1, FRAME_SAMPLES ) try: self._apm.process_reverse_stream(render_frame) except Exception: # Ignore reverse stream errors in callback pass self._stream = sd.OutputStream( callback=_callback, dtype="int16", channels=num_channels, device=output_device, samplerate=sample_rate, blocksize=blocksize, ) async def add_track(self, track: Track) -> None: """Add an audio track to the internal mixer for playback. This creates an `AudioStream` from the track and adds it to the internal mixer. The mixer is created lazily on first track addition. Call `start()` to begin playback of all added tracks. Args: track: The audio track to add (typically from a remote participant). Raises: ValueError: If the track is not an audio track or has already been added. """ if track.sid in self._track_streams: raise ValueError(f"Track {track.sid} already added to player") # Create mixer on first track addition if self._mixer is None: self._mixer = AudioMixer(sample_rate=self._sample_rate, num_channels=self._num_channels) # Create audio stream for this track stream = AudioStream(track, sample_rate=self._sample_rate, num_channels=self._num_channels) # Wrap the stream with an adapter to convert AudioFrameEvent to AudioFrame stream_iterator = _AudioStreamIterator(stream) self._track_streams[track.sid] = (stream, stream_iterator) self._mixer.add_stream(stream_iterator) async def remove_track(self, track: Track) -> None: """Remove an audio track from the internal mixer. This removes the track's stream from the mixer and closes it. Args: track: The audio track to remove. """ entry = self._track_streams.pop(track.sid, None) if entry is None: return stream, stream_iterator = entry if self._mixer is not None: try: self._mixer.remove_stream(stream_iterator) except Exception: pass try: await stream.aclose() except Exception: pass async def start(self) -> None: """Start playback of all tracks in the internal mixer. This begins a background task that consumes frames from the internal mixer and sends them to the output device. Tracks can be added or removed dynamically using `add_track()` and `remove_track()`. Raises: RuntimeError: If playback is already started or no mixer is available. """ if self._play_task is not None and not self._play_task.done(): raise RuntimeError("Playback already started") if self._mixer is None: self._mixer = AudioMixer(sample_rate=self._sample_rate, num_channels=self._num_channels) async def _playback_loop(): """Internal playback loop that consumes frames from the mixer.""" self._running = True self._stream.start() try: async for frame in self._mixer: if not self._running: break # Append raw PCM bytes for callback consumption self._buffer.extend(frame.data.tobytes()) finally: self._running = False try: self._stream.stop() self._stream.close() except Exception: pass self._play_task = asyncio.create_task(_playback_loop()) async def aclose(self) -> None: """Stop playback and close the output stream. This also cleans up all added tracks and the internal mixer. """ self._running = False # Cancel playback task if running if self._play_task is not None and not self._play_task.done(): self._play_task.cancel() try: await self._play_task except asyncio.CancelledError: pass # Clean up all track streams for stream, _ in list(self._track_streams.values()): try: await stream.aclose() except Exception: pass self._track_streams.clear() # Close mixer if self._mixer is not None: try: await self._mixer.aclose() except Exception: pass self._mixer = None # Close output stream try: self._stream.stop() self._stream.close() except Exception: passSimple audio output helper using
sounddevice.OutputStream.When
apm_for_reverseis provided, this player will feed the same PCM it renders (in 10 ms frames) into the APM reverse path so that echo cancellation can correlate mic input with speaker output.The OutputPlayer includes an internal
AudioMixerfor convenient multi-track playback. Useadd_track()andremove_track()to dynamically manage tracks, then callstart()to begin playback.Methods
async def aclose(self) ‑> None-
Expand source code
async def aclose(self) -> None: """Stop playback and close the output stream. This also cleans up all added tracks and the internal mixer. """ self._running = False # Cancel playback task if running if self._play_task is not None and not self._play_task.done(): self._play_task.cancel() try: await self._play_task except asyncio.CancelledError: pass # Clean up all track streams for stream, _ in list(self._track_streams.values()): try: await stream.aclose() except Exception: pass self._track_streams.clear() # Close mixer if self._mixer is not None: try: await self._mixer.aclose() except Exception: pass self._mixer = None # Close output stream try: self._stream.stop() self._stream.close() except Exception: passStop playback and close the output stream.
This also cleans up all added tracks and the internal mixer.
async def add_track(self, track: Track) ‑> None-
Expand source code
async def add_track(self, track: Track) -> None: """Add an audio track to the internal mixer for playback. This creates an `AudioStream` from the track and adds it to the internal mixer. The mixer is created lazily on first track addition. Call `start()` to begin playback of all added tracks. Args: track: The audio track to add (typically from a remote participant). Raises: ValueError: If the track is not an audio track or has already been added. """ if track.sid in self._track_streams: raise ValueError(f"Track {track.sid} already added to player") # Create mixer on first track addition if self._mixer is None: self._mixer = AudioMixer(sample_rate=self._sample_rate, num_channels=self._num_channels) # Create audio stream for this track stream = AudioStream(track, sample_rate=self._sample_rate, num_channels=self._num_channels) # Wrap the stream with an adapter to convert AudioFrameEvent to AudioFrame stream_iterator = _AudioStreamIterator(stream) self._track_streams[track.sid] = (stream, stream_iterator) self._mixer.add_stream(stream_iterator)Add an audio track to the internal mixer for playback.
This creates an
AudioStreamfrom the track and adds it to the internal mixer. The mixer is created lazily on first track addition. Callstart()to begin playback of all added tracks.Args
track- The audio track to add (typically from a remote participant).
Raises
ValueError- If the track is not an audio track or has already been added.
async def remove_track(self, track: Track) ‑> None-
Expand source code
async def remove_track(self, track: Track) -> None: """Remove an audio track from the internal mixer. This removes the track's stream from the mixer and closes it. Args: track: The audio track to remove. """ entry = self._track_streams.pop(track.sid, None) if entry is None: return stream, stream_iterator = entry if self._mixer is not None: try: self._mixer.remove_stream(stream_iterator) except Exception: pass try: await stream.aclose() except Exception: passRemove an audio track from the internal mixer.
This removes the track's stream from the mixer and closes it.
Args
track- The audio track to remove.
async def start(self) ‑> None-
Expand source code
async def start(self) -> None: """Start playback of all tracks in the internal mixer. This begins a background task that consumes frames from the internal mixer and sends them to the output device. Tracks can be added or removed dynamically using `add_track()` and `remove_track()`. Raises: RuntimeError: If playback is already started or no mixer is available. """ if self._play_task is not None and not self._play_task.done(): raise RuntimeError("Playback already started") if self._mixer is None: self._mixer = AudioMixer(sample_rate=self._sample_rate, num_channels=self._num_channels) async def _playback_loop(): """Internal playback loop that consumes frames from the mixer.""" self._running = True self._stream.start() try: async for frame in self._mixer: if not self._running: break # Append raw PCM bytes for callback consumption self._buffer.extend(frame.data.tobytes()) finally: self._running = False try: self._stream.stop() self._stream.close() except Exception: pass self._play_task = asyncio.create_task(_playback_loop())Start playback of all tracks in the internal mixer.
This begins a background task that consumes frames from the internal mixer and sends them to the output device. Tracks can be added or removed dynamically using
add_track()andremove_track().Raises
RuntimeError- If playback is already started or no mixer is available.