Module livekit.plugins.gnani

Gnani Vachana plugin for LiveKit Agents

Support for speech-to-text and text-to-speech with Gnani's Vachana platform.

Vachana provides high-accuracy STT and low-latency TTS for Indian languages, including multilingual and code-switching scenarios.

For API access, email speechstack@gnani.ai

Classes

class STT (*,
language: str = 'en-IN',
api_key: str | None = None,
sample_rate: int = 16000,
base_url: str = 'https://api.vachana.ai',
organization_id: str | None = None,
user_id: str | None = None,
http_session: None = None)
Expand source code
class STT(stt.STT):
    """Gnani Vachana Speech-to-Text implementation.

    Provides speech-to-text functionality using Gnani's Vachana platform.
    Supports batch recognition via REST API and real-time streaming via WebSocket.

    Args:
        language: BCP-47 language code (e.g. "hi-IN", "en-IN").
        api_key: Gnani API key (falls back to GNANI_API_KEY env var).
        sample_rate: Audio sample rate for streaming (8000 or 16000).
        base_url: Vachana API base URL.
        organization_id: Organization ID for REST API (falls back to GNANI_ORGANIZATION_ID).
        user_id: User ID for REST API (falls back to GNANI_USER_ID).
    """

    def __init__(
        self,
        *,
        language: str = "en-IN",
        api_key: str | None = None,
        sample_rate: int = SAMPLE_RATE_16K,
        base_url: str = GNANI_STT_BASE_URL,
        organization_id: str | None = None,
        user_id: str | None = None,
        http_session: None = None,
    ) -> None:
        super().__init__(
            capabilities=stt.STTCapabilities(
                streaming=True,
                interim_results=False,
                aligned_transcript=False,
            )
        )

        self._api_key = api_key or os.environ.get("GNANI_API_KEY")
        if not self._api_key:
            raise ValueError(
                "Gnani API key is required. "
                "Provide it directly or set GNANI_API_KEY environment variable."
            )

        if sample_rate not in (SAMPLE_RATE_8K, SAMPLE_RATE_16K):
            raise ValueError("sample_rate must be 8000 or 16000")

        self._opts = GnaniSTTOptions(
            api_key=self._api_key,
            language=language,
            sample_rate=sample_rate,
            base_url=base_url,
            organization_id=organization_id or os.environ.get("GNANI_ORGANIZATION_ID"),
            user_id=user_id or os.environ.get("GNANI_USER_ID"),
        )
        self._session: aiohttp.ClientSession | None = None

    @property
    def model(self) -> str:
        return "vachana-stt-v3"

    @property
    def provider(self) -> str:
        return "Gnani"

    def _ensure_session(self) -> aiohttp.ClientSession:
        if not self._session:
            self._session = utils.http_context.http_session()
        return self._session

    @staticmethod
    def _single_attempt(conn_options: APIConnectOptions) -> APIConnectOptions:
        return APIConnectOptions(
            max_retry=0,
            retry_interval=conn_options.retry_interval,
            timeout=conn_options.timeout,
        )

    async def recognize(
        self,
        buffer: AudioBuffer,
        *,
        language: NotGivenOr[str] = NOT_GIVEN,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    ) -> stt.SpeechEvent:
        return await super().recognize(
            buffer,
            language=language,
            conn_options=self._single_attempt(conn_options),
        )

    async def _recognize_impl(
        self,
        buffer: AudioBuffer,
        *,
        language: NotGivenOr[str] = NOT_GIVEN,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    ) -> stt.SpeechEvent:
        lang = language if is_given(language) else self._opts.language

        wav_bytes = rtc.combine_audio_frames(buffer).to_wav_bytes()

        form_data = aiohttp.FormData()
        form_data.add_field("audio_file", wav_bytes, filename="audio.wav", content_type="audio/wav")
        form_data.add_field("language_code", lang)

        headers: dict[str, str] = {
            "X-API-Key-ID": self._opts.api_key,
        }
        if self._opts.organization_id:
            headers["X-Organization-ID"] = self._opts.organization_id
        if self._opts.user_id:
            headers["X-API-User-ID"] = self._opts.user_id

        try:
            async with self._ensure_session().post(
                url=f"{self._opts.base_url}/stt/v3",
                data=form_data,
                headers=headers,
                timeout=aiohttp.ClientTimeout(
                    total=conn_options.timeout,
                    sock_connect=conn_options.timeout,
                ),
            ) as res:
                if res.status != 200:
                    error_text = await res.text()
                    logger.error(f"Gnani STT API error: {res.status} - {error_text}")
                    raise APIStatusError(
                        message=f"Gnani STT API Error ({res.status}): {error_text}",
                        status_code=res.status,
                        body=error_text,
                    )

                response_json = await res.json()
                transcript = response_json.get("transcript", "")
                request_id = response_json.get("request_id", "")

                return stt.SpeechEvent(
                    type=stt.SpeechEventType.FINAL_TRANSCRIPT,
                    request_id=request_id,
                    alternatives=[
                        stt.SpeechData(
                            language=LanguageCode(lang),
                            text=transcript,
                            confidence=1.0,
                        )
                    ],
                )

        except asyncio.TimeoutError as e:
            raise APITimeoutError("Gnani STT API request timed out") from e
        except (APIStatusError, APIConnectionError, APITimeoutError):
            raise
        except Exception as e:
            raise APIConnectionError(f"Gnani STT error: {e}") from e

    def stream(
        self,
        *,
        language: NotGivenOr[str] = NOT_GIVEN,
        conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
    ) -> SpeechStream:
        lang = language if is_given(language) else self._opts.language
        return SpeechStream(
            stt=self,
            opts=GnaniSTTOptions(
                api_key=self._opts.api_key,
                language=lang,
                sample_rate=self._opts.sample_rate,
                base_url=self._opts.base_url,
                organization_id=self._opts.organization_id,
                user_id=self._opts.user_id,
            ),
            conn_options=self._single_attempt(conn_options),
        )

    async def aclose(self) -> None:
        pass

