Module livekit.rtc.data_stream

Classes

class BaseStreamInfo (stream_id: str,
mime_type: str,
topic: str,
timestamp: int,
size: Optional[int],
attributes: Optional[Dict[str, str]])
Expand source code
@dataclass
class BaseStreamInfo:
    stream_id: str
    mime_type: str
    topic: str
    timestamp: int
    size: Optional[int]
    attributes: Optional[Dict[str, str]]  # Optional for the attributes dictionary

BaseStreamInfo(stream_id: 'str', mime_type: 'str', topic: 'str', timestamp: 'int', size: 'Optional[int]', attributes: 'Optional[Dict[str, str]]')

Subclasses

Instance variables

var attributes : Dict[str, str] | None
var mime_type : str
var size : int | None
var stream_id : str
var timestamp : int
var topic : str
class BaseStreamWriter (local_participant: LocalParticipant,
topic: str = '',
attributes: Optional[Dict[str, str]] = {},
stream_id: str | None = None,
total_size: int | None = None,
mime_type: str = '',
destination_identities: Optional[List[str]] = None,
sender_identity: str | None = None)
Expand source code
class BaseStreamWriter:
    def __init__(
        self,
        local_participant: LocalParticipant,
        topic: str = "",
        attributes: Optional[Dict[str, str]] = {},
        stream_id: str | None = None,
        total_size: int | None = None,
        mime_type: str = "",
        destination_identities: Optional[List[str]] = None,
        sender_identity: str | None = None,
    ):
        self._local_participant = local_participant
        if stream_id is None:
            stream_id = str(uuid.uuid4())
        timestamp = int(datetime.datetime.now().timestamp() * 1000)
        self._header = proto_DataStream.Header(
            stream_id=stream_id,
            timestamp=timestamp,
            mime_type=mime_type,
            topic=topic,
            attributes=attributes,
            total_length=total_size,
        )
        self._next_chunk_index: int = 0
        self._destination_identities = destination_identities
        self._sender_identity = sender_identity or self._local_participant.identity
        self._closed = False

    async def _send_header(self):
        req = proto_ffi.FfiRequest(
            send_stream_header=proto_room.SendStreamHeaderRequest(
                header=self._header,
                local_participant_handle=self._local_participant._ffi_handle.handle,
                destination_identities=self._destination_identities,
                sender_identity=self._sender_identity,
            )
        )

        queue = FfiClient.instance.queue.subscribe()
        try:
            resp = FfiClient.instance.request(req)
            cb: proto_ffi.FfiEvent = await queue.wait_for(
                lambda e: e.send_stream_header.async_id == resp.send_stream_header.async_id
            )
        finally:
            FfiClient.instance.queue.unsubscribe(queue)

        if cb.send_stream_header.error:
            raise ConnectionError(cb.send_stream_header.error)

    async def _send_chunk(self, chunk: proto_DataStream.Chunk):
        if self._closed:
            raise RuntimeError(f"Cannot send chunk after stream is closed: {chunk}")
        req = proto_ffi.FfiRequest(
            send_stream_chunk=proto_room.SendStreamChunkRequest(
                chunk=chunk,
                local_participant_handle=self._local_participant._ffi_handle.handle,
                sender_identity=self._local_participant.identity,
                destination_identities=self._destination_identities,
            )
        )

        queue = FfiClient.instance.queue.subscribe()
        try:
            resp = FfiClient.instance.request(req)
            cb: proto_ffi.FfiEvent = await queue.wait_for(
                lambda e: e.send_stream_chunk.async_id == resp.send_stream_chunk.async_id
            )
        finally:
            FfiClient.instance.queue.unsubscribe(queue)

        if cb.send_stream_chunk.error:
            raise ConnectionError(cb.send_stream_chunk.error)

    async def _send_trailer(self, trailer: proto_DataStream.Trailer):
        req = proto_ffi.FfiRequest(
            send_stream_trailer=proto_room.SendStreamTrailerRequest(
                trailer=trailer,
                local_participant_handle=self._local_participant._ffi_handle.handle,
                sender_identity=self._local_participant.identity,
            )
        )

        queue = FfiClient.instance.queue.subscribe()
        try:
            resp = FfiClient.instance.request(req)
            cb: proto_ffi.FfiEvent = await queue.wait_for(
                lambda e: e.send_stream_trailer.async_id == resp.send_stream_trailer.async_id
            )
        finally:
            FfiClient.instance.queue.unsubscribe(queue)

        if cb.send_stream_chunk.error:
            raise ConnectionError(cb.send_stream_trailer.error)

    async def aclose(self, *, reason: str = "", attributes: Optional[Dict[str, str]] = None):
        if self._closed:
            raise RuntimeError("Stream already closed")
        self._closed = True
        await self._send_trailer(
            trailer=proto_DataStream.Trailer(
                stream_id=self._header.stream_id, reason=reason, attributes=attributes
            )
        )

Subclasses

Methods

