Module livekit.agents

Sub-modules

livekit.agents.cli
livekit.agents.ipc
livekit.agents.llm
livekit.agents.multimodal
livekit.agents.pipeline
livekit.agents.stt
livekit.agents.tokenize
livekit.agents.transcription
livekit.agents.tts
livekit.agents.utils
livekit.agents.vad
livekit.agents.voice_assistant

Classes

class AssignmentTimeoutError (*args, **kwargs)

Raised when accepting a job but not receiving an assignment within the specified timeout. The server may have chosen another worker to handle this job.

Expand source code
class AssignmentTimeoutError(Exception):
    """Raised when accepting a job but not receiving an assignment within the specified timeout.
    The server may have chosen another worker to handle this job."""

    pass

Ancestors

  • builtins.Exception
  • builtins.BaseException
class AutoSubscribe (*args, **kwds)

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

Expand source code
class AutoSubscribe(str, Enum):
    SUBSCRIBE_ALL = "subscribe_all"
    SUBSCRIBE_NONE = "subscribe_none"
    AUDIO_ONLY = "audio_only"
    VIDEO_ONLY = "video_only"

Ancestors

  • builtins.str
  • enum.Enum

Class variables

var AUDIO_ONLY
var SUBSCRIBE_ALL
var SUBSCRIBE_NONE
var VIDEO_ONLY
class JobContext (*, proc: JobProcess, info: RunningJobInfo, room: rtc.Room, on_connect: Callable[[], None], on_shutdown: Callable[[str], None])
Expand source code
class JobContext:
    def __init__(
        self,
        *,
        proc: JobProcess,
        info: RunningJobInfo,
        room: rtc.Room,
        on_connect: Callable[[], None],
        on_shutdown: Callable[[str], None],
    ) -> None:
        self._proc = proc
        self._info = info
        self._room = room
        self._on_connect = on_connect
        self._on_shutdown = on_shutdown
        self._shutdown_callbacks: list[Callable[[], Coroutine[None, None, None]]] = []
        self._participant_entrypoints: list[
            Callable[[JobContext, rtc.RemoteParticipant], Coroutine[None, None, None]]
        ] = []
        self._participant_tasks = dict[Tuple[str, Callable], asyncio.Task[None]]()
        self._room.on("participant_connected", self._participant_available)

    @property
    def proc(self) -> JobProcess:
        """Returns the process running the job. Useful for storing process-specific state."""
        return self._proc

    @property
    def job(self) -> agent.Job:
        """Returns the current job that the worker is executing."""
        return self._info.job

    @property
    def room(self) -> rtc.Room:
        """The Room object is the main interface that the worker should interact with.

        When the entrypoint is called, the worker has not connected to the Room yet.
        Certain properties of Room would not be available before calling JobContext.connect()
        """
        return self._room

    @property
    def agent(self) -> rtc.LocalParticipant:
        return self._room.local_participant

    def add_shutdown_callback(
        self, callback: Callable[[], Coroutine[None, None, None]]
    ) -> None:
        self._shutdown_callbacks.append(callback)

    async def wait_for_participant(
        self, *, identity: str | None = None
    ) -> rtc.RemoteParticipant:
        """
        Returns a participant that matches the given identity. If identity is None, the first
        participant that joins the room will be returned.
        If the participant has already joined, the function will return immediately.
        """
        if not self._room.isconnected():
            raise RuntimeError("room is not connected")

        fut = asyncio.Future[rtc.RemoteParticipant]()

        for p in self._room.remote_participants.values():
            if (
                identity is None or p.identity == identity
            ) and p.kind != rtc.ParticipantKind.PARTICIPANT_KIND_AGENT:
                fut.set_result(p)
                break

        def _on_participant_connected(p: rtc.RemoteParticipant):
            if (
                identity is None or p.identity == identity
            ) and p.kind != rtc.ParticipantKind.PARTICIPANT_KIND_AGENT:
                self._room.off("participant_connected", _on_participant_connected)
                if not fut.done():
                    fut.set_result(p)

        if not fut.done():
            self._room.on("participant_connected", _on_participant_connected)

        return await fut

    async def connect(
        self,
        *,
        e2ee: rtc.E2EEOptions | None = None,
        auto_subscribe: AutoSubscribe = AutoSubscribe.SUBSCRIBE_ALL,
        rtc_config: rtc.RtcConfiguration | None = None,
    ) -> None:
        """Connect to the room. This method should be called only once.

        Args:
            e2ee: End-to-end encryption options. If provided, the Agent will utilize end-to-end encryption. Note: clients will also need to handle E2EE.
            auto_subscribe: Whether to automatically subscribe to tracks. Default is AutoSubscribe.SUBSCRIBE_ALL.
            rtc_config: Custom RTC configuration to use when connecting to the room.
        """
        room_options = rtc.RoomOptions(
            e2ee=e2ee,
            auto_subscribe=auto_subscribe == AutoSubscribe.SUBSCRIBE_ALL,
            rtc_config=rtc_config,
        )

        await self._room.connect(self._info.url, self._info.token, options=room_options)
        self._on_connect()
        for p in self._room.remote_participants.values():
            self._participant_available(p)

        _apply_auto_subscribe_opts(self._room, auto_subscribe)

    def shutdown(self, reason: str = "") -> None:
        self._on_shutdown(reason)

    def add_participant_entrypoint(
        self,
        entrypoint_fnc: Callable[
            [JobContext, rtc.RemoteParticipant], Coroutine[None, None, None]
        ],
    ):
        """Adds an entrypoint function to be run when a participant joins the room. In cases where
        the participant has already joined, the entrypoint will be run immediately. Multiple unique entrypoints can be
        added and they will each be run in parallel for each participant.
        """

        if entrypoint_fnc in self._participant_entrypoints:
            raise ValueError("entrypoints cannot be added more than once")

        self._participant_entrypoints.append(entrypoint_fnc)

    def _participant_available(self, p: rtc.RemoteParticipant) -> None:
        for coro in self._participant_entrypoints:
            if (p.identity, coro) in self._participant_tasks:
                logger.warning(
                    f"a participant has joined before a prior participant task matching the same identity has finished: '{p.identity}'"
                )
            task_name = f"part-entry-{p.identity}-{coro.__name__}"
            task = asyncio.create_task(coro(self, p), name=task_name)
            self._participant_tasks[(p.identity, coro)] = task
            task.add_done_callback(
                lambda _: self._participant_tasks.pop((p.identity, coro))
            )

