Module livekit.rtc.video_stream

Classes

class VideoFrameEvent (frame: VideoFrame, timestamp_us: int, rotation: proto_video_frame.VideoRotation)
Expand source code
@dataclass
class VideoFrameEvent:
    frame: VideoFrame
    timestamp_us: int
    rotation: proto_video_frame.VideoRotation

VideoFrameEvent(frame: 'VideoFrame', timestamp_us: 'int', rotation: 'proto_video_frame.VideoRotation')

Class variables

var frameVideoFrame
var rotation
var timestamp_us : int
class VideoStream (track: Track,
loop: Optional[asyncio.AbstractEventLoop] = None,
capacity: int = 0,
format: Optional[proto_video_frame.VideoBufferType.ValueType] = None,
**kwargs)
Expand source code
class VideoStream:
    """VideoStream is a stream of video frames received from a RemoteTrack."""

    def __init__(
        self,
        track: Track,
        loop: Optional[asyncio.AbstractEventLoop] = None,
        capacity: int = 0,
        format: Optional[proto_video_frame.VideoBufferType.ValueType] = None,
        **kwargs,
    ) -> None:
        self._loop = loop or asyncio.get_event_loop()
        self._ffi_queue = FfiClient.instance.queue.subscribe(self._loop)
        self._queue: RingQueue[VideoFrameEvent | None] = RingQueue(capacity)
        self._track: Track | None = track
        self._format = format
        self._capacity = capacity
        self._format = format
        stream: Any = None
        if "participant" in kwargs:
            stream = self._create_owned_stream_from_participant(
                participant=kwargs["participant"], track_source=kwargs["track_source"]
            )
        else:
            stream = self._create_owned_stream()

        self._ffi_handle = FfiHandle(stream.handle.id)
        self._info = stream.info

        self._task = self._loop.create_task(self._run())
        self._task.add_done_callback(task_done_logger)

    @classmethod
    def from_participant(
        cls,
        *,
        participant: Participant,
        track_source: TrackSource.ValueType,
        loop: Optional[asyncio.AbstractEventLoop] = None,
        format: Optional[proto_video_frame.VideoBufferType.ValueType] = None,
        capacity: int = 0,
    ) -> VideoStream:
        return VideoStream(
            participant=participant,
            track_source=track_source,
            loop=loop,
            capacity=capacity,
            format=format,
            track=None,  # type: ignore
        )

    @classmethod
    def from_track(
        cls,
        *,
        track: Track,
        loop: Optional[asyncio.AbstractEventLoop] = None,
        format: Optional[proto_video_frame.VideoBufferType.ValueType] = None,
        capacity: int = 0,
    ) -> VideoStream:
        return VideoStream(
            track=track,
            loop=loop,
            capacity=capacity,
            format=format,
        )

    def __del__(self) -> None:
        FfiClient.instance.queue.unsubscribe(self._ffi_queue)

    def _create_owned_stream(self) -> Any:
        assert self._track is not None
        req = proto_ffi.FfiRequest()
        new_video_stream = req.new_video_stream
        new_video_stream.track_handle = self._track._ffi_handle.handle
        new_video_stream.type = proto_video_frame.VideoStreamType.VIDEO_STREAM_NATIVE
        if self._format is not None:
            new_video_stream.format = self._format
        new_video_stream.normalize_stride = True
        resp = FfiClient.instance.request(req)
        return resp.new_video_stream.stream

    def _create_owned_stream_from_participant(
        self, participant: Participant, track_source: TrackSource.ValueType
    ) -> Any:
        req = proto_ffi.FfiRequest()
        video_stream_from_participant = req.video_stream_from_participant
        video_stream_from_participant.participant_handle = (
            participant._ffi_handle.handle
        )
        video_stream_from_participant.type = (
            proto_video_frame.VideoStreamType.VIDEO_STREAM_NATIVE
        )
        video_stream_from_participant.track_source = track_source
        video_stream_from_participant.normalize_stride = True
        if self._format is not None:
            video_stream_from_participant.format = self._format
        resp = FfiClient.instance.request(req)
        return resp.video_stream_from_participant.stream

    async def _run(self) -> None:
        while True:
            event = await self._ffi_queue.wait_for(self._is_event)
            video_event = event.video_stream_event

            if video_event.HasField("frame_received"):
                owned_buffer_info = video_event.frame_received.buffer
                frame = VideoFrame._from_owned_info(owned_buffer_info)

                event = VideoFrameEvent(
                    frame=frame,
                    timestamp_us=video_event.frame_received.timestamp_us,
                    rotation=video_event.frame_received.rotation,
                )

                self._queue.put(event)
            elif video_event.HasField("eos"):
                break

        FfiClient.instance.queue.unsubscribe(self._ffi_queue)

    async def aclose(self) -> None:
        self._ffi_handle.dispose()
        await self._task

    def _is_event(self, e: proto_ffi.FfiEvent) -> bool:
        return e.video_stream_event.stream_handle == self._ffi_handle.handle

    def __aiter__(self) -> AsyncIterator[VideoFrameEvent]:
        return self

    async def __anext__(self) -> VideoFrameEvent:
        if self._task.done():
            raise StopAsyncIteration

        item = await self._queue.get()
        if item is None:
            raise StopAsyncIteration

        return item

VideoStream is a stream of video frames received from a RemoteTrack.

Static methods

def from_participant(*,
participant: Participant,
track_source: TrackSource.ValueType,
loop: Optional[asyncio.AbstractEventLoop] = None,
format: Optional[proto_video_frame.VideoBufferType.ValueType] = None,
capacity: int = 0) ‑> VideoStream
def from_track(*,
track: Track,
loop: Optional[asyncio.AbstractEventLoop] = None,
format: Optional[proto_video_frame.VideoBufferType.ValueType] = None,
capacity: int = 0) ‑> VideoStream

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    self._ffi_handle.dispose()
    await self._task