LiveKit C++ Client SDK v1.1.0
Real-time audio/video/data SDK for C++
Loading...
Searching...
No Matches
data_stream.h
1/*
2 * Copyright 2025 LiveKit
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an “AS IS” BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#pragma once
18
19#include <condition_variable>
20#include <cstdint>
21#include <deque>
22#include <functional>
23#include <map>
24#include <memory>
25#include <mutex>
26#include <optional>
27#include <string>
28#include <vector>
29
30#include "livekit/visibility.h"
31
32namespace livekit {
33
34class LocalParticipant;
35
39constexpr std::size_t kStreamChunkSize = 15'000; // 15 KB
40
44 std::string stream_id;
45
47 std::string mime_type;
48
50 std::string topic;
51
53 std::int64_t timestamp = 0;
54
56 std::optional<std::size_t> size;
57
59 std::map<std::string, std::string> attributes;
60};
61
65 std::vector<std::string> attachments;
66};
67
71 std::string name;
72};
73
74// Readers
75// - TextStreamReader: yields UTF-8 text chunks (std::string)
76// - ByteStreamReader: yields raw bytes (std::vector<uint8_t>)
77
80class LIVEKIT_API TextStreamReader {
81public:
84
85 TextStreamReader(const TextStreamReader&) = delete;
86 TextStreamReader& operator=(const TextStreamReader&) = delete;
87
90 bool readNext(std::string& out);
91
94 std::string readAll();
95
97 const TextStreamInfo& info() const noexcept { return info_; }
98
99private:
100 friend class Room;
101
103 void onChunkUpdate(const std::string& text);
104
107 void onStreamClose(const std::map<std::string, std::string>& trailer_attrs);
108
109 TextStreamInfo info_;
110
111 // Queue of text chunks; empty string with closed_==true means EOS.
112 std::deque<std::string> queue_;
113 bool closed_ = false;
114
115 std::mutex mutex_;
116 std::condition_variable cv_;
117};
118
120class LIVEKIT_API ByteStreamReader {
121public:
124
125 ByteStreamReader(const ByteStreamReader&) = delete;
126 ByteStreamReader& operator=(const ByteStreamReader&) = delete;
127
130 bool readNext(std::vector<std::uint8_t>& out);
131
133 const ByteStreamInfo& info() const noexcept { return info_; }
134
135private:
136 friend class Room;
137
139 void onChunkUpdate(const std::vector<std::uint8_t>& bytes);
140
143 void onStreamClose(const std::map<std::string, std::string>& trailer_attrs);
144
145 ByteStreamInfo info_;
146
147 std::deque<std::vector<std::uint8_t>> queue_;
148 bool closed_ = false;
149
150 std::mutex mutex_;
151 std::condition_variable cv_;
152};
153
156class LIVEKIT_API BaseStreamWriter {
157public:
158 virtual ~BaseStreamWriter() = default;
159
161 const std::string& streamId() const noexcept { return stream_id_; }
162
164 const std::string& topic() const noexcept { return topic_; }
165
167 const std::string& mimeType() const noexcept { return mime_type_; }
168
170 std::int64_t timestampMs() const noexcept { return timestamp_ms_; }
171
173 bool isClosed() const noexcept { return closed_; }
174
177 void close(const std::string& reason = "", const std::map<std::string, std::string>& attributes = {});
178
179protected:
180 BaseStreamWriter(LocalParticipant& local_participant, std::string topic = "",
181 std::map<std::string, std::string> attributes = {}, std::string stream_id = "",
182 std::optional<std::size_t> total_size = std::nullopt, std::string mime_type = "",
183 std::vector<std::string> destination_identities = {}, std::string sender_identity = "");
184
185 enum class StreamKind { kUnknown, kText, kByte };
186
187 LocalParticipant& local_participant_;
188
189 // Public-ish metadata (mirrors BaseStreamInfo, but kept simple here)
190 std::string stream_id_;
191 std::string mime_type_;
192 std::string topic_;
193 std::int64_t timestamp_ms_ = 0;
194 std::optional<std::size_t> total_size_;
195 std::map<std::string, std::string> attributes_;
196 std::vector<std::string> destination_identities_;
197 std::string sender_identity_;
198
199 bool closed_ = false;
200 bool header_sent_ = false;
201 std::uint64_t next_chunk_index_ = 0;
202 StreamKind kind_ = StreamKind::kUnknown;
203 std::string reply_to_id_;
204 std::string byte_name_; // Used by ByteStreamWriter
205
209
212 void sendChunk(const std::vector<std::uint8_t>& content);
213
216 void sendTrailer(const std::string& reason, const std::map<std::string, std::string>& attributes);
217};
218
220class LIVEKIT_API TextStreamWriter : public BaseStreamWriter {
221public:
222 TextStreamWriter(LocalParticipant& local_participant, const std::string& topic = "",
223 const std::map<std::string, std::string>& attributes = {}, const std::string& stream_id = "",
224 std::optional<std::size_t> total_size = std::nullopt, const std::string& reply_to_id = "",
225 const std::vector<std::string>& destination_identities = {},
226 const std::string& sender_identity = "");
227
231 void write(const std::string& text);
232
234 const TextStreamInfo& info() const noexcept { return info_; }
235
236private:
237 TextStreamInfo info_;
238 std::mutex write_mutex_;
239};
240
242class LIVEKIT_API ByteStreamWriter : public BaseStreamWriter {
243public:
244 ByteStreamWriter(LocalParticipant& local_participant, const std::string& name, const std::string& topic = "",
245 const std::map<std::string, std::string>& attributes = {}, const std::string& stream_id = "",
246 std::optional<std::size_t> total_size = std::nullopt,
247 const std::string& mime_type = "application/octet-stream",
248 const std::vector<std::string>& destination_identities = {},
249 const std::string& sender_identity = "");
250
254 void write(const std::vector<std::uint8_t>& data);
255
257 const ByteStreamInfo& info() const noexcept { return info_; }
258
259private:
260 ByteStreamInfo info_;
261 std::mutex write_mutex_;
262};
263
270 std::function<void(std::shared_ptr<TextStreamReader>, const std::string& participant_identity)>;
271
278 std::function<void(std::shared_ptr<ByteStreamReader>, const std::string& participant_identity)>;
279
280} // namespace livekit
Base class for sending data streams.
Definition data_stream.h:156
void close(const std::string &reason="", const std::map< std::string, std::string > &attributes={})
Close the stream with optional reason and attributes.
void ensureHeaderSent()
Ensure the header has been sent once.
void sendChunk(const std::vector< std::uint8_t > &content)
Send a raw chunk of bytes.
std::int64_t timestampMs() const noexcept
Timestamp (ms) when the stream was created.
Definition data_stream.h:170
const std::string & streamId() const noexcept
Stream id assigned to this writer.
Definition data_stream.h:161
const std::string & topic() const noexcept
Topic of this stream.
Definition data_stream.h:164
const std::string & mimeType() const noexcept
MIME type for this stream.
Definition data_stream.h:167
bool isClosed() const noexcept
Whether the stream has been closed.
Definition data_stream.h:173
void sendTrailer(const std::string &reason, const std::map< std::string, std::string > &attributes)
Send the trailer with given reason and attributes.
Reader for incoming byte streams.
Definition data_stream.h:120
ByteStreamReader(ByteStreamInfo info)
Construct a reader from initial stream metadata.
bool readNext(std::vector< std::uint8_t > &out)
Blocking read of next byte chunk.
const ByteStreamInfo & info() const noexcept
Metadata associated with this stream.
Definition data_stream.h:133
Writer for outgoing byte streams.
Definition data_stream.h:242
void write(const std::vector< std::uint8_t > &data)
Write binary data to the stream.
const ByteStreamInfo & info() const noexcept
Metadata associated with this stream.
Definition data_stream.h:257
Represents the local participant in a room.
Definition local_participant.h:55
Represents a LiveKit room session.
Definition room.h:98
Reader for incoming text streams.
Definition data_stream.h:80
bool readNext(std::string &out)
Blocking read of next text chunk.
const TextStreamInfo & info() const noexcept
Metadata associated with this stream.
Definition data_stream.h:97
TextStreamReader(TextStreamInfo info)
Construct a reader from initial stream metadata.
std::string readAll()
Convenience: read entire stream into a single string.
Writer for outgoing text streams.
Definition data_stream.h:220
const TextStreamInfo & info() const noexcept
Metadata associated with this stream.
Definition data_stream.h:234
void write(const std::string &text)
Write a UTF-8 string to the stream.
Public API for the LiveKit C++ Client SDK.
Definition audio_frame.h:25
constexpr std::size_t kStreamChunkSize
Chunk size for data streams (matches Python STREAM_CHUNK_SIZE).
Definition data_stream.h:39
std::function< void(std::shared_ptr< ByteStreamReader >, const std::string &participant_identity)> ByteStreamHandler
Callback invoked when a new incoming byte stream is opened.
Definition data_stream.h:278
std::function< void(std::shared_ptr< TextStreamReader >, const std::string &participant_identity)> TextStreamHandler
Callback invoked when a new incoming text stream is opened.
Definition data_stream.h:270
Base metadata for any stream (text or bytes).
Definition data_stream.h:42
std::string topic
Application-defined topic name.
Definition data_stream.h:50
std::string mime_type
MIME type of the stream (e.g. "text/plain", "application/octet-stream").
Definition data_stream.h:47
std::map< std::string, std::string > attributes
Arbitrary key–value attributes attached to the stream.
Definition data_stream.h:59
std::int64_t timestamp
Timestamp in milliseconds when the stream was created.
Definition data_stream.h:53
std::optional< std::size_t > size
Total size of the stream in bytes, if known.
Definition data_stream.h:56
std::string stream_id
Unique identifier for this stream.
Definition data_stream.h:44
Metadata for a byte stream.
Definition data_stream.h:69
std::string name
Optional name of the binary object (e.g. filename).
Definition data_stream.h:71
Metadata for a text stream.
Definition data_stream.h:63
std::vector< std::string > attachments
IDs of any attached streams (for replies / threads).
Definition data_stream.h:65