Module livekit.rtc.synchronizer
Classes
class AVSynchronizer (*,
audio_source: AudioSource,
video_source: VideoSource,
video_fps: float,
video_queue_size_ms: float = 100)-
Expand source code
class AVSynchronizer: """Synchronize audio and video capture. Usage: av_sync = AVSynchronizer( audio_source=audio_source, video_source=video_source, video_fps=video_fps, ) async for video_frame, audio_frame in video_generator: await av_sync.push(video_frame) await av_sync.push(audio_frame) """ def __init__( self, *, audio_source: AudioSource, video_source: VideoSource, video_fps: float, video_queue_size_ms: float = 100, _max_delay_tolerance_ms: float = 300, ): self._audio_source = audio_source self._video_source = video_source self._video_fps = video_fps self._video_queue_size_ms = video_queue_size_ms self._max_delay_tolerance_ms = _max_delay_tolerance_ms self._stopped = False self._video_queue_max_size = int( self._video_fps * self._video_queue_size_ms / 1000 ) if self._video_queue_size_ms > 0: # ensure queue is bounded if queue size is specified self._video_queue_max_size = max(1, self._video_queue_max_size) self._video_queue = asyncio.Queue[VideoFrame]( maxsize=self._video_queue_max_size ) self._fps_controller = _FPSController( expected_fps=self._video_fps, max_delay_tolerance_ms=self._max_delay_tolerance_ms, ) self._capture_video_task = asyncio.create_task(self._capture_video()) async def push(self, frame: Union[VideoFrame, AudioFrame]) -> None: if isinstance(frame, AudioFrame): await self._audio_source.capture_frame(frame) return await self._video_queue.put(frame) async def clear_queue(self) -> None: self._audio_source.clear_queue() while not self._video_queue.empty(): await self._video_queue.get() async def wait_for_playout(self) -> None: """Wait until all video and audio frames are played out.""" await self._audio_source.wait_for_playout() await self._video_queue.join() async def _capture_video(self) -> None: while not self._stopped: frame = await self._video_queue.get() async with self._fps_controller: self._video_source.capture_frame(frame) self._video_queue.task_done() async def aclose(self) -> None: self._stopped = True if self._capture_video_task: self._capture_video_task.cancel() @property def actual_fps(self) -> float: return self._fps_controller.actual_fps
Synchronize audio and video capture.
Usage
av_sync = AVSynchronizer( audio_source=audio_source, video_source=video_source, video_fps=video_fps, )
async for video_frame, audio_frame in video_generator: await av_sync.push(video_frame) await av_sync.push(audio_frame)
Instance variables
prop actual_fps : float
-
Expand source code
@property def actual_fps(self) -> float: return self._fps_controller.actual_fps
Methods
async def aclose(self) ‑> None
-
Expand source code
async def aclose(self) -> None: self._stopped = True if self._capture_video_task: self._capture_video_task.cancel()
async def clear_queue(self) ‑> None
-
Expand source code
async def clear_queue(self) -> None: self._audio_source.clear_queue() while not self._video_queue.empty(): await self._video_queue.get()
async def push(self,
frame: VideoFrame | AudioFrame) ‑> None-
Expand source code
async def push(self, frame: Union[VideoFrame, AudioFrame]) -> None: if isinstance(frame, AudioFrame): await self._audio_source.capture_frame(frame) return await self._video_queue.put(frame)
async def wait_for_playout(self) ‑> None
-
Expand source code
async def wait_for_playout(self) -> None: """Wait until all video and audio frames are played out.""" await self._audio_source.wait_for_playout() await self._video_queue.join()
Wait until all video and audio frames are played out.