Working with plugins

Agents includes a set of prebuilt plugins that make it easier to compose together agents. These plugins cover common tasks like converting speech to text or vice versa and running inference on a generative AI model.

We've standardized the plugin APIs for common tasks to facilitate easy switching between different providers. Having a consistent interface also makes it simpler for anyone to extend the framework and/or build new plugins for other providers.

Streams

One of the key concepts in the framework is a stream, which is used extensively for asynchronous, non-blocking data processing. A stream allows you to push a sequence inputs, and produces an async iterator of outputs.

Here's how you can use a stream in your application:

# create a stream
stream = plugin.stream()
# create a task to consume outputs
asyncio.create_task(consume(stream))
# push inputs into the stream
stream.push(input)
stream.push(input2)
await stream.flush()
async def consume(stream):
async for output in stream:
# do something with output
# close the stream to release resources after consuming output
await stream.aclose()

Speech-to-text (STT)

STT converts audio frames to a stream of text.

from livekit import agents, rtc
from livekit.plugins import deepgram
async def process_track(ctx: agents.JobContext, track: rtc.Track):
stt = deepgram.STT()
stt_stream = stt.stream()
audio_stream = rtc.AudioStream(track)
ctx.create_task(process_text_from_speech(stt_stream))
async for audio_frame in audio_stream:
stt_stream.push_frame(audio_frame)
await stt_stream.flush()
async def process_text_from_speech(self, stream):
async for event in stream:
if not event.is_final:
# received a partial result, STT result be updated as confidence increases
continue
# do something with final speech
pass
await stream.aclose()

VAD and StreamAdapter

Some providers or models, such as Whisper, do not support streaming input. In these cases, the application must determine when a chunk of audio represents a complete segment of speech. This can be accomplished using a VAD (voice activity detector) in conjunction with a StreamAdapter class we provide.

We can modify the example above to use a VAD and StreamAdapter:

from livekit import agents, rtc
from livekit.plugins import openai, silero
async def process_track(ctx: agents.JobContext, track: rtc.Track):
whisper_stt = openai.STT()
vad = silero.VAD()
vad_stream = vad.stream(min_silence_duration=1.0)
# StreamAdapter will buffer audio until VAD emits END_SPEAKING event
stt = agents.stt.StreamAdapter(whisper_stt, vad_stream)
stt_stream = stt.stream()
...

Text-to-speech (TTS)

TTS synthesizes text into audio frames.

from livekit import agents, rtc
from livekit.plugins import elevenlabs
from typing import AsyncIterable
ctx: agents.JobContext = ...
text_stream: AsyncIterable[str] = ...
audio_source = rtc.AudioSource(44100, 1)
track = rtc.LocalAudioTrack.create_audio_track("agent-audio", audio_source)
await ctx.room.local_participant.publish_track(track)
tts = elevenlabs.TTS(model_id="eleven_turbo_v2")
tts_stream = tts.stream()
# create a task to consume and publish audio frames
ctx.create_task(process_audio(tts_stream))
# push text into the stream, TTS stream will emit audio frames along with events
# indicating sentence (or segment) boundaries.
async for text in text_stream:
tts_stream.push_text(text)
await tts_stream.flush()
async def send_audio(ctx: agents.JobContext, audio_stream: AsyncIterable[rtc.AudioFrame]):
async for e in audio_stream:
if e.type == SynthesisEventType.STARTED:
# marks the beginning of each segment of audio
pass
elif e.type == SynthesisEventType.FINISHED:
# marks the end of each segment of audio, as determined by VAD
pass
elif e.type == SynthesisEventType.AUDIO:
await audio_source.capture_frame(e.audio.data)
await tts_stream.aclose()

Building your own

The plugin framework is designed to be extensible, allowing anyone to build their own plugin. Your plugin can integrate with various providers or directly load models for local inference.

By adopting the standard STT or TTS interfaces, you can abstract away implementation specifics and simplify switching between different providers in your agent code.