Module livekit.plugins.google.beta.realtime.transcriber
Functions
def clean_transcription(text: str) ‑> str
-
Expand source code
def clean_transcription(text: str) -> str: text = text.replace("\n", " ") text = re.sub(r"\s+", " ", text) return text.strip()
Classes
class ModelTranscriber (*, client: genai.Client, model: LiveAPIModels | str)
-
Expand source code
class ModelTranscriber(utils.EventEmitter[EventTypes]): """ Transcribes agent audio using model generation. """ def __init__(self, *, client: genai.Client, model: LiveAPIModels | str): super().__init__() self._client = client self._model = model self._needed_sr = 16000 self._system_instructions = types.Content( parts=[types.Part(text=SYSTEM_INSTRUCTIONS)] ) self._config = types.GenerateContentConfig( temperature=0.0, system_instruction=self._system_instructions, # TODO: add response_schem ) self._resampler: rtc.AudioResampler | None = None self._buffer: rtc.AudioFrame | None = None self._audio_ch = utils.aio.Chan[rtc.AudioFrame]() self._main_atask = asyncio.create_task( self._main_task(), name="gemini-model-transcriber" ) async def aclose(self) -> None: if self._audio_ch.closed: return self._audio_ch.close() await self._main_atask def _push_audio(self, frames: list[rtc.AudioFrame]) -> None: if not frames: return buffer = utils.merge_frames(frames) if buffer.sample_rate != self._needed_sr: if self._resampler is None: self._resampler = rtc.AudioResampler( input_rate=buffer.sample_rate, output_rate=self._needed_sr, quality=rtc.AudioResamplerQuality.HIGH, ) buffer = utils.merge_frames(self._resampler.push(buffer)) self._audio_ch.send_nowait(buffer) @utils.log_exceptions(logger=logger) async def _main_task(self): request_id = utils.shortuuid() try: async for buffer in self._audio_ch: # TODO: stream content for better latency response = await self._client.aio.models.generate_content( model=self._model, contents=[ types.Content( parts=[ types.Part(text=SYSTEM_INSTRUCTIONS), types.Part.from_bytes( data=buffer.to_wav_bytes(), mime_type="audio/wav", ), ], role="user", ) ], config=self._config, ) content = TranscriptionContent( response_id=request_id, text=clean_transcription(response.text) ) self.emit("input_speech_done", content) except (ClientError, ServerError, APIError) as e: raise APIStatusError( f"model transcriber error: {e}", status_code=e.code, body=e.message, request_id=request_id, ) from e except Exception as e: raise APIConnectionError("Error generating transcription") from e
Transcribes agent audio using model generation.
Initialize a new instance of EventEmitter.
Ancestors
- EventEmitter
- typing.Generic
Methods
async def aclose(self) ‑> None
-
Expand source code
async def aclose(self) -> None: if self._audio_ch.closed: return self._audio_ch.close() await self._main_atask
Inherited members
class TranscriberSession (*, client: genai.Client, model: LiveAPIModels | str)
-
Expand source code
class TranscriberSession(utils.EventEmitter[EventTypes]): """ Handles live audio transcription using the realtime API. """ def __init__(self, *, client: genai.Client, model: LiveAPIModels | str): super().__init__() self._client = client self._model = model self._needed_sr = 16000 self._closed = False system_instructions = types.Content( parts=[types.Part(text=SYSTEM_INSTRUCTIONS)] ) self._config = types.LiveConnectConfig( response_modalities=[types.Modality.TEXT], system_instruction=system_instructions, generation_config=types.GenerationConfig(temperature=0.0), ) self._main_atask = asyncio.create_task( self._main_task(), name="gemini-realtime-transcriber" ) self._send_ch = utils.aio.Chan[ClientEvents]() self._resampler: rtc.AudioResampler | None = None self._active_response_id = None def _push_audio(self, frame: rtc.AudioFrame) -> None: if self._closed: return if frame.sample_rate != self._needed_sr: if not self._resampler: self._resampler = rtc.AudioResampler( frame.sample_rate, self._needed_sr, quality=rtc.AudioResamplerQuality.HIGH, ) if self._resampler: for f in self._resampler.push(frame): self._queue_msg( types.LiveClientRealtimeInput( media_chunks=[ types.Blob(data=f.data.tobytes(), mime_type="audio/pcm") ] ) ) else: self._queue_msg( types.LiveClientRealtimeInput( media_chunks=[ types.Blob(data=frame.data.tobytes(), mime_type="audio/pcm") ] ) ) def _queue_msg(self, msg: ClientEvents) -> None: if not self._closed: self._send_ch.send_nowait(msg) async def aclose(self) -> None: if self._send_ch.closed: return self._closed = True self._send_ch.close() await self._main_atask @utils.log_exceptions(logger=logger) async def _main_task(self): @utils.log_exceptions(logger=logger) async def _send_task(): try: async for msg in self._send_ch: if self._closed: break await self._session.send(input=msg) except websockets.exceptions.ConnectionClosedError as e: logger.exception(f"Transcriber session closed in _send_task: {e}") self._closed = True except Exception as e: logger.exception(f"Uncaught error in transcriber _send_task: {e}") self._closed = True @utils.log_exceptions(logger=logger) async def _recv_task(): try: while not self._closed: async for response in self._session.receive(): if self._closed: break if self._active_response_id is None: self._active_response_id = utils.shortuuid() content = TranscriptionContent( response_id=self._active_response_id, text="", ) self.emit("input_speech_started", content) server_content = response.server_content if server_content: model_turn = server_content.model_turn if model_turn: for part in model_turn.parts: if part.text: content.text += part.text if server_content.turn_complete: content.text = clean_transcription(content.text) self.emit("input_speech_done", content) self._active_response_id = None except websockets.exceptions.ConnectionClosedError as e: logger.exception(f"Transcriber session closed in _recv_task: {e}") self._closed = True except Exception as e: logger.exception(f"Uncaught error in transcriber _recv_task: {e}") self._closed = True async with self._client.aio.live.connect( model=self._model, config=self._config ) as session: self._session = session tasks = [ asyncio.create_task( _send_task(), name="gemini-realtime-transcriber-send" ), asyncio.create_task( _recv_task(), name="gemini-realtime-transcriber-recv" ), ] try: await asyncio.gather(*tasks) finally: await utils.aio.gracefully_cancel(*tasks) await self._session.close()
Handles live audio transcription using the realtime API.
Initialize a new instance of EventEmitter.
Ancestors
- EventEmitter
- typing.Generic
Methods
async def aclose(self) ‑> None
-
Expand source code
async def aclose(self) -> None: if self._send_ch.closed: return self._closed = True self._send_ch.close() await self._main_atask
Inherited members
class TranscriptionContent (response_id: str, text: str)
-
Expand source code
@dataclass class TranscriptionContent: response_id: str text: str
TranscriptionContent(response_id: 'str', text: 'str')
Instance variables
var response_id : str
var text : str