Instance variables

prop agent : rtc.LocalParticipant
Expand source code
@property
def agent(self) -> rtc.LocalParticipant:
    return self._room.local_participant
prop job : agent.Job

Returns the current job that the worker is executing.

Expand source code
@property
def job(self) -> agent.Job:
    """Returns the current job that the worker is executing."""
    return self._info.job
prop procJobProcess

Returns the process running the job. Useful for storing process-specific state.

Expand source code
@property
def proc(self) -> JobProcess:
    """Returns the process running the job. Useful for storing process-specific state."""
    return self._proc
prop room : rtc.Room

The Room object is the main interface that the worker should interact with.

When the entrypoint is called, the worker has not connected to the Room yet. Certain properties of Room would not be available before calling JobContext.connect()

Expand source code
@property
def room(self) -> rtc.Room:
    """The Room object is the main interface that the worker should interact with.

    When the entrypoint is called, the worker has not connected to the Room yet.
    Certain properties of Room would not be available before calling JobContext.connect()
    """
    return self._room

Methods

def add_participant_entrypoint(self, entrypoint_fnc: Callable[[JobContext, rtc.RemoteParticipant], Coroutine[None, None, None]])

Adds an entrypoint function to be run when a participant joins the room. In cases where the participant has already joined, the entrypoint will be run immediately. Multiple unique entrypoints can be added and they will each be run in parallel for each participant.

def add_shutdown_callback(self, callback: Callable[[], Coroutine[None, None, None]]) ‑> None
async def connect(self, *, e2ee: rtc.E2EEOptions | None = None, auto_subscribe: AutoSubscribe = AutoSubscribe.SUBSCRIBE_ALL, rtc_config: rtc.RtcConfiguration | None = None) ‑> None

Connect to the room. This method should be called only once.

Args

e2ee
End-to-end encryption options. If provided, the Agent will utilize end-to-end encryption. Note: clients will also need to handle E2EE.
auto_subscribe
Whether to automatically subscribe to tracks. Default is AutoSubscribe.SUBSCRIBE_ALL.
rtc_config
Custom RTC configuration to use when connecting to the room.
def shutdown(self, reason: str = '') ‑> None
async def wait_for_participant(self, *, identity: str | None = None) ‑> RemoteParticipant

Returns a participant that matches the given identity. If identity is None, the first participant that joins the room will be returned. If the participant has already joined, the function will return immediately.

class JobExecutorType (*args, **kwds)

Create a collection of name/value pairs.

Example enumeration:

>>> class Color(Enum):
...     RED = 1
...     BLUE = 2
...     GREEN = 3

Access them by:

  • attribute access:

Color.RED

  • value lookup:

Color(1)

  • name lookup:

Color['RED']

Enumerations can be iterated over, and know how many members they have:

>>> len(Color)
3
>>> list(Color)
[<Color.RED: 1>, <Color.BLUE: 2>, <Color.GREEN: 3>]

Methods can be added to enumerations, and members can have their own attributes – see the documentation for details.

Expand source code
@unique
class JobExecutorType(Enum):
    PROCESS = "process"
    THREAD = "thread"

Ancestors

  • enum.Enum

Class variables

var PROCESS
var THREAD
class JobProcess (*, start_arguments: Any | None = None)
Expand source code
class JobProcess:
    def __init__(self, *, start_arguments: Any | None = None) -> None:
        self._mp_proc = mp.current_process()
        self._userdata: dict[str, Any] = {}
        self._start_arguments = start_arguments

    @property
    def pid(self) -> int | None:
        return self._mp_proc.pid

    @property
    def userdata(self) -> dict:
        return self._userdata

    @property
    def start_arguments(self) -> Any | None:
        return self._start_arguments

Instance variables

prop pid : int | None
Expand source code
@property
def pid(self) -> int | None:
    return self._mp_proc.pid
prop start_arguments : Any | None
Expand source code
@property
def start_arguments(self) -> Any | None:
    return self._start_arguments