async def aclose(self, *, reason: str = '', attributes: Optional[Dict[str, str]] = None)
Expand source code
async def aclose(self, *, reason: str = "", attributes: Optional[Dict[str, str]] = None):
    if self._closed:
        raise RuntimeError("Stream already closed")
    self._closed = True
    await self._send_trailer(
        trailer=proto_DataStream.Trailer(
            stream_id=self._header.stream_id, reason=reason, attributes=attributes
        )
    )
class ByteStreamInfo (stream_id: str,
mime_type: str,
topic: str,
timestamp: int,
size: Optional[int],
attributes: Optional[Dict[str, str]],
name: str)
Expand source code
@dataclass
class ByteStreamInfo(BaseStreamInfo):
    name: str

ByteStreamInfo(stream_id: 'str', mime_type: 'str', topic: 'str', timestamp: 'int', size: 'Optional[int]', attributes: 'Optional[Dict[str, str]]', name: 'str')

Ancestors

Instance variables

var name : str
class ByteStreamReader (header: proto_DataStream.Header, capacity: int = 0)
Expand source code
class ByteStreamReader:
    def __init__(self, header: proto_DataStream.Header, capacity: int = 0) -> None:
        self._header = header
        self._info = ByteStreamInfo(
            stream_id=header.stream_id,
            mime_type=header.mime_type,
            topic=header.topic,
            timestamp=header.timestamp,
            size=header.total_length,
            attributes=dict(header.attributes),
            name=header.byte_header.name,
        )
        self._queue: asyncio.Queue[proto_DataStream.Chunk | None] = asyncio.Queue(capacity)

    async def _on_chunk_update(self, chunk: proto_DataStream.Chunk):
        await self._queue.put(chunk)

    async def _on_stream_close(self, trailer: proto_DataStream.Trailer):
        self.info.attributes = self.info.attributes or {}
        self.info.attributes.update(trailer.attributes)
        await self._queue.put(None)

    def __aiter__(self) -> AsyncIterator[bytes]:
        return self

    async def __anext__(self) -> bytes:
        item = await self._queue.get()
        if item is None:
            raise StopAsyncIteration

        return item.content

    @property
    def info(self) -> ByteStreamInfo:
        return self._info

Instance variables

prop infoByteStreamInfo
Expand source code
@property
def info(self) -> ByteStreamInfo:
    return self._info
class ByteStreamWriter (local_participant: LocalParticipant,
*,
name: str,
topic: str = '',
attributes: Optional[Dict[str, str]] = None,
stream_id: str | None = None,
total_size: int | None = None,
mime_type: str = 'application/octet-stream',
destination_identities: Optional[List[str]] = None)
Expand source code
class ByteStreamWriter(BaseStreamWriter):
    def __init__(
        self,
        local_participant: LocalParticipant,
        *,
        name: str,
        topic: str = "",
        attributes: Optional[Dict[str, str]] = None,
        stream_id: str | None = None,
        total_size: int | None = None,
        mime_type: str = "application/octet-stream",
        destination_identities: Optional[List[str]] = None,
    ) -> None:
        super().__init__(
            local_participant,
            topic,
            attributes,
            stream_id,
            total_size,
            mime_type=mime_type,
            destination_identities=destination_identities,
        )
        self._header.byte_header.name = name
        self._info = ByteStreamInfo(
            stream_id=self._header.stream_id,
            mime_type=self._header.mime_type,
            topic=self._header.topic,
            timestamp=self._header.timestamp,
            size=self._header.total_length,
            attributes=dict(self._header.attributes),
            name=self._header.byte_header.name,
        )
        self._write_lock = asyncio.Lock()

    async def write(self, data: bytes):
        async with self._write_lock:
            chunked_data = [
                data[i : i + STREAM_CHUNK_SIZE] for i in range(0, len(data), STREAM_CHUNK_SIZE)
            ]

            for chunk in chunked_data:
                self._next_chunk_index += 1
                chunk_msg = proto_DataStream.Chunk(
                    stream_id=self._header.stream_id,
                    chunk_index=self._next_chunk_index,
                    content=chunk,
                )
                await self._send_chunk(chunk_msg)

    @property
    def info(self) -> ByteStreamInfo:
        return self._info

Ancestors

Instance variables

prop infoByteStreamInfo
Expand source code
@property
def info(self) -> ByteStreamInfo:
    return self._info

Methods

async def write(self, data: bytes)
Expand source code
async def write(self, data: bytes):
    async with self._write_lock:
        chunked_data = [
            data[i : i + STREAM_CHUNK_SIZE] for i in range(0, len(data), STREAM_CHUNK_SIZE)
        ]

        for chunk in chunked_data:
            self._next_chunk_index += 1
            chunk_msg = proto_DataStream.Chunk(
                stream_id=self._header.stream_id,
                chunk_index=self._next_chunk_index,
                content=chunk,
            )
            await self._send_chunk(chunk_msg)
class proto_DataStream (*args, **kwargs)

A ProtocolMessage

Ancestors

  • google._upb._message.Message
  • google.protobuf.message.Message

Class variables

var ByteHeader

A ProtocolMessage

var Chunk