Gnani Vachana Speech-to-Text implementation.

Provides speech-to-text functionality using Gnani's Vachana platform. Supports batch recognition via REST API and real-time streaming via WebSocket.

Args

language
BCP-47 language code (e.g. "hi-IN", "en-IN").
api_key
Gnani API key (falls back to GNANI_API_KEY env var).
sample_rate
Audio sample rate for streaming (8000 or 16000).
base_url
Vachana API base URL.
organization_id
Organization ID for REST API (falls back to GNANI_ORGANIZATION_ID).
user_id
User ID for REST API (falls back to GNANI_USER_ID).

Ancestors

  • livekit.agents.stt.stt.STT
  • abc.ABC
  • EventEmitter
  • typing.Generic

Instance variables

prop model : str
Expand source code
@property
def model(self) -> str:
    return "vachana-stt-v3"

Get the model name/identifier for this STT instance.

Returns

The model name if available, "unknown" otherwise.

Note

Plugins should override this property to provide their model information.

prop provider : str
Expand source code
@property
def provider(self) -> str:
    return "Gnani"

Get the provider name/identifier for this STT instance.

Returns

The provider name if available, "unknown" otherwise.

Note

Plugins should override this property to provide their provider information.

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    pass

Close the STT, and every stream/requests associated with it

