Working with plugins

Agents includes a set of prebuilt plugins that make it easier to compose together agents. These plugins cover common tasks like converting speech to text or vice versa and running inference on a generative AI model.

We've standardized the plugin APIs for common tasks to facilitate easy switching between different providers. Having a consistent interface also makes it simpler for anyone to extend the framework and build new plugins for other providers.

Streams

One of the key concepts in the framework is a stream, which is used extensively for asynchronous, non-blocking data processing. A stream allows you to push a sequence inputs, and produces an async iterator of outputs.

Here's how you can use a stream in your application:

# create a stream
stream = plugin.stream()
# create a task to consume outputs
asyncio.create_task(consume(stream))
# push inputs into the stream
stream.push(some_input)
stream.push(more_input)
# mark end of segment
stream.flush()
# signal input is done, once all of the output is consumed, the iterator will end
stream.end_input()
# Consume in another task
async def consume(stream):
async for output in stream:
# do something with output
# close the stream for cleanup
await stream.aclose()

Interrupting a stream

Streams are designed to be interruptable. To stop processing, call stream.aclose() to close the stream.

Speech-to-text (STT)

STT converts audio frames to a stream of text.

from livekit import agents, rtc
from livekit.plugins import deepgram
from livekit.agents.stt import SpeechEventType, SpeechEvent
from typing import AsyncIterable
async def process_track(ctx: agents.JobContext, track: rtc.Track):
stt = deepgram.STT()
stt_stream = stt.stream()
audio_stream = rtc.AudioStream(track)
ctx.create_task(process_text_from_speech(stt_stream))
async for audio_event in audio_stream:
stt_stream.push_frame(audio_event.frame)
stt_stream.end_input()
async def process_text_from_speech(self, stream: AsyncIterable[SpeechEvent]):
async for event in stream:
if event.type == SpeechEventType.FINAL_TRANSCRIPT:
text = event.alternatives[0].text
# Do something with text
elif event.type == SpeechEventType.INTERIM_TRANSCRIPT:
pass
elif event.type == SpeechEventType.START_OF_SPEECH:
pass
elif event.type == SpeechEventType.END_OF_SPEECH:
pass
await stream.aclose()

VAD and StreamAdapter

Some providers or models, such as Whisper, do not support streaming input. In these cases, the application must determine when a chunk of audio represents a complete segment of speech. This can be accomplished using a VAD (voice activity detector) in conjunction with a StreamAdapter class we provide.

We can modify the example above to use a VAD and StreamAdapter:

from livekit import agents, rtc
from livekit.plugins import openai, silero
async def process_track(ctx: agents.JobContext, track: rtc.Track):
whisper_stt = openai.STT()
vad = silero.VAD.load(
min_speech_duration=0.1,
min_silence_duration=0.5,
)
vad_stream = vad.stream()
# StreamAdapter will buffer audio until VAD emits END_SPEAKING event
stt = agents.stt.StreamAdapter(whisper_stt, vad_stream)
stt_stream = stt.stream()
...

Text-to-speech (TTS)

TTS synthesizes text into audio frames.

from livekit import agents, rtc
from livekit.agents.tts import SynthesizedAudio
from livekit.plugins import elevenlabs
from typing import AsyncIterable
ctx: agents.JobContext = ...
text_stream: AsyncIterable[str] = ...
audio_source = rtc.AudioSource(44100, 1)
track = rtc.LocalAudioTrack.create_audio_track("agent-audio", audio_source)
await ctx.room.local_participant.publish_track(track)
tts = elevenlabs.TTS(model_id="eleven_turbo_v2")
tts_stream = tts.stream()
# create a task to consume and publish audio frames
ctx.create_task(send_audio(tts_stream))
# push text into the stream, TTS stream will emit audio frames along with events
# indicating sentence (or segment) boundaries.
async for text in text_stream:
tts_stream.push_text(text)
tts_stream.end_input()
async def send_audio(audio_stream: AsyncIterable[SynthesizedAudio]):
async for a in audio_stream:
await audio_source.capture_frame(e.audio.frame)

Building your own

The plugin framework is designed to be extensible, allowing anyone to build their own plugin. Your plugin can integrate with various providers or directly load models for local inference.

By adopting the standard STT or TTS interfaces, you can abstract away implementation specifics and simplify switching between different providers in your agent code.

Available LiveKit plugins

LiveKit provides the following plugins. The documentation for each plugin includes the environment variables to set to use the plugin, for example, an API key.