Module livekit.plugins.openai.beta.assistant_llm

Functions

def build_oai_message(msg: llm.ChatMessage)

Classes

class AssistantCreateOptions (name: str, instructions: str, model: ChatModels, temperature: float | None = None)

AssistantCreateOptions(name: 'str', instructions: 'str', model: 'ChatModels', temperature: 'float | None' = None)

Expand source code
@dataclass
class AssistantCreateOptions:
    name: str
    instructions: str
    model: ChatModels
    temperature: float | None = None
    # TODO: when we implement code_interpreter and file_search tools
    # tool_resources: ToolResources | None = None
    # tools: list[AssistantTools] = field(default_factory=list)

Class variables

var instructions : str
var model : Literal['gpt-4o', 'gpt-4o-2024-05-13', 'gpt-4o-mini', 'gpt-4o-mini-2024-07-18', 'gpt-4-turbo', 'gpt-4-turbo-2024-04-09', 'gpt-4-turbo-preview', 'gpt-4-0125-preview', 'gpt-4-1106-preview', 'gpt-4-vision-preview', 'gpt-4-1106-vision-preview', 'gpt-4', 'gpt-4-0314', 'gpt-4-0613', 'gpt-4-32k', 'gpt-4-32k-0314', 'gpt-4-32k-0613', 'gpt-3.5-turbo', 'gpt-3.5-turbo-16k', 'gpt-3.5-turbo-0301', 'gpt-3.5-turbo-0613', 'gpt-3.5-turbo-1106', 'gpt-3.5-turbo-16k-0613']
var name : str
var temperature : float | None
class AssistantLLM (*, assistant_opts: AssistantOptions, client: AsyncClient | None = None, api_key: str | None = None, base_url: str | None = None, on_file_uploaded: OnFileUploaded | None = None)

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code
class AssistantLLM(llm.LLM):
    def __init__(
        self,
        *,
        assistant_opts: AssistantOptions,
        client: AsyncClient | None = None,
        api_key: str | None = None,
        base_url: str | None = None,
        on_file_uploaded: OnFileUploaded | None = None,
    ) -> None:
        test_ctx = llm.ChatContext()
        if not hasattr(test_ctx, "_metadata"):
            raise Exception(
                "This beta feature of 'livekit-plugins-openai' requires a newer version of 'livekit-agents'"
            )
        self._client = client or AsyncClient(
            api_key=api_key,
            base_url=base_url,
            http_client=httpx.AsyncClient(
                timeout=httpx.Timeout(timeout=30, connect=10, read=5, pool=5),
                follow_redirects=True,
                limits=httpx.Limits(
                    max_connections=1000,
                    max_keepalive_connections=100,
                    keepalive_expiry=120,
                ),
            ),
        )
        self._assistant_opts = assistant_opts
        self._running_fncs: MutableSet[asyncio.Task[Any]] = set()
        self._on_file_uploaded = on_file_uploaded
        self._tool_call_run_id_lookup = dict[str, str]()
        self._submitted_tool_calls = set[str]()

        self._sync_openai_task: asyncio.Task[AssistantLoadOptions] | None = None
        try:
            self._sync_openai_task = asyncio.create_task(self._sync_openai())
        except Exception:
            logger.error(
                "failed to create sync openai task. This can happen when instantiating without a running asyncio event loop (such has when running tests)"
            )
        self._done_futures = list[asyncio.Future[None]]()

    async def _sync_openai(self) -> AssistantLoadOptions:
        if self._assistant_opts.create_options:
            kwargs: Dict[str, Any] = {
                "model": self._assistant_opts.create_options.model,
                "name": self._assistant_opts.create_options.name,
                "instructions": self._assistant_opts.create_options.instructions,
                # "tools": [
                #     {"type": t} for t in self._assistant_opts.create_options.tools
                # ],
                # "tool_resources": self._assistant_opts.create_options.tool_resources,
            }
            # TODO when we implement code_interpreter and file_search tools
            # if self._assistant_opts.create_options.tool_resources:
            #     kwargs["tool_resources"] = (
            #         self._assistant_opts.create_options.tool_resources
            #     )
            if self._assistant_opts.create_options.temperature:
                kwargs["temperature"] = self._assistant_opts.create_options.temperature
            assistant = await self._client.beta.assistants.create(**kwargs)

            thread = await self._client.beta.threads.create()
            return AssistantLoadOptions(assistant_id=assistant.id, thread_id=thread.id)
        elif self._assistant_opts.load_options:
            if not self._assistant_opts.load_options.thread_id:
                thread = await self._client.beta.threads.create()
                self._assistant_opts.load_options.thread_id = thread.id
            return self._assistant_opts.load_options

        raise Exception("One of create_options or load_options must be set")

    def chat(
        self,
        *,
        chat_ctx: llm.ChatContext,
        fnc_ctx: llm.FunctionContext | None = None,
        temperature: float | None = None,
        n: int | None = None,
        parallel_tool_calls: bool | None = None,
    ):
        if n is not None:
            logger.warning("OpenAI Assistants does not support the 'n' parameter")

        if parallel_tool_calls is not None:
            logger.warning(
                "OpenAI Assistants does not support the 'parallel_tool_calls' parameter"
            )

        if not self._sync_openai_task:
            self._sync_openai_task = asyncio.create_task(self._sync_openai())

        return AssistantLLMStream(
            temperature=temperature,
            assistant_llm=self,
            sync_openai_task=self._sync_openai_task,
            client=self._client,
            chat_ctx=chat_ctx,
            fnc_ctx=fnc_ctx,
            on_file_uploaded=self._on_file_uploaded,
        )

    async def _register_tool_call(self, tool_call_id: str, run_id: str) -> None:
        self._tool_call_run_id_lookup[tool_call_id] = run_id

    async def _submit_tool_call_result(self, tool_call_id: str, result: str) -> None:
        if tool_call_id in self._submitted_tool_calls:
            return
        logger.debug(f"submitting tool call {tool_call_id} result")
        run_id = self._tool_call_run_id_lookup.get(tool_call_id)
        if not run_id:
            logger.error(f"tool call {tool_call_id} not found")
            return

        if not self._sync_openai_task:
            logger.error("sync_openai_task not set")
            return

        thread_id = (await self._sync_openai_task).thread_id
        if not thread_id:
            logger.error("thread_id not set")
            return
        tool_output = ToolOutput(output=result, tool_call_id=tool_call_id)
        await self._client.beta.threads.runs.submit_tool_outputs_and_poll(
            tool_outputs=[tool_output], run_id=run_id, thread_id=thread_id
        )
        self._submitted_tool_calls.add(tool_call_id)
        logger.debug(f"submitted tool call {tool_call_id} result")

