Module livekit.plugins.turn_detector

Sub-modules

livekit.plugins.turn_detector.eou
livekit.plugins.turn_detector.log
livekit.plugins.turn_detector.version

Classes

class EOUModel (inference_executor: InferenceExecutor | None = None,
unlikely_threshold: float = 0.0289)
Expand source code
class EOUModel:
    def __init__(
        self,
        inference_executor: InferenceExecutor | None = None,
        unlikely_threshold: float = 0.0289,
    ) -> None:
        self._executor = (
            inference_executor or get_current_job_context().inference_executor
        )
        self._unlikely_threshold = unlikely_threshold

    def unlikely_threshold(self) -> float:
        return self._unlikely_threshold

    def supports_language(self, language: str | None) -> bool:
        if language is None:
            return False
        parts = language.lower().split("-")
        # certain models use language codes (DG, AssemblyAI), others use full names (like OAI)
        return parts[0] == "en" or parts[0] == "english"

    async def predict_eou(self, chat_ctx: llm.ChatContext) -> float:
        return await self.predict_end_of_turn(chat_ctx)

    # our EOU model inference should be fast, 3 seconds is more than enough
    async def predict_end_of_turn(
        self, chat_ctx: llm.ChatContext, *, timeout: float | None = 3
    ) -> float:
        messages = []

        for msg in chat_ctx.messages:
            if msg.role not in ("user", "assistant"):
                continue

            if isinstance(msg.content, str):
                messages.append(
                    {
                        "role": msg.role,
                        "content": msg.content,
                    }
                )
            elif isinstance(msg.content, list):
                for cnt in msg.content:
                    if isinstance(cnt, str):
                        messages.append(
                            {
                                "role": msg.role,
                                "content": cnt,
                            }
                        )
                        break

        messages = messages[-MAX_HISTORY_TURNS:]

        json_data = json.dumps({"chat_ctx": messages}).encode()

        result = await asyncio.wait_for(
            self._executor.do_inference(_EUORunner.INFERENCE_METHOD, json_data),
            timeout=timeout,
        )

        assert result is not None, (
            "end_of_utterance prediction should always returns a result"
        )

        result_json = json.loads(result.decode())
        logger.debug(
            "eou prediction",
            extra=result_json,
        )
        return result_json["eou_probability"]

Methods

async def predict_end_of_turn(self, chat_ctx: llm.ChatContext, *, timeout: float | None = 3) ‑> float
Expand source code
async def predict_end_of_turn(
    self, chat_ctx: llm.ChatContext, *, timeout: float | None = 3
) -> float:
    messages = []

    for msg in chat_ctx.messages:
        if msg.role not in ("user", "assistant"):
            continue

        if isinstance(msg.content, str):
            messages.append(
                {
                    "role": msg.role,
                    "content": msg.content,
                }
            )
        elif isinstance(msg.content, list):
            for cnt in msg.content:
                if isinstance(cnt, str):
                    messages.append(
                        {
                            "role": msg.role,
                            "content": cnt,
                        }
                    )
                    break

    messages = messages[-MAX_HISTORY_TURNS:]

    json_data = json.dumps({"chat_ctx": messages}).encode()

    result = await asyncio.wait_for(
        self._executor.do_inference(_EUORunner.INFERENCE_METHOD, json_data),
        timeout=timeout,
    )

    assert result is not None, (
        "end_of_utterance prediction should always returns a result"
    )

    result_json = json.loads(result.decode())
    logger.debug(
        "eou prediction",
        extra=result_json,
    )
    return result_json["eou_probability"]
async def predict_eou(self, chat_ctx: llm.ChatContext) ‑> float
Expand source code
async def predict_eou(self, chat_ctx: llm.ChatContext) -> float:
    return await self.predict_end_of_turn(chat_ctx)
def supports_language(self, language: str | None) ‑> bool
Expand source code
def supports_language(self, language: str | None) -> bool:
    if language is None:
        return False
    parts = language.lower().split("-")
    # certain models use language codes (DG, AssemblyAI), others use full names (like OAI)
    return parts[0] == "en" or parts[0] == "english"
def unlikely_threshold(self) ‑> float
Expand source code
def unlikely_threshold(self) -> float:
    return self._unlikely_threshold