Module livekit.agents.pipeline.human_input
Classes
class HumanInput (*, room: rtc.Room, vad: voice_activity_detection.VAD, stt: speech_to_text.STT, participant: rtc.RemoteParticipant, transcription: bool)
-
Abstract base class for generic types.
On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::
class Mapping[KT, VT]: def __getitem__(self, key: KT) -> VT: ... # Etc.
On older versions of Python, however, generic classes have to explicitly inherit from Generic.
After a class has been declared to be generic, it can then be used as follows::
def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
Initialize a new instance of EventEmitter.
Expand source code
class HumanInput(utils.EventEmitter[EventTypes]): def __init__( self, *, room: rtc.Room, vad: voice_activity_detection.VAD, stt: speech_to_text.STT, participant: rtc.RemoteParticipant, transcription: bool, ) -> None: super().__init__() self._room, self._vad, self._stt, self._participant, self._transcription = ( room, vad, stt, participant, transcription, ) self._subscribed_track: rtc.RemoteAudioTrack | None = None self._recognize_atask: asyncio.Task[None] | None = None self._closed = False self._speaking = False self._speech_probability = 0.0 self._room.on("track_published", self._subscribe_to_microphone) self._room.on("track_subscribed", self._subscribe_to_microphone) self._subscribe_to_microphone() async def aclose(self) -> None: if self._closed: raise RuntimeError("HumanInput already closed") self._closed = True self._room.off("track_published", self._subscribe_to_microphone) self._room.off("track_subscribed", self._subscribe_to_microphone) self._speaking = False if self._recognize_atask is not None: await utils.aio.gracefully_cancel(self._recognize_atask) @property def speaking(self) -> bool: return self._speaking @property def speaking_probability(self) -> float: return self._speech_probability def _subscribe_to_microphone(self, *args, **kwargs) -> None: """ Subscribe to the participant microphone if found and not already subscribed. Do nothing if no track is found. """ for publication in self._participant.track_publications.values(): if publication.source != rtc.TrackSource.SOURCE_MICROPHONE: continue if not publication.subscribed: publication.set_subscribed(True) if ( publication.track is not None and publication.track != self._subscribed_track ): self._subscribed_track = publication.track # type: ignore if self._recognize_atask is not None: self._recognize_atask.cancel() self._recognize_atask = asyncio.create_task( self._recognize_task(rtc.AudioStream(self._subscribed_track)) # type: ignore ) break @utils.log_exceptions(logger=logger) async def _recognize_task(self, audio_stream: rtc.AudioStream) -> None: """ Receive the frames from the user audio stream and detect voice activity. """ vad_stream = self._vad.stream() stt_stream = self._stt.stream() def _before_forward( fwd: transcription.STTSegmentsForwarder, transcription: rtc.Transcription ): if not self._transcription: transcription.segments = [] return transcription stt_forwarder = transcription.STTSegmentsForwarder( room=self._room, participant=self._participant, track=self._subscribed_track, before_forward_cb=_before_forward, ) async def _audio_stream_co() -> None: # forward the audio stream to the VAD and STT streams async for ev in audio_stream: stt_stream.push_frame(ev.frame) vad_stream.push_frame(ev.frame) async def _vad_stream_co() -> None: async for ev in vad_stream: if ev.type == voice_activity_detection.VADEventType.START_OF_SPEECH: self._speaking = True self.emit("start_of_speech", ev) elif ev.type == voice_activity_detection.VADEventType.INFERENCE_DONE: self._speech_probability = ev.probability self.emit("vad_inference_done", ev) elif ev.type == voice_activity_detection.VADEventType.END_OF_SPEECH: self._speaking = False self.emit("end_of_speech", ev) async def _stt_stream_co() -> None: async for ev in stt_stream: stt_forwarder.update(ev) if ev.type == speech_to_text.SpeechEventType.FINAL_TRANSCRIPT: self.emit("final_transcript", ev) elif ev.type == speech_to_text.SpeechEventType.INTERIM_TRANSCRIPT: self.emit("interim_transcript", ev) tasks = [ asyncio.create_task(_audio_stream_co()), asyncio.create_task(_vad_stream_co()), asyncio.create_task(_stt_stream_co()), ] try: await asyncio.gather(*tasks) finally: await utils.aio.gracefully_cancel(*tasks) await stt_forwarder.aclose() await stt_stream.aclose() await vad_stream.aclose()
Ancestors
- EventEmitter
- typing.Generic
Instance variables
prop speaking : bool
-
Expand source code
@property def speaking(self) -> bool: return self._speaking
prop speaking_probability : float
-
Expand source code
@property def speaking_probability(self) -> float: return self._speech_probability
Methods
async def aclose(self) ‑> None
Inherited members