LiveKit C++ SDK
Real-time audio/video SDK for C++
Loading...
Searching...
No Matches
subscription_thread_dispatcher.h
1/*
2 * Copyright 2025 LiveKit
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#ifndef LIVEKIT_SUBSCRIPTION_THREAD_DISPATCHER_H
18#define LIVEKIT_SUBSCRIPTION_THREAD_DISPATCHER_H
19
20#include "livekit/audio_stream.h"
21#include "livekit/video_stream.h"
22
23#include <cstdint>
24#include <functional>
25#include <memory>
26#include <mutex>
27#include <optional>
28#include <string>
29#include <thread>
30#include <unordered_map>
31#include <vector>
32
33namespace livekit {
34
35class AudioFrame;
36class DataTrackStream;
37class RemoteDataTrack;
38class Track;
39class VideoFrame;
40
43using AudioFrameCallback = std::function<void(const AudioFrame &)>;
44
47using VideoFrameCallback =
48 std::function<void(const VideoFrame &frame, std::int64_t timestamp_us)>;
49
52using VideoFrameEventCallback = std::function<void(const VideoFrameEvent &)>;
53
58using DataFrameCallback =
59 std::function<void(const std::vector<std::uint8_t> &payload,
60 std::optional<std::uint64_t> user_timestamp)>;
61
64using DataFrameCallbackId = std::uint64_t;
65
89public:
92
95
109 void setOnAudioFrameCallback(const std::string &participant_identity,
110 TrackSource source, AudioFrameCallback callback,
111 const AudioStream::Options &opts = {});
112
126 void setOnAudioFrameCallback(const std::string &participant_identity,
127 const std::string &track_name,
128 AudioFrameCallback callback,
129 const AudioStream::Options &opts = {});
130
144 void setOnVideoFrameCallback(const std::string &participant_identity,
145 TrackSource source, VideoFrameCallback callback,
146 const VideoStream::Options &opts = {});
147
161 void setOnVideoFrameCallback(const std::string &participant_identity,
162 const std::string &track_name,
163 VideoFrameCallback callback,
164 const VideoStream::Options &opts = {});
165
181 void setOnVideoFrameEventCallback(const std::string &participant_identity,
182 const std::string &track_name,
183 VideoFrameEventCallback callback,
184 const VideoStream::Options &opts = {});
185
195 void clearOnAudioFrameCallback(const std::string &participant_identity,
196 TrackSource source);
197
207 void clearOnAudioFrameCallback(const std::string &participant_identity,
208 const std::string &track_name);
209
219 void clearOnVideoFrameCallback(const std::string &participant_identity,
220 TrackSource source);
221
231 void clearOnVideoFrameCallback(const std::string &participant_identity,
232 const std::string &track_name);
233
248 void handleTrackSubscribed(const std::string &participant_identity,
249 TrackSource source, const std::string &track_name,
250 const std::shared_ptr<Track> &track);
251
264 void handleTrackUnsubscribed(const std::string &participant_identity,
265 TrackSource source,
266 const std::string &track_name);
267
268 // ---------------------------------------------------------------
269 // Data track callbacks
270 // ---------------------------------------------------------------
271
289 DataFrameCallbackId
290 addOnDataFrameCallback(const std::string &participant_identity,
291 const std::string &track_name,
292 DataFrameCallback callback);
293
302 void removeOnDataFrameCallback(DataFrameCallbackId id);
303
313 void handleDataTrackPublished(const std::shared_ptr<RemoteDataTrack> &track);
314
323 void handleDataTrackUnpublished(const std::string &sid);
324
331 void stopAll();
332
333private:
334 friend class SubscriptionThreadDispatcherTest;
335
339 struct CallbackKey {
340 std::string participant_identity;
341 TrackSource source;
342 std::string track_name;
343
344 bool operator==(const CallbackKey &o) const {
345 return participant_identity == o.participant_identity &&
346 source == o.source && track_name == o.track_name;
347 }
348 };
349
351 struct CallbackKeyHash {
352 std::size_t operator()(const CallbackKey &k) const {
353 auto h1 = std::hash<std::string>{}(k.participant_identity);
354 auto h2 = std::hash<int>{}(static_cast<int>(k.source));
355 auto h3 = std::hash<std::string>{}(k.track_name);
356 return h1 ^ (h2 << 1) ^ (h3 << 2);
357 }
358 };
359
361 struct ActiveReader {
362 std::shared_ptr<AudioStream> audio_stream;
363 std::shared_ptr<VideoStream> video_stream;
364 std::thread thread;
365 };
366
368 struct DataCallbackKey {
369 std::string participant_identity;
370 std::string track_name;
371
372 bool operator==(const DataCallbackKey &o) const {
373 return participant_identity == o.participant_identity &&
374 track_name == o.track_name;
375 }
376 };
377
379 struct DataCallbackKeyHash {
380 std::size_t operator()(const DataCallbackKey &k) const {
381 auto h1 = std::hash<std::string>{}(k.participant_identity);
382 auto h2 = std::hash<std::string>{}(k.track_name);
383 return h1 ^ (h2 << 1);
384 }
385 };
386
388 struct RegisteredDataCallback {
389 DataCallbackKey key;
390 DataFrameCallback callback;
391 };
392
394 struct ActiveDataReader {
395 std::shared_ptr<RemoteDataTrack> remote_track;
396 std::mutex sub_mutex;
397 std::shared_ptr<DataTrackStream> stream; // guarded by sub_mutex
398 std::thread thread;
399 };
400
402 struct RegisteredAudioCallback {
403 AudioFrameCallback callback;
404 AudioStream::Options options;
405 };
406
408 struct RegisteredVideoCallback {
409 VideoFrameCallback legacy_callback;
410 VideoFrameEventCallback event_callback;
411 VideoStream::Options options;
412 };
413
418 std::thread extractReaderThreadLocked(const CallbackKey &key);
419
423 std::thread startReaderLocked(const CallbackKey &key,
424 const std::shared_ptr<Track> &track);
425
430 std::thread startAudioReaderLocked(const CallbackKey &key,
431 const std::shared_ptr<Track> &track,
432 const AudioFrameCallback &cb,
433 const AudioStream::Options &opts);
434
439 std::thread startVideoReaderLocked(const CallbackKey &key,
440 const std::shared_ptr<Track> &track,
441 const RegisteredVideoCallback &callback);
442
445 std::thread extractDataReaderThreadLocked(DataFrameCallbackId id);
446
449 std::thread extractDataReaderThreadLocked(const DataCallbackKey &key);
450
453 std::thread
454 startDataReaderLocked(DataFrameCallbackId id, const DataCallbackKey &key,
455 const std::shared_ptr<RemoteDataTrack> &track,
456 const DataFrameCallback &cb);
457
459 mutable std::mutex lock_;
460
462 std::unordered_map<CallbackKey, RegisteredAudioCallback, CallbackKeyHash>
463 audio_callbacks_;
464
466 std::unordered_map<CallbackKey, RegisteredVideoCallback, CallbackKeyHash>
467 video_callbacks_;
468
470 std::unordered_map<CallbackKey, ActiveReader, CallbackKeyHash>
471 active_readers_;
472
474 DataFrameCallbackId next_data_callback_id_;
475
477 std::unordered_map<DataFrameCallbackId, RegisteredDataCallback>
478 data_callbacks_;
479
481 std::unordered_map<DataFrameCallbackId, std::shared_ptr<ActiveDataReader>>
482 active_data_readers_;
483
485 std::unordered_map<DataCallbackKey, std::shared_ptr<RemoteDataTrack>,
486 DataCallbackKeyHash>
487 remote_data_tracks_;
488
490 static constexpr int kMaxActiveReaders = 20;
491};
492
493} // namespace livekit
494
495#endif /* LIVEKIT_SUBSCRIPTION_THREAD_DISPATCHER_H */
Definition subscription_thread_dispatcher.h:88
void setOnAudioFrameCallback(const std::string &participant_identity, TrackSource source, AudioFrameCallback callback, const AudioStream::Options &opts={})
void handleDataTrackPublished(const std::shared_ptr< RemoteDataTrack > &track)
void handleTrackSubscribed(const std::string &participant_identity, TrackSource source, const std::string &track_name, const std::shared_ptr< Track > &track)
void handleDataTrackUnpublished(const std::string &sid)
void clearOnAudioFrameCallback(const std::string &participant_identity, TrackSource source)
void removeOnDataFrameCallback(DataFrameCallbackId id)
void clearOnVideoFrameCallback(const std::string &participant_identity, TrackSource source)
DataFrameCallbackId addOnDataFrameCallback(const std::string &participant_identity, const std::string &track_name, DataFrameCallback callback)
void setOnVideoFrameEventCallback(const std::string &participant_identity, const std::string &track_name, VideoFrameEventCallback callback, const VideoStream::Options &opts={})
SubscriptionThreadDispatcher()
Constructs an empty dispatcher with no registered callbacks or readers.
void setOnVideoFrameCallback(const std::string &participant_identity, TrackSource source, VideoFrameCallback callback, const VideoStream::Options &opts={})
void clearOnVideoFrameCallback(const std::string &participant_identity, const std::string &track_name)
void setOnAudioFrameCallback(const std::string &participant_identity, const std::string &track_name, AudioFrameCallback callback, const AudioStream::Options &opts={})
void handleTrackUnsubscribed(const std::string &participant_identity, TrackSource source, const std::string &track_name)
void clearOnAudioFrameCallback(const std::string &participant_identity, const std::string &track_name)
void setOnVideoFrameCallback(const std::string &participant_identity, const std::string &track_name, VideoFrameCallback callback, const VideoStream::Options &opts={})
~SubscriptionThreadDispatcher()
Stops all active readers and clears all registered callbacks.
Configuration options for AudioStream creation.
Definition audio_stream.h:67
Definition video_stream.h:68