A ProtocolMessage

var DESCRIPTOR
var Header

A ProtocolMessage

var TextHeader

A ProtocolMessage

var Trailer

A ProtocolMessage

class TextStreamInfo (stream_id: str,
mime_type: str,
topic: str,
timestamp: int,
size: Optional[int],
attributes: Optional[Dict[str, str]],
attachments: List[str])
Expand source code
@dataclass
class TextStreamInfo(BaseStreamInfo):
    attachments: List[str]

TextStreamInfo(stream_id: 'str', mime_type: 'str', topic: 'str', timestamp: 'int', size: 'Optional[int]', attributes: 'Optional[Dict[str, str]]', attachments: 'List[str]')

Ancestors

Instance variables

var attachments : List[str]
class TextStreamReader (header: proto_DataStream.Header)
Expand source code
class TextStreamReader:
    def __init__(
        self,
        header: proto_DataStream.Header,
    ) -> None:
        self._header = header
        self._info = TextStreamInfo(
            stream_id=header.stream_id,
            mime_type=header.mime_type,
            topic=header.topic,
            timestamp=header.timestamp,
            size=header.total_length,
            attributes=dict(header.attributes),
            attachments=list(header.text_header.attached_stream_ids),
        )
        self._queue: asyncio.Queue[proto_DataStream.Chunk | None] = asyncio.Queue()

    async def _on_chunk_update(self, chunk: proto_DataStream.Chunk):
        await self._queue.put(chunk)

    async def _on_stream_close(self, trailer: proto_DataStream.Trailer):
        self.info.attributes = self.info.attributes or {}
        self.info.attributes.update(trailer.attributes)
        await self._queue.put(None)

    def __aiter__(self) -> AsyncIterator[str]:
        return self

    async def __anext__(self) -> str:
        item = await self._queue.get()
        if item is None:
            raise StopAsyncIteration
        decodedStr = item.content.decode()
        return decodedStr

    @property
    def info(self) -> TextStreamInfo:
        return self._info

    async def read_all(self) -> str:
        final_string = ""
        async for chunk in self:
            final_string += chunk
        return final_string

Instance variables

prop infoTextStreamInfo
Expand source code
@property
def info(self) -> TextStreamInfo:
    return self._info

Methods

async def read_all(self) ‑> str
Expand source code
async def read_all(self) -> str:
    final_string = ""
    async for chunk in self:
        final_string += chunk
    return final_string
class TextStreamWriter (local_participant: LocalParticipant,
*,
topic: str = '',
attributes: Optional[Dict[str, str]] = {},
stream_id: str | None = None,
total_size: int | None = None,
reply_to_id: str | None = None,
destination_identities: Optional[List[str]] = None,
sender_identity: str | None = None)
Expand source code
class TextStreamWriter(BaseStreamWriter):
    def __init__(
        self,
        local_participant: LocalParticipant,
        *,
        topic: str = "",
        attributes: Optional[Dict[str, str]] = {},
        stream_id: str | None = None,
        total_size: int | None = None,
        reply_to_id: str | None = None,
        destination_identities: Optional[List[str]] = None,
        sender_identity: str | None = None,
    ) -> None:
        super().__init__(
            local_participant,
            topic,
            attributes,
            stream_id,
            total_size,
            mime_type="text/plain",
            destination_identities=destination_identities,
            sender_identity=sender_identity,
        )
        self._header.text_header.operation_type = proto_DataStream.OperationType.CREATE
        if reply_to_id:
            self._header.text_header.reply_to_stream_id = reply_to_id
        self._info = TextStreamInfo(
            stream_id=self._header.stream_id,
            mime_type=self._header.mime_type,
            topic=self._header.topic,
            timestamp=self._header.timestamp,
            size=self._header.total_length,
            attributes=dict(self._header.attributes),
            attachments=list(self._header.text_header.attached_stream_ids),
        )
        self._write_lock = asyncio.Lock()

    async def write(self, text: str):
        async with self._write_lock:
            for chunk in split_utf8(text, STREAM_CHUNK_SIZE):
                content = chunk
                chunk_index = self._next_chunk_index
                self._next_chunk_index += 1
                chunk_msg = proto_DataStream.Chunk(
                    stream_id=self._header.stream_id,
                    chunk_index=chunk_index,
                    content=content,
                )
                await self._send_chunk(chunk_msg)

    @property
    def info(self) -> TextStreamInfo:
        return self._info

Ancestors

Instance variables

prop infoTextStreamInfo
Expand source code
@property
def info(self) -> TextStreamInfo:
    return self._info

Methods

async def write(self, text: str)
Expand source code
async def write(self, text: str):
    async with self._write_lock:
        for chunk in split_utf8(text, STREAM_CHUNK_SIZE):
            content = chunk
            chunk_index = self._next_chunk_index
            self._next_chunk_index += 1
            chunk_msg = proto_DataStream.Chunk(
                stream_id=self._header.stream_id,
                chunk_index=chunk_index,
                content=content,
            )
            await self._send_chunk(chunk_msg)