LiveKit C++ Client SDK v1.1.0
Real-time audio/video/data SDK for C++
Loading...
Searching...
No Matches
subscription_thread_dispatcher.h
1/*
2 * Copyright 2025 LiveKit
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#pragma once
18
19#include <cstdint>
20#include <functional>
21#include <memory>
22#include <mutex>
23#include <optional>
24#include <string>
25#include <thread>
26#include <unordered_map>
27#include <vector>
28
29#include "livekit/audio_stream.h"
30#include "livekit/video_stream.h"
31#include "livekit/visibility.h"
32
33namespace livekit {
34
35class AudioFrame;
36class DataTrackStream;
37class RemoteDataTrack;
38class Track;
39class VideoFrame;
40
43using AudioFrameCallback = std::function<void(const AudioFrame&)>;
44
47using VideoFrameCallback = std::function<void(const VideoFrame& frame, std::int64_t timestamp_us)>;
48
51using VideoFrameEventCallback = std::function<void(const VideoFrameEvent&)>;
52
58 std::function<void(const std::vector<std::uint8_t>& payload, std::optional<std::uint64_t> user_timestamp)>;
59
62using DataFrameCallbackId = std::uint64_t;
63
85public:
88
91
103 void setOnAudioFrameCallback(const std::string& participant_identity, const std::string& track_name,
104 AudioFrameCallback callback, const AudioStream::Options& opts = {});
105
117 void setOnVideoFrameCallback(const std::string& participant_identity, const std::string& track_name,
118 VideoFrameCallback callback, const VideoStream::Options& opts = {});
119
133 void setOnVideoFrameEventCallback(const std::string& participant_identity, const std::string& track_name,
134 VideoFrameEventCallback callback, const VideoStream::Options& opts = {});
135
143 void clearOnAudioFrameCallback(const std::string& participant_identity, const std::string& track_name);
144
152 void clearOnVideoFrameCallback(const std::string& participant_identity, const std::string& track_name);
153
166 void handleTrackSubscribed(const std::string& participant_identity, const std::string& track_name,
167 const std::shared_ptr<Track>& track);
168
179 void handleTrackUnsubscribed(const std::string& participant_identity, TrackSource source,
180 const std::string& track_name);
181
182 // ---------------------------------------------------------------
183 // Data track callbacks
184 // ---------------------------------------------------------------
185
201 DataFrameCallbackId addOnDataFrameCallback(const std::string& participant_identity, const std::string& track_name,
202 DataFrameCallback callback);
203
211
219 void handleDataTrackPublished(const std::shared_ptr<RemoteDataTrack>& track);
220
227 void handleDataTrackUnpublished(const std::string& sid);
228
233 void stopAll();
234
235private:
236 friend class SubscriptionThreadDispatcherTest;
237
239 struct CallbackKey {
240 std::string participant_identity;
241 std::string track_name;
242
243 bool operator==(const CallbackKey& o) const {
244 return participant_identity == o.participant_identity && track_name == o.track_name;
245 }
246 };
247
249 struct CallbackKeyHash {
250 std::size_t operator()(const CallbackKey& k) const {
251 auto h1 = std::hash<std::string>{}(k.participant_identity);
252 auto h2 = std::hash<std::string>{}(k.track_name);
253 return h1 ^ (h2 << 1);
254 }
255 };
256
258 struct ActiveReader {
259 std::shared_ptr<AudioStream> audio_stream;
260 std::shared_ptr<VideoStream> video_stream;
261 std::thread thread;
262 };
263
265 struct DataCallbackKey {
266 std::string participant_identity;
267 std::string track_name;
268
269 bool operator==(const DataCallbackKey& o) const {
270 return participant_identity == o.participant_identity && track_name == o.track_name;
271 }
272 };
273
275 struct DataCallbackKeyHash {
276 std::size_t operator()(const DataCallbackKey& k) const {
277 auto h1 = std::hash<std::string>{}(k.participant_identity);
278 auto h2 = std::hash<std::string>{}(k.track_name);
279 return h1 ^ (h2 << 1);
280 }
281 };
282
284 struct RegisteredDataCallback {
285 DataCallbackKey key;
286 DataFrameCallback callback;
287 };
288
290 struct ActiveDataReader {
291 std::shared_ptr<RemoteDataTrack> remote_track;
292 std::mutex sub_mutex;
293 std::shared_ptr<DataTrackStream> stream; // guarded by sub_mutex
294 std::thread thread;
295 };
296
298 struct RegisteredAudioCallback {
299 AudioFrameCallback callback;
300 AudioStream::Options options;
301 };
302
304 struct RegisteredVideoCallback {
305 VideoFrameCallback legacy_callback;
306 VideoFrameEventCallback event_callback;
307 VideoStream::Options options;
308 };
309
314 std::thread extractReaderThreadLocked(const CallbackKey& key);
315
319 std::thread startReaderLocked(const CallbackKey& key, const std::shared_ptr<Track>& track);
320
325 std::thread startAudioReaderLocked(const CallbackKey& key, const std::shared_ptr<Track>& track,
326 const AudioFrameCallback& cb, const AudioStream::Options& opts);
327
332 std::thread startVideoReaderLocked(const CallbackKey& key, const std::shared_ptr<Track>& track,
333 const RegisteredVideoCallback& callback);
334
337 std::thread extractDataReaderThreadLocked(DataFrameCallbackId id);
338
341 std::thread extractDataReaderThreadLocked(const DataCallbackKey& key);
342
345 std::thread startDataReaderLocked(DataFrameCallbackId id, const DataCallbackKey& key,
346 const std::shared_ptr<RemoteDataTrack>& track, const DataFrameCallback& cb);
347
349 mutable std::mutex lock_;
350
352 std::unordered_map<CallbackKey, RegisteredAudioCallback, CallbackKeyHash> audio_callbacks_;
353
355 std::unordered_map<CallbackKey, RegisteredVideoCallback, CallbackKeyHash> video_callbacks_;
356
358 std::unordered_map<CallbackKey, ActiveReader, CallbackKeyHash> active_readers_;
359
361 DataFrameCallbackId next_data_callback_id_{0};
362
364 std::unordered_map<DataFrameCallbackId, RegisteredDataCallback> data_callbacks_;
365
367 std::unordered_map<DataFrameCallbackId, std::shared_ptr<ActiveDataReader>> active_data_readers_;
368
370 std::unordered_map<DataCallbackKey, std::shared_ptr<RemoteDataTrack>, DataCallbackKeyHash> remote_data_tracks_;
371
373 static constexpr int kMaxActiveReaders = 20;
374};
375
376} // namespace livekit
Represents a raw PCM audio frame with interleaved int16 samples.
Definition audio_frame.h:37
Owns subscription callback registration and per-subscription reader threads.
Definition subscription_thread_dispatcher.h:84
void handleDataTrackPublished(const std::shared_ptr< RemoteDataTrack > &track)
Notify the dispatcher that a remote data track has been published.
void handleTrackSubscribed(const std::string &participant_identity, const std::string &track_name, const std::shared_ptr< Track > &track)
Start or restart reader dispatch for a newly subscribed remote track.
void handleDataTrackUnpublished(const std::string &sid)
Notify the dispatcher that a remote data track has been unpublished.
void stopAll()
Stop all readers and clear all callback registrations.
void removeOnDataFrameCallback(DataFrameCallbackId id)
Remove a data frame callback previously registered via addOnDataFrameCallback().
DataFrameCallbackId addOnDataFrameCallback(const std::string &participant_identity, const std::string &track_name, DataFrameCallback callback)
Add a callback for data frames from a specific remote participant's data track.
void setOnVideoFrameEventCallback(const std::string &participant_identity, const std::string &track_name, VideoFrameEventCallback callback, const VideoStream::Options &opts={})
Register or replace a rich video frame event callback for a remote subscription.
SubscriptionThreadDispatcher()
Constructs an empty dispatcher with no registered callbacks or readers.
void clearOnVideoFrameCallback(const std::string &participant_identity, const std::string &track_name)
Remove a video callback registration and stop any active reader.
void setOnAudioFrameCallback(const std::string &participant_identity, const std::string &track_name, AudioFrameCallback callback, const AudioStream::Options &opts={})
Register or replace an audio frame callback for a remote subscription.
void handleTrackUnsubscribed(const std::string &participant_identity, TrackSource source, const std::string &track_name)
Stop reader dispatch for an unsubscribed remote track.
void clearOnAudioFrameCallback(const std::string &participant_identity, const std::string &track_name)
Remove an audio callback registration and stop any active reader.
void setOnVideoFrameCallback(const std::string &participant_identity, const std::string &track_name, VideoFrameCallback callback, const VideoStream::Options &opts={})
Register or replace a video frame callback for a remote subscription.
~SubscriptionThreadDispatcher()
Stops all active readers and clears all registered callbacks.
Public SDK representation of a video frame.
Definition video_frame.h:48
Public API for the LiveKit C++ Client SDK.
Definition audio_frame.h:25
std::function< void(const VideoFrameEvent &)> VideoFrameEventCallback
Callback type for incoming video frame events.
Definition subscription_thread_dispatcher.h:51
std::uint64_t DataFrameCallbackId
Opaque identifier returned by addOnDataFrameCallback, used to remove an individual subscription via r...
Definition subscription_thread_dispatcher.h:62
std::function< void(const std::vector< std::uint8_t > &payload, std::optional< std::uint64_t > user_timestamp)> DataFrameCallback
Callback type for incoming data track frames.
Definition subscription_thread_dispatcher.h:58
TrackSource
Source category for a published track.
Definition track.h:43
std::function< void(const AudioFrame &)> AudioFrameCallback
Callback type for incoming audio frames.
Definition subscription_thread_dispatcher.h:43
std::function< void(const VideoFrame &frame, std::int64_t timestamp_us)> VideoFrameCallback
Callback type for incoming video frames.
Definition subscription_thread_dispatcher.h:47
Configuration options for AudioStream creation.
Definition audio_stream.h:66
A single video frame event delivered by VideoStream::read().
Definition video_stream.h:37
Options for creating a decoded video frame stream.
Definition video_stream.h:69