prop userdata : dict
Expand source code
@property
def userdata(self) -> dict:
    return self._userdata
class JobRequest (*, job: agent.Job, on_reject: Callable[[], Coroutine[None, None, None]], on_accept: Callable[[JobAcceptArguments], Coroutine[None, None, None]])
Expand source code
class JobRequest:
    def __init__(
        self,
        *,
        job: agent.Job,
        on_reject: Callable[[], Coroutine[None, None, None]],
        on_accept: Callable[[JobAcceptArguments], Coroutine[None, None, None]],
    ) -> None:
        self._job = job
        self._lock = asyncio.Lock()
        self._on_reject = on_reject
        self._on_accept = on_accept

    @property
    def id(self) -> str:
        return self._job.id

    @property
    def job(self) -> agent.Job:
        return self._job

    @property
    def room(self) -> models.Room:
        return self._job.room

    @property
    def publisher(self) -> models.ParticipantInfo | None:
        return self._job.participant

    @property
    def agent_name(self) -> str:
        return self._job.agent_name

    async def reject(self) -> None:
        """Reject the job request. The job may be assigned to another worker"""
        await self._on_reject()

    async def accept(
        self,
        *,
        name: str = "",
        identity: str = "",
        metadata: str = "",
    ) -> None:
        """Accept the job request, and start the job if the LiveKit SFU assigns the job to our worker."""
        if not identity:
            identity = "agent-" + self.id

        accept_arguments = JobAcceptArguments(
            name=name,
            identity=identity,
            metadata=metadata,
        )

        await self._on_accept(accept_arguments)

Instance variables

prop agent_name : str
Expand source code
@property
def agent_name(self) -> str:
    return self._job.agent_name
prop id : str
Expand source code
@property
def id(self) -> str:
    return self._job.id
prop job : agent.Job
Expand source code
@property
def job(self) -> agent.Job:
    return self._job
prop publisher : models.ParticipantInfo | None
Expand source code
@property
def publisher(self) -> models.ParticipantInfo | None:
    return self._job.participant
prop room : models.Room
Expand source code
@property
def room(self) -> models.Room:
    return self._job.room

Methods

async def accept(self, *, name: str = '', identity: str = '', metadata: str = '') ‑> None

Accept the job request, and start the job if the LiveKit SFU assigns the job to our worker.

async def reject(self) ‑> None

Reject the job request. The job may be assigned to another worker

class Plugin (title: str, version: str, package: str, logger: logging.Logger | None = None)

Helper class that provides a standard way to create an ABC using inheritance.

Expand source code
class Plugin(ABC):
    registered_plugins: List["Plugin"] = []
    emitter: utils.EventEmitter[EventTypes] = utils.EventEmitter()
    lock = threading.Lock()

    # TODO(theomonnom): make logger mandatory once all plugins have been updated
    def __init__(
        self,
        title: str,
        version: str,
        package: str,
        logger: logging.Logger | None = None,
    ) -> None:
        self._title = title
        self._version = version
        self._package = package
        self._logger = logger

    @classmethod
    def register_plugin(cls, plugin: "Plugin") -> None:
        if threading.current_thread() != threading.main_thread():
            raise RuntimeError("Plugins must be registered on the main thread")

        cls.registered_plugins.append(plugin)
        cls.emitter.emit("plugin_registered", plugin)

    # plugin can implement an optional download_files method
    def download_files(self) -> None: ...

    @property
    def package(self) -> str:
        return self._package

    @property
    def title(self) -> str:
        return self._title

    @property
    def version(self) -> str:
        return self._version

    @property
    def logger(self) -> logging.Logger | None:
        return self._logger

Ancestors

  • abc.ABC

Subclasses

  • livekit.plugins.anthropic.AnthropicPlugin
  • livekit.plugins.azure.AzurePlugin
  • livekit.plugins.cartesia.CartesiaPlugin
  • livekit.plugins.deepgram.DeepgramPlugin
  • livekit.plugins.elevenlabs.ElevenLabsPlugin
  • livekit.plugins.google.GooglePlugin
  • livekit.plugins.nltk.NltkPlugin
  • livekit.plugins.openai.OpenAIPlugin

Class variables

var emitterEventEmitter[typing.Literal['plugin_registered']]
var lock
var registered_plugins : List[livekit.agents.plugin.Plugin]

Static methods

def register_plugin(plugin: "'Plugin'") ‑> None

Instance variables

prop logger : logging.Logger | None
Expand source code
@property
def logger(self) -> logging.Logger | None:
    return self._logger
prop package : str
Expand source code
@property
def package(self) -> str:
    return self._package
prop title : str
Expand source code
@property
def title(self) -> str:
    return self._title
prop version : str
Expand source code
@property
def version(self) -> str:
    return self._version

Methods

def download_files(self) ‑> None
class Worker (opts: WorkerOptions, *, devmode: bool = True, loop: asyncio.AbstractEventLoop | None = None)

Abstract base class for generic types.

On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::

class Mapping[KT, VT]:
    def __getitem__(self, key: KT) -> VT:
        ...
    # Etc.

On older versions of Python, however, generic classes have to explicitly inherit from Generic.

After a class has been declared to be generic, it can then be used as follows::

