We've standardized the plugin APIs for common tasks to facilitate easy switching between different providers. Having a consistent interface also makes it simpler for anyone to extend the framework and build new plugins for other providers.
Streams
One of the key concepts in the framework is a stream
, which is used extensively for asynchronous, non-blocking data processing. A stream allows you to push a sequence inputs, and produces an async iterator of outputs.
Here's how you can use a stream in your application:
# create a streamstream = plugin.stream()# create a task to consume outputsasyncio.create_task(consume(stream))# push inputs into the streamstream.push(some_input)stream.push(more_input)# mark end of segmentstream.flush()# signal input is done, once all of the output is consumed, the iterator will endstream.end_input()# Consume in another taskasync def consume(stream):async for output in stream:# do something with output# close the stream for cleanupawait stream.aclose()
Interrupting a stream
Streams are designed to be interruptible. To stop processing, call stream.aclose()
to close the stream.
Speech-to-text (STT)
STT converts audio frames to a stream of text.
from livekit import agents, rtcfrom livekit.plugins import deepgramfrom livekit.agents.stt import SpeechEventType, SpeechEventfrom typing import AsyncIterableasync def process_track(ctx: agents.JobContext, track: rtc.Track):stt = deepgram.STT()stt_stream = stt.stream()audio_stream = rtc.AudioStream(track)ctx.create_task(process_text_from_speech(stt_stream))async for audio_event in audio_stream:stt_stream.push_frame(audio_event.frame)stt_stream.end_input()async def process_text_from_speech(self, stream: AsyncIterable[SpeechEvent]):async for event in stream:if event.type == SpeechEventType.FINAL_TRANSCRIPT:text = event.alternatives[0].text# Do something with textelif event.type == SpeechEventType.INTERIM_TRANSCRIPT:passelif event.type == SpeechEventType.START_OF_SPEECH:passelif event.type == SpeechEventType.END_OF_SPEECH:passawait stream.aclose()
VAD and StreamAdapter
Some providers or models, such as Whisper, do not support streaming input. In these cases, the application must determine when a chunk of audio represents a complete segment of speech. This can be accomplished using a VAD (voice activity detector) in conjunction with a StreamAdapter
class we provide.
We can modify the example above to use a VAD and StreamAdapter:
from livekit import agents, rtcfrom livekit.plugins import openai, sileroasync def process_track(ctx: agents.JobContext, track: rtc.Track):whisper_stt = openai.STT()vad = silero.VAD.load(min_speech_duration=0.1,min_silence_duration=0.5,)vad_stream = vad.stream()# StreamAdapter will buffer audio until VAD emits END_SPEAKING eventstt = agents.stt.StreamAdapter(whisper_stt, vad_stream)stt_stream = stt.stream()...
Text-to-speech (TTS)
TTS synthesizes text into audio frames.
from livekit import agents, rtcfrom livekit.agents.tts import SynthesizedAudiofrom livekit.plugins import elevenlabsfrom typing import AsyncIterablectx: agents.JobContext = ...text_stream: AsyncIterable[str] = ...audio_source = rtc.AudioSource(44100, 1)track = rtc.LocalAudioTrack.create_audio_track("agent-audio", audio_source)await ctx.room.local_participant.publish_track(track)tts = elevenlabs.TTS(model_id="eleven_turbo_v2")tts_stream = tts.stream()# create a task to consume and publish audio framesctx.create_task(send_audio(tts_stream))# push text into the stream, TTS stream will emit audio frames along with events# indicating sentence (or segment) boundaries.async for text in text_stream:tts_stream.push_text(text)tts_stream.end_input()async def send_audio(audio_stream: AsyncIterable[SynthesizedAudio]):async for a in audio_stream:await audio_source.capture_frame(e.audio.frame)
Building your own
The plugin framework is designed to be extensible, allowing anyone to build their own plugin. Your plugin can integrate with various providers or directly load models for local inference.
By adopting the standard STT or TTS interfaces, you can abstract away implementation specifics and simplify switching between different providers in your agent code.
Available LiveKit plugins
LiveKit provides the following plugins. The documentation for each plugin includes the environment variables to set to use the plugin, for example, an API key.
Plugin | Feature |
---|---|
livekit-plugins-anthropic | LLM |
livekit-plugins-azure | STT, TTS |
livekit-plugins-browser | Chrome browser |
livekit-plugins-cartesia | TTS |
livekit-plugins-deepgram | STT |
livekit-plugins-elevenlabs | TTS |
livekit-plugins-google | STT, TTS |
livekit-plugins-nltk | Utilities for working with text |
LLM, STT, TTS, multimodal The OpenAI plugin also includes methods that allow you to use any OpenAI API compatible LLM. To learn more, see OpenAI compatible LLMs. | |
livekit-plugins-rag | Vector retrieval with Annoy |
VAD |