19#include <condition_variable>
32class LocalParticipant;
37constexpr std::size_t kStreamChunkSize = 15'000;
54 std::optional<std::size_t>
size;
101 void onChunkUpdate(
const std::string &text);
105 void onStreamClose(
const std::map<std::string, std::string> &trailer_attrs);
110 std::deque<std::string> queue_;
111 bool closed_ =
false;
114 std::condition_variable cv_;
137 void onChunkUpdate(
const std::vector<std::uint8_t> &bytes);
141 void onStreamClose(
const std::map<std::string, std::string> &trailer_attrs);
145 std::deque<std::vector<std::uint8_t>> queue_;
146 bool closed_ =
false;
149 std::condition_variable cv_;
159 const std::string &
streamId() const noexcept {
return stream_id_; }
162 const std::string &
topic() const noexcept {
return topic_; }
165 const std::string &
mimeType() const noexcept {
return mime_type_; }
168 std::int64_t
timestampMs() const noexcept {
return timestamp_ms_; }
175 void close(
const std::string &reason =
"",
176 const std::map<std::string, std::string> &attributes = {});
180 const std::string &
topic =
"",
181 const std::map<std::string, std::string> &attributes = {},
182 const std::string &stream_id =
"",
183 std::optional<std::size_t> total_size = std::nullopt,
184 const std::string &mime_type =
"",
185 const std::vector<std::string> &destination_identities = {},
186 const std::string &sender_identity =
"");
188 enum class StreamKind { kUnknown, kText, kByte };
190 LocalParticipant &local_participant_;
193 std::string stream_id_;
194 std::string mime_type_;
196 std::int64_t timestamp_ms_ = 0;
197 std::optional<std::size_t> total_size_;
198 std::map<std::string, std::string> attributes_;
199 std::vector<std::string> destination_identities_;
200 std::string sender_identity_;
202 bool closed_ =
false;
203 bool header_sent_ =
false;
204 std::uint64_t next_chunk_index_ = 0;
205 StreamKind kind_ = StreamKind::kUnknown;
206 std::string reply_to_id_;
207 std::string byte_name_;
215 void sendChunk(
const std::vector<std::uint8_t> &content);
220 const std::map<std::string, std::string> &attributes);
227 const std::string &
topic =
"",
228 const std::map<std::string, std::string> &attributes = {},
229 const std::string &stream_id =
"",
230 std::optional<std::size_t> total_size = std::nullopt,
231 const std::string &reply_to_id =
"",
232 const std::vector<std::string> &destination_identities = {},
233 const std::string &sender_identity =
"");
238 void write(
const std::string &text);
245 std::mutex write_mutex_;
252 const std::string &
topic =
"",
253 const std::map<std::string, std::string> &attributes = {},
254 const std::string &stream_id =
"",
255 std::optional<std::size_t> total_size = std::nullopt,
256 const std::string &mime_type =
"application/octet-stream",
257 const std::vector<std::string> &destination_identities = {},
258 const std::string &sender_identity =
"");
263 void write(
const std::vector<std::uint8_t> &data);
270 std::mutex write_mutex_;
279using TextStreamHandler =
280 std::function<void(std::shared_ptr<TextStreamReader>,
281 const std::string &participant_identity)>;
289using ByteStreamHandler =
290 std::function<void(std::shared_ptr<ByteStreamReader>,
291 const std::string &participant_identity)>;
Definition data_stream.h:154
void close(const std::string &reason="", const std::map< std::string, std::string > &attributes={})
void sendChunk(const std::vector< std::uint8_t > &content)
std::int64_t timestampMs() const noexcept
Timestamp (ms) when the stream was created.
Definition data_stream.h:168
const std::string & streamId() const noexcept
Stream id assigned to this writer.
Definition data_stream.h:159
const std::string & topic() const noexcept
Topic of this stream.
Definition data_stream.h:162
const std::string & mimeType() const noexcept
MIME type for this stream.
Definition data_stream.h:165
bool isClosed() const noexcept
Whether the stream has been closed.
Definition data_stream.h:171
void sendTrailer(const std::string &reason, const std::map< std::string, std::string > &attributes)
Reader for incoming byte streams.
Definition data_stream.h:118
bool readNext(std::vector< std::uint8_t > &out)
ByteStreamReader(const ByteStreamInfo &info)
Construct a reader from initial stream metadata.
const ByteStreamInfo & info() const noexcept
Metadata associated with this stream.
Definition data_stream.h:131
Writer for outgoing byte streams.
Definition data_stream.h:249
void write(const std::vector< std::uint8_t > &data)
const ByteStreamInfo & info() const noexcept
Metadata associated with this stream.
Definition data_stream.h:266
Definition local_participant.h:54
Definition data_stream.h:78
bool readNext(std::string &out)
const TextStreamInfo & info() const noexcept
Metadata associated with this stream.
Definition data_stream.h:95
TextStreamReader(const TextStreamInfo &info)
Construct a reader from initial stream metadata.
Writer for outgoing text streams.
Definition data_stream.h:224
const TextStreamInfo & info() const noexcept
Metadata associated with this stream.
Definition data_stream.h:241
void write(const std::string &text)
Base metadata for any stream (text or bytes).
Definition data_stream.h:40
std::string topic
Application-defined topic name.
Definition data_stream.h:48
std::string mime_type
MIME type of the stream (e.g. "text/plain", "application/octet-stream").
Definition data_stream.h:45
std::map< std::string, std::string > attributes
Arbitrary key–value attributes attached to the stream.
Definition data_stream.h:57
std::int64_t timestamp
Timestamp in milliseconds when the stream was created.
Definition data_stream.h:51
std::optional< std::size_t > size
Total size of the stream in bytes, if known.
Definition data_stream.h:54
std::string stream_id
Unique identifier for this stream.
Definition data_stream.h:42
Metadata for a byte stream.
Definition data_stream.h:67
std::string name
Optional name of the binary object (e.g. filename).
Definition data_stream.h:69
Metadata for a text stream.
Definition data_stream.h:61
std::vector< std::string > attachments
IDs of any attached streams (for replies / threads).
Definition data_stream.h:63