def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT:
    try:
        return mapping[key]
    except KeyError:
        return default

Initialize a new instance of EventEmitter.

Expand source code
class Worker(utils.EventEmitter[EventTypes]):
    def __init__(
        self,
        opts: WorkerOptions,
        *,
        devmode: bool = True,
        loop: asyncio.AbstractEventLoop | None = None,
    ) -> None:
        super().__init__()
        opts.ws_url = opts.ws_url or os.environ.get("LIVEKIT_URL") or ""
        opts.api_key = opts.api_key or os.environ.get("LIVEKIT_API_KEY") or ""
        opts.api_secret = opts.api_secret or os.environ.get("LIVEKIT_API_SECRET") or ""

        if not opts.ws_url:
            raise ValueError(
                "ws_url is required, or add LIVEKIT_URL in your environment"
            )

        if not opts.api_key:
            raise ValueError(
                "api_key is required, or add LIVEKIT_API_KEY in your environment"
            )

        if not opts.api_secret:
            raise ValueError(
                "api_secret is required, or add LIVEKIT_API_SECRET in your environment"
            )

        self._opts = opts
        self._loop = loop or asyncio.get_event_loop()

        self._id = "unregistered"
        self._closed, self._draining, self._connecting = True, False, False
        self._tasks = set[asyncio.Task[Any]]()
        self._pending_assignments: dict[str, asyncio.Future[agent.JobAssignment]] = {}
        self._close_future: asyncio.Future[None] | None = None
        self._msg_chan = utils.aio.Chan[agent.WorkerMessage](128, loop=self._loop)
        self._devmode = devmode

        # using spawn context for all platforms. We may have further optimizations for
        # Linux with forkserver, but for now, this is the safest option
        mp_ctx = mp.get_context("spawn")
        self._proc_pool = ipc.proc_pool.ProcPool(
            initialize_process_fnc=opts.prewarm_fnc,
            job_entrypoint_fnc=opts.entrypoint_fnc,
            num_idle_processes=_WorkerEnvOption.getvalue(
                opts.num_idle_processes, self._devmode
            ),
            loop=self._loop,
            job_executor_type=opts.job_executor_type,
            mp_ctx=mp_ctx,
            initialize_timeout=opts.initialize_process_timeout,
            close_timeout=opts.shutdown_process_timeout,
        )

        self._api: api.LiveKitAPI | None = None
        self._http_session: aiohttp.ClientSession | None = None
        self._http_server = http_server.HttpServer(
            opts.host,
            _WorkerEnvOption.getvalue(opts.port, self._devmode),
            loop=self._loop,
        )

        self._main_task: asyncio.Task[None] | None = None

    async def run(self):
        if not self._closed:
            raise Exception("worker is already running")

        logger.info(
            "starting worker",
            extra={"version": __version__, "rtc-version": rtc.__version__},
        )

        self._closed = False
        self._proc_pool.start()
        self._api = api.LiveKitAPI(
            self._opts.ws_url, self._opts.api_key, self._opts.api_secret
        )
        self._http_session = aiohttp.ClientSession()
        self._close_future = asyncio.Future(loop=self._loop)

        self._main_task = asyncio.create_task(self._worker_task(), name="worker_task")
        tasks = [
            self._main_task,
            asyncio.create_task(self._http_server.run(), name="http_server"),
        ]
        try:
            await asyncio.gather(*tasks)
        finally:
            await utils.aio.gracefully_cancel(*tasks)
            if not self._close_future.done():
                self._close_future.set_result(None)

    @property
    def id(self) -> str:
        return self._id

    @property
    def active_jobs(self) -> list[RunningJobInfo]:
        return [
            proc.running_job for proc in self._proc_pool.processes if proc.running_job
        ]

    async def drain(self, timeout: int | None = None) -> None:
        """When timeout isn't None, it will raise asyncio.TimeoutError if the processes didn't finish in time."""
        if self._draining:
            return

        logger.info("draining worker", extra={"id": self.id, "timeout": timeout})
        self._draining = True

        # exit the queue
        update_worker = agent.WorkerMessage(
            update_worker=agent.UpdateWorkerStatus(status=agent.WorkerStatus.WS_FULL)
        )
        await self._queue_msg(update_worker)

        async def _join_jobs():
            for proc in self._proc_pool.processes:
                if proc.running_job:
                    await proc.join()

        if timeout:
            await asyncio.wait_for(
                _join_jobs(), timeout
            )  # raises asyncio.TimeoutError on timeout
        else:
            await _join_jobs()

    async def simulate_job(
        self, room: str, participant_identity: str | None = None
    ) -> None:
        assert self._api is not None

        room_obj = await self._api.room.create_room(api.CreateRoomRequest(name=room))
        participant = None
        if participant_identity:
            participant = await self._api.room.get_participant(
                api.RoomParticipantIdentity(room=room, identity=participant_identity)
            )

        msg = agent.WorkerMessage()
        msg.simulate_job.room.CopyFrom(room_obj)
        if participant:
            msg.simulate_job.participant.CopyFrom(participant)

        await self._queue_msg(msg)

    async def aclose(self) -> None:
        if self._closed:
            if self._close_future is not None:
                await self._close_future
            return

        logger.info("shutting down worker", extra={"id": self.id})

        assert self._close_future is not None
        assert self._http_session is not None
        assert self._api is not None
        assert self._main_task is not None

        self._closed = True
        self._main_task.cancel()

        await self._proc_pool.aclose()
        await self._http_session.close()
        await self._http_server.aclose()
        await self._api.aclose()

        await asyncio.gather(*self._tasks, return_exceptions=True)

        # await asyncio.sleep(0.25)  # see https://github.com/aio-libs/aiohttp/issues/1925
        self._msg_chan.close()
        await self._close_future

    async def _queue_msg(self, msg: agent.WorkerMessage) -> None:
        """_queue_msg raises aio.ChanClosed when the worker is closing/closed"""
        if self._connecting:
            which = msg.WhichOneof("message")
            if which == "update_worker":
                return
            elif which == "ping":
                return

        await self._msg_chan.send(msg)

    async def _worker_task(self) -> None:
        assert self._http_session is not None

        retry_count = 0
        ws: aiohttp.ClientWebSocketResponse | None = None
        while not self._closed:
            try:
                self._connecting = True
                join_jwt = (
                    api.AccessToken(self._opts.api_key, self._opts.api_secret)
                    .with_grants(api.VideoGrants(agent=True))
                    .to_jwt()
                )

                headers = {"Authorization": f"Bearer {join_jwt}"}

                parse = urlparse(self._opts.ws_url)
                scheme = parse.scheme
                if scheme.startswith("http"):
                    scheme = scheme.replace("http", "ws")

                path_parts = [f"{scheme}://{parse.netloc}", parse.path, "/agent"]
                agent_url = reduce(urljoin, path_parts)

                ws = await self._http_session.ws_connect(
                    agent_url, headers=headers, autoping=True
                )

                retry_count = 0

                # register the worker
                req = agent.WorkerMessage()
                req.register.type = self._opts.worker_type.value
                req.register.allowed_permissions.CopyFrom(
                    models.ParticipantPermission(
                        can_publish=self._opts.permissions.can_publish,
                        can_subscribe=self._opts.permissions.can_subscribe,
                        can_publish_data=self._opts.permissions.can_publish_data,
                        can_update_metadata=self._opts.permissions.can_update_metadata,
                        can_publish_sources=self._opts.permissions.can_publish_sources,
                        hidden=self._opts.permissions.hidden,
                        agent=True,
                    )
                )
                req.register.agent_name = self._opts.agent_name
                req.register.version = __version__
                await ws.send_bytes(req.SerializeToString())

                # wait for the register response before running this connection
                first_msg_b = await ws.receive_bytes()
                msg = agent.ServerMessage()
                msg.ParseFromString(first_msg_b)

                if not msg.HasField("register"):
                    raise Exception("expected register response as first message")

                self._handle_register(msg.register)
                self._connecting = False

                await self._run_ws(ws)
            except Exception as e:
                if self._closed:
                    break

                if retry_count >= self._opts.max_retry:
                    raise RuntimeError(
                        f"failed to connect to livekit after {retry_count} attempts",
                    )

                retry_delay = min(retry_count * 2, 10)
                retry_count += 1

                logger.warning(
                    f"failed to connect to livekit, retrying in {retry_delay}s: {e}"
                )
                await asyncio.sleep(retry_delay)
            finally:
                if ws is not None:
                    await ws.close()

    async def _run_ws(self, ws: aiohttp.ClientWebSocketResponse):
        closing_ws = False

        async def _load_task():
            """periodically check load and update worker status"""
            interval = utils.aio.interval(UPDATE_LOAD_INTERVAL)
            current_status = agent.WorkerStatus.WS_AVAILABLE
            while True:
                await interval.tick()

                old_status = current_status
                current_load = await asyncio.get_event_loop().run_in_executor(
                    None, self._opts.load_fnc
                )

                is_full = current_load >= _WorkerEnvOption.getvalue(
                    self._opts.load_threshold, self._devmode
                )
                currently_available = not is_full and not self._draining

                current_status = (
                    agent.WorkerStatus.WS_AVAILABLE
                    if currently_available
                    else agent.WorkerStatus.WS_FULL
                )

                update = agent.UpdateWorkerStatus(
                    load=current_load, status=current_status
                )

                # only log if status has changed
                if old_status != current_status and not self._draining:
                    extra = {
                        "load": current_load,
                        "threshold": self._opts.load_threshold,
                    }
                    if is_full:
                        logger.info(
                            "worker is at full capacity, marking as unavailable",
                            extra=extra,
                        )
                    else:
                        logger.info(
                            "worker is below capacity, marking as available",
                            extra=extra,
                        )

                msg = agent.WorkerMessage(update_worker=update)
                with contextlib.suppress(utils.aio.ChanClosed):
                    await self._queue_msg(msg)

        async def _send_task():
            nonlocal closing_ws
            while True:
                try:
                    msg = await self._msg_chan.recv()
                    await ws.send_bytes(msg.SerializeToString())
                except utils.aio.ChanClosed:
                    closing_ws = True
                    return

        async def _recv_task():
            nonlocal closing_ws
            while True:
                msg = await ws.receive()
                if msg.type in (
                    aiohttp.WSMsgType.CLOSE,
                    aiohttp.WSMsgType.CLOSED,
                    aiohttp.WSMsgType.CLOSING,
                ):
                    if closing_ws:
                        return

                    raise Exception("worker connection closed unexpectedly")

                if msg.type != aiohttp.WSMsgType.BINARY:
                    logger.warning("unexpected message type: %s", msg.type)
                    continue

                data = msg.data
                msg = agent.ServerMessage()
                msg.ParseFromString(data)
                which = msg.WhichOneof("message")
                if which == "availability":
                    self._handle_availability(msg.availability)
                elif which == "assignment":
                    self._handle_assignment(msg.assignment)
                elif which == "termination":
                    user_task = self._loop.create_task(
                        self._handle_termination(msg.termination),
                        name="agent_job_termination",
                    )
                    self._tasks.add(user_task)
                    user_task.add_done_callback(self._tasks.discard)

        tasks = [
            asyncio.create_task(_load_task()),
            asyncio.create_task(_send_task()),
            asyncio.create_task(_recv_task()),
        ]
        try:
            await asyncio.gather(*tasks)
        finally:
            await utils.aio.gracefully_cancel(*tasks)

    async def _reload_jobs(self, jobs: list[RunningJobInfo]) -> None:
        for aj in jobs:
            logger.log(
                DEV_LEVEL,
                "reloading job",
                extra={"job_id": aj.job.id, "agent_name": aj.job.agent_name},
            )
            url = self._opts.ws_url

            # take the original jwt token and extend it while keeping all the same data that was generated
            # by the SFU for the original join token.
            original_token = aj.token
            decoded = jwt.decode(
                original_token, self._opts.api_secret, algorithms=["HS256"]
            )
            decoded["exp"] = (
                int(datetime.datetime.now(datetime.timezone.utc).timestamp()) + 3600
            )
            running_info = RunningJobInfo(
                accept_arguments=aj.accept_arguments,
                job=aj.job,
                url=url,
                token=jwt.encode(decoded, self._opts.api_secret, algorithm="HS256"),
            )
            await self._proc_pool.launch_job(running_info)

    def _handle_register(self, reg: agent.RegisterWorkerResponse):
        self._id = reg.worker_id
        logger.info(
            "registered worker",
            extra={
                "id": reg.worker_id,
                "region": reg.server_info.region,
                "protocol": reg.server_info.protocol,
                "node_id": reg.server_info.node_id,
            },
        )
        self.emit("worker_registered", reg.worker_id, reg.server_info)

    def _handle_availability(self, msg: agent.AvailabilityRequest):
        task = self._loop.create_task(self._answer_availability(msg))
        self._tasks.add(task)
        task.add_done_callback(self._tasks.discard)

    async def _answer_availability(self, msg: agent.AvailabilityRequest):
        """Ask the user if they want to accept this job and forward the answer to the server.
        If we get the job assigned, we start a new process."""

        answered = False

        async def _on_reject() -> None:
            nonlocal answered
            answered = True

            availability_resp = agent.WorkerMessage()
            availability_resp.availability.job_id = msg.job.id
            availability_resp.availability.available = False
            await self._queue_msg(availability_resp)

        async def _on_accept(args: JobAcceptArguments) -> None:
            nonlocal answered
            answered = True

            availability_resp = agent.WorkerMessage()
            availability_resp.availability.job_id = msg.job.id
            availability_resp.availability.available = True
            availability_resp.availability.participant_identity = args.identity
            availability_resp.availability.participant_name = args.name
            availability_resp.availability.participant_metadata = args.metadata
            await self._queue_msg(availability_resp)

            wait_assignment = asyncio.Future[agent.JobAssignment]()
            self._pending_assignments[job_req.id] = wait_assignment

            # the job was accepted by the user, wait for the server assignment
            try:
                await asyncio.wait_for(wait_assignment, ASSIGNMENT_TIMEOUT)
            except asyncio.TimeoutError:
                logger.warning(
                    f"assignment for job {job_req.id} timed out",
                    extra={"job_request": job_req, "agent_name": self._opts.agent_name},
                )
                raise AssignmentTimeoutError()

            job_assign = wait_assignment.result()
            running_info = RunningJobInfo(
                accept_arguments=args,
                job=msg.job,
                url=job_assign.url or self._opts.ws_url,
                token=job_assign.token,
            )

            await self._proc_pool.launch_job(running_info)

        job_req = JobRequest(job=msg.job, on_reject=_on_reject, on_accept=_on_accept)

        logger.info(
            "received job request",
            extra={
                "job_id": msg.job.id,
                "dispatch_id": msg.job.dispatch_id,
                "room_name": msg.job.room.name,
                "agent_name": self._opts.agent_name,
                "resuming": msg.resuming,
            },
        )

        @utils.log_exceptions(logger=logger)
        async def _job_request_task():
            try:
                await self._opts.request_fnc(job_req)
            except Exception:
                logger.exception(
                    "job_request_fnc failed",
                    extra={"job_request": job_req, "agent_name": self._opts.agent_name},
                )

            if not answered:
                logger.warning(
                    "no answer was given inside the job_request_fnc, automatically rejecting the job",
                    extra={"job_request": job_req, "agent_name": self._opts.agent_name},
                )
                await _on_reject()

        user_task = self._loop.create_task(_job_request_task(), name="job_request")
        self._tasks.add(user_task)
        user_task.add_done_callback(self._tasks.discard)

    def _handle_assignment(self, assignment: agent.JobAssignment):
        if assignment.job.id in self._pending_assignments:
            with contextlib.suppress(asyncio.InvalidStateError):
                fut = self._pending_assignments.pop(assignment.job.id)
                fut.set_result(assignment)
        else:
            logger.warning(
                "received assignment for an unknown job",
                extra={"job": assignment.job, "agent_name": self._opts.agent_name},
            )

    async def _handle_termination(self, msg: agent.JobTermination):
        proc = self._proc_pool.get_by_job_id(msg.job_id)
        if not proc:
            # safe to ignore
            return
        await proc.aclose()