Ancestors

Methods

def chat(self, *, chat_ctx: llm.ChatContext, fnc_ctx: llm.FunctionContext | None = None, temperature: float | None = None, n: int | None = None, parallel_tool_calls: bool | None = None)
class AssistantLLMStream (*, assistant_llm: AssistantLLM, client: AsyncClient, sync_openai_task: asyncio.Task[AssistantLoadOptions], chat_ctx: llm.ChatContext, fnc_ctx: llm.FunctionContext | None, temperature: float | None, on_file_uploaded: OnFileUploaded | None)

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code
class AssistantLLMStream(llm.LLMStream):
    class EventHandler(AsyncAssistantEventHandler):
        def __init__(
            self,
            llm: AssistantLLM,
            llm_stream: AssistantLLMStream,
            output_queue: asyncio.Queue[llm.ChatChunk | Exception | None],
            chat_ctx: llm.ChatContext,
            fnc_ctx: llm.FunctionContext | None = None,
        ):
            super().__init__()
            self._llm = llm
            self._llm_stream = llm_stream
            self._chat_ctx = chat_ctx
            self._output_queue = output_queue
            self._fnc_ctx = fnc_ctx

        async def on_text_delta(self, delta: TextDelta, snapshot: Text):
            self._output_queue.put_nowait(
                llm.ChatChunk(
                    choices=[
                        llm.Choice(
                            delta=llm.ChoiceDelta(role="assistant", content=delta.value)
                        )
                    ]
                )
            )

        async def on_tool_call_created(self, tool_call: ToolCall):
            if not self.current_run:
                logger.error("tool call created without run")
                return
            await self._llm._register_tool_call(tool_call.id, self.current_run.id)

        async def on_tool_call_done(
            self,
            tool_call: CodeInterpreterToolCall | FileSearchToolCall | FunctionToolCall,
        ) -> None:
            if tool_call.type == "code_interpreter":
                logger.warning("code interpreter tool call not yet implemented")
            elif tool_call.type == "file_search":
                logger.warning("file_search tool call not yet implemented")
            elif tool_call.type == "function":
                if not self._fnc_ctx:
                    logger.error("function tool called without function context")
                    return

                fnc = llm.FunctionCallInfo(
                    function_info=self._fnc_ctx.ai_functions[tool_call.function.name],
                    arguments=json.loads(tool_call.function.arguments),
                    tool_call_id=tool_call.id,
                    raw_arguments=tool_call.function.arguments,
                )

                self._llm_stream._function_calls_info.append(fnc)
                chunk = llm.ChatChunk(
                    choices=[
                        llm.Choice(
                            delta=llm.ChoiceDelta(role="assistant", tool_calls=[fnc]),
                            index=0,
                        )
                    ]
                )
                self._output_queue.put_nowait(chunk)

    def __init__(
        self,
        *,
        assistant_llm: AssistantLLM,
        client: AsyncClient,
        sync_openai_task: asyncio.Task[AssistantLoadOptions],
        chat_ctx: llm.ChatContext,
        fnc_ctx: llm.FunctionContext | None,
        temperature: float | None,
        on_file_uploaded: OnFileUploaded | None,
    ) -> None:
        super().__init__(chat_ctx=chat_ctx, fnc_ctx=fnc_ctx)
        self._llm = assistant_llm
        self._client = client
        self._temperature = temperature
        self._on_file_uploaded = on_file_uploaded

        # current function call that we're waiting for full completion (args are streamed)
        self._tool_call_id: str | None = None
        self._fnc_name: str | None = None
        self._fnc_raw_arguments: str | None = None
        self._output_queue = asyncio.Queue[Union[llm.ChatChunk, Exception, None]]()
        self._create_stream_task = asyncio.create_task(self._create_stream())
        self._sync_openai_task = sync_openai_task

        # Running stream is used to ensure that we only have one stream running at a time
        self._done_future: asyncio.Future[None] = asyncio.Future()

    async def _create_stream(self) -> None:
        # This function's complexity is due to the fact that we need to sync chat_ctx messages with OpenAI.
        # OpenAI also does not allow us to modify messages while a stream is running. So we need to make sure streams run
        # sequentially. The strategy is as follows:
        #
        # 1. ensure that we have a thread_id and assistant_id from OpenAI. This comes from the _sync_openai_task
        # 2. make sure all previous streams are done before starting a new one
        # 3. delete messages that are no longer in the chat_ctx but are still in OpenAI by using the OpenAI message id
        # 4. add new messages to OpenAI that are in the chat_ctx but not in OpenAI. We don't know the OpenAI message id yet
        #    so we create a random uuid (we call it the LiveKit message id) and set that in the metdata.
        # 5. start the stream and wait for it to finish
        # 6. get the OpenAI message ids for the messages we added to OpenAI by using the metadata
        # 7. Resolve the OpenAI message id with all messages that have a LiveKit message id.
        try:
            load_options = await self._sync_openai_task

            # The assistants api does not let us modify messages while a stream is running.
            # So we have to make sure previous streams are done before starting a new one.
            await asyncio.gather(*self._llm._done_futures)
            self._llm._done_futures.clear()
            self._llm._done_futures.append(self._done_future)

            # OpenAI required submitting tool call outputs manually. We iterate
            # tool outputs in the chat_ctx (from previous runs) and submit them
            # before continuing.
            for msg in self._chat_ctx.messages:
                if msg.role == "tool":
                    if not msg.tool_call_id:
                        logger.error("tool message without tool_call_id")
                        continue
                    if not isinstance(msg.content, str):
                        logger.error("tool message content is not str")
                        continue
                    await self._llm._submit_tool_call_result(
                        msg.tool_call_id, msg.content
                    )

            # At the chat_ctx level, create a map of thread_id to message_ids
            # This is used to keep track of which messages have been added to the thread
            # and which we may need to delete from OpenAI
            if OPENAI_MESSAGES_ADDED_KEY not in self._chat_ctx._metadata:
                self._chat_ctx._metadata[OPENAI_MESSAGES_ADDED_KEY] = dict()

            if (
                load_options.thread_id
                not in self._chat_ctx._metadata[OPENAI_MESSAGES_ADDED_KEY]
            ):
                self._chat_ctx._metadata[OPENAI_MESSAGES_ADDED_KEY][
                    load_options.thread_id
                ] = set()

            # Keep this handy to make the code more readable later on
            openai_addded_messages_set: set[str] = self._chat_ctx._metadata[
                OPENAI_MESSAGES_ADDED_KEY
            ][load_options.thread_id]

            # Keep track of messages that are no longer in the chat_ctx but are still in OpenAI
            # Note: Unfortuneately, this will add latency unfortunately. Usually it's just one message so we loop it but
            # it will create an extra round trip to OpenAI before being able to run inference.
            # TODO: parallelize it?
            for msg in self._chat_ctx.messages:
                msg_id = msg._metadata.get(OPENAI_MESSAGE_ID_KEY, {}).get(
                    load_options.thread_id
                )
                assert load_options.thread_id
                if msg_id and msg_id not in openai_addded_messages_set:
                    await self._client.beta.threads.messages.delete(
                        thread_id=load_options.thread_id,
                        message_id=msg_id,
                    )
                    logger.debug(
                        f"Deleted message '{msg_id}' in thread '{load_options.thread_id}'"
                    )
                    openai_addded_messages_set.remove(msg_id)

            # Upload any images in the chat_ctx that have not been uploaded to OpenAI
            for msg in self._chat_ctx.messages:
                if msg.role != "user":
                    continue

                if not isinstance(msg.content, list):
                    continue

                for cnt in msg.content:
                    if (
                        not isinstance(cnt, llm.ChatImage)
                        or OPENAI_FILE_ID_KEY in cnt._cache
                    ):
                        continue

                    if isinstance(cnt.image, str):
                        continue

                    file_obj = await self._upload_frame(
                        cnt.image, cnt.inference_width, cnt.inference_height
                    )
                    cnt._cache[OPENAI_FILE_ID_KEY] = file_obj.id
                    if self._on_file_uploaded:
                        self._on_file_uploaded(
                            OnFileUploadedInfo(
                                type="image",
                                original_file=cnt,
                                openai_file_object=file_obj,
                            )
                        )

            # Keep track of the new messages in the chat_ctx that we need to add to OpenAI
            additional_messages: list[AdditionalMessage] = []
            for msg in self._chat_ctx.messages:
                if msg.role != "user":
                    continue

                msg_id = str(uuid.uuid4())
                if OPENAI_MESSAGE_ID_KEY not in msg._metadata:
                    msg._metadata[OPENAI_MESSAGE_ID_KEY] = dict[str, str]()

                if LIVEKIT_MESSAGE_ID_KEY not in msg._metadata:
                    msg._metadata[LIVEKIT_MESSAGE_ID_KEY] = dict[str, str]()

                oai_msg_id_dict = msg._metadata[OPENAI_MESSAGE_ID_KEY]
                lk_msg_id_dict = msg._metadata[LIVEKIT_MESSAGE_ID_KEY]

                if load_options.thread_id not in oai_msg_id_dict:
                    converted_msg = build_oai_message(msg)
                    converted_msg["private_message_id"] = msg_id
                    additional_messages.append(
                        AdditionalMessage(
                            role="user",
                            content=converted_msg["content"],
                            metadata={LIVEKIT_MESSAGE_ID_KEY: msg_id},
                        )
                    )
                    lk_msg_id_dict[load_options.thread_id] = msg_id

            eh = AssistantLLMStream.EventHandler(
                llm=self._llm,
                output_queue=self._output_queue,
                chat_ctx=self._chat_ctx,
                fnc_ctx=self._fnc_ctx,
                llm_stream=self,
            )
            assert load_options.thread_id
            kwargs: dict[str, Any] = {
                "additional_messages": additional_messages,
                "thread_id": load_options.thread_id,
                "assistant_id": load_options.assistant_id,
                "event_handler": eh,
                "temperature": self._temperature,
            }
            if self._fnc_ctx:
                kwargs["tools"] = [
                    llm._oai_api.build_oai_function_description(f)
                    for f in self._fnc_ctx.ai_functions.values()
                ]

            async with self._client.beta.threads.runs.stream(**kwargs) as stream:
                await stream.until_done()

            await self._output_queue.put(None)

            # Populate the openai_message_id for the messages we added to OpenAI. Note, we do this after
            # sending None to close the iterator so that it is done in parellel with any users of
            # the stream. However, the next stream will not start until this is done.
            lk_to_oai_lookup = dict[str, str]()
            messages = await self._client.beta.threads.messages.list(
                thread_id=load_options.thread_id,
                limit=10,  # We could be smarter and make a more exact query, but this is probably fine
            )
            for oai_msg in messages.data:
                if oai_msg.metadata.get(LIVEKIT_MESSAGE_ID_KEY):  # type: ignore
                    lk_to_oai_lookup[oai_msg.metadata[LIVEKIT_MESSAGE_ID_KEY]] = (  # type: ignore
                        oai_msg.id
                    )

            for msg in self._chat_ctx.messages:
                if msg.role != "user":
                    continue
                oai_msg_id_dict = msg._metadata.get(OPENAI_MESSAGE_ID_KEY)
                lk_msg_id_dict = msg._metadata.get(LIVEKIT_MESSAGE_ID_KEY)
                if oai_msg_id_dict is None or lk_msg_id_dict is None:
                    continue

                lk_msg_id = lk_msg_id_dict.get(load_options.thread_id)
                if lk_msg_id and lk_msg_id in lk_to_oai_lookup:
                    oai_msg_id = lk_to_oai_lookup[lk_msg_id]
                    oai_msg_id_dict[load_options.thread_id] = oai_msg_id
                    openai_addded_messages_set.add(oai_msg_id)
                    # We don't need the LiveKit message id anymore
                    lk_msg_id_dict.pop(load_options.thread_id)

        except Exception as e:
            await self._output_queue.put(e)
        finally:
            self._done_future.set_result(None)

    async def _upload_frame(
        self,
        frame: rtc.VideoFrame,
        inference_width: int | None,
        inference_height: int | None,
    ):
        # inside our internal implementation, we allow to put extra metadata to
        # each ChatImage (avoid to reencode each time we do a chatcompletion request)
        opts = utils.images.EncodeOptions()
        if inference_width and inference_height:
            opts.resize_options = utils.images.ResizeOptions(
                width=inference_width,
                height=inference_height,
                strategy="center_aspect_fit",
            )

        encoded_data = utils.images.encode(frame, opts)
        fileObj = await self._client.files.create(
            file=("image.jpg", encoded_data),
            purpose="vision",
        )

        return fileObj

    async def __anext__(self):
        item = await self._output_queue.get()
        if item is None:
            raise StopAsyncIteration

        if isinstance(item, Exception):
            raise item

        return item