async def recognize(self,
buffer: AudioBuffer,
*,
language: NotGivenOr[str] = NOT_GIVEN,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.agents.stt.stt.SpeechEvent
Expand source code
async def recognize(
    self,
    buffer: AudioBuffer,
    *,
    language: NotGivenOr[str] = NOT_GIVEN,
    conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
) -> stt.SpeechEvent:
    return await super().recognize(
        buffer,
        language=language,
        conn_options=self._single_attempt(conn_options),
    )
def stream(self,
*,
language: NotGivenOr[str] = NOT_GIVEN,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.gnani.stt.SpeechStream
Expand source code
def stream(
    self,
    *,
    language: NotGivenOr[str] = NOT_GIVEN,
    conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS,
) -> SpeechStream:
    lang = language if is_given(language) else self._opts.language
    return SpeechStream(
        stt=self,
        opts=GnaniSTTOptions(
            api_key=self._opts.api_key,
            language=lang,
            sample_rate=self._opts.sample_rate,
            base_url=self._opts.base_url,
            organization_id=self._opts.organization_id,
            user_id=self._opts.user_id,
        ),
        conn_options=self._single_attempt(conn_options),
    )

Inherited members

class TTS (*,
voice: GnaniTTSVoices | str = 'Karan',
model: str = 'vachana-voice-v3',
sample_rate: int = 16000,
num_channels: int = 1,
encoding: GnaniTTSEncodings | str = 'linear_pcm',
container: GnaniTTSContainers | str = 'wav',
api_key: str | None = None,
base_url: str = 'https://api.vachana.ai',
language: str = 'hi',
synthesize_method: GnaniTTSSynthesizeMethod = 'rest')
Expand source code
class TTS(tts.TTS):
    """Gnani Vachana Text-to-Speech implementation.

    Provides text-to-speech functionality using Gnani's Vachana platform.
    Supports REST, SSE, and WebSocket synthesis modes.

    Args:
        voice: Voice to use for synthesis (Karan, Simran, Riya, etc.).
        model: TTS model name (default: vachana-voice-v3).
        sample_rate: Audio output sample rate (8000-44100).
        encoding: Audio encoding (linear_pcm or oggopus).
        container: Audio container format (raw, mp3, wav, mulaw, ogg).
        api_key: Gnani API key (falls back to GNANI_API_KEY env var).
        base_url: Vachana API base URL.
        language: Language code for TTS (default: hi).
        synthesize_method: Synthesis mode — "rest", "sse", or "websocket".
    """

    def __init__(
        self,
        *,
        voice: GnaniTTSVoices | str = "Karan",
        model: str = "vachana-voice-v3",
        sample_rate: int = 16000,
        num_channels: int = 1,
        encoding: GnaniTTSEncodings | str = "linear_pcm",
        container: GnaniTTSContainers | str = "wav",
        api_key: str | None = None,
        base_url: str = GNANI_TTS_BASE_URL,
        language: str = "hi",
        synthesize_method: GnaniTTSSynthesizeMethod = "rest",
    ) -> None:
        if sample_rate not in SUPPORTED_SAMPLE_RATES:
            raise ValueError(
                f"sample_rate must be one of {SUPPORTED_SAMPLE_RATES}, got {sample_rate}"
            )

        super().__init__(
            capabilities=tts.TTSCapabilities(streaming=True),
            sample_rate=sample_rate,
            num_channels=num_channels,
        )

        self._api_key = api_key or os.environ.get("GNANI_API_KEY")
        if not self._api_key:
            raise ValueError(
                "Gnani API key is required. "
                "Provide it directly or set GNANI_API_KEY environment variable."
            )

        if voice not in SUPPORTED_VOICES:
            raise ValueError(
                f"Voice '{voice}' not supported. "
                f"Supported voices: {', '.join(sorted(SUPPORTED_VOICES))}"
            )

        self._opts = GnaniTTSOptions(
            api_key=self._api_key,
            voice=voice,
            model=model,
            sample_rate=sample_rate,
            encoding=encoding,
            container=container,
            num_channels=num_channels,
            base_url=base_url,
            language=language,
            synthesize_method=synthesize_method,
        )
        self._session: aiohttp.ClientSession | None = None

    @property
    def model(self) -> str:
        return self._opts.model

    @property
    def provider(self) -> str:
        return "Gnani"

    def _ensure_session(self) -> aiohttp.ClientSession:
        if not self._session:
            self._session = utils.http_context.http_session()
        return self._session

    def synthesize(
        self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS
    ) -> tts.ChunkedStream:
        if self._opts.synthesize_method == "sse":
            return SSEChunkedStream(tts=self, input_text=text, conn_options=conn_options)
        if self._opts.synthesize_method == "websocket":
            return WebSocketChunkedStream(tts=self, input_text=text, conn_options=conn_options)
        return RESTChunkedStream(tts=self, input_text=text, conn_options=conn_options)

    def stream(
        self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS
    ) -> SynthesizeStream:
        return SynthesizeStream(tts=self, conn_options=conn_options)

    def update_options(
        self,
        *,
        voice: str | None = None,
        model: str | None = None,
        language: str | None = None,
    ) -> None:
        if voice is not None:
            if voice not in SUPPORTED_VOICES:
                raise ValueError(
                    f"Voice '{voice}' not supported. "
                    f"Supported voices: {', '.join(sorted(SUPPORTED_VOICES))}"
                )
            self._opts.voice = voice
        if model is not None:
            self._opts.model = model
        if language is not None:
            self._opts.language = language

    async def aclose(self) -> None:
        pass

Gnani Vachana Text-to-Speech implementation.

Provides text-to-speech functionality using Gnani's Vachana platform. Supports REST, SSE, and WebSocket synthesis modes.

Args

voice
Voice to use for synthesis (Karan, Simran, Riya, etc.).
model
TTS model name (default: vachana-voice-v3).
sample_rate
Audio output sample rate (8000-44100).
encoding
Audio encoding (linear_pcm or oggopus).
container
Audio container format (raw, mp3, wav, mulaw, ogg).
api_key
Gnani API key (falls back to GNANI_API_KEY env var).
base_url
Vachana API base URL.
language
Language code for TTS (default: hi).
synthesize_method
Synthesis mode — "rest", "sse", or "websocket".

Ancestors

  • livekit.agents.tts.tts.TTS
  • abc.ABC
  • EventEmitter
  • typing.Generic

Instance variables

prop model : str
Expand source code
@property
def model(self) -> str:
    return self._opts.model

Get the model name/identifier for this TTS instance.

Returns

The model name if available, "unknown" otherwise.

Note

Plugins should override this property to provide their model information.

prop provider : str
Expand source code
@property
def provider(self) -> str:
    return "Gnani"

Get the provider name/identifier for this TTS instance.

Returns

The provider name if available, "unknown" otherwise.

Note

Plugins should override this property to provide their provider information.

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    pass
def stream(self,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.plugins.gnani.tts.SynthesizeStream
Expand source code
def stream(
    self, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS
) -> SynthesizeStream:
    return SynthesizeStream(tts=self, conn_options=conn_options)
def synthesize(self,
text: str,
*,
conn_options: APIConnectOptions = APIConnectOptions(max_retry=3, retry_interval=2.0, timeout=10.0)) ‑> livekit.agents.tts.tts.ChunkedStream
Expand source code
def synthesize(
    self, text: str, *, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS
) -> tts.ChunkedStream:
    if self._opts.synthesize_method == "sse":
        return SSEChunkedStream(tts=self, input_text=text, conn_options=conn_options)
    if self._opts.synthesize_method == "websocket":
        return WebSocketChunkedStream(tts=self, input_text=text, conn_options=conn_options)
    return RESTChunkedStream(tts=self, input_text=text, conn_options=conn_options)
def update_options(self,
*,
voice: str | None = None,
model: str | None = None,
language: str | None = None) ‑> None
Expand source code
def update_options(
    self,
    *,
    voice: str | None = None,
    model: str | None = None,
    language: str | None = None,
) -> None:
    if voice is not None:
        if voice not in SUPPORTED_VOICES:
            raise ValueError(
                f"Voice '{voice}' not supported. "
                f"Supported voices: {', '.join(sorted(SUPPORTED_VOICES))}"
            )
        self._opts.voice = voice
    if model is not None:
        self._opts.model = model
    if language is not None:
        self._opts.language = language

Inherited members