Receiving and publishing tracks

Receiving

Reading WebRTC tracks via LiveKit is done through streams, which are exposed in Python as AsyncIterators. The realtime SDK provides utilities for working with both audio and video tracks:

async def do_something(track: rtc.Track):
if track.kind == rtc.TrackKind.KIND_AUDIO:
audio_stream = rtc.AudioStream(track)
async for event in audio_stream:
# Do something here to process event.frame
pass
elif track.kind == rtc.TrackKind.KIND_VIDEO:
video_stream = rtc.VideoStream(track)
async for event in video_stream:
# Do something here to process event.frame
pass
@ctx.room.on("track_subscribed")
def on_track_subscribed(
track: rtc.Track,
publication: rtc.TrackPublication,
participant: rtc.RemoteParticipant,
):
if track.kind == rtc.TrackKind.KIND_AUDIO:
asyncio.create_task(do_something(track))
elif track.kind == rtc.TrackKind.KIND_VIDEO:
asyncio.create_task(do_something(track))

Like LiveKit's other realtime SDKs, the track_subscribed event is triggered when a track is subscribed to. The agent can be configured to automatically subscribe to tracks when you accept the job. To manage subscriptions manually, set auto_subscribe to AutoSubscribe.SUBSCRIBE_NONE:

async def entrypoint_fnc(ctx: JobContext):
await ctx.connect(
# valid values are SUBSCRIBE_ALL, SUBSCRIBE_NONE, VIDEO_ONLY, AUDIO_ONLY
# when omitted, it defaults to SUBSCRIBE_ALL
auto_subscribe=AutoSubscribe.SUBSCRIBE_NONE,
)
note:

All subscribed tracks will be streamed to the machine. To ensure efficient use of resources, subscribe only to the tracks that your agent needs.

Working with video

Because different applications work with different video buffer encodings, LiveKit supports many and translates between them automatically. VideoFrame provides the current video buffer type and a method to convert it to any of the other encodings:

async def handle_video(track: rtc.Track):
video_stream = rtc.VideoStream(track)
async for event in video_stream:
video_frame = event.frame
current_type = video_frame.type
frame_as_bgra = video_frame.convert(rtc.VideoBufferType.BGRA)
# [...]
@ctx.room.on("track_subscribed")
def on_track_subscribed(
track: rtc.Track,
publication: rtc.TrackPublication,
participant: rtc.RemoteParticipant,
):
if track.kind == rtc.TrackKind.KIND_VIDEO:
asyncio.create_task(handle_video(track))

Publishing

Similarly, publishing data to the agent’s track is done by transmitting a continuous live feed. Audio streams carry raw PCM data at a specified sample rate and channel count, while video streams can transmit data in any of 11 buffer encodings.

Publishing audio

Publishing audio involves splitting the stream into audio frames of a configurable length. An internal buffer holds 50ms of queued audio to be sent to the realtime stack. The capture_frame method, used to send new frames, is blocking and will not return control until the buffer has taken in the entire frame. This allows for easier interruption handling.

In order to publish an audio track, the sample rate and number of channels need to be determined beforehand, as well as the length (number of samples) of each frame. The following example transmits a constant 16-bit sine wave at 48kHz in 10ms long frames:

SAMPLE_RATE = 48000
NUM_CHANNELS = 1 # mono audio
AMPLITUDE = 2 ** 8 - 1
SAMPLES_PER_CHANNEL = 480 # 10ms at 48kHz
async def entrypoint(ctx: JobContext):
await ctx.connect()
source = rtc.AudioSource(SAMPLE_RATE, NUM_CHANNELS)
track = rtc.LocalAudioTrack.create_audio_track("example-track", source)
# since the agent is a participant, our audio I/O is its "microphone"
options = rtc.TrackPublishOptions(source=rtc.TrackSource.SOURCE_MICROPHONE)
# ctx.agent is an alias for ctx.room.local_participant
publication = await ctx.agent.publish_track(track, options)
frequency = 440
async def _sinewave():
audio_frame = rtc.AudioFrame.create(SAMPLE_RATE, NUM_CHANNELS, SAMPLES_PER_CHANNEL)
audio_data = np.frombuffer(audio_frame.data, dtype=np.int16)
time = np.arange(SAMPLES_PER_CHANNEL) / SAMPLE_RATE
total_samples = 0
while True:
time = (total_samples + np.arange(SAMPLES_PER_CHANNEL)) / SAMPLE_RATE
sinewave = (AMPLITUDE * np.sin(2 * np.pi * frequency * time)).astype(np.int16)
np.copyto(audio_data, sinewave)
# send this frame to the track
await source.capture_frame(frame)
total_samples += samples_per_channel
warning:

When streaming finite audio (e.g. from a file), make sure the frame length isn't longer than the amount of samples left to stream, otherwise the end of the buffer will consist of noise.

Publishing video

When publishing video tracks, the frame rate and buffer encoding of the video need to be established beforehand. In this example, the Agent connects to the room and starts publishing a solid color frame at 10 frames per second:

WIDTH = 640
HEIGHT = 480
async def entrypoint(ctx: JobContext):
await ctx.connect()
source = rtc.VideoSource(WIDTH, HEIGHT)
track = rtc.LocalVideoTrack.create_video_track("example-track", source)
options = rtc.TrackPublishOptions(
# since the agent is a participant, our video I/O is its "camera"
source=rtc.TrackSource.SOURCE_CAMERA,
video_encoding=rtc.VideoEncoding(max_framerate=10),
audio_encoding=rtc.AudioEncoding(max_bitrate=48000),
video_codec=rtc.VideoCodec.H264,
)
publication = await ctx.agent.publish_track(track, options)
# this color is encoded as ARGB. when passed to VideoFrame it gets re-encoded.
COLOR = [255, 255, 0, 0]; # FFFF0000 RED
async def _draw_color():
argb_frame = bytearray(WIDTH * HEIGHT * 4)
while True:
await asyncio.sleep(0.1) # 10 fps
argb_frame[:] = COLOR * WIDTH * HEIGHT
frame = rtc.VideoFrame(WIDTH, HEIGHT, rtc.VideoBufferType.RGBA, argb_frame)
# send this frame to the track
source.capture_frame(frame)
asyncio.create_task(_draw_color())
note:

Although the frame being published is static, it is still necessary to stream it continuously, for the benefit of participants joining the room after the initial frame had been sent.

note:

Unlike audio, video capture_frame does not keep an internal buffer.