Receiving
Reading WebRTC tracks via LiveKit is done through streams, which are exposed in Python as AsyncIterators. The realtime SDK provides utilities for working with both audio and video tracks:
@ctx.room.on("track_subscribed")def on_track_subscribed(track: rtc.Track,publication: rtc.TrackPublication,participant: rtc.RemoteParticipant,):if track.kind == rtc.TrackKind.KIND_AUDIO:audio_stream = rtc.AudioStream(track)async for event in audio_stream:do_something(event.frame)elif track.kind == rtc.TrackKind.KIND_VIDEO:video_stream = rtc.VideoStream(track)async for event in video_stream:do_something(event.frame)
Like LiveKit's other realtime SDKs, the track_subscribed
event is triggered when a track is subscribed to. The agent can be configured to automatically subscribe to tracks when you accept the job. To manage subscriptions manually, set auto_subscribe
to AutoSubscribe.SUBSCRIBE_NONE
:
async def entrypoint_fnc(ctx: JobContext):await ctx.connect(# valid values are SUBSCRIBE_ALL, SUBSCRIBE_NONE, VIDEO_ONLY, AUDIO_ONLY# when omitted, it defaults to SUBSCRIBE_ALLauto_subscribe=AutoSubscribe.SUBSCRIBE_NONE,)
All subscribed tracks will be streamed to the machine. To ensure efficient use of resources, subscribe only to the tracks that your agent needs.
Working with video
Because different applications work with different video buffer encodings, LiveKit supports many and translates between them automatically. VideoFrame
provides the current video buffer type and a method to convert it to any of the other encodings:
@ctx.room.on("track_subscribed")def on_track_subscribed(track: rtc.Track,publication: rtc.TrackPublication,participant: rtc.RemoteParticipant,):if track.kind == rtc.TrackKind.KIND_VIDEO:video_stream = rtc.VideoStream(track)async for event in video_stream:video_frame = event.framecurrent_type = video_frame.typeframe_as_bgra = video_frame.convert(rtc.VideoBufferType.BGRA)# [...]
Publishing
Similarly, publishing data to the agent’s track is done by transmitting a continuous live feed. Audio streams carry raw PCM data at a specified sample rate and channel count, while video streams can transmit data in any of 11 buffer encodings.
Publishing audio
Publishing audio involves splitting the stream into audio frames of a configurable length. An internal buffer holds 50ms of queued audio to be sent to the realtime stack. The capture_frame
method, used to send new frames, is blocking and will not return control until the buffer has taken in the entire frame. This allows for easier interruption handling.
In order to publish an audio track, the sample rate and number of channels need to be determined beforehand, as well as the length (number of samples) of each frame. The following example transmits a constant 16-bit sine wave at 48kHz in 10ms long frames:
SAMPLE_RATE = 48000NUM_CHANNELS = 1 # mono audioAMPLITUDE = 2 ** 8 - 1SAMPLES_PER_CHANNEL = 480 # 10ms at 48kHzasync def entrypoint(ctx: JobContext):await ctx.connect()source = rtc.AudioSource(SAMPLE_RATE, NUM_CHANNELS)track = rtc.LocalAudioTrack.create_audio_track("example-track", source)# since the agent is a participant, our audio I/O is its "microphone"options = rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_MICROPHONE)# ctx.agent is an alias for ctx.room.local_participantpublication = await ctx.agent.publish_track(track, options)frequency = 440async def _sinewave():audio_frame = rtc.AudioFrame.create(SAMPLE_RATE, NUM_CHANNELS, SAMPLES_PER_CHANNEL)audio_data = np.frombuffer(audio_frame.data, dtype=np.int16)time = np.arange(SAMPLES_PER_CHANNEL) / SAMPLE_RATEtotal_samples = 0while True:time = (total_samples + np.arange(SAMPLES_PER_CHANNEL)) / SAMPLE_RATEsinewave = (AMPLITUDE * np.sin(2 * np.pi * frequency * time)).astype(np.int16)np.copyto(audio_data, sinewave)# send this frame to the trackawait source.capture_frame(frame)total_samples += samples_per_channel
When streaming finite audio (e.g. from a file), make sure the frame length isn't longer than the amount of samples left to stream, otherwise the end of the buffer will consist of noise.
Publishing video
When publishing video tracks, the frame rate and buffer encoding of the video need to be established beforehand. In this example, the Agent connects to the room and starts publishing a solid color frame at 10 frames per second:
WIDTH = 640HEIGHT = 480async def entrypoint(ctx: JobContext):await ctx.connect()source = rtc.VideoSource(WIDTH, HEIGHT)track = rtc.LocalVideoTrack.create_video_track("example-track", source)options = rtc.TrackPublishOptions(# since the agent is a participant, our video I/O is its "camera"source=rtc.TrackSource.SOURCE_CAMERA,video_encoding=rtc.VideoEncoding(max_framerate=10),audio_encoding=rtc.AudioEncoding(max_bitrate=48000),video_codec=rtc.VideoCodec.H264,)publication = await ctx.agent.publish_track(track, options)# this color is encoded as ARGB. when passed to VideoFrame it gets re-encoded.COLOR = [255, 255, 0, 0] # FFFF0000 REDasync def _draw_color():argb_frame = bytearray(WIDTH * HEIGHT * 4)while True:await asyncio.sleep(0.1) # 10 fpsargb_frame[:] = COLOR * WIDTH * HEIGHTframe = rtc.VideoFrame(WIDTH, HEIGHT, rtc.VideoBufferType.RGBA, argb_frame)# send this frame to the tracksource.capture_frame(frame)asyncio.create_task(_draw_color())
Although the frame being published is static, it is still necessary to stream it continuously, for the benefit of participants joining the room after the initial frame had been sent.
Unlike audio, video capture_frame
does not keep an internal buffer.