LiveKit C++ SDK
Real-time audio/video SDK for C++
Loading...
Searching...
No Matches
data_stream.h
1/*
2 * Copyright 2025 LiveKit
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an “AS IS” BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#pragma once
18
19#include <condition_variable>
20#include <cstdint>
21#include <deque>
22#include <functional>
23#include <map>
24#include <memory>
25#include <mutex>
26#include <optional>
27#include <string>
28#include <vector>
29
30namespace livekit {
31
32class LocalParticipant;
33
34// Chunk size for data streams (matches Python STREAM_CHUNK_SIZE).
35// Chosen to balance throughput and latency, and to work well with WebRTC data
36// channels.
37constexpr std::size_t kStreamChunkSize = 15'000; // 15 KB
38
42 std::string stream_id;
43
45 std::string mime_type;
46
48 std::string topic;
49
51 std::int64_t timestamp = 0;
52
54 std::optional<std::size_t> size;
55
57 std::map<std::string, std::string> attributes;
58};
59
63 std::vector<std::string> attachments;
64};
65
69 std::string name;
70};
71
72// Readers
73// - TextStreamReader: yields UTF-8 text chunks (std::string)
74// - ByteStreamReader: yields raw bytes (std::vector<uint8_t>)
75
79public:
82
83 TextStreamReader(const TextStreamReader &) = delete;
84 TextStreamReader &operator=(const TextStreamReader &) = delete;
85
88 bool readNext(std::string &out);
89
92 std::string readAll();
93
95 const TextStreamInfo &info() const noexcept { return info_; }
96
97private:
98 friend class Room;
99
101 void onChunkUpdate(const std::string &text);
102
105 void onStreamClose(const std::map<std::string, std::string> &trailer_attrs);
106
107 TextStreamInfo info_;
108
109 // Queue of text chunks; empty string with closed_==true means EOS.
110 std::deque<std::string> queue_;
111 bool closed_ = false;
112
113 std::mutex mutex_;
114 std::condition_variable cv_;
115};
116
119public:
122
123 ByteStreamReader(const ByteStreamReader &) = delete;
124 ByteStreamReader &operator=(const ByteStreamReader &) = delete;
125
128 bool readNext(std::vector<std::uint8_t> &out);
129
131 const ByteStreamInfo &info() const noexcept { return info_; }
132
133private:
134 friend class Room;
135
137 void onChunkUpdate(const std::vector<std::uint8_t> &bytes);
138
141 void onStreamClose(const std::map<std::string, std::string> &trailer_attrs);
142
143 ByteStreamInfo info_;
144
145 std::deque<std::vector<std::uint8_t>> queue_;
146 bool closed_ = false;
147
148 std::mutex mutex_;
149 std::condition_variable cv_;
150};
151
155public:
156 virtual ~BaseStreamWriter() = default;
157
159 const std::string &streamId() const noexcept { return stream_id_; }
160
162 const std::string &topic() const noexcept { return topic_; }
163
165 const std::string &mimeType() const noexcept { return mime_type_; }
166
168 std::int64_t timestampMs() const noexcept { return timestamp_ms_; }
169
171 bool isClosed() const noexcept { return closed_; }
172
175 void close(const std::string &reason = "",
176 const std::map<std::string, std::string> &attributes = {});
177
178protected:
179 BaseStreamWriter(LocalParticipant &local_participant,
180 const std::string &topic = "",
181 const std::map<std::string, std::string> &attributes = {},
182 const std::string &stream_id = "",
183 std::optional<std::size_t> total_size = std::nullopt,
184 const std::string &mime_type = "",
185 const std::vector<std::string> &destination_identities = {},
186 const std::string &sender_identity = "");
187
188 enum class StreamKind { kUnknown, kText, kByte };
189
190 LocalParticipant &local_participant_;
191
192 // Public-ish metadata (mirrors BaseStreamInfo, but kept simple here)
193 std::string stream_id_;
194 std::string mime_type_;
195 std::string topic_;
196 std::int64_t timestamp_ms_ = 0;
197 std::optional<std::size_t> total_size_;
198 std::map<std::string, std::string> attributes_;
199 std::vector<std::string> destination_identities_;
200 std::string sender_identity_;
201
202 bool closed_ = false;
203 bool header_sent_ = false;
204 std::uint64_t next_chunk_index_ = 0;
205 StreamKind kind_ = StreamKind::kUnknown;
206 std::string reply_to_id_;
207 std::string byte_name_; // Used by ByteStreamWriter
208
212
215 void sendChunk(const std::vector<std::uint8_t> &content);
216
219 void sendTrailer(const std::string &reason,
220 const std::map<std::string, std::string> &attributes);
221};
222
225public:
226 TextStreamWriter(LocalParticipant &local_participant,
227 const std::string &topic = "",
228 const std::map<std::string, std::string> &attributes = {},
229 const std::string &stream_id = "",
230 std::optional<std::size_t> total_size = std::nullopt,
231 const std::string &reply_to_id = "",
232 const std::vector<std::string> &destination_identities = {},
233 const std::string &sender_identity = "");
234
238 void write(const std::string &text);
239
241 const TextStreamInfo &info() const noexcept { return info_; }
242
243private:
244 TextStreamInfo info_;
245 std::mutex write_mutex_;
246};
247
250public:
251 ByteStreamWriter(LocalParticipant &local_participant, const std::string &name,
252 const std::string &topic = "",
253 const std::map<std::string, std::string> &attributes = {},
254 const std::string &stream_id = "",
255 std::optional<std::size_t> total_size = std::nullopt,
256 const std::string &mime_type = "application/octet-stream",
257 const std::vector<std::string> &destination_identities = {},
258 const std::string &sender_identity = "");
259
263 void write(const std::vector<std::uint8_t> &data);
264
266 const ByteStreamInfo &info() const noexcept { return info_; }
267
268private:
269 ByteStreamInfo info_;
270 std::mutex write_mutex_;
271};
272
273/* Callback invoked when a new incoming text stream is opened.
274 *
275 * The TextStreamReader is provided as a shared_ptr to ensure it remains
276 * alive for the duration of asynchronous reads (for example, when the
277 * user spawns a background thread to consume the stream).
278 */
279using TextStreamHandler =
280 std::function<void(std::shared_ptr<TextStreamReader>,
281 const std::string &participant_identity)>;
282
283/* Callback invoked when a new incoming byte stream is opened.
284 *
285 * The ByteStreamReader is provided as a shared_ptr to ensure it remains
286 * alive for the duration of asynchronous reads (for example, file writes
287 * or background processing).
288 */
289using ByteStreamHandler =
290 std::function<void(std::shared_ptr<ByteStreamReader>,
291 const std::string &participant_identity)>;
292
293} // namespace livekit
Definition data_stream.h:154
void close(const std::string &reason="", const std::map< std::string, std::string > &attributes={})
void sendChunk(const std::vector< std::uint8_t > &content)
std::int64_t timestampMs() const noexcept
Timestamp (ms) when the stream was created.
Definition data_stream.h:168
const std::string & streamId() const noexcept
Stream id assigned to this writer.
Definition data_stream.h:159
const std::string & topic() const noexcept
Topic of this stream.
Definition data_stream.h:162
const std::string & mimeType() const noexcept
MIME type for this stream.
Definition data_stream.h:165
bool isClosed() const noexcept
Whether the stream has been closed.
Definition data_stream.h:171
void sendTrailer(const std::string &reason, const std::map< std::string, std::string > &attributes)
Reader for incoming byte streams.
Definition data_stream.h:118
bool readNext(std::vector< std::uint8_t > &out)
ByteStreamReader(const ByteStreamInfo &info)
Construct a reader from initial stream metadata.
const ByteStreamInfo & info() const noexcept
Metadata associated with this stream.
Definition data_stream.h:131
Writer for outgoing byte streams.
Definition data_stream.h:249
void write(const std::vector< std::uint8_t > &data)
const ByteStreamInfo & info() const noexcept
Metadata associated with this stream.
Definition data_stream.h:266
Definition local_participant.h:54
Definition room.h:89
Definition data_stream.h:78
bool readNext(std::string &out)
const TextStreamInfo & info() const noexcept
Metadata associated with this stream.
Definition data_stream.h:95
TextStreamReader(const TextStreamInfo &info)
Construct a reader from initial stream metadata.
Writer for outgoing text streams.
Definition data_stream.h:224
const TextStreamInfo & info() const noexcept
Metadata associated with this stream.
Definition data_stream.h:241
void write(const std::string &text)
Base metadata for any stream (text or bytes).
Definition data_stream.h:40
std::string topic
Application-defined topic name.
Definition data_stream.h:48
std::string mime_type
MIME type of the stream (e.g. "text/plain", "application/octet-stream").
Definition data_stream.h:45
std::map< std::string, std::string > attributes
Arbitrary key–value attributes attached to the stream.
Definition data_stream.h:57
std::int64_t timestamp
Timestamp in milliseconds when the stream was created.
Definition data_stream.h:51
std::optional< std::size_t > size
Total size of the stream in bytes, if known.
Definition data_stream.h:54
std::string stream_id
Unique identifier for this stream.
Definition data_stream.h:42
Metadata for a byte stream.
Definition data_stream.h:67
std::string name
Optional name of the binary object (e.g. filename).
Definition data_stream.h:69
Metadata for a text stream.
Definition data_stream.h:61
std::vector< std::string > attachments
IDs of any attached streams (for replies / threads).
Definition data_stream.h:63