Module livekit.rtc.synchronizer

Classes

class AVSynchronizer (*,
audio_source: AudioSource,
video_source: VideoSource,
video_fps: float,
video_queue_size_ms: float = 100)
Expand source code
class AVSynchronizer:
    """Synchronize audio and video capture.

    Usage:
        av_sync = AVSynchronizer(
            audio_source=audio_source,
            video_source=video_source,
            video_fps=video_fps,
        )

        async for video_frame, audio_frame in video_generator:
            await av_sync.push(video_frame)
            await av_sync.push(audio_frame)
    """

    def __init__(
        self,
        *,
        audio_source: AudioSource,
        video_source: VideoSource,
        video_fps: float,
        video_queue_size_ms: float = 100,
        _max_delay_tolerance_ms: float = 300,
    ):
        self._audio_source = audio_source
        self._video_source = video_source
        self._video_fps = video_fps
        self._video_queue_size_ms = video_queue_size_ms
        self._max_delay_tolerance_ms = _max_delay_tolerance_ms

        self._stopped = False

        self._video_queue_max_size = int(
            self._video_fps * self._video_queue_size_ms / 1000
        )
        if self._video_queue_size_ms > 0:
            # ensure queue is bounded if queue size is specified
            self._video_queue_max_size = max(1, self._video_queue_max_size)

        self._video_queue = asyncio.Queue[VideoFrame](
            maxsize=self._video_queue_max_size
        )
        self._fps_controller = _FPSController(
            expected_fps=self._video_fps,
            max_delay_tolerance_ms=self._max_delay_tolerance_ms,
        )
        self._capture_video_task = asyncio.create_task(self._capture_video())

    async def push(self, frame: Union[VideoFrame, AudioFrame]) -> None:
        if isinstance(frame, AudioFrame):
            await self._audio_source.capture_frame(frame)
            return

        await self._video_queue.put(frame)

    async def clear_queue(self) -> None:
        self._audio_source.clear_queue()
        while not self._video_queue.empty():
            await self._video_queue.get()

    async def wait_for_playout(self) -> None:
        """Wait until all video and audio frames are played out."""
        await self._audio_source.wait_for_playout()
        await self._video_queue.join()

    async def _capture_video(self) -> None:
        while not self._stopped:
            frame = await self._video_queue.get()
            async with self._fps_controller:
                self._video_source.capture_frame(frame)
            self._video_queue.task_done()

    async def aclose(self) -> None:
        self._stopped = True
        if self._capture_video_task:
            self._capture_video_task.cancel()

    @property
    def actual_fps(self) -> float:
        return self._fps_controller.actual_fps

Synchronize audio and video capture.

Usage

av_sync = AVSynchronizer( audio_source=audio_source, video_source=video_source, video_fps=video_fps, )

async for video_frame, audio_frame in video_generator: await av_sync.push(video_frame) await av_sync.push(audio_frame)

Instance variables

prop actual_fps : float
Expand source code
@property
def actual_fps(self) -> float:
    return self._fps_controller.actual_fps

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    self._stopped = True
    if self._capture_video_task:
        self._capture_video_task.cancel()
async def clear_queue(self) ‑> None
Expand source code
async def clear_queue(self) -> None:
    self._audio_source.clear_queue()
    while not self._video_queue.empty():
        await self._video_queue.get()
async def push(self,
frame: VideoFrame | AudioFrame) ‑> None
Expand source code
async def push(self, frame: Union[VideoFrame, AudioFrame]) -> None:
    if isinstance(frame, AudioFrame):
        await self._audio_source.capture_frame(frame)
        return

    await self._video_queue.put(frame)
async def wait_for_playout(self) ‑> None
Expand source code
async def wait_for_playout(self) -> None:
    """Wait until all video and audio frames are played out."""
    await self._audio_source.wait_for_playout()
    await self._video_queue.join()

Wait until all video and audio frames are played out.