Module livekit.rtc.synchronizer
Classes
class AVSynchronizer (*,
audio_source: AudioSource,
video_source: VideoSource,
video_fps: float,
video_queue_size_ms: float = 100)-
Expand source code
class AVSynchronizer: """Synchronize audio and video capture. Usage: av_sync = AVSynchronizer( audio_source=audio_source, video_source=video_source, video_fps=video_fps, ) async for video_frame, audio_frame in video_generator: await av_sync.push(video_frame) await av_sync.push(audio_frame) """ def __init__( self, *, audio_source: AudioSource, video_source: VideoSource, video_fps: float, video_queue_size_ms: float = 100, _max_delay_tolerance_ms: float = 300, ): self._audio_source = audio_source self._video_source = video_source self._video_fps = video_fps self._video_queue_size_ms = video_queue_size_ms self._max_delay_tolerance_ms = _max_delay_tolerance_ms self._stopped = False # the time of the last video/audio frame captured self._last_video_time: float = 0 self._last_audio_time: float = 0 self._video_queue_max_size = int(self._video_fps * self._video_queue_size_ms / 1000) if self._video_queue_size_ms > 0: # ensure queue is bounded if queue size is specified self._video_queue_max_size = max(1, self._video_queue_max_size) self._video_queue = asyncio.Queue[tuple[VideoFrame, Optional[float]]]( maxsize=self._video_queue_max_size ) self._fps_controller = _FPSController( expected_fps=self._video_fps, max_delay_tolerance_ms=self._max_delay_tolerance_ms, ) self._capture_video_task = asyncio.create_task(self._capture_video()) async def push( self, frame: Union[VideoFrame, AudioFrame], timestamp: Optional[float] = None ) -> None: """Push a frame to the synchronizer Args: frame: The video or audio frame to push. timestamp: (optional) The timestamp of the frame, for logging purposes for now. For AudioFrame, it should be the end time of the frame. """ if isinstance(frame, AudioFrame): await self._audio_source.capture_frame(frame) if timestamp is not None: self._last_audio_time = timestamp return await self._video_queue.put((frame, timestamp)) async def clear_queue(self) -> None: self._audio_source.clear_queue() while not self._video_queue.empty(): await self._video_queue.get() self._video_queue.task_done() async def wait_for_playout(self) -> None: """Wait until all video and audio frames are played out.""" await asyncio.gather( self._audio_source.wait_for_playout(), self._video_queue.join(), ) def reset(self) -> None: self._fps_controller.reset() async def _capture_video(self) -> None: while not self._stopped: frame, timestamp = await self._video_queue.get() async with self._fps_controller: self._video_source.capture_frame(frame) if timestamp is not None: self._last_video_time = timestamp self._video_queue.task_done() async def aclose(self) -> None: self._stopped = True if self._capture_video_task: self._capture_video_task.cancel() @property def actual_fps(self) -> float: return self._fps_controller.actual_fps @property def last_video_time(self) -> float: """The time of the last video frame captured""" return self._last_video_time @property def last_audio_time(self) -> float: """The time of the last audio frame played out""" return self._last_audio_time - self._audio_source.queued_duration
Synchronize audio and video capture.
Usage
av_sync = AVSynchronizer( audio_source=audio_source, video_source=video_source, video_fps=video_fps, )
async for video_frame, audio_frame in video_generator: await av_sync.push(video_frame) await av_sync.push(audio_frame)
Instance variables
prop actual_fps : float
-
Expand source code
@property def actual_fps(self) -> float: return self._fps_controller.actual_fps
prop last_audio_time : float
-
Expand source code
@property def last_audio_time(self) -> float: """The time of the last audio frame played out""" return self._last_audio_time - self._audio_source.queued_duration
The time of the last audio frame played out
prop last_video_time : float
-
Expand source code
@property def last_video_time(self) -> float: """The time of the last video frame captured""" return self._last_video_time
The time of the last video frame captured
Methods
async def aclose(self) ‑> None
-
Expand source code
async def aclose(self) -> None: self._stopped = True if self._capture_video_task: self._capture_video_task.cancel()
async def clear_queue(self) ‑> None
-
Expand source code
async def clear_queue(self) -> None: self._audio_source.clear_queue() while not self._video_queue.empty(): await self._video_queue.get() self._video_queue.task_done()
async def push(self,
frame: VideoFrame | AudioFrame,
timestamp: float | None = None) ‑> None-
Expand source code
async def push( self, frame: Union[VideoFrame, AudioFrame], timestamp: Optional[float] = None ) -> None: """Push a frame to the synchronizer Args: frame: The video or audio frame to push. timestamp: (optional) The timestamp of the frame, for logging purposes for now. For AudioFrame, it should be the end time of the frame. """ if isinstance(frame, AudioFrame): await self._audio_source.capture_frame(frame) if timestamp is not None: self._last_audio_time = timestamp return await self._video_queue.put((frame, timestamp))
Push a frame to the synchronizer
Args
frame
- The video or audio frame to push.
timestamp
- (optional) The timestamp of the frame, for logging purposes for now. For AudioFrame, it should be the end time of the frame.
def reset(self) ‑> None
-
Expand source code
def reset(self) -> None: self._fps_controller.reset()
async def wait_for_playout(self) ‑> None
-
Expand source code
async def wait_for_playout(self) -> None: """Wait until all video and audio frames are played out.""" await asyncio.gather( self._audio_source.wait_for_playout(), self._video_queue.join(), )
Wait until all video and audio frames are played out.