Module livekit.plugins.turn_detector.multilingual

Classes

class MultilingualModel (*, unlikely_threshold: float | None = None)
Expand source code
class MultilingualModel(EOUModelBase):
    def __init__(self, *, unlikely_threshold: float | None = None):
        super().__init__(
            model_type="multilingual",
            unlikely_threshold=unlikely_threshold,
            load_languages=_remote_inference_url() is None,
        )

    def _inference_method(self) -> str:
        return _EUORunnerMultilingual.INFERENCE_METHOD

    async def unlikely_threshold(self, language: str | None) -> float | None:
        if not language:
            return None

        threshold = await super().unlikely_threshold(language)
        if threshold is None:
            try:
                if url := _remote_inference_url():
                    async with utils.http_context.http_session().post(
                        url=url,
                        json={
                            "language": language,
                        },
                        timeout=aiohttp.ClientTimeout(total=REMOTE_INFERENCE_TIMEOUT),
                    ) as resp:
                        resp.raise_for_status()
                        data = await resp.json()
                        threshold = data.get("threshold")
                        if threshold:
                            self._languages[language] = {"threshold": threshold}
            except Exception as e:
                logger.warning("Error fetching threshold for language %s", language, exc_info=e)

        return threshold

    async def predict_end_of_turn(
        self,
        chat_ctx: llm.ChatContext,
        *,
        timeout: float | None = 3,
    ) -> float:
        url = _remote_inference_url()
        if not url:
            return await super().predict_end_of_turn(chat_ctx, timeout=timeout)

        messages = chat_ctx.copy(
            exclude_function_call=True, exclude_instructions=True, exclude_empty_message=True
        ).truncate(max_items=MAX_HISTORY_TURNS)

        ctx = get_job_context()
        request = messages.to_dict(exclude_image=True, exclude_audio=True, exclude_timestamp=True)
        request["jobId"] = ctx.job.id
        request["workerId"] = ctx.worker_id
        agent_id = os.getenv("LIVEKIT_AGENT_ID")
        if agent_id:
            request["agentId"] = agent_id

        started_at = perf_counter()
        async with utils.http_context.http_session().post(
            url=url,
            json=request,
            timeout=aiohttp.ClientTimeout(total=REMOTE_INFERENCE_TIMEOUT),
        ) as resp:
            resp.raise_for_status()
            data = await resp.json()
            probability = data.get("probability")
            if isinstance(probability, float) and probability >= 0:
                logger.debug(
                    "eou prediction",
                    extra={
                        "eou_probability": probability,
                        "duration": perf_counter() - started_at,
                    },
                )
                return probability
            else:
                # default to indicate no prediction
                return 1

Helper class that provides a standard way to create an ABC using inheritance.

Ancestors

Methods

async def predict_end_of_turn(self, chat_ctx: llm.ChatContext, *, timeout: float | None = 3) ‑> float
Expand source code
async def predict_end_of_turn(
    self,
    chat_ctx: llm.ChatContext,
    *,
    timeout: float | None = 3,
) -> float:
    url = _remote_inference_url()
    if not url:
        return await super().predict_end_of_turn(chat_ctx, timeout=timeout)

    messages = chat_ctx.copy(
        exclude_function_call=True, exclude_instructions=True, exclude_empty_message=True
    ).truncate(max_items=MAX_HISTORY_TURNS)

    ctx = get_job_context()
    request = messages.to_dict(exclude_image=True, exclude_audio=True, exclude_timestamp=True)
    request["jobId"] = ctx.job.id
    request["workerId"] = ctx.worker_id
    agent_id = os.getenv("LIVEKIT_AGENT_ID")
    if agent_id:
        request["agentId"] = agent_id

    started_at = perf_counter()
    async with utils.http_context.http_session().post(
        url=url,
        json=request,
        timeout=aiohttp.ClientTimeout(total=REMOTE_INFERENCE_TIMEOUT),
    ) as resp:
        resp.raise_for_status()
        data = await resp.json()
        probability = data.get("probability")
        if isinstance(probability, float) and probability >= 0:
            logger.debug(
                "eou prediction",
                extra={
                    "eou_probability": probability,
                    "duration": perf_counter() - started_at,
                },
            )
            return probability
        else:
            # default to indicate no prediction
            return 1
async def unlikely_threshold(self, language: str | None) ‑> float | None
Expand source code
async def unlikely_threshold(self, language: str | None) -> float | None:
    if not language:
        return None

    threshold = await super().unlikely_threshold(language)
    if threshold is None:
        try:
            if url := _remote_inference_url():
                async with utils.http_context.http_session().post(
                    url=url,
                    json={
                        "language": language,
                    },
                    timeout=aiohttp.ClientTimeout(total=REMOTE_INFERENCE_TIMEOUT),
                ) as resp:
                    resp.raise_for_status()
                    data = await resp.json()
                    threshold = data.get("threshold")
                    if threshold:
                        self._languages[language] = {"threshold": threshold}
        except Exception as e:
            logger.warning("Error fetching threshold for language %s", language, exc_info=e)

    return threshold