Ancestors

Class variables

var EventHandler

Inherited members

class AssistantLoadOptions (assistant_id: str, thread_id: str | None)

AssistantLoadOptions(assistant_id: 'str', thread_id: 'str | None')

Expand source code
@dataclass
class AssistantLoadOptions:
    assistant_id: str
    thread_id: str | None

Class variables

var assistant_id : str
var thread_id : str | None
class AssistantOptions (create_options: AssistantCreateOptions | None = None, load_options: AssistantLoadOptions | None = None)

Options for creating (on-the-fly) or loading an assistant. Only one of create_options or load_options should be set.

Expand source code
@dataclass
class AssistantOptions:
    """Options for creating (on-the-fly) or loading an assistant. Only one of create_options or load_options should be set."""

    create_options: AssistantCreateOptions | None = None
    load_options: AssistantLoadOptions | None = None

Class variables

var create_optionsAssistantCreateOptions | None
var load_optionsAssistantLoadOptions | None
class LLMOptions (model: str | ChatModels)

LLMOptions(model: 'str | ChatModels')

Expand source code
@dataclass
class LLMOptions:
    model: str | ChatModels

Class variables

var model : Union[str, Literal['gpt-4o', 'gpt-4o-2024-05-13', 'gpt-4o-mini', 'gpt-4o-mini-2024-07-18', 'gpt-4-turbo', 'gpt-4-turbo-2024-04-09', 'gpt-4-turbo-preview', 'gpt-4-0125-preview', 'gpt-4-1106-preview', 'gpt-4-vision-preview', 'gpt-4-1106-vision-preview', 'gpt-4', 'gpt-4-0314', 'gpt-4-0613', 'gpt-4-32k', 'gpt-4-32k-0314', 'gpt-4-32k-0613', 'gpt-3.5-turbo', 'gpt-3.5-turbo-16k', 'gpt-3.5-turbo-0301', 'gpt-3.5-turbo-0613', 'gpt-3.5-turbo-1106', 'gpt-3.5-turbo-16k-0613']]
class OnFileUploadedInfo (type: "Literal['image']", original_file: llm.ChatImage, openai_file_object: FileObject)

OnFileUploadedInfo(type: "Literal['image']", original_file: 'llm.ChatImage', openai_file_object: 'FileObject')

Expand source code
@dataclass
class OnFileUploadedInfo:
    type: Literal["image"]
    original_file: llm.ChatImage
    openai_file_object: FileObject

Class variables

var openai_file_object : openai.types.file_object.FileObject
var original_fileChatImage
var type : Literal['image']