We've standardized the plugin APIs for common tasks to facilitate easy switching between different providers. Having a consistent interface also makes it simpler for anyone to extend the framework and/or build new plugins for other providers.
Streams
One of the key concepts in the framework is a stream
, which is used extensively for asynchronous, non-blocking data processing. A stream allows you to push a sequence inputs, and produces an async iterator of outputs.
Here's how you can use a stream in your application:
# create a streamstream = plugin.stream()# create a task to consume outputsasyncio.create_task(consume(stream))# push inputs into the streamstream.push(input)stream.push(input2)# Close the stream. 'wait=True' means the stream will finish consuming all queued input# and produce output for it. If 'wait=False', queued input would be skipped and# iteration in the consumer would end as soon as possible.await stream.aclose(wait=True)# Consume in another taskasync def consume(stream):async for output in stream:# do something with output
Speech-to-text (STT)
STT converts audio frames to a stream of text.
from livekit import agents, rtcfrom livekit.plugins import deepgramasync def process_track(ctx: agents.JobContext, track: rtc.Track):stt = deepgram.STT()stt_stream = stt.stream()audio_stream = rtc.AudioStream(track)ctx.create_task(process_text_from_speech(stt_stream))async for audio_event in audio_stream:stt_stream.push_frame(audio_event.frame)await stt_stream.aclose(wait=True)async def process_text_from_speech(self, stream):async for event in stream:if not event.is_final:# received a partial result, STT result be updated as confidence increasescontinue# do something with final speechpass
VAD and StreamAdapter
Some providers or models, such as Whisper, do not support streaming input. In these cases, the application must determine when a chunk of audio represents a complete segment of speech. This can be accomplished using a VAD (voice activity detector) in conjunction with a StreamAdapter
class we provide.
We can modify the example above to use a VAD and StreamAdapter:
from livekit import agents, rtcfrom livekit.plugins import openai, sileroasync def process_track(ctx: agents.JobContext, track: rtc.Track):whisper_stt = openai.STT()vad = silero.VAD()vad_stream = vad.stream(min_silence_duration=1.0)# StreamAdapter will buffer audio until VAD emits END_SPEAKING eventstt = agents.stt.StreamAdapter(whisper_stt, vad_stream)stt_stream = stt.stream()...
Text-to-speech (TTS)
TTS synthesizes text into audio frames.
from livekit import agents, rtcfrom livekit.agents.tty import SynthesisEvent, SynthesisEventTypefrom livekit.plugins import elevenlabsfrom typing import AsyncIterablectx: agents.JobContext = ...text_stream: AsyncIterable[str] = ...audio_source = rtc.AudioSource(44100, 1)track = rtc.LocalAudioTrack.create_audio_track("agent-audio", audio_source)await ctx.room.local_participant.publish_track(track)tts = elevenlabs.TTS(model_id="eleven_turbo_v2")tts_stream = tts.stream()# create a task to consume and publish audio framesctx.create_task(send_audio(tts_stream))# push text into the stream, TTS stream will emit audio frames along with events# indicating sentence (or segment) boundaries.async for text in text_stream:tts_stream.push_text(text)await tts_stream.aclose(wait=True)async def send_audio(audio_stream: AsyncIterable[SynthesisEvent]):async for e in audio_stream:if e.type == SynthesisEventType.STARTED:# marks the beginning of each segment of audiopasselif e.type == SynthesisEventType.FINISHED:# marks the end of each segment of audio, as determined by VADpasselif e.type == SynthesisEventType.AUDIO:await audio_source.capture_frame(e.audio.data)
Building your own
The plugin framework is designed to be extensible, allowing anyone to build their own plugin. Your plugin can integrate with various providers or directly load models for local inference.
By adopting the standard STT or TTS interfaces, you can abstract away implementation specifics and simplify switching between different providers in your agent code.