Ancestors

Instance variables

prop active_jobs : list[RunningJobInfo]
Expand source code
@property
def active_jobs(self) -> list[RunningJobInfo]:
    return [
        proc.running_job for proc in self._proc_pool.processes if proc.running_job
    ]
prop id : str
Expand source code
@property
def id(self) -> str:
    return self._id

Methods

async def aclose(self) ‑> None
async def drain(self, timeout: int | None = None) ‑> None

When timeout isn't None, it will raise asyncio.TimeoutError if the processes didn't finish in time.

async def run(self)
async def simulate_job(self, room: str, participant_identity: str | None = None) ‑> None

Inherited members

class WorkerOptions (entrypoint_fnc: Callable[[JobContext], Awaitable[None]], request_fnc: Callable[[JobRequest], Awaitable[None]] = <function _default_request_fnc>, prewarm_fnc: Callable[[JobProcess], Any] = <function _default_initialize_process_fnc>, load_fnc: Callable[[], float] = <bound method _DefaultLoadCalc.get_load of <class 'livekit.agents.worker._DefaultLoadCalc'>>, job_executor_type: JobExecutorType = JobExecutorType.PROCESS, load_threshold: float | _WorkerEnvOption[float] = _WorkerEnvOption(dev_default=inf, prod_default=0.75), num_idle_processes: int | _WorkerEnvOption[int] = _WorkerEnvOption(dev_default=0, prod_default=3), shutdown_process_timeout: float = 60.0, initialize_process_timeout: float = 10.0, permissions: WorkerPermissions = <factory>, agent_name: str = '', worker_type: WorkerType = WorkerType.ROOM, max_retry: int = 16, ws_url: str = 'ws://localhost:7880', api_key: str | None = None, api_secret: str | None = None, host: str = '', port: int | _WorkerEnvOption[int] = _WorkerEnvOption(dev_default=0, prod_default=8081))

WorkerOptions(entrypoint_fnc: 'Callable[[JobContext], Awaitable[None]]', request_fnc: 'Callable[[JobRequest], Awaitable[None]]' = , prewarm_fnc: 'Callable[[JobProcess], Any]' = , load_fnc: 'Callable[[], float]' = >, job_executor_type: 'JobExecutorType' = , load_threshold: 'float | _WorkerEnvOption[float]' = _WorkerEnvOption(dev_default=inf, prod_default=0.75), num_idle_processes: 'int | _WorkerEnvOption[int]' = _WorkerEnvOption(dev_default=0, prod_default=3), shutdown_process_timeout: 'float' = 60.0, initialize_process_timeout: 'float' = 10.0, permissions: 'WorkerPermissions' = , agent_name: 'str' = '', worker_type: 'WorkerType' = , max_retry: 'int' = 16, ws_url: 'str' = 'ws://localhost:7880', api_key: 'str | None' = None, api_secret: 'str | None' = None, host: 'str' = '', port: 'int | _WorkerEnvOption[int]' = _WorkerEnvOption(dev_default=0, prod_default=8081))

Expand source code
@dataclass
class WorkerOptions:
    entrypoint_fnc: Callable[[JobContext], Awaitable[None]]
    """Entrypoint function that will be called when a job is assigned to this worker."""
    request_fnc: Callable[[JobRequest], Awaitable[None]] = _default_request_fnc
    """Inspect the request and decide if the current worker should handle it.

    When left empty, all jobs are accepted."""
    prewarm_fnc: Callable[[JobProcess], Any] = _default_initialize_process_fnc
    """A function to perform any necessary initialization before the job starts."""
    load_fnc: Callable[[], float] = _DefaultLoadCalc.get_load
    """Called to determine the current load of the worker. Should return a value between 0 and 1."""
    job_executor_type: JobExecutorType = _default_job_executor_type
    """Which executor to use to run jobs. (currently thread or process are supported)"""
    load_threshold: float | _WorkerEnvOption[float] = _WorkerEnvOption(
        dev_default=math.inf, prod_default=0.75
    )
    """When the load exceeds this threshold, the worker will be marked as unavailable.
    
    Defaults to 0.75 on "production" mode, and is disabled in "development" mode.
    """
    num_idle_processes: int | _WorkerEnvOption[int] = _WorkerEnvOption(
        dev_default=0, prod_default=3
    )
    """Number of idle processes to keep warm."""
    shutdown_process_timeout: float = 60.0
    """Maximum amount of time to wait for a job to shut down gracefully"""
    initialize_process_timeout: float = 10.0
    """Maximum amount of time to wait for a process to initialize/prewarm"""
    permissions: WorkerPermissions = field(default_factory=WorkerPermissions)
    """Permissions that the agent should join the room with."""
    agent_name: str = ""
    """Agent name can be used when multiple agents are required to join the same room. The LiveKit SFU will dispatch jobs to unique agent_name workers independently."""
    worker_type: WorkerType = WorkerType.ROOM
    """Whether to spin up an agent for each room or publisher."""
    max_retry: int = 16
    """Maximum number of times to retry connecting to LiveKit."""
    ws_url: str = "ws://localhost:7880"
    """URL to connect to the LiveKit server.

    By default it uses ``LIVEKIT_URL`` from environment"""
    api_key: str | None = None
    """API key to authenticate with LiveKit.

    By default it uses ``LIVEKIT_API_KEY`` from environment"""
    api_secret: str | None = None
    """API secret to authenticate with LiveKit.

    By default it uses ``LIVEKIT_API_SECRET`` from environment"""
    host: str = ""  # default to all interfaces
    port: int | _WorkerEnvOption[int] = _WorkerEnvOption(
        dev_default=0, prod_default=8081
    )
    """Port for local HTTP server to listen on.

    The HTTP server is used as a health check endpoint.
    """

Class variables

var agent_name : str

Agent name can be used when multiple agents are required to join the same room. The LiveKit SFU will dispatch jobs to unique agent_name workers independently.

var api_key : str | None

API key to authenticate with LiveKit.

By default it uses LIVEKIT_API_KEY from environment

var api_secret : str | None

API secret to authenticate with LiveKit.

By default it uses LIVEKIT_API_SECRET from environment

var entrypoint_fnc : Callable[[livekit.agents.job.JobContext], Awaitable[None]]

Entrypoint function that will be called when a job is assigned to this worker.

var host : str
var initialize_process_timeout : float

Maximum amount of time to wait for a process to initialize/prewarm

var job_executor_type : livekit.agents.job.JobExecutorType

Which executor to use to run jobs. (currently thread or process are supported)

var load_threshold : Union[float, livekit.agents.worker._WorkerEnvOption[float]]

When the load exceeds this threshold, the worker will be marked as unavailable.

Defaults to 0.75 on "production" mode, and is disabled in "development" mode.

var max_retry : int

Maximum number of times to retry connecting to LiveKit.

var num_idle_processes : Union[int, livekit.agents.worker._WorkerEnvOption[int]]

Number of idle processes to keep warm.

var permissions : livekit.agents.worker.WorkerPermissions

Permissions that the agent should join the room with.

var port : Union[int, livekit.agents.worker._WorkerEnvOption[int]]

Port for local HTTP server to listen on.

The HTTP server is used as a health check endpoint.

var shutdown_process_timeout : float

Maximum amount of time to wait for a job to shut down gracefully

var worker_type : livekit.agents.worker.WorkerType

Whether to spin up an agent for each room or publisher.

var ws_url : str

URL to connect to the LiveKit server.

By default it uses LIVEKIT_URL from environment

Static methods

def load_fnc() ‑> float

Methods

def prewarm_fnc(proc: JobProcess) ‑> Any
async def request_fnc(ctx: JobRequest) ‑> None
class WorkerPermissions (can_publish: bool = True, can_subscribe: bool = True, can_publish_data: bool = True, can_update_metadata: bool = True, can_publish_sources: list[models.TrackSource] = <factory>, hidden: bool = False)

WorkerPermissions(can_publish: 'bool' = True, can_subscribe: 'bool' = True, can_publish_data: 'bool' = True, can_update_metadata: 'bool' = True, can_publish_sources: 'list[models.TrackSource]' = , hidden: 'bool' = False)

Expand source code
@dataclass
class WorkerPermissions:
    can_publish: bool = True
    can_subscribe: bool = True
    can_publish_data: bool = True
    can_update_metadata: bool = True
    can_publish_sources: list[models.TrackSource] = field(default_factory=list)
    hidden: bool = False

Class variables

var can_publish : bool
var can_publish_data : bool
var can_publish_sources : list[]
var can_subscribe : bool
var can_update_metadata : bool
var hidden : bool
class WorkerType (*args, **kwds)

Create a collection of name/value pairs.

Example enumeration:

>>> class Color(Enum):
...     RED = 1
...     BLUE = 2
...     GREEN = 3

Access them by:

  • attribute access:

Color.RED

  • value lookup:

Color(1)

  • name lookup:

Color['RED']

Enumerations can be iterated over, and know how many members they have:

>>> len(Color)
3
>>> list(Color)
[<Color.RED: 1>, <Color.BLUE: 2>, <Color.GREEN: 3>]

Methods can be added to enumerations, and members can have their own attributes – see the documentation for details.

Expand source code
class WorkerType(Enum):
    ROOM = agent.JobType.JT_ROOM
    PUBLISHER = agent.JobType.JT_PUBLISHER

Ancestors

  • enum.Enum

Class variables

var PUBLISHER
var ROOM