LiveKit C++ SDK
Real-time audio/video SDK for C++
Loading...
Searching...
No Matches
subscription_thread_dispatcher.h
1/*
2 * Copyright 2025 LiveKit
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#ifndef LIVEKIT_SUBSCRIPTION_THREAD_DISPATCHER_H
18#define LIVEKIT_SUBSCRIPTION_THREAD_DISPATCHER_H
19
20#include "livekit/audio_stream.h"
21#include "livekit/video_stream.h"
22
23#include <cstdint>
24#include <functional>
25#include <memory>
26#include <mutex>
27#include <optional>
28#include <string>
29#include <thread>
30#include <unordered_map>
31#include <vector>
32
33namespace livekit {
34
35class AudioFrame;
36class DataTrackStream;
37class RemoteDataTrack;
38class Track;
39class VideoFrame;
40
43using AudioFrameCallback = std::function<void(const AudioFrame &)>;
44
47using VideoFrameCallback =
48 std::function<void(const VideoFrame &frame, std::int64_t timestamp_us)>;
49
54using DataFrameCallback =
55 std::function<void(const std::vector<std::uint8_t> &payload,
56 std::optional<std::uint64_t> user_timestamp)>;
57
60using DataFrameCallbackId = std::uint64_t;
61
85public:
88
91
105 void setOnAudioFrameCallback(const std::string &participant_identity,
106 TrackSource source, AudioFrameCallback callback,
107 AudioStream::Options opts = {});
108
122 void setOnAudioFrameCallback(const std::string &participant_identity,
123 const std::string &track_name,
124 AudioFrameCallback callback,
125 AudioStream::Options opts = {});
126
140 void setOnVideoFrameCallback(const std::string &participant_identity,
141 TrackSource source, VideoFrameCallback callback,
142 VideoStream::Options opts = {});
143
157 void setOnVideoFrameCallback(const std::string &participant_identity,
158 const std::string &track_name,
159 VideoFrameCallback callback,
160 VideoStream::Options opts = {});
161
171 void clearOnAudioFrameCallback(const std::string &participant_identity,
172 TrackSource source);
173
183 void clearOnAudioFrameCallback(const std::string &participant_identity,
184 const std::string &track_name);
185
195 void clearOnVideoFrameCallback(const std::string &participant_identity,
196 TrackSource source);
197
207 void clearOnVideoFrameCallback(const std::string &participant_identity,
208 const std::string &track_name);
209
224 void handleTrackSubscribed(const std::string &participant_identity,
225 TrackSource source, const std::string &track_name,
226 const std::shared_ptr<Track> &track);
227
240 void handleTrackUnsubscribed(const std::string &participant_identity,
241 TrackSource source,
242 const std::string &track_name);
243
244 // ---------------------------------------------------------------
245 // Data track callbacks
246 // ---------------------------------------------------------------
247
265 DataFrameCallbackId
266 addOnDataFrameCallback(const std::string &participant_identity,
267 const std::string &track_name,
268 DataFrameCallback callback);
269
278 void removeOnDataFrameCallback(DataFrameCallbackId id);
279
289 void handleDataTrackPublished(const std::shared_ptr<RemoteDataTrack> &track);
290
299 void handleDataTrackUnpublished(const std::string &sid);
300
307 void stopAll();
308
309private:
310 friend class SubscriptionThreadDispatcherTest;
311
315 struct CallbackKey {
316 std::string participant_identity;
317 TrackSource source;
318 std::string track_name;
319
320 bool operator==(const CallbackKey &o) const {
321 return participant_identity == o.participant_identity &&
322 source == o.source && track_name == o.track_name;
323 }
324 };
325
327 struct CallbackKeyHash {
328 std::size_t operator()(const CallbackKey &k) const {
329 auto h1 = std::hash<std::string>{}(k.participant_identity);
330 auto h2 = std::hash<int>{}(static_cast<int>(k.source));
331 auto h3 = std::hash<std::string>{}(k.track_name);
332 return h1 ^ (h2 << 1) ^ (h3 << 2);
333 }
334 };
335
337 struct ActiveReader {
338 std::shared_ptr<AudioStream> audio_stream;
339 std::shared_ptr<VideoStream> video_stream;
340 std::thread thread;
341 };
342
344 struct DataCallbackKey {
345 std::string participant_identity;
346 std::string track_name;
347
348 bool operator==(const DataCallbackKey &o) const {
349 return participant_identity == o.participant_identity &&
350 track_name == o.track_name;
351 }
352 };
353
355 struct DataCallbackKeyHash {
356 std::size_t operator()(const DataCallbackKey &k) const {
357 auto h1 = std::hash<std::string>{}(k.participant_identity);
358 auto h2 = std::hash<std::string>{}(k.track_name);
359 return h1 ^ (h2 << 1);
360 }
361 };
362
364 struct RegisteredDataCallback {
365 DataCallbackKey key;
366 DataFrameCallback callback;
367 };
368
370 struct ActiveDataReader {
371 std::shared_ptr<RemoteDataTrack> remote_track;
372 std::mutex sub_mutex;
373 std::shared_ptr<DataTrackStream> stream; // guarded by sub_mutex
374 std::thread thread;
375 };
376
378 struct RegisteredAudioCallback {
379 AudioFrameCallback callback;
380 AudioStream::Options options;
381 };
382
384 struct RegisteredVideoCallback {
385 VideoFrameCallback callback;
386 VideoStream::Options options;
387 };
388
393 std::thread extractReaderThreadLocked(const CallbackKey &key);
394
398 std::thread startReaderLocked(const CallbackKey &key,
399 const std::shared_ptr<Track> &track);
400
405 std::thread startAudioReaderLocked(const CallbackKey &key,
406 const std::shared_ptr<Track> &track,
407 AudioFrameCallback cb,
408 const AudioStream::Options &opts);
409
414 std::thread startVideoReaderLocked(const CallbackKey &key,
415 const std::shared_ptr<Track> &track,
416 VideoFrameCallback cb,
417 const VideoStream::Options &opts);
418
421 std::thread extractDataReaderThreadLocked(DataFrameCallbackId id);
422
425 std::thread extractDataReaderThreadLocked(const DataCallbackKey &key);
426
429 std::thread
430 startDataReaderLocked(DataFrameCallbackId id, const DataCallbackKey &key,
431 const std::shared_ptr<RemoteDataTrack> &track,
432 DataFrameCallback cb);
433
435 mutable std::mutex lock_;
436
438 std::unordered_map<CallbackKey, RegisteredAudioCallback, CallbackKeyHash>
439 audio_callbacks_;
440
442 std::unordered_map<CallbackKey, RegisteredVideoCallback, CallbackKeyHash>
443 video_callbacks_;
444
446 std::unordered_map<CallbackKey, ActiveReader, CallbackKeyHash>
447 active_readers_;
448
450 DataFrameCallbackId next_data_callback_id_;
451
453 std::unordered_map<DataFrameCallbackId, RegisteredDataCallback>
454 data_callbacks_;
455
457 std::unordered_map<DataFrameCallbackId, std::shared_ptr<ActiveDataReader>>
458 active_data_readers_;
459
461 std::unordered_map<DataCallbackKey, std::shared_ptr<RemoteDataTrack>,
462 DataCallbackKeyHash>
463 remote_data_tracks_;
464
466 static constexpr int kMaxActiveReaders = 20;
467};
468
469} // namespace livekit
470
471#endif /* LIVEKIT_SUBSCRIPTION_THREAD_DISPATCHER_H */
Definition subscription_thread_dispatcher.h:84
void handleDataTrackPublished(const std::shared_ptr< RemoteDataTrack > &track)
void handleTrackSubscribed(const std::string &participant_identity, TrackSource source, const std::string &track_name, const std::shared_ptr< Track > &track)
void handleDataTrackUnpublished(const std::string &sid)
void clearOnAudioFrameCallback(const std::string &participant_identity, TrackSource source)
void setOnVideoFrameCallback(const std::string &participant_identity, const std::string &track_name, VideoFrameCallback callback, VideoStream::Options opts={})
void setOnAudioFrameCallback(const std::string &participant_identity, TrackSource source, AudioFrameCallback callback, AudioStream::Options opts={})
void removeOnDataFrameCallback(DataFrameCallbackId id)
void setOnAudioFrameCallback(const std::string &participant_identity, const std::string &track_name, AudioFrameCallback callback, AudioStream::Options opts={})
void clearOnVideoFrameCallback(const std::string &participant_identity, TrackSource source)
DataFrameCallbackId addOnDataFrameCallback(const std::string &participant_identity, const std::string &track_name, DataFrameCallback callback)
SubscriptionThreadDispatcher()
Constructs an empty dispatcher with no registered callbacks or readers.
void setOnVideoFrameCallback(const std::string &participant_identity, TrackSource source, VideoFrameCallback callback, VideoStream::Options opts={})
void clearOnVideoFrameCallback(const std::string &participant_identity, const std::string &track_name)
void handleTrackUnsubscribed(const std::string &participant_identity, TrackSource source, const std::string &track_name)
void clearOnAudioFrameCallback(const std::string &participant_identity, const std::string &track_name)
~SubscriptionThreadDispatcher()
Stops all active readers and clears all registered callbacks.
Configuration options for AudioStream creation.
Definition audio_stream.h:67
Definition video_stream.h:62