Module livekit.plugins.elevenlabs

ElevenLabs plugin for LiveKit Agents

See https://docs.livekit.io/agents/integrations/tts/elevenlabs/ for more information.

Classes

class PronunciationDictionaryLocator (pronunciation_dictionary_id: str, version_id: str)
Expand source code
@dataclass
class PronunciationDictionaryLocator:
    pronunciation_dictionary_id: str
    version_id: str

PronunciationDictionaryLocator(pronunciation_dictionary_id: 'str', version_id: 'str')

Instance variables

var pronunciation_dictionary_id : str
var version_id : str
class STT (*,
api_key: NotGivenOr[str] = NOT_GIVEN,
base_url: NotGivenOr[str] = NOT_GIVEN,
language_code: NotGivenOr[str] = NOT_GIVEN,
tag_audio_events: bool = True,
use_realtime: bool = False,
sample_rate: STTRealtimeSampleRates = 16000,
server_vad: NotGivenOr[VADOptions] = NOT_GIVEN,
http_session: aiohttp.ClientSession | None = None)
Expand source code
class STT(stt.STT):
    def __init__(
        self,
        *,
        api_key: NotGivenOr[str] = NOT_GIVEN,
        base_url: NotGivenOr[str] = NOT_GIVEN,
        language_code: NotGivenOr[str] = NOT_GIVEN,
        tag_audio_events: bool = True,
        use_realtime: bool = False,
        sample_rate: STTRealtimeSampleRates = 16000,
        server_vad: NotGivenOr[VADOptions] = NOT_GIVEN,
        http_session: aiohttp.ClientSession | None = None,
    ) -> None:
        """
        Create a new instance of ElevenLabs STT.

        Args:
            api_key (NotGivenOr[str]): ElevenLabs API key. Can be set via argument or `ELEVEN_API_KEY` environment variable.
            base_url (NotGivenOr[str]): Custom base URL for the API. Optional.
            language_code (NotGivenOr[str]): Language code for the STT model. Optional.
            tag_audio_events (bool): Whether to tag audio events like (laughter), (footsteps), etc. in the transcription.
                Only supported for Scribe v1 model. Default is True.
            use_realtime (bool): Whether to use "scribe_v2_realtime" model for streaming mode. Default is False.
            sample_rate (STTRealtimeSampleRates): Audio sample rate in Hz. Default is 16000.
            server_vad (NotGivenOr[VADOptions]): Server-side VAD options, only supported for Scribe v2 realtime model.
            http_session (aiohttp.ClientSession | None): Custom HTTP session for API requests. Optional.
        """  # noqa: E501

        super().__init__(capabilities=STTCapabilities(streaming=use_realtime, interim_results=True))

        if not use_realtime and is_given(server_vad):
            logger.warning("Server-side VAD is only supported for Scribe v2 realtime model")

        elevenlabs_api_key = api_key if is_given(api_key) else os.environ.get("ELEVEN_API_KEY")
        if not elevenlabs_api_key:
            raise ValueError(
                "ElevenLabs API key is required, either as argument or "
                "set ELEVEN_API_KEY environmental variable"
            )
        self._opts = STTOptions(
            api_key=elevenlabs_api_key,
            base_url=base_url if is_given(base_url) else API_BASE_URL_V1,
            language_code=language_code or None,
            tag_audio_events=tag_audio_events,
            sample_rate=sample_rate,
            server_vad=server_vad,
        )
        self._session = http_session
        self._streams = weakref.WeakSet[SpeechStream]()

    @property
    def model(self) -> str:
        return "Scribe"

    @property
    def provider(self) -> str:
        return "ElevenLabs"

    def _ensure_session(self) -> aiohttp.ClientSession:
        if not self._session:
            self._session = http_context.http_session()

        return self._session

    async def _recognize_impl(
        self,
        buffer: AudioBuffer,
        *,
        language: NotGivenOr[str] = NOT_GIVEN,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    ) -> stt.SpeechEvent:
        if is_given(language):
            self._opts.language_code = language

        wav_bytes = rtc.combine_audio_frames(buffer).to_wav_bytes()
        form = aiohttp.FormData()
        form.add_field("file", wav_bytes, filename="audio.wav", content_type="audio/x-wav")
        form.add_field("model_id", "scribe_v1")
        form.add_field("tag_audio_events", str(self._opts.tag_audio_events).lower())
        if self._opts.language_code:
            form.add_field("language_code", self._opts.language_code)

        try:
            async with self._ensure_session().post(
                f"{self._opts.base_url}/speech-to-text",
                data=form,
                headers={AUTHORIZATION_HEADER: self._opts.api_key},
            ) as response:
                response_json = await response.json()
                if response.status != 200:
                    raise APIStatusError(
                        message=response_json.get("detail", "Unknown ElevenLabs error"),
                        status_code=response.status,
                        request_id=None,
                        body=response_json,
                    )
                extracted_text = response_json.get("text")
                language_code = response_json.get("language_code")
                speaker_id = None
                start_time, end_time = 0, 0
                words = response_json.get("words")
                if words:
                    speaker_id = words[0].get("speaker_id", None)
                    start_time = min(w.get("start", 0) for w in words)
                    end_time = max(w.get("end", 0) for w in words)

        except asyncio.TimeoutError as e:
            raise APITimeoutError() from e
        except aiohttp.ClientResponseError as e:
            raise APIStatusError(
                message=e.message,
                status_code=e.status,
                request_id=None,
                body=None,
            ) from e
        except Exception as e:
            raise APIConnectionError() from e

        return self._transcription_to_speech_event(
            language_code=language_code,
            text=extracted_text,
            start_time=start_time,
            end_time=end_time,
            speaker_id=speaker_id,
        )

    def _transcription_to_speech_event(
        self,
        language_code: str,
        text: str,
        start_time: float,
        end_time: float,
        speaker_id: str | None,
    ) -> stt.SpeechEvent:
        return stt.SpeechEvent(
            type=SpeechEventType.FINAL_TRANSCRIPT,
            alternatives=[
                stt.SpeechData(
                    text=text,
                    language=language_code,
                    speaker_id=speaker_id,
                    start_time=start_time,
                    end_time=end_time,
                )
            ],
        )

    def update_options(
        self,
        *,
        tag_audio_events: NotGivenOr[bool] = NOT_GIVEN,
        server_vad: NotGivenOr[VADOptions] = NOT_GIVEN,
    ) -> None:
        if is_given(tag_audio_events):
            self._opts.tag_audio_events = tag_audio_events

        if is_given(server_vad):
            self._opts.server_vad = server_vad

        for stream in self._streams:
            stream.update_options(server_vad=server_vad)

    def stream(
        self,
        *,
        language: NotGivenOr[str] = NOT_GIVEN,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    ) -> SpeechStream:
        stream = SpeechStream(
            stt=self,
            opts=self._opts,
            conn_options=conn_options,
            language=language if is_given(language) else self._opts.language_code,
            http_session=self._ensure_session(),
        )
        self._streams.add(stream)
        return stream

Helper class that provides a standard way to create an ABC using inheritance.

Create a new instance of ElevenLabs STT.

Args

api_key : NotGivenOr[str]
ElevenLabs API key. Can be set via argument or ELEVEN_API_KEY environment variable.
base_url : NotGivenOr[str]
Custom base URL for the API. Optional.
language_code : NotGivenOr[str]
Language code for the STT model. Optional.
tag_audio_events : bool
Whether to tag audio events like (laughter), (footsteps), etc. in the transcription. Only supported for Scribe v1 model. Default is True.
use_realtime : bool
Whether to use "scribe_v2_realtime" model for streaming mode. Default is False.
sample_rate : STTRealtimeSampleRates
Audio sample rate in Hz. Default is 16000.
server_vad : NotGivenOr[VADOptions]
Server-side VAD options, only supported for Scribe v2 realtime model.
http_session : aiohttp.ClientSession | None
Custom HTTP session for API requests. Optional.

Ancestors

  • livekit.agents.stt.stt.STT
  • abc.ABC
  • EventEmitter
  • typing.Generic

Instance variables

prop model : str
Expand source code
@property
def model(self) -> str:
    return "Scribe"

Get the model name/identifier for this STT instance.

Returns

The model name if available, "unknown" otherwise.

Note

Plugins should override this property to provide their model information.

prop provider : str
Expand source code
@property
def provider(self) -> str:
    return "ElevenLabs"

Get the provider name/identifier for this STT instance.

Returns

The provider name if available, "unknown" otherwise.

Note

Plugins should override this property to provide their provider information.

Methods

def stream(self,
*,
language: NotGivenOr[str] = NOT_GIVEN,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.elevenlabs.stt.SpeechStream
Expand source code
def stream(
    self,
    *,
    language: NotGivenOr[str] = NOT_GIVEN,
    conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
) -> SpeechStream:
    stream = SpeechStream(
        stt=self,
        opts=self._opts,
        conn_options=conn_options,
        language=language if is_given(language) else self._opts.language_code,
        http_session=self._ensure_session(),
    )
    self._streams.add(stream)
    return stream
def update_options(self,
*,
tag_audio_events: NotGivenOr[bool] = NOT_GIVEN,
server_vad: NotGivenOr[VADOptions] = NOT_GIVEN) ‑> None
Expand source code
def update_options(
    self,
    *,
    tag_audio_events: NotGivenOr[bool] = NOT_GIVEN,
    server_vad: NotGivenOr[VADOptions] = NOT_GIVEN,
) -> None:
    if is_given(tag_audio_events):
        self._opts.tag_audio_events = tag_audio_events

    if is_given(server_vad):
        self._opts.server_vad = server_vad

    for stream in self._streams:
        stream.update_options(server_vad=server_vad)

Inherited members

class SpeechStream (*,
stt: STT,
opts: STTOptions,
conn_options: APIConnectOptions,
language: str | None,
http_session: aiohttp.ClientSession)
Expand source code
class SpeechStream(stt.SpeechStream):
    """Streaming speech recognition using ElevenLabs Scribe v2 realtime API"""

    def __init__(
        self,
        *,
        stt: STT,
        opts: STTOptions,
        conn_options: APIConnectOptions,
        language: str | None,
        http_session: aiohttp.ClientSession,
    ) -> None:
        super().__init__(stt=stt, conn_options=conn_options, sample_rate=opts.sample_rate)
        self._opts = opts
        self._language = language
        self._session = http_session
        self._reconnect_event = asyncio.Event()
        self._speaking = False  # Track if we're currently in a speech segment

    def update_options(
        self,
        *,
        server_vad: NotGivenOr[VADOptions] = NOT_GIVEN,
    ) -> None:
        if is_given(server_vad):
            self._opts.server_vad = server_vad
            self._reconnect_event.set()

    async def _run(self) -> None:
        """Run the streaming transcription session"""
        closing_ws = False

        async def keepalive_task(ws: aiohttp.ClientWebSocketResponse) -> None:
            try:
                while True:
                    await ws.ping()
                    await asyncio.sleep(30)
            except Exception:
                return

        @utils.log_exceptions(logger=logger)
        async def send_task(ws: aiohttp.ClientWebSocketResponse) -> None:
            nonlocal closing_ws

            # Buffer audio into chunks (50ms chunks)
            samples_50ms = self._opts.sample_rate // 20
            audio_bstream = utils.audio.AudioByteStream(
                sample_rate=self._opts.sample_rate,
                num_channels=1,
                samples_per_channel=samples_50ms,
            )

            async for data in self._input_ch:
                # Write audio bytes to buffer and get 50ms frames
                frames: list[rtc.AudioFrame] = []
                if isinstance(data, rtc.AudioFrame):
                    frames.extend(audio_bstream.write(data.data.tobytes()))
                elif isinstance(data, self._FlushSentinel):
                    frames.extend(audio_bstream.flush())

                for frame in frames:
                    audio_b64 = base64.b64encode(frame.data.tobytes()).decode("utf-8")
                    await ws.send_str(
                        json.dumps(
                            {
                                "message_type": "input_audio_chunk",
                                "audio_base_64": audio_b64,
                                "commit": False,
                                "sample_rate": self._opts.sample_rate,
                            }
                        )
                    )

            closing_ws = True

        @utils.log_exceptions(logger=logger)
        async def recv_task(ws: aiohttp.ClientWebSocketResponse) -> None:
            nonlocal closing_ws

            while True:
                msg = await ws.receive()

                if msg.type in (
                    aiohttp.WSMsgType.CLOSED,
                    aiohttp.WSMsgType.CLOSE,
                    aiohttp.WSMsgType.CLOSING,
                ):
                    if closing_ws or self._session.closed:
                        return
                    raise APIStatusError(message="ElevenLabs STT connection closed unexpectedly")

                if msg.type != aiohttp.WSMsgType.TEXT:
                    logger.warning("unexpected ElevenLabs STT message type %s", msg.type)
                    continue

                try:
                    parsed = json.loads(msg.data)
                    self._process_stream_event(parsed)
                except Exception:
                    logger.exception("failed to process ElevenLabs STT message")

        ws: aiohttp.ClientWebSocketResponse | None = None

        while True:
            try:
                ws = await self._connect_ws()
                tasks = [
                    asyncio.create_task(send_task(ws)),
                    asyncio.create_task(recv_task(ws)),
                    asyncio.create_task(keepalive_task(ws)),
                ]
                tasks_group = asyncio.gather(*tasks)
                wait_reconnect_task = asyncio.create_task(self._reconnect_event.wait())

                try:
                    done, _ = await asyncio.wait(
                        (tasks_group, wait_reconnect_task),
                        return_when=asyncio.FIRST_COMPLETED,
                    )

                    for task in done:
                        if task != wait_reconnect_task:
                            task.result()

                    if wait_reconnect_task not in done:
                        break

                    self._reconnect_event.clear()
                finally:
                    await utils.aio.gracefully_cancel(*tasks, wait_reconnect_task)
                    tasks_group.cancel()
                    tasks_group.exception()  # Retrieve exception to prevent it from being logged
            finally:
                if ws is not None:
                    await ws.close()

    async def _connect_ws(self) -> aiohttp.ClientWebSocketResponse:
        """Establish WebSocket connection to ElevenLabs Scribe v2 API"""
        commit_strategy = "manual" if self._opts.server_vad is None else "vad"
        params = [
            "model_id=scribe_v2_realtime",
            f"encoding=pcm_{self._opts.sample_rate}",
            f"commit_strategy={commit_strategy}",
        ]

        if server_vad := self._opts.server_vad:
            if (
                vad_silence_threshold_secs := server_vad.get("vad_silence_threshold_secs")
            ) is not None:
                params.append(f"vad_silence_threshold_secs={vad_silence_threshold_secs}")
            if (vad_threshold := server_vad.get("vad_threshold")) is not None:
                params.append(f"vad_threshold={vad_threshold}")
            if (min_speech_duration_ms := server_vad.get("min_speech_duration_ms")) is not None:
                params.append(f"min_speech_duration_ms={min_speech_duration_ms}")
            if (min_silence_duration_ms := server_vad.get("min_silence_duration_ms")) is not None:
                params.append(f"min_silence_duration_ms={min_silence_duration_ms}")

        if self._language:
            params.append(f"language_code={self._language}")

        query_string = "&".join(params)

        # Convert HTTPS URL to WSS
        base_url = self._opts.base_url.replace("https://", "wss://").replace("http://", "ws://")
        ws_url = f"{base_url}/speech-to-text/realtime?{query_string}"

        try:
            ws = await asyncio.wait_for(
                self._session.ws_connect(
                    ws_url,
                    headers={AUTHORIZATION_HEADER: self._opts.api_key},
                ),
                self._conn_options.timeout,
            )
        except (aiohttp.ClientConnectorError, asyncio.TimeoutError) as e:
            raise APIConnectionError("Failed to connect to ElevenLabs") from e

        return ws

    def _process_stream_event(self, data: dict) -> None:
        """Process incoming WebSocket messages from ElevenLabs"""
        message_type = data.get("message_type")
        text = data.get("text", "")

        speech_data = stt.SpeechData(
            language=self._language or "en",
            text=text,
        )

        if message_type == "partial_transcript":
            logger.debug("Received message type partial_transcript: %s", data)

            if text:
                # Send START_OF_SPEECH if we're not already speaking
                if not self._speaking:
                    self._event_ch.send_nowait(
                        stt.SpeechEvent(type=SpeechEventType.START_OF_SPEECH)
                    )
                    self._speaking = True

                # Send INTERIM_TRANSCRIPT
                interim_event = stt.SpeechEvent(
                    type=SpeechEventType.INTERIM_TRANSCRIPT,
                    alternatives=[speech_data],
                )
                self._event_ch.send_nowait(interim_event)

        elif message_type == "committed_transcript":
            logger.debug("Received message type committed_transcript: %s", data)

            # Final committed transcripts - these are sent to the LLM/TTS layer in LiveKit agents
            # and trigger agent responses (unlike partial transcripts which are UI-only)

            if text:
                # Send START_OF_SPEECH if we're not already speaking
                if not self._speaking:
                    self._event_ch.send_nowait(
                        stt.SpeechEvent(type=SpeechEventType.START_OF_SPEECH)
                    )
                    self._speaking = True

                # Send FINAL_TRANSCRIPT but keep speaking=True
                # Multiple commits can occur within the same speech segment
                final_event = stt.SpeechEvent(
                    type=SpeechEventType.FINAL_TRANSCRIPT,
                    alternatives=[speech_data],
                )
                self._event_ch.send_nowait(final_event)
            else:
                # Empty commit signals end of speech segment (similar to Cartesia's is_final flag)
                # This groups multiple committed transcripts into one speech segment
                if self._speaking:
                    self._event_ch.send_nowait(stt.SpeechEvent(type=SpeechEventType.END_OF_SPEECH))
                    self._speaking = False

        elif message_type == "session_started":
            # Session initialization message - informational only
            session_id = data.get("session_id", "unknown")
            logger.debug("Session started with ID: %s", session_id)

        elif message_type == "committed_transcript_with_timestamps":
            logger.debug("Received message type committed_transcript_with_timestamps: %s", data)

        # Error handling for known ElevenLabs error types
        elif message_type in (
            "auth_error",
            "quota_exceeded",
            "transcriber_error",
            "input_error",
            "error",
        ):
            error_msg = data.get("message", "Unknown error")
            error_details = data.get("details", "")
            details_suffix = " - " + error_details if error_details else ""
            logger.error(
                "ElevenLabs STT error [%s]: %s%s",
                message_type,
                error_msg,
                details_suffix,
            )
            raise APIConnectionError(f"{message_type}: {error_msg}{details_suffix}")
        else:
            logger.warning("ElevenLabs STT unknown message type: %s, data: %s", message_type, data)

Streaming speech recognition using ElevenLabs Scribe v2 realtime API

Args: sample_rate : int or None, optional The desired sample rate for the audio input. If specified, the audio input will be automatically resampled to match the given sample rate before being processed for Speech-to-Text. If not provided (None), the input will retain its original sample rate.

Ancestors

  • livekit.agents.stt.stt.RecognizeStream
  • abc.ABC

Methods

def update_options(self, *, server_vad: NotGivenOr[VADOptions] = NOT_GIVEN) ‑> None
Expand source code
def update_options(
    self,
    *,
    server_vad: NotGivenOr[VADOptions] = NOT_GIVEN,
) -> None:
    if is_given(server_vad):
        self._opts.server_vad = server_vad
        self._reconnect_event.set()
class TTS (*,
voice_id: str = 'bIHbv24MWmeRgasZH58o',
voice_settings: NotGivenOr[VoiceSettings] = NOT_GIVEN,
model: TTSModels | str = 'eleven_turbo_v2_5',
encoding: NotGivenOr[TTSEncoding] = NOT_GIVEN,
api_key: NotGivenOr[str] = NOT_GIVEN,
base_url: NotGivenOr[str] = NOT_GIVEN,
streaming_latency: NotGivenOr[int] = NOT_GIVEN,
inactivity_timeout: int = 180,
auto_mode: NotGivenOr[bool] = NOT_GIVEN,
apply_text_normalization: "Literal['auto', 'off', 'on']" = 'auto',
word_tokenizer: NotGivenOr[tokenize.WordTokenizer | tokenize.SentenceTokenizer] = NOT_GIVEN,
enable_ssml_parsing: bool = False,
enable_logging: bool = True,
chunk_length_schedule: NotGivenOr[list[int]] = NOT_GIVEN,
http_session: aiohttp.ClientSession | None = None,
language: NotGivenOr[str] = NOT_GIVEN,
sync_alignment: bool = True,
preferred_alignment: "Literal['normalized', 'original']" = 'normalized',
pronunciation_dictionary_locators: NotGivenOr[list[PronunciationDictionaryLocator]] = NOT_GIVEN)
Expand source code
class TTS(tts.TTS):
    def __init__(
        self,
        *,
        voice_id: str = DEFAULT_VOICE_ID,
        voice_settings: NotGivenOr[VoiceSettings] = NOT_GIVEN,
        model: TTSModels | str = "eleven_turbo_v2_5",
        encoding: NotGivenOr[TTSEncoding] = NOT_GIVEN,
        api_key: NotGivenOr[str] = NOT_GIVEN,
        base_url: NotGivenOr[str] = NOT_GIVEN,
        streaming_latency: NotGivenOr[int] = NOT_GIVEN,
        inactivity_timeout: int = WS_INACTIVITY_TIMEOUT,
        auto_mode: NotGivenOr[bool] = NOT_GIVEN,
        apply_text_normalization: Literal["auto", "off", "on"] = "auto",
        word_tokenizer: NotGivenOr[tokenize.WordTokenizer | tokenize.SentenceTokenizer] = NOT_GIVEN,
        enable_ssml_parsing: bool = False,
        enable_logging: bool = True,
        chunk_length_schedule: NotGivenOr[list[int]] = NOT_GIVEN,  # range is [50, 500]
        http_session: aiohttp.ClientSession | None = None,
        language: NotGivenOr[str] = NOT_GIVEN,
        sync_alignment: bool = True,
        preferred_alignment: Literal["normalized", "original"] = "normalized",
        pronunciation_dictionary_locators: NotGivenOr[
            list[PronunciationDictionaryLocator]
        ] = NOT_GIVEN,
    ) -> None:
        """
        Create a new instance of ElevenLabs TTS.

        Args:
            voice_id (str): Voice ID. Defaults to `DEFAULT_VOICE_ID`.
            voice_settings (NotGivenOr[VoiceSettings]): Voice settings.
            model (TTSModels | str): TTS model to use. Defaults to "eleven_turbo_v2_5".
            api_key (NotGivenOr[str]): ElevenLabs API key. Can be set via argument or `ELEVEN_API_KEY` environment variable.
            base_url (NotGivenOr[str]): Custom base URL for the API. Optional.
            streaming_latency (NotGivenOr[int]): Optimize for streaming latency, defaults to 0 - disabled. 4 for max latency optimizations. deprecated
            inactivity_timeout (int): Inactivity timeout in seconds for the websocket connection. Defaults to 300.
            auto_mode (bool): Reduces latency by disabling chunk schedule and buffers. Sentence tokenizer will be used to synthesize one sentence at a time. Defaults to True.
            word_tokenizer (NotGivenOr[tokenize.WordTokenizer | tokenize.SentenceTokenizer]): Tokenizer for processing text. Defaults to basic WordTokenizer when auto_mode=False, `livekit.agents.tokenize.blingfire.SentenceTokenizer` otherwise.
            enable_ssml_parsing (bool): Enable SSML parsing for input text. Defaults to False.
            enable_logging (bool): Enable logging of the request. When set to false, zero retention mode will be used. Defaults to True.
            chunk_length_schedule (NotGivenOr[list[int]]): Schedule for chunk lengths, ranging from 50 to 500. Defaults are [120, 160, 250, 290].
            http_session (aiohttp.ClientSession | None): Custom HTTP session for API requests. Optional.
            language (NotGivenOr[str]): Language code for the TTS model, as of 10/24/24 only valid for "eleven_turbo_v2_5".
            sync_alignment (bool): Enable sync alignment for the TTS model. Defaults to True.
            preferred_alignment (Literal["normalized", "original"]): Use normalized or original alignment. Defaults to "normalized".
            pronunciation_dictionary_locators (NotGivenOr[list[PronunciationDictionaryLocator]]): List of pronunciation dictionary locators to use for pronunciation control.
        """  # noqa: E501

        if not is_given(encoding):
            encoding = _DefaultEncoding

        super().__init__(
            capabilities=tts.TTSCapabilities(
                streaming=True,
                aligned_transcript=sync_alignment,
            ),
            sample_rate=_sample_rate_from_format(encoding),
            num_channels=1,
        )

        elevenlabs_api_key = api_key if is_given(api_key) else os.environ.get("ELEVEN_API_KEY")
        if not elevenlabs_api_key:
            raise ValueError(
                "ElevenLabs API key is required, either as argument or set ELEVEN_API_KEY environmental variable"  # noqa: E501
            )

        if not is_given(auto_mode):
            auto_mode = True

        if not is_given(word_tokenizer):
            word_tokenizer = (
                tokenize.basic.WordTokenizer(ignore_punctuation=False)
                if not auto_mode
                else tokenize.blingfire.SentenceTokenizer()
            )
        elif auto_mode and not isinstance(word_tokenizer, tokenize.SentenceTokenizer):
            logger.warning(
                "auto_mode is enabled, it expects full sentences or phrases, "
                "please provide a SentenceTokenizer instead of a WordTokenizer."
            )
        self._opts = _TTSOptions(
            voice_id=voice_id,
            voice_settings=voice_settings,
            model=model,
            api_key=elevenlabs_api_key,
            base_url=base_url if is_given(base_url) else API_BASE_URL_V1,
            encoding=encoding,
            sample_rate=self.sample_rate,
            streaming_latency=streaming_latency,
            word_tokenizer=word_tokenizer,
            chunk_length_schedule=chunk_length_schedule,
            enable_ssml_parsing=enable_ssml_parsing,
            enable_logging=enable_logging,
            language=language,
            inactivity_timeout=inactivity_timeout,
            sync_alignment=sync_alignment,
            auto_mode=auto_mode,
            apply_text_normalization=apply_text_normalization,
            preferred_alignment=preferred_alignment,
            pronunciation_dictionary_locators=pronunciation_dictionary_locators,
        )
        self._session = http_session
        self._streams = weakref.WeakSet[SynthesizeStream]()

        self._current_connection: _Connection | None = None
        self._connection_lock = asyncio.Lock()

    @property
    def model(self) -> str:
        return self._opts.model

    @property
    def provider(self) -> str:
        return "ElevenLabs"

    def _ensure_session(self) -> aiohttp.ClientSession:
        if not self._session:
            self._session = utils.http_context.http_session()
        return self._session

    async def list_voices(self) -> list[Voice]:
        async with self._ensure_session().get(
            f"{self._opts.base_url}/voices",
            headers={AUTHORIZATION_HEADER: self._opts.api_key},
        ) as resp:
            return _dict_to_voices_list(await resp.json())

    def update_options(
        self,
        *,
        voice_id: NotGivenOr[str] = NOT_GIVEN,
        voice_settings: NotGivenOr[VoiceSettings] = NOT_GIVEN,
        model: NotGivenOr[TTSModels | str] = NOT_GIVEN,
        language: NotGivenOr[str] = NOT_GIVEN,
        pronunciation_dictionary_locators: NotGivenOr[
            list[PronunciationDictionaryLocator]
        ] = NOT_GIVEN,
    ) -> None:
        """
        Args:
            voice_id (NotGivenOr[str]): Voice ID.
            voice_settings (NotGivenOr[VoiceSettings]): Voice settings.
            model (NotGivenOr[TTSModels | str]): TTS model to use.
            language (NotGivenOr[str]): Language code for the TTS model.
            pronunciation_dictionary_locators (NotGivenOr[list[PronunciationDictionaryLocator]]): List of pronunciation dictionary locators.
        """
        changed = False

        if is_given(model) and model != self._opts.model:
            self._opts.model = model
            changed = True

        if is_given(voice_id) and voice_id != self._opts.voice_id:
            self._opts.voice_id = voice_id
            changed = True

        if is_given(voice_settings):
            self._opts.voice_settings = voice_settings
            changed = True

        if is_given(language) and language != self._opts.language:
            self._opts.language = language
            changed = True

        if is_given(pronunciation_dictionary_locators):
            self._opts.pronunciation_dictionary_locators = pronunciation_dictionary_locators
            changed = True

        if changed and self._current_connection:
            self._current_connection.mark_non_current()
            self._current_connection = None

    async def current_connection(self) -> _Connection:
        """Get the current connection, creating one if needed"""
        async with self._connection_lock:
            if (
                self._current_connection
                and self._current_connection.is_current
                and not self._current_connection._closed
            ):
                return self._current_connection

            session = self._ensure_session()
            conn = _Connection(self._opts, session)
            await conn.connect()
            self._current_connection = conn
            return conn

    def synthesize(
        self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS
    ) -> ChunkedStream:
        return ChunkedStream(tts=self, input_text=text, conn_options=conn_options)

    def stream(
        self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS
    ) -> SynthesizeStream:
        stream = SynthesizeStream(tts=self, conn_options=conn_options)
        self._streams.add(stream)
        return stream

    async def aclose(self) -> None:
        for stream in list(self._streams):
            await stream.aclose()
        self._streams.clear()

        if self._current_connection:
            await self._current_connection.aclose()
            self._current_connection = None

Helper class that provides a standard way to create an ABC using inheritance.

Create a new instance of ElevenLabs TTS.

Args

voice_id : str
Voice ID. Defaults to DEFAULT_VOICE_ID.
voice_settings : NotGivenOr[VoiceSettings]
Voice settings.
model : TTSModels | str
TTS model to use. Defaults to "eleven_turbo_v2_5".
api_key : NotGivenOr[str]
ElevenLabs API key. Can be set via argument or ELEVEN_API_KEY environment variable.
base_url : NotGivenOr[str]
Custom base URL for the API. Optional.
streaming_latency : NotGivenOr[int]
Optimize for streaming latency, defaults to 0 - disabled. 4 for max latency optimizations. deprecated
inactivity_timeout : int
Inactivity timeout in seconds for the websocket connection. Defaults to 300.
auto_mode : bool
Reduces latency by disabling chunk schedule and buffers. Sentence tokenizer will be used to synthesize one sentence at a time. Defaults to True.
word_tokenizer : NotGivenOr[tokenize.WordTokenizer | tokenize.SentenceTokenizer]
Tokenizer for processing text. Defaults to basic WordTokenizer when auto_mode=False, SentenceTokenizer otherwise.
enable_ssml_parsing : bool
Enable SSML parsing for input text. Defaults to False.
enable_logging : bool
Enable logging of the request. When set to false, zero retention mode will be used. Defaults to True.
chunk_length_schedule : NotGivenOr[list[int]]
Schedule for chunk lengths, ranging from 50 to 500. Defaults are [120, 160, 250, 290].
http_session : aiohttp.ClientSession | None
Custom HTTP session for API requests. Optional.
language : NotGivenOr[str]
Language code for the TTS model, as of 10/24/24 only valid for "eleven_turbo_v2_5".
sync_alignment : bool
Enable sync alignment for the TTS model. Defaults to True.
preferred_alignment (Literal["normalized", "original"]): Use normalized or original alignment. Defaults to "normalized".
pronunciation_dictionary_locators : NotGivenOr[list[PronunciationDictionaryLocator]]
List of pronunciation dictionary locators to use for pronunciation control.

Ancestors

  • livekit.agents.tts.tts.TTS
  • abc.ABC
  • EventEmitter
  • typing.Generic

Instance variables

prop model : str
Expand source code
@property
def model(self) -> str:
    return self._opts.model

Get the model name/identifier for this TTS instance.

Returns

The model name if available, "unknown" otherwise.

Note

Plugins should override this property to provide their model information.

prop provider : str
Expand source code
@property
def provider(self) -> str:
    return "ElevenLabs"

Get the provider name/identifier for this TTS instance.

Returns

The provider name if available, "unknown" otherwise.

Note

Plugins should override this property to provide their provider information.

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    for stream in list(self._streams):
        await stream.aclose()
    self._streams.clear()

    if self._current_connection:
        await self._current_connection.aclose()
        self._current_connection = None
async def current_connection(self) ‑> livekit.plugins.elevenlabs.tts._Connection
Expand source code
async def current_connection(self) -> _Connection:
    """Get the current connection, creating one if needed"""
    async with self._connection_lock:
        if (
            self._current_connection
            and self._current_connection.is_current
            and not self._current_connection._closed
        ):
            return self._current_connection

        session = self._ensure_session()
        conn = _Connection(self._opts, session)
        await conn.connect()
        self._current_connection = conn
        return conn

Get the current connection, creating one if needed

async def list_voices(self) ‑> list[livekit.plugins.elevenlabs.tts.Voice]
Expand source code
async def list_voices(self) -> list[Voice]:
    async with self._ensure_session().get(
        f"{self._opts.base_url}/voices",
        headers={AUTHORIZATION_HEADER: self._opts.api_key},
    ) as resp:
        return _dict_to_voices_list(await resp.json())
def stream(self,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.elevenlabs.tts.SynthesizeStream
Expand source code
def stream(
    self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS
) -> SynthesizeStream:
    stream = SynthesizeStream(tts=self, conn_options=conn_options)
    self._streams.add(stream)
    return stream
def synthesize(self,
text: str,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.elevenlabs.tts.ChunkedStream
Expand source code
def synthesize(
    self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS
) -> ChunkedStream:
    return ChunkedStream(tts=self, input_text=text, conn_options=conn_options)
def update_options(self,
*,
voice_id: NotGivenOr[str] = NOT_GIVEN,
voice_settings: NotGivenOr[VoiceSettings] = NOT_GIVEN,
model: NotGivenOr[TTSModels | str] = NOT_GIVEN,
language: NotGivenOr[str] = NOT_GIVEN,
pronunciation_dictionary_locators: NotGivenOr[list[PronunciationDictionaryLocator]] = NOT_GIVEN) ‑> None
Expand source code
def update_options(
    self,
    *,
    voice_id: NotGivenOr[str] = NOT_GIVEN,
    voice_settings: NotGivenOr[VoiceSettings] = NOT_GIVEN,
    model: NotGivenOr[TTSModels | str] = NOT_GIVEN,
    language: NotGivenOr[str] = NOT_GIVEN,
    pronunciation_dictionary_locators: NotGivenOr[
        list[PronunciationDictionaryLocator]
    ] = NOT_GIVEN,
) -> None:
    """
    Args:
        voice_id (NotGivenOr[str]): Voice ID.
        voice_settings (NotGivenOr[VoiceSettings]): Voice settings.
        model (NotGivenOr[TTSModels | str]): TTS model to use.
        language (NotGivenOr[str]): Language code for the TTS model.
        pronunciation_dictionary_locators (NotGivenOr[list[PronunciationDictionaryLocator]]): List of pronunciation dictionary locators.
    """
    changed = False

    if is_given(model) and model != self._opts.model:
        self._opts.model = model
        changed = True

    if is_given(voice_id) and voice_id != self._opts.voice_id:
        self._opts.voice_id = voice_id
        changed = True

    if is_given(voice_settings):
        self._opts.voice_settings = voice_settings
        changed = True

    if is_given(language) and language != self._opts.language:
        self._opts.language = language
        changed = True

    if is_given(pronunciation_dictionary_locators):
        self._opts.pronunciation_dictionary_locators = pronunciation_dictionary_locators
        changed = True

    if changed and self._current_connection:
        self._current_connection.mark_non_current()
        self._current_connection = None

Args

voice_id : NotGivenOr[str]
Voice ID.
voice_settings : NotGivenOr[VoiceSettings]
Voice settings.
model : NotGivenOr[TTSModels | str]
TTS model to use.
language : NotGivenOr[str]
Language code for the TTS model.
pronunciation_dictionary_locators : NotGivenOr[list[PronunciationDictionaryLocator]]
List of pronunciation dictionary locators.

Inherited members

class Voice (id: str, name: str, category: str)
Expand source code
@dataclass
class Voice:
    id: str
    name: str
    category: str

Voice(id: 'str', name: 'str', category: 'str')

Instance variables

var category : str
var id : str
var name : str
class VoiceSettings (stability: float,
similarity_boost: float,
style: NotGivenOr[float] = NOT_GIVEN,
speed: NotGivenOr[float] = NOT_GIVEN,
use_speaker_boost: NotGivenOr[bool] = NOT_GIVEN)
Expand source code
@dataclass
class VoiceSettings:
    stability: float  # [0.0 - 1.0]
    similarity_boost: float  # [0.0 - 1.0]
    style: NotGivenOr[float] = NOT_GIVEN  # [0.0 - 1.0]
    speed: NotGivenOr[float] = NOT_GIVEN  # [0.8 - 1.2]
    use_speaker_boost: NotGivenOr[bool] = NOT_GIVEN

VoiceSettings(stability: 'float', similarity_boost: 'float', style: 'NotGivenOr[float]' = NOT_GIVEN, speed: 'NotGivenOr[float]' = NOT_GIVEN, use_speaker_boost: 'NotGivenOr[bool]' = NOT_GIVEN)

Instance variables

var similarity_boost : float
var speed : float | livekit.agents.types.NotGiven
var stability : float
var style : float | livekit.agents.types.NotGiven
var use_speaker_boost : bool | livekit.agents.types.NotGiven