Module livekit.plugins.turn_detector.multilingual
Classes
class MultilingualModel (*, unlikely_threshold: float | None = None)
-
Expand source code
class MultilingualModel(EOUModelBase): def __init__(self, *, unlikely_threshold: float | None = None): super().__init__( model_type="multilingual", unlikely_threshold=unlikely_threshold, load_languages=_remote_inference_url() is None, ) def _inference_method(self) -> str: return _EUORunnerMultilingual.INFERENCE_METHOD async def unlikely_threshold(self, language: str | None) -> float | None: if not language: return None threshold = await super().unlikely_threshold(language) if threshold is None: try: if url := _remote_inference_url(): async with utils.http_context.http_session().post( url=url, json={ "language": language, }, timeout=aiohttp.ClientTimeout(total=REMOTE_INFERENCE_TIMEOUT), ) as resp: resp.raise_for_status() data = await resp.json() threshold = data.get("threshold") if threshold: self._languages[language] = {"threshold": threshold} except Exception as e: logger.warning("Error fetching threshold for language %s", language, exc_info=e) return threshold async def predict_end_of_turn( self, chat_ctx: llm.ChatContext, *, timeout: float | None = 3, ) -> float: url = _remote_inference_url() if not url: return await super().predict_end_of_turn(chat_ctx, timeout=timeout) messages = chat_ctx.copy( exclude_function_call=True, exclude_instructions=True, exclude_empty_message=True ).truncate(max_items=MAX_HISTORY_TURNS) ctx = get_job_context() request = messages.to_dict(exclude_image=True, exclude_audio=True, exclude_timestamp=True) request["jobId"] = ctx.job.id request["workerId"] = ctx.worker_id agent_id = os.getenv("LIVEKIT_AGENT_ID") if agent_id: request["agentId"] = agent_id started_at = perf_counter() async with utils.http_context.http_session().post( url=url, json=request, timeout=aiohttp.ClientTimeout(total=REMOTE_INFERENCE_TIMEOUT), ) as resp: resp.raise_for_status() data = await resp.json() probability = data.get("probability") if isinstance(probability, float) and probability >= 0: logger.debug( "eou prediction", extra={ "eou_probability": probability, "duration": perf_counter() - started_at, }, ) return probability else: # default to indicate no prediction return 1
Helper class that provides a standard way to create an ABC using inheritance.
Ancestors
- EOUModelBase
- abc.ABC
Methods
async def predict_end_of_turn(self, chat_ctx: llm.ChatContext, *, timeout: float | None = 3) ‑> float
-
Expand source code
async def predict_end_of_turn( self, chat_ctx: llm.ChatContext, *, timeout: float | None = 3, ) -> float: url = _remote_inference_url() if not url: return await super().predict_end_of_turn(chat_ctx, timeout=timeout) messages = chat_ctx.copy( exclude_function_call=True, exclude_instructions=True, exclude_empty_message=True ).truncate(max_items=MAX_HISTORY_TURNS) ctx = get_job_context() request = messages.to_dict(exclude_image=True, exclude_audio=True, exclude_timestamp=True) request["jobId"] = ctx.job.id request["workerId"] = ctx.worker_id agent_id = os.getenv("LIVEKIT_AGENT_ID") if agent_id: request["agentId"] = agent_id started_at = perf_counter() async with utils.http_context.http_session().post( url=url, json=request, timeout=aiohttp.ClientTimeout(total=REMOTE_INFERENCE_TIMEOUT), ) as resp: resp.raise_for_status() data = await resp.json() probability = data.get("probability") if isinstance(probability, float) and probability >= 0: logger.debug( "eou prediction", extra={ "eou_probability": probability, "duration": perf_counter() - started_at, }, ) return probability else: # default to indicate no prediction return 1
async def unlikely_threshold(self, language: str | None) ‑> float | None
-
Expand source code
async def unlikely_threshold(self, language: str | None) -> float | None: if not language: return None threshold = await super().unlikely_threshold(language) if threshold is None: try: if url := _remote_inference_url(): async with utils.http_context.http_session().post( url=url, json={ "language": language, }, timeout=aiohttp.ClientTimeout(total=REMOTE_INFERENCE_TIMEOUT), ) as resp: resp.raise_for_status() data = await resp.json() threshold = data.get("threshold") if threshold: self._languages[language] = {"threshold": threshold} except Exception as e: logger.warning("Error fetching threshold for language %s", language, exc_info=e) return threshold