Module livekit.rtc.synchronizer

Classes

class AVSynchronizer (*,
audio_source: AudioSource,
video_source: VideoSource,
video_fps: float,
video_queue_size_ms: float = 100)
Expand source code
class AVSynchronizer:
    """Synchronize audio and video capture.

    Usage:
        av_sync = AVSynchronizer(
            audio_source=audio_source,
            video_source=video_source,
            video_fps=video_fps,
        )

        async for video_frame, audio_frame in video_generator:
            await av_sync.push(video_frame)
            await av_sync.push(audio_frame)
    """

    def __init__(
        self,
        *,
        audio_source: AudioSource,
        video_source: VideoSource,
        video_fps: float,
        video_queue_size_ms: float = 100,
        _max_delay_tolerance_ms: float = 300,
    ):
        self._audio_source = audio_source
        self._video_source = video_source
        self._video_fps = video_fps
        self._video_queue_size_ms = video_queue_size_ms
        self._max_delay_tolerance_ms = _max_delay_tolerance_ms

        self._stopped = False
        # the time of the last video/audio frame captured
        self._last_video_time: float = 0
        self._last_audio_time: float = 0

        self._video_queue_max_size = int(self._video_fps * self._video_queue_size_ms / 1000)
        if self._video_queue_size_ms > 0:
            # ensure queue is bounded if queue size is specified
            self._video_queue_max_size = max(1, self._video_queue_max_size)

        self._video_queue = asyncio.Queue[tuple[VideoFrame, Optional[float]]](
            maxsize=self._video_queue_max_size
        )
        self._fps_controller = _FPSController(
            expected_fps=self._video_fps,
            max_delay_tolerance_ms=self._max_delay_tolerance_ms,
        )
        self._capture_video_task = asyncio.create_task(self._capture_video())

    async def push(
        self, frame: Union[VideoFrame, AudioFrame], timestamp: Optional[float] = None
    ) -> None:
        """Push a frame to the synchronizer

        Args:
            frame: The video or audio frame to push.
            timestamp: (optional) The timestamp of the frame, for logging purposes for now.
                For AudioFrame, it should be the end time of the frame.
        """
        if isinstance(frame, AudioFrame):
            await self._audio_source.capture_frame(frame)
            if timestamp is not None:
                self._last_audio_time = timestamp
            return

        await self._video_queue.put((frame, timestamp))

    async def clear_queue(self) -> None:
        self._audio_source.clear_queue()
        while not self._video_queue.empty():
            await self._video_queue.get()
            self._video_queue.task_done()

    async def wait_for_playout(self) -> None:
        """Wait until all video and audio frames are played out."""
        await asyncio.gather(
            self._audio_source.wait_for_playout(),
            self._video_queue.join(),
        )

    def reset(self) -> None:
        self._fps_controller.reset()

    async def _capture_video(self) -> None:
        while not self._stopped:
            frame, timestamp = await self._video_queue.get()
            async with self._fps_controller:
                self._video_source.capture_frame(frame)
                if timestamp is not None:
                    self._last_video_time = timestamp
            self._video_queue.task_done()

    async def aclose(self) -> None:
        self._stopped = True
        if self._capture_video_task:
            self._capture_video_task.cancel()

    @property
    def actual_fps(self) -> float:
        return self._fps_controller.actual_fps

    @property
    def last_video_time(self) -> float:
        """The time of the last video frame captured"""
        return self._last_video_time

    @property
    def last_audio_time(self) -> float:
        """The time of the last audio frame played out"""
        return self._last_audio_time - self._audio_source.queued_duration

Synchronize audio and video capture.

Usage

av_sync = AVSynchronizer( audio_source=audio_source, video_source=video_source, video_fps=video_fps, )

async for video_frame, audio_frame in video_generator: await av_sync.push(video_frame) await av_sync.push(audio_frame)

Instance variables

prop actual_fps : float
Expand source code
@property
def actual_fps(self) -> float:
    return self._fps_controller.actual_fps
prop last_audio_time : float
Expand source code
@property
def last_audio_time(self) -> float:
    """The time of the last audio frame played out"""
    return self._last_audio_time - self._audio_source.queued_duration

The time of the last audio frame played out

prop last_video_time : float
Expand source code
@property
def last_video_time(self) -> float:
    """The time of the last video frame captured"""
    return self._last_video_time

The time of the last video frame captured

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    self._stopped = True
    if self._capture_video_task:
        self._capture_video_task.cancel()
async def clear_queue(self) ‑> None
Expand source code
async def clear_queue(self) -> None:
    self._audio_source.clear_queue()
    while not self._video_queue.empty():
        await self._video_queue.get()
        self._video_queue.task_done()
async def push(self,
frame: VideoFrame | AudioFrame,
timestamp: float | None = None) ‑> None
Expand source code
async def push(
    self, frame: Union[VideoFrame, AudioFrame], timestamp: Optional[float] = None
) -> None:
    """Push a frame to the synchronizer

    Args:
        frame: The video or audio frame to push.
        timestamp: (optional) The timestamp of the frame, for logging purposes for now.
            For AudioFrame, it should be the end time of the frame.
    """
    if isinstance(frame, AudioFrame):
        await self._audio_source.capture_frame(frame)
        if timestamp is not None:
            self._last_audio_time = timestamp
        return

    await self._video_queue.put((frame, timestamp))

Push a frame to the synchronizer

Args

frame
The video or audio frame to push.
timestamp
(optional) The timestamp of the frame, for logging purposes for now. For AudioFrame, it should be the end time of the frame.
def reset(self) ‑> None
Expand source code
def reset(self) -> None:
    self._fps_controller.reset()
async def wait_for_playout(self) ‑> None
Expand source code
async def wait_for_playout(self) -> None:
    """Wait until all video and audio frames are played out."""
    await asyncio.gather(
        self._audio_source.wait_for_playout(),
        self._video_queue.join(),
    )

Wait until all video and audio frames are played out.