Module livekit.agents

Sub-modules

livekit.agents.cli
livekit.agents.ipc
livekit.agents.llm
livekit.agents.metrics
livekit.agents.multimodal
livekit.agents.pipeline
livekit.agents.stt
livekit.agents.tokenize
livekit.agents.transcription
livekit.agents.tts
livekit.agents.utils
livekit.agents.vad
livekit.agents.voice_assistant

Classes

class APIConnectOptions (max_retry: int = 3, retry_interval: float = 5.0, timeout: float = 10.0)
Expand source code
@dataclass(frozen=True)
class APIConnectOptions:
    max_retry: int = 3
    """
    Maximum number of retries to connect to the API.
    """

    retry_interval: float = 5.0
    """
    Interval between retries to connect to the API in seconds.
    """

    timeout: float = 10.0
    """
    Timeout for connecting to the API in seconds.
    """

    def __post_init__(self):
        if self.max_retry < 0:
            raise ValueError("max_retry must be greater than or equal to 0")

        if self.retry_interval < 0:
            raise ValueError("retry_interval must be greater than or equal to 0")

        if self.timeout < 0:
            raise ValueError("timeout must be greater than or equal to 0")

APIConnectOptions(max_retry: int = 3, retry_interval: float = 5.0, timeout: float = 10.0)

Class variables

var max_retry : int

Maximum number of retries to connect to the API.

var retry_interval : float

Interval between retries to connect to the API in seconds.

var timeout : float

Timeout for connecting to the API in seconds.

class APIConnectionError (message: str = 'Connection error.')
Expand source code
class APIConnectionError(APIError):
    """Raised when an API request failed due to a connection error."""

    def __init__(self, message: str = "Connection error.") -> None:
        super().__init__(message, body=None)

Raised when an API request failed due to a connection error.

Ancestors

  • livekit.agents._exceptions.APIError
  • builtins.Exception
  • builtins.BaseException

Subclasses

  • livekit.agents._exceptions.APITimeoutError
class APIError (message: str, *, body: object | None)
Expand source code
class APIError(Exception):
    """Raised when an API request failed.
    This is used on our TTS/STT/LLM plugins."""

    message: str
    """
    The error message returned by the API.
    """

    body: object | None
    """The API response body, if available.

    
    If the API returned a valid json, the body will contains
    the decodede result.
    """

    def __init__(self, message: str, *, body: object | None) -> None:
        super().__init__(message)

        self.message = message
        self.body = body

Raised when an API request failed. This is used on our TTS/STT/LLM plugins.

Ancestors

  • builtins.Exception
  • builtins.BaseException

Subclasses

  • livekit.agents._exceptions.APIConnectionError
  • livekit.agents._exceptions.APIStatusError

Class variables

var body : object | None

The API response body, if available.

If the API returned a valid json, the body will contains the decodede result.

var message : str

The error message returned by the API.

class APIStatusError (message: str, *, status_code: int, request_id: str | None, body: object | None)
Expand source code
class APIStatusError(APIError):
    """Raised when an API response has a status code of 4xx or 5xx."""

    status_code: int
    """The status code of the API response."""

    request_id: str | None
    """The request ID of the API response, if available."""

    def __init__(
        self,
        message: str,
        *,
        status_code: int,
        request_id: str | None,
        body: object | None,
    ) -> None:
        super().__init__(message, body=body)

        self.status_code = status_code
        self.request_id = request_id

Raised when an API response has a status code of 4xx or 5xx.

Ancestors

  • livekit.agents._exceptions.APIError
  • builtins.Exception
  • builtins.BaseException

Class variables

var request_id : str | None

The request ID of the API response, if available.

var status_code : int

The status code of the API response.

class APITimeoutError (message: str = 'Request timed out.')
Expand source code
class APITimeoutError(APIConnectionError):
    """Raised when an API request timed out."""

    def __init__(self, message: str = "Request timed out.") -> None:
        super().__init__(message)

Raised when an API request timed out.

Ancestors

  • livekit.agents._exceptions.APIConnectionError
  • livekit.agents._exceptions.APIError
  • builtins.Exception
  • builtins.BaseException
class AssignmentTimeoutError (*args, **kwargs)
Expand source code
class AssignmentTimeoutError(Exception):
    """Raised when accepting a job but not receiving an assignment within the specified timeout.
    The server may have chosen another worker to handle this job."""

    pass

Raised when accepting a job but not receiving an assignment within the specified timeout. The server may have chosen another worker to handle this job.

Ancestors

  • builtins.Exception
  • builtins.BaseException
class AutoSubscribe (*args, **kwds)
Expand source code
class AutoSubscribe(str, Enum):
    SUBSCRIBE_ALL = "subscribe_all"
    SUBSCRIBE_NONE = "subscribe_none"
    AUDIO_ONLY = "audio_only"
    VIDEO_ONLY = "video_only"

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

Ancestors

  • builtins.str
  • enum.Enum

Class variables

var AUDIO_ONLY
var SUBSCRIBE_ALL
var SUBSCRIBE_NONE
var VIDEO_ONLY
class JobContext (*,
proc: JobProcess,
info: RunningJobInfo,
room: rtc.Room,
on_connect: Callable[[], None],
on_shutdown: Callable[[str], None],
inference_executor: InferenceExecutor)
Expand source code
class JobContext:
    def __init__(
        self,
        *,
        proc: JobProcess,
        info: RunningJobInfo,
        room: rtc.Room,
        on_connect: Callable[[], None],
        on_shutdown: Callable[[str], None],
        inference_executor: InferenceExecutor,
    ) -> None:
        self._proc = proc
        self._info = info
        self._room = room
        self._on_connect = on_connect
        self._on_shutdown = on_shutdown
        self._shutdown_callbacks: list[Callable[[], Coroutine[None, None, None]]] = []
        self._participant_entrypoints: list[
            Tuple[
                Callable[
                    [JobContext, rtc.RemoteParticipant], Coroutine[None, None, None]
                ],
                list[rtc.ParticipantKind.ValueType] | rtc.ParticipantKind.ValueType,
            ]
        ] = []
        self._participant_tasks = dict[Tuple[str, Callable], asyncio.Task[None]]()
        self._room.on("participant_connected", self._participant_available)
        self._inf_executor = inference_executor

    @property
    def inference_executor(self) -> InferenceExecutor:
        return self._inf_executor

    @functools.cached_property
    def api(self) -> api.LiveKitAPI:
        return api.LiveKitAPI()

    @property
    def proc(self) -> JobProcess:
        """Returns the process running the job. Useful for storing process-specific state."""
        return self._proc

    @property
    def job(self) -> agent.Job:
        """Returns the current job that the worker is executing."""
        return self._info.job

    @property
    def room(self) -> rtc.Room:
        """The Room object is the main interface that the worker should interact with.

        When the entrypoint is called, the worker has not connected to the Room yet.
        Certain properties of Room would not be available before calling JobContext.connect()
        """
        return self._room

    @property
    def agent(self) -> rtc.LocalParticipant:
        return self._room.local_participant

    def add_shutdown_callback(
        self, callback: Callable[[], Coroutine[None, None, None]]
    ) -> None:
        self._shutdown_callbacks.append(callback)

    async def wait_for_participant(
        self,
        *,
        identity: str | None = None,
        kind: list[rtc.ParticipantKind.ValueType]
        | rtc.ParticipantKind.ValueType = DEFAULT_PARTICIPANT_KINDS,
    ) -> rtc.RemoteParticipant:
        """
        Returns a participant that matches the given identity. If identity is None, the first
        participant that joins the room will be returned.
        If the participant has already joined, the function will return immediately.
        """
        if not self._room.isconnected():
            raise RuntimeError("room is not connected")

        fut = asyncio.Future[rtc.RemoteParticipant]()

        def kind_match(p: rtc.RemoteParticipant) -> bool:
            if isinstance(kind, list):
                return p.kind in kind

            return p.kind == kind

        for p in self._room.remote_participants.values():
            if (identity is None or p.identity == identity) and kind_match(p):
                fut.set_result(p)
                break

        def _on_participant_connected(p: rtc.RemoteParticipant):
            if (identity is None or p.identity == identity) and kind_match(p):
                self._room.off("participant_connected", _on_participant_connected)
                if not fut.done():
                    fut.set_result(p)

        if not fut.done():
            self._room.on("participant_connected", _on_participant_connected)

        return await fut

    async def connect(
        self,
        *,
        e2ee: rtc.E2EEOptions | None = None,
        auto_subscribe: AutoSubscribe = AutoSubscribe.SUBSCRIBE_ALL,
        rtc_config: rtc.RtcConfiguration | None = None,
    ) -> None:
        """Connect to the room. This method should be called only once.

        Args:
            e2ee: End-to-end encryption options. If provided, the Agent will utilize end-to-end encryption. Note: clients will also need to handle E2EE.
            auto_subscribe: Whether to automatically subscribe to tracks. Default is AutoSubscribe.SUBSCRIBE_ALL.
            rtc_config: Custom RTC configuration to use when connecting to the room.
        """
        room_options = rtc.RoomOptions(
            e2ee=e2ee,
            auto_subscribe=auto_subscribe == AutoSubscribe.SUBSCRIBE_ALL,
            rtc_config=rtc_config,
        )

        await self._room.connect(self._info.url, self._info.token, options=room_options)
        self._on_connect()
        for p in self._room.remote_participants.values():
            self._participant_available(p)

        _apply_auto_subscribe_opts(self._room, auto_subscribe)

    def shutdown(self, reason: str = "") -> None:
        self._on_shutdown(reason)

    def add_participant_entrypoint(
        self,
        entrypoint_fnc: Callable[
            [JobContext, rtc.RemoteParticipant], Coroutine[None, None, None]
        ],
        *_,
        kind: list[rtc.ParticipantKind.ValueType]
        | rtc.ParticipantKind.ValueType = DEFAULT_PARTICIPANT_KINDS,
    ):
        """Adds an entrypoint function to be run when a participant joins the room. In cases where
        the participant has already joined, the entrypoint will be run immediately. Multiple unique entrypoints can be
        added and they will each be run in parallel for each participant.
        """

        if entrypoint_fnc in [e for (e, _) in self._participant_entrypoints]:
            raise ValueError("entrypoints cannot be added more than once")

        self._participant_entrypoints.append((entrypoint_fnc, kind))

    def _participant_available(self, p: rtc.RemoteParticipant) -> None:
        for coro, kind in self._participant_entrypoints:
            if isinstance(kind, list):
                if p.kind not in kind:
                    continue
            else:
                if p.kind != kind:
                    continue

            if (p.identity, coro) in self._participant_tasks:
                logger.warning(
                    f"a participant has joined before a prior participant task matching the same identity has finished: '{p.identity}'"
                )
            task_name = f"part-entry-{p.identity}-{coro.__name__}"
            task = asyncio.create_task(coro(self, p), name=task_name)
            self._participant_tasks[(p.identity, coro)] = task
            task.add_done_callback(
                lambda _: self._participant_tasks.pop((p.identity, coro))
            )

Instance variables

prop agent : rtc.LocalParticipant
Expand source code
@property
def agent(self) -> rtc.LocalParticipant:
    return self._room.local_participant
var api : api.LiveKitAPI
Expand source code
@functools.cached_property
def api(self) -> api.LiveKitAPI:
    return api.LiveKitAPI()
prop inference_executor : InferenceExecutor
Expand source code
@property
def inference_executor(self) -> InferenceExecutor:
    return self._inf_executor
prop job : agent.Job
Expand source code
@property
def job(self) -> agent.Job:
    """Returns the current job that the worker is executing."""
    return self._info.job

Returns the current job that the worker is executing.

prop procJobProcess
Expand source code
@property
def proc(self) -> JobProcess:
    """Returns the process running the job. Useful for storing process-specific state."""
    return self._proc

Returns the process running the job. Useful for storing process-specific state.

prop room : rtc.Room
Expand source code
@property
def room(self) -> rtc.Room:
    """The Room object is the main interface that the worker should interact with.

    When the entrypoint is called, the worker has not connected to the Room yet.
    Certain properties of Room would not be available before calling JobContext.connect()
    """
    return self._room

The Room object is the main interface that the worker should interact with.

When the entrypoint is called, the worker has not connected to the Room yet. Certain properties of Room would not be available before calling JobContext.connect()

Methods

def add_participant_entrypoint(self,
entrypoint_fnc: Callable[[JobContext, rtc.RemoteParticipant], Coroutine[None, None, None]],
*_,
kind: list[rtc.ParticipantKind.ValueType] | rtc.ParticipantKind.ValueType = [3, 0])
Expand source code
def add_participant_entrypoint(
    self,
    entrypoint_fnc: Callable[
        [JobContext, rtc.RemoteParticipant], Coroutine[None, None, None]
    ],
    *_,
    kind: list[rtc.ParticipantKind.ValueType]
    | rtc.ParticipantKind.ValueType = DEFAULT_PARTICIPANT_KINDS,
):
    """Adds an entrypoint function to be run when a participant joins the room. In cases where
    the participant has already joined, the entrypoint will be run immediately. Multiple unique entrypoints can be
    added and they will each be run in parallel for each participant.
    """

    if entrypoint_fnc in [e for (e, _) in self._participant_entrypoints]:
        raise ValueError("entrypoints cannot be added more than once")

    self._participant_entrypoints.append((entrypoint_fnc, kind))

Adds an entrypoint function to be run when a participant joins the room. In cases where the participant has already joined, the entrypoint will be run immediately. Multiple unique entrypoints can be added and they will each be run in parallel for each participant.

def add_shutdown_callback(self, callback: Callable[[], Coroutine[None, None, None]]) ‑> None
Expand source code
def add_shutdown_callback(
    self, callback: Callable[[], Coroutine[None, None, None]]
) -> None:
    self._shutdown_callbacks.append(callback)
async def connect(self,
*,
e2ee: rtc.E2EEOptions | None = None,
auto_subscribe: AutoSubscribe = AutoSubscribe.SUBSCRIBE_ALL,
rtc_config: rtc.RtcConfiguration | None = None) ‑> None
Expand source code
async def connect(
    self,
    *,
    e2ee: rtc.E2EEOptions | None = None,
    auto_subscribe: AutoSubscribe = AutoSubscribe.SUBSCRIBE_ALL,
    rtc_config: rtc.RtcConfiguration | None = None,
) -> None:
    """Connect to the room. This method should be called only once.

    Args:
        e2ee: End-to-end encryption options. If provided, the Agent will utilize end-to-end encryption. Note: clients will also need to handle E2EE.
        auto_subscribe: Whether to automatically subscribe to tracks. Default is AutoSubscribe.SUBSCRIBE_ALL.
        rtc_config: Custom RTC configuration to use when connecting to the room.
    """
    room_options = rtc.RoomOptions(
        e2ee=e2ee,
        auto_subscribe=auto_subscribe == AutoSubscribe.SUBSCRIBE_ALL,
        rtc_config=rtc_config,
    )

    await self._room.connect(self._info.url, self._info.token, options=room_options)
    self._on_connect()
    for p in self._room.remote_participants.values():
        self._participant_available(p)

    _apply_auto_subscribe_opts(self._room, auto_subscribe)

Connect to the room. This method should be called only once.

Args

e2ee
End-to-end encryption options. If provided, the Agent will utilize end-to-end encryption. Note: clients will also need to handle E2EE.
auto_subscribe
Whether to automatically subscribe to tracks. Default is AutoSubscribe.SUBSCRIBE_ALL.
rtc_config
Custom RTC configuration to use when connecting to the room.
def shutdown(self, reason: str = '') ‑> None
Expand source code
def shutdown(self, reason: str = "") -> None:
    self._on_shutdown(reason)
async def wait_for_participant(self,
*,
identity: str | None = None,
kind: list[rtc.ParticipantKind.ValueType] | rtc.ParticipantKind.ValueType = [3, 0]) ‑> RemoteParticipant
Expand source code
async def wait_for_participant(
    self,
    *,
    identity: str | None = None,
    kind: list[rtc.ParticipantKind.ValueType]
    | rtc.ParticipantKind.ValueType = DEFAULT_PARTICIPANT_KINDS,
) -> rtc.RemoteParticipant:
    """
    Returns a participant that matches the given identity. If identity is None, the first
    participant that joins the room will be returned.
    If the participant has already joined, the function will return immediately.
    """
    if not self._room.isconnected():
        raise RuntimeError("room is not connected")

    fut = asyncio.Future[rtc.RemoteParticipant]()

    def kind_match(p: rtc.RemoteParticipant) -> bool:
        if isinstance(kind, list):
            return p.kind in kind

        return p.kind == kind

    for p in self._room.remote_participants.values():
        if (identity is None or p.identity == identity) and kind_match(p):
            fut.set_result(p)
            break

    def _on_participant_connected(p: rtc.RemoteParticipant):
        if (identity is None or p.identity == identity) and kind_match(p):
            self._room.off("participant_connected", _on_participant_connected)
            if not fut.done():
                fut.set_result(p)

    if not fut.done():
        self._room.on("participant_connected", _on_participant_connected)

    return await fut

Returns a participant that matches the given identity. If identity is None, the first participant that joins the room will be returned. If the participant has already joined, the function will return immediately.

class JobExecutorType (*args, **kwds)
Expand source code
@unique
class JobExecutorType(Enum):
    PROCESS = "process"
    THREAD = "thread"

Create a collection of name/value pairs.

Example enumeration:

>>> class Color(Enum):
...     RED = 1
...     BLUE = 2
...     GREEN = 3

Access them by:

  • attribute access:

Color.RED

  • value lookup:

Color(1)

  • name lookup:

Color['RED']

Enumerations can be iterated over, and know how many members they have:

>>> len(Color)
3
>>> list(Color)
[<Color.RED: 1>, <Color.BLUE: 2>, <Color.GREEN: 3>]

Methods can be added to enumerations, and members can have their own attributes – see the documentation for details.

Ancestors

  • enum.Enum

Class variables

var PROCESS
var THREAD
class JobProcess (*, user_arguments: Any | None = None)
Expand source code
class JobProcess:
    def __init__(
        self,
        *,
        user_arguments: Any | None = None,
    ) -> None:
        self._mp_proc = mp.current_process()
        self._userdata: dict[str, Any] = {}
        self._user_arguments = user_arguments

    @property
    def pid(self) -> int | None:
        return self._mp_proc.pid

    @property
    def userdata(self) -> dict:
        return self._userdata

    @property
    def user_arguments(self) -> Any | None:
        return self._user_arguments

Instance variables

prop pid : int | None
Expand source code
@property
def pid(self) -> int | None:
    return self._mp_proc.pid
prop user_arguments : Any | None
Expand source code
@property
def user_arguments(self) -> Any | None:
    return self._user_arguments
prop userdata : dict
Expand source code
@property
def userdata(self) -> dict:
    return self._userdata
class JobRequest (*,
job: agent.Job,
on_reject: Callable[[], Coroutine[None, None, None]],
on_accept: Callable[[JobAcceptArguments], Coroutine[None, None, None]])
Expand source code
class JobRequest:
    def __init__(
        self,
        *,
        job: agent.Job,
        on_reject: Callable[[], Coroutine[None, None, None]],
        on_accept: Callable[[JobAcceptArguments], Coroutine[None, None, None]],
    ) -> None:
        self._job = job
        self._lock = asyncio.Lock()
        self._on_reject = on_reject
        self._on_accept = on_accept

    @property
    def id(self) -> str:
        return self._job.id

    @property
    def job(self) -> agent.Job:
        return self._job

    @property
    def room(self) -> models.Room:
        return self._job.room

    @property
    def publisher(self) -> models.ParticipantInfo | None:
        return self._job.participant

    @property
    def agent_name(self) -> str:
        return self._job.agent_name

    async def reject(self) -> None:
        """Reject the job request. The job may be assigned to another worker"""
        await self._on_reject()

    async def accept(
        self,
        *,
        name: str = "",
        identity: str = "",
        metadata: str = "",
        attributes: dict[str, str] | None = None,
    ) -> None:
        """Accept the job request, and start the job if the LiveKit SFU assigns the job to our worker."""
        if not identity:
            identity = "agent-" + self.id

        accept_arguments = JobAcceptArguments(
            name=name,
            identity=identity,
            metadata=metadata,
            attributes=attributes,
        )

        await self._on_accept(accept_arguments)

Instance variables

prop agent_name : str
Expand source code
@property
def agent_name(self) -> str:
    return self._job.agent_name
prop id : str
Expand source code
@property
def id(self) -> str:
    return self._job.id
prop job : agent.Job
Expand source code
@property
def job(self) -> agent.Job:
    return self._job
prop publisher : models.ParticipantInfo | None
Expand source code
@property
def publisher(self) -> models.ParticipantInfo | None:
    return self._job.participant
prop room : models.Room
Expand source code
@property
def room(self) -> models.Room:
    return self._job.room

Methods

async def accept(self,
*,
name: str = '',
identity: str = '',
metadata: str = '',
attributes: dict[str, str] | None = None) ‑> None
Expand source code
async def accept(
    self,
    *,
    name: str = "",
    identity: str = "",
    metadata: str = "",
    attributes: dict[str, str] | None = None,
) -> None:
    """Accept the job request, and start the job if the LiveKit SFU assigns the job to our worker."""
    if not identity:
        identity = "agent-" + self.id

    accept_arguments = JobAcceptArguments(
        name=name,
        identity=identity,
        metadata=metadata,
        attributes=attributes,
    )

    await self._on_accept(accept_arguments)

Accept the job request, and start the job if the LiveKit SFU assigns the job to our worker.

async def reject(self) ‑> None
Expand source code
async def reject(self) -> None:
    """Reject the job request. The job may be assigned to another worker"""
    await self._on_reject()

Reject the job request. The job may be assigned to another worker

class NotGiven
Expand source code
class NotGiven:
    def __bool__(self) -> Literal[False]:
        return False

    def __repr__(self) -> str:
        return "NOT_GIVEN"
class Plugin (title: str, version: str, package: str, logger: logging.Logger | None = None)
Expand source code
class Plugin(ABC):
    registered_plugins: List["Plugin"] = []
    emitter: utils.EventEmitter[EventTypes] = utils.EventEmitter()

    # TODO(theomonnom): make logger mandatory once all plugins have been updated
    def __init__(
        self,
        title: str,
        version: str,
        package: str,
        logger: logging.Logger | None = None,
    ) -> None:
        self._title = title
        self._version = version
        self._package = package
        self._logger = logger

    @classmethod
    def register_plugin(cls, plugin: "Plugin") -> None:
        if threading.current_thread() != threading.main_thread():
            raise RuntimeError("Plugins must be registered on the main thread")

        cls.registered_plugins.append(plugin)
        cls.emitter.emit("plugin_registered", plugin)

    # plugin can implement an optional download_files method
    def download_files(self) -> None: ...

    @property
    def package(self) -> str:
        return self._package

    @property
    def title(self) -> str:
        return self._title

    @property
    def version(self) -> str:
        return self._version

    @property
    def logger(self) -> logging.Logger | None:
        return self._logger

Helper class that provides a standard way to create an ABC using inheritance.

Ancestors

  • abc.ABC

Subclasses

  • livekit.plugins.anthropic.AnthropicPlugin
  • livekit.plugins.assemblyai.AssemblyAIPlugin
  • livekit.plugins.azure.AzurePlugin
  • livekit.plugins.cartesia.CartesiaPlugin
  • livekit.plugins.deepgram.DeepgramPlugin
  • livekit.plugins.elevenlabs.ElevenLabsPlugin
  • livekit.plugins.fal.FalPlugin
  • livekit.plugins.google.GooglePlugin
  • livekit.plugins.nltk.NltkPlugin
  • livekit.plugins.openai.OpenAIPlugin
  • livekit.plugins.rag.RAGPlugin
  • livekit.plugins.silero.SileroPlugin
  • livekit.plugins.turn_detector.EOUPlugin

Class variables

var emitterEventEmitter[typing.Literal['plugin_registered']]
var registered_plugins : List[livekit.agents.plugin.Plugin]

Static methods

def register_plugin(plugin: "'Plugin'") ‑> None

Instance variables

prop logger : logging.Logger | None
Expand source code
@property
def logger(self) -> logging.Logger | None:
    return self._logger
prop package : str
Expand source code
@property
def package(self) -> str:
    return self._package
prop title : str
Expand source code
@property
def title(self) -> str:
    return self._title
prop version : str
Expand source code
@property
def version(self) -> str:
    return self._version

Methods

def download_files(self) ‑> None
Expand source code
def download_files(self) -> None: ...
class Worker (opts: WorkerOptions,
*,
devmode: bool = True,
loop: asyncio.AbstractEventLoop | None = None)
Expand source code
class Worker(utils.EventEmitter[EventTypes]):
    def __init__(
        self,
        opts: WorkerOptions,
        *,
        devmode: bool = True,
        loop: asyncio.AbstractEventLoop | None = None,
    ) -> None:
        super().__init__()
        opts.ws_url = opts.ws_url or os.environ.get("LIVEKIT_URL") or ""
        opts.api_key = opts.api_key or os.environ.get("LIVEKIT_API_KEY") or ""
        opts.api_secret = opts.api_secret or os.environ.get("LIVEKIT_API_SECRET") or ""

        if not opts.ws_url:
            raise ValueError(
                "ws_url is required, or add LIVEKIT_URL in your environment"
            )

        if not opts.api_key:
            raise ValueError(
                "api_key is required, or add LIVEKIT_API_KEY in your environment"
            )

        if not opts.api_secret:
            raise ValueError(
                "api_secret is required, or add LIVEKIT_API_SECRET in your environment"
            )

        if (
            opts.job_memory_limit_mb > 0
            and opts.job_executor_type != JobExecutorType.PROCESS
        ):
            logger.warning(
                "max_job_memory_usage is only supported for process-based job executors, "
                "ignoring max_job_memory_usage"
            )

        self._opts = opts
        self._loop = loop or asyncio.get_event_loop()

        self._id = "unregistered"
        self._closed, self._draining, self._connecting = True, False, False
        self._tasks = set[asyncio.Task[Any]]()
        self._pending_assignments: dict[str, asyncio.Future[agent.JobAssignment]] = {}
        self._close_future: asyncio.Future[None] | None = None
        self._msg_chan = utils.aio.Chan[agent.WorkerMessage](128, loop=self._loop)
        self._devmode = devmode

        # using spawn context for all platforms. We may have further optimizations for
        # Linux with forkserver, but for now, this is the safest option
        mp_ctx = mp.get_context("spawn")

        self._inference_executor: (
            ipc.inference_proc_executor.InferenceProcExecutor | None
        ) = None
        if len(_InferenceRunner.registered_runners) > 0:
            self._inference_executor = (
                ipc.inference_proc_executor.InferenceProcExecutor(
                    runners=_InferenceRunner.registered_runners,
                    initialize_timeout=30,
                    close_timeout=5,
                    memory_warn_mb=2000,
                    memory_limit_mb=0,  # no limit
                    ping_interval=5,
                    ping_timeout=60,
                    high_ping_threshold=2.5,
                    mp_ctx=mp_ctx,
                    loop=self._loop,
                )
            )

        self._proc_pool = ipc.proc_pool.ProcPool(
            initialize_process_fnc=opts.prewarm_fnc,
            job_entrypoint_fnc=opts.entrypoint_fnc,
            num_idle_processes=_WorkerEnvOption.getvalue(
                opts.num_idle_processes, self._devmode
            ),
            loop=self._loop,
            job_executor_type=opts.job_executor_type,
            inference_executor=self._inference_executor,
            mp_ctx=mp_ctx,
            initialize_timeout=opts.initialize_process_timeout,
            close_timeout=opts.shutdown_process_timeout,
            memory_warn_mb=opts.job_memory_warn_mb,
            memory_limit_mb=opts.job_memory_limit_mb,
        )

        self._previous_status = agent.WorkerStatus.WS_AVAILABLE

        self._api: api.LiveKitAPI | None = None
        self._http_session: aiohttp.ClientSession | None = None
        self._http_server = http_server.HttpServer(
            opts.host,
            _WorkerEnvOption.getvalue(opts.port, self._devmode),
            loop=self._loop,
        )

        self._main_task: asyncio.Task[None] | None = None

    async def run(self):
        if not self._closed:
            raise Exception("worker is already running")

        logger.info(
            "starting worker",
            extra={"version": __version__, "rtc-version": rtc.__version__},
        )

        if self._inference_executor is not None:
            logger.info("starting inference executor")
            await self._inference_executor.start()
            await self._inference_executor.initialize()

        self._closed = False

        def _update_job_status(proc: ipc.job_executor.JobExecutor) -> None:
            t = self._loop.create_task(self._update_job_status(proc))
            self._tasks.add(t)
            t.add_done_callback(self._tasks.discard)

        self._proc_pool.on("process_started", _update_job_status)
        self._proc_pool.on("process_closed", _update_job_status)
        self._proc_pool.on("process_job_launched", _update_job_status)

        self._proc_pool.start()
        self._api = api.LiveKitAPI(
            self._opts.ws_url, self._opts.api_key, self._opts.api_secret
        )
        self._http_session = aiohttp.ClientSession()
        self._close_future = asyncio.Future(loop=self._loop)

        self._main_task = asyncio.create_task(self._worker_task(), name="worker_task")
        tasks = [
            self._main_task,
            asyncio.create_task(self._http_server.run(), name="http_server"),
        ]
        try:
            await asyncio.gather(*tasks)
        finally:
            await utils.aio.gracefully_cancel(*tasks)
            if not self._close_future.done():
                self._close_future.set_result(None)

    @property
    def id(self) -> str:
        return self._id

    @property
    def active_jobs(self) -> list[RunningJobInfo]:
        return [
            proc.running_job for proc in self._proc_pool.processes if proc.running_job
        ]

    async def drain(self, timeout: int | None = None) -> None:
        """When timeout isn't None, it will raise asyncio.TimeoutError if the processes didn't finish in time."""
        if self._draining:
            return

        logger.info("draining worker", extra={"id": self.id, "timeout": timeout})
        self._draining = True
        await self._update_worker_status()

        async def _join_jobs():
            for proc in self._proc_pool.processes:
                if proc.running_job:
                    await proc.join()

        if timeout:
            await asyncio.wait_for(
                _join_jobs(), timeout
            )  # raises asyncio.TimeoutError on timeout
        else:
            await _join_jobs()

    async def simulate_job(
        self, room: str, participant_identity: str | None = None
    ) -> None:
        assert self._api is not None

        room_obj = await self._api.room.create_room(api.CreateRoomRequest(name=room))
        participant = None
        if participant_identity:
            participant = await self._api.room.get_participant(
                api.RoomParticipantIdentity(room=room, identity=participant_identity)
            )

        msg = agent.WorkerMessage()
        msg.simulate_job.room.CopyFrom(room_obj)
        if participant:
            msg.simulate_job.participant.CopyFrom(participant)

        await self._queue_msg(msg)

    async def aclose(self) -> None:
        if self._closed:
            if self._close_future is not None:
                await self._close_future
            return

        logger.info("shutting down worker", extra={"id": self.id})

        assert self._close_future is not None
        assert self._http_session is not None
        assert self._api is not None
        assert self._main_task is not None

        self._closed = True
        self._main_task.cancel()

        await self._proc_pool.aclose()

        if self._inference_executor is not None:
            await self._inference_executor.aclose()

        await self._http_session.close()
        await self._http_server.aclose()
        await self._api.aclose()

        await asyncio.gather(*self._tasks, return_exceptions=True)

        # await asyncio.sleep(0.25)  # see https://github.com/aio-libs/aiohttp/issues/1925
        self._msg_chan.close()
        await self._close_future

    async def _queue_msg(self, msg: agent.WorkerMessage) -> None:
        """_queue_msg raises aio.ChanClosed when the worker is closing/closed"""
        if self._connecting:
            which = msg.WhichOneof("message")
            if which == "update_worker":
                return
            elif which == "ping":
                return

        await self._msg_chan.send(msg)

    async def _worker_task(self) -> None:
        assert self._http_session is not None

        retry_count = 0
        ws: aiohttp.ClientWebSocketResponse | None = None
        while not self._closed:
            try:
                self._connecting = True
                join_jwt = (
                    api.AccessToken(self._opts.api_key, self._opts.api_secret)
                    .with_grants(api.VideoGrants(agent=True))
                    .to_jwt()
                )

                headers = {"Authorization": f"Bearer {join_jwt}"}

                parse = urlparse(self._opts.ws_url)
                scheme = parse.scheme
                if scheme.startswith("http"):
                    scheme = scheme.replace("http", "ws")

                path_parts = [f"{scheme}://{parse.netloc}", parse.path, "/agent"]
                agent_url = reduce(urljoin, path_parts)

                ws = await self._http_session.ws_connect(
                    agent_url, headers=headers, autoping=True
                )

                retry_count = 0

                # register the worker
                req = agent.WorkerMessage()
                req.register.type = self._opts.worker_type.value
                req.register.allowed_permissions.CopyFrom(
                    models.ParticipantPermission(
                        can_publish=self._opts.permissions.can_publish,
                        can_subscribe=self._opts.permissions.can_subscribe,
                        can_publish_data=self._opts.permissions.can_publish_data,
                        can_update_metadata=self._opts.permissions.can_update_metadata,
                        can_publish_sources=self._opts.permissions.can_publish_sources,
                        hidden=self._opts.permissions.hidden,
                        agent=True,
                    )
                )
                req.register.agent_name = self._opts.agent_name
                req.register.version = __version__
                await ws.send_bytes(req.SerializeToString())

                # wait for the register response before running this connection
                first_msg_b = await ws.receive_bytes()
                msg = agent.ServerMessage()
                msg.ParseFromString(first_msg_b)

                if not msg.HasField("register"):
                    raise Exception("expected register response as first message")

                self._handle_register(msg.register)
                self._connecting = False

                await self._run_ws(ws)
            except Exception as e:
                if self._closed:
                    break

                if retry_count >= self._opts.max_retry:
                    raise RuntimeError(
                        f"failed to connect to livekit after {retry_count} attempts",
                    )

                retry_delay = min(retry_count * 2, 10)
                retry_count += 1

                logger.warning(
                    f"failed to connect to livekit, retrying in {retry_delay}s: {e}"
                )
                await asyncio.sleep(retry_delay)
            finally:
                if ws is not None:
                    await ws.close()

    async def _run_ws(self, ws: aiohttp.ClientWebSocketResponse):
        closing_ws = False

        async def _load_task():
            """periodically check load and update worker status"""
            interval = utils.aio.interval(UPDATE_LOAD_INTERVAL)
            while True:
                await interval.tick()
                await self._update_worker_status()

        async def _send_task():
            nonlocal closing_ws
            while True:
                try:
                    msg = await self._msg_chan.recv()
                    await ws.send_bytes(msg.SerializeToString())
                except utils.aio.ChanClosed:
                    closing_ws = True
                    return

        async def _recv_task():
            nonlocal closing_ws
            while True:
                msg = await ws.receive()
                if msg.type in (
                    aiohttp.WSMsgType.CLOSE,
                    aiohttp.WSMsgType.CLOSED,
                    aiohttp.WSMsgType.CLOSING,
                ):
                    if closing_ws:
                        return

                    raise Exception("worker connection closed unexpectedly")

                if msg.type != aiohttp.WSMsgType.BINARY:
                    logger.warning("unexpected message type: %s", msg.type)
                    continue

                data = msg.data
                msg = agent.ServerMessage()
                msg.ParseFromString(data)
                which = msg.WhichOneof("message")
                if which == "availability":
                    self._handle_availability(msg.availability)
                elif which == "assignment":
                    self._handle_assignment(msg.assignment)
                elif which == "termination":
                    user_task = self._loop.create_task(
                        self._handle_termination(msg.termination),
                        name="agent_job_termination",
                    )
                    self._tasks.add(user_task)
                    user_task.add_done_callback(self._tasks.discard)

        tasks = [
            asyncio.create_task(_load_task()),
            asyncio.create_task(_send_task()),
            asyncio.create_task(_recv_task()),
        ]
        try:
            await asyncio.gather(*tasks)
        finally:
            await utils.aio.gracefully_cancel(*tasks)

    async def _reload_jobs(self, jobs: list[RunningJobInfo]) -> None:
        if not self._opts.api_secret:
            raise RuntimeError("api_secret is required to reload jobs")

        for aj in jobs:
            logger.log(
                DEV_LEVEL,
                "reloading job",
                extra={"job_id": aj.job.id, "agent_name": aj.job.agent_name},
            )
            url = self._opts.ws_url

            # take the original jwt token and extend it while keeping all the same data that was generated
            # by the SFU for the original join token.
            original_token = aj.token
            decoded = jwt.decode(
                original_token, self._opts.api_secret, algorithms=["HS256"]
            )
            decoded["exp"] = (
                int(datetime.datetime.now(datetime.timezone.utc).timestamp()) + 3600
            )
            running_info = RunningJobInfo(
                accept_arguments=aj.accept_arguments,
                job=aj.job,
                url=url,
                token=jwt.encode(decoded, self._opts.api_secret, algorithm="HS256"),
            )
            await self._proc_pool.launch_job(running_info)

    def _handle_register(self, reg: agent.RegisterWorkerResponse):
        self._id = reg.worker_id
        logger.info(
            "registered worker",
            extra={
                "id": reg.worker_id,
                "region": reg.server_info.region,
                "protocol": reg.server_info.protocol,
                "node_id": reg.server_info.node_id,
            },
        )
        self.emit("worker_registered", reg.worker_id, reg.server_info)

    def _handle_availability(self, msg: agent.AvailabilityRequest):
        task = self._loop.create_task(self._answer_availability(msg))
        self._tasks.add(task)
        task.add_done_callback(self._tasks.discard)

    async def _answer_availability(self, msg: agent.AvailabilityRequest):
        """Ask the user if they want to accept this job and forward the answer to the server.
        If we get the job assigned, we start a new process."""

        answered = False

        async def _on_reject() -> None:
            nonlocal answered
            answered = True

            availability_resp = agent.WorkerMessage()
            availability_resp.availability.job_id = msg.job.id
            availability_resp.availability.available = False
            await self._queue_msg(availability_resp)

        async def _on_accept(args: JobAcceptArguments) -> None:
            nonlocal answered
            answered = True

            availability_resp = agent.WorkerMessage()
            availability_resp.availability.job_id = msg.job.id
            availability_resp.availability.available = True
            availability_resp.availability.participant_identity = args.identity
            availability_resp.availability.participant_name = args.name
            availability_resp.availability.participant_metadata = args.metadata
            if args.attributes:
                availability_resp.availability.participant_attributes.update(
                    args.attributes
                )
            await self._queue_msg(availability_resp)

            wait_assignment = asyncio.Future[agent.JobAssignment]()
            self._pending_assignments[job_req.id] = wait_assignment

            # the job was accepted by the user, wait for the server assignment
            try:
                await asyncio.wait_for(wait_assignment, ASSIGNMENT_TIMEOUT)
            except asyncio.TimeoutError:
                logger.warning(
                    f"assignment for job {job_req.id} timed out",
                    extra={"job_request": job_req, "agent_name": self._opts.agent_name},
                )
                raise AssignmentTimeoutError()

            job_assign = wait_assignment.result()
            running_info = RunningJobInfo(
                accept_arguments=args,
                job=msg.job,
                url=job_assign.url or self._opts.ws_url,
                token=job_assign.token,
            )

            await self._proc_pool.launch_job(running_info)

        job_req = JobRequest(job=msg.job, on_reject=_on_reject, on_accept=_on_accept)

        logger.info(
            "received job request",
            extra={
                "job_id": msg.job.id,
                "dispatch_id": msg.job.dispatch_id,
                "room_name": msg.job.room.name,
                "agent_name": self._opts.agent_name,
                "resuming": msg.resuming,
            },
        )

        @utils.log_exceptions(logger=logger)
        async def _job_request_task():
            try:
                await self._opts.request_fnc(job_req)
            except Exception:
                logger.exception(
                    "job_request_fnc failed",
                    extra={"job_request": job_req, "agent_name": self._opts.agent_name},
                )

            if not answered:
                logger.warning(
                    "no answer was given inside the job_request_fnc, automatically rejecting the job",
                    extra={"job_request": job_req, "agent_name": self._opts.agent_name},
                )
                await _on_reject()

        user_task = self._loop.create_task(_job_request_task(), name="job_request")
        self._tasks.add(user_task)
        user_task.add_done_callback(self._tasks.discard)

    def _handle_assignment(self, assignment: agent.JobAssignment):
        if assignment.job.id in self._pending_assignments:
            with contextlib.suppress(asyncio.InvalidStateError):
                fut = self._pending_assignments.pop(assignment.job.id)
                fut.set_result(assignment)
        else:
            logger.warning(
                "received assignment for an unknown job",
                extra={"job": assignment.job, "agent_name": self._opts.agent_name},
            )

    async def _handle_termination(self, msg: agent.JobTermination):
        proc = self._proc_pool.get_by_job_id(msg.job_id)
        if not proc:
            # safe to ignore
            return
        await proc.aclose()

    async def _update_worker_status(self):
        job_cnt = len(self.active_jobs)
        if self._draining:
            update = agent.UpdateWorkerStatus(
                status=agent.WorkerStatus.WS_FULL, job_count=job_cnt
            )
            msg = agent.WorkerMessage(update_worker=update)
            await self._queue_msg(msg)
            return

        def load_fnc():
            signature = inspect.signature(self._opts.load_fnc)
            parameters = list(signature.parameters.values())
            if len(parameters) == 0:
                return self._opts.load_fnc()  # type: ignore

            return self._opts.load_fnc(self)  # type: ignore

        current_load = await asyncio.get_event_loop().run_in_executor(None, load_fnc)

        is_full = current_load >= _WorkerEnvOption.getvalue(
            self._opts.load_threshold, self._devmode
        )
        currently_available = not is_full and not self._draining

        status = (
            agent.WorkerStatus.WS_AVAILABLE
            if currently_available
            else agent.WorkerStatus.WS_FULL
        )

        update = agent.UpdateWorkerStatus(
            load=current_load, status=status, job_count=job_cnt
        )

        # only log if status has changed
        if self._previous_status != status and not self._draining:
            self._previous_status = status
            extra = {
                "load": current_load,
                "threshold": self._opts.load_threshold,
            }
            if is_full:
                logger.info(
                    "worker is at full capacity, marking as unavailable",
                    extra=extra,
                )
            else:
                logger.info(
                    "worker is below capacity, marking as available",
                    extra=extra,
                )

        msg = agent.WorkerMessage(update_worker=update)
        with contextlib.suppress(utils.aio.ChanClosed):
            await self._queue_msg(msg)

    async def _update_job_status(self, proc: ipc.job_executor.JobExecutor) -> None:
        job_info = proc.running_job
        if job_info is None:
            return

        status: agent.JobStatus = agent.JobStatus.JS_RUNNING
        if proc.status == ipc.job_executor.JobStatus.FAILED:
            status = agent.JobStatus.JS_FAILED
        elif proc.status == ipc.job_executor.JobStatus.SUCCESS:
            status = agent.JobStatus.JS_SUCCESS
        elif proc.status == ipc.job_executor.JobStatus.RUNNING:
            status = agent.JobStatus.JS_RUNNING

        update = agent.UpdateJobStatus(job_id=job_info.job.id, status=status, error="")
        msg = agent.WorkerMessage(update_job=update)
        await self._queue_msg(msg)

Abstract base class for generic types.

On Python 3.12 and newer, generic classes implicitly inherit from Generic when they declare a parameter list after the class's name::

class Mapping[KT, VT]:
    def __getitem__(self, key: KT) -> VT:
        ...
    # Etc.

On older versions of Python, however, generic classes have to explicitly inherit from Generic.

After a class has been declared to be generic, it can then be used as follows::

def lookup_name[KT, VT](mapping: Mapping[KT, VT], key: KT, default: VT) -> VT:
    try:
        return mapping[key]
    except KeyError:
        return default

Initialize a new instance of EventEmitter.

Ancestors

Instance variables

prop active_jobs : list[RunningJobInfo]
Expand source code
@property
def active_jobs(self) -> list[RunningJobInfo]:
    return [
        proc.running_job for proc in self._proc_pool.processes if proc.running_job
    ]
prop id : str
Expand source code
@property
def id(self) -> str:
    return self._id

Methods

async def aclose(self) ‑> None
Expand source code
async def aclose(self) -> None:
    if self._closed:
        if self._close_future is not None:
            await self._close_future
        return

    logger.info("shutting down worker", extra={"id": self.id})

    assert self._close_future is not None
    assert self._http_session is not None
    assert self._api is not None
    assert self._main_task is not None

    self._closed = True
    self._main_task.cancel()

    await self._proc_pool.aclose()

    if self._inference_executor is not None:
        await self._inference_executor.aclose()

    await self._http_session.close()
    await self._http_server.aclose()
    await self._api.aclose()

    await asyncio.gather(*self._tasks, return_exceptions=True)

    # await asyncio.sleep(0.25)  # see https://github.com/aio-libs/aiohttp/issues/1925
    self._msg_chan.close()
    await self._close_future
async def drain(self, timeout: int | None = None) ‑> None
Expand source code
async def drain(self, timeout: int | None = None) -> None:
    """When timeout isn't None, it will raise asyncio.TimeoutError if the processes didn't finish in time."""
    if self._draining:
        return

    logger.info("draining worker", extra={"id": self.id, "timeout": timeout})
    self._draining = True
    await self._update_worker_status()

    async def _join_jobs():
        for proc in self._proc_pool.processes:
            if proc.running_job:
                await proc.join()

    if timeout:
        await asyncio.wait_for(
            _join_jobs(), timeout
        )  # raises asyncio.TimeoutError on timeout
    else:
        await _join_jobs()

When timeout isn't None, it will raise asyncio.TimeoutError if the processes didn't finish in time.

async def run(self)
Expand source code
async def run(self):
    if not self._closed:
        raise Exception("worker is already running")

    logger.info(
        "starting worker",
        extra={"version": __version__, "rtc-version": rtc.__version__},
    )

    if self._inference_executor is not None:
        logger.info("starting inference executor")
        await self._inference_executor.start()
        await self._inference_executor.initialize()

    self._closed = False

    def _update_job_status(proc: ipc.job_executor.JobExecutor) -> None:
        t = self._loop.create_task(self._update_job_status(proc))
        self._tasks.add(t)
        t.add_done_callback(self._tasks.discard)

    self._proc_pool.on("process_started", _update_job_status)
    self._proc_pool.on("process_closed", _update_job_status)
    self._proc_pool.on("process_job_launched", _update_job_status)

    self._proc_pool.start()
    self._api = api.LiveKitAPI(
        self._opts.ws_url, self._opts.api_key, self._opts.api_secret
    )
    self._http_session = aiohttp.ClientSession()
    self._close_future = asyncio.Future(loop=self._loop)

    self._main_task = asyncio.create_task(self._worker_task(), name="worker_task")
    tasks = [
        self._main_task,
        asyncio.create_task(self._http_server.run(), name="http_server"),
    ]
    try:
        await asyncio.gather(*tasks)
    finally:
        await utils.aio.gracefully_cancel(*tasks)
        if not self._close_future.done():
            self._close_future.set_result(None)
async def simulate_job(self, room: str, participant_identity: str | None = None) ‑> None
Expand source code
async def simulate_job(
    self, room: str, participant_identity: str | None = None
) -> None:
    assert self._api is not None

    room_obj = await self._api.room.create_room(api.CreateRoomRequest(name=room))
    participant = None
    if participant_identity:
        participant = await self._api.room.get_participant(
            api.RoomParticipantIdentity(room=room, identity=participant_identity)
        )

    msg = agent.WorkerMessage()
    msg.simulate_job.room.CopyFrom(room_obj)
    if participant:
        msg.simulate_job.participant.CopyFrom(participant)

    await self._queue_msg(msg)

Inherited members

class WorkerOptions (entrypoint_fnc: Callable[[JobContext], Awaitable[None]],
request_fnc: Callable[[JobRequest], Awaitable[None]] = <function _default_request_fnc>,
prewarm_fnc: Callable[[JobProcess], Any] = <function _default_initialize_process_fnc>,
load_fnc: Callable[[Worker], float] | Callable[[], float] = <bound method _DefaultLoadCalc.get_load of <class 'livekit.agents.worker._DefaultLoadCalc'>>,
job_executor_type: JobExecutorType = JobExecutorType.PROCESS,
load_threshold: float | _WorkerEnvOption[float] = _WorkerEnvOption(dev_default=inf, prod_default=0.75),
job_memory_warn_mb: float = 300,
job_memory_limit_mb: float = 0,
num_idle_processes: int | _WorkerEnvOption[int] = _WorkerEnvOption(dev_default=0, prod_default=3),
shutdown_process_timeout: float = 60.0,
initialize_process_timeout: float = 10.0,
permissions: WorkerPermissions = <factory>,
agent_name: str = '',
worker_type: WorkerType = WorkerType.ROOM,
max_retry: int = 16,
ws_url: str = 'ws://localhost:7880',
api_key: str | None = None,
api_secret: str | None = None,
host: str = '',
port: int | _WorkerEnvOption[int] = _WorkerEnvOption(dev_default=0, prod_default=8081))
Expand source code
@dataclass
class WorkerOptions:
    entrypoint_fnc: Callable[[JobContext], Awaitable[None]]
    """Entrypoint function that will be called when a job is assigned to this worker."""
    request_fnc: Callable[[JobRequest], Awaitable[None]] = _default_request_fnc
    """Inspect the request and decide if the current worker should handle it.

    When left empty, all jobs are accepted."""
    prewarm_fnc: Callable[[JobProcess], Any] = _default_initialize_process_fnc
    """A function to perform any necessary initialization before the job starts."""
    load_fnc: Callable[[Worker], float] | Callable[[], float] = (
        _DefaultLoadCalc.get_load
    )
    """Called to determine the current load of the worker. Should return a value between 0 and 1."""
    job_executor_type: JobExecutorType = _default_job_executor_type
    """Which executor to use to run jobs. (currently thread or process are supported)"""
    load_threshold: float | _WorkerEnvOption[float] = _WorkerEnvOption(
        dev_default=math.inf, prod_default=0.75
    )
    """When the load exceeds this threshold, the worker will be marked as unavailable.

    Defaults to 0.75 on "production" mode, and is disabled in "development" mode.
    """

    job_memory_warn_mb: float = 300
    """Memory warning threshold in MB. If the job process exceeds this limit, a warning will be logged."""
    job_memory_limit_mb: float = 0
    """Maximum memory usage for a job in MB, the job process will be killed if it exceeds this limit.
    Defaults to 0 (disabled).
    """

    """Number of idle processes to keep warm."""
    num_idle_processes: int | _WorkerEnvOption[int] = _WorkerEnvOption(
        dev_default=0, prod_default=3
    )
    """Number of idle processes to keep warm."""
    shutdown_process_timeout: float = 60.0
    """Maximum amount of time to wait for a job to shut down gracefully"""
    initialize_process_timeout: float = 10.0
    """Maximum amount of time to wait for a process to initialize/prewarm"""
    permissions: WorkerPermissions = field(default_factory=WorkerPermissions)
    """Permissions that the agent should join the room with."""
    agent_name: str = ""
    """Set agent_name to enable explicit dispatch. When explicit dispatch is enabled, jobs will not be dispatched to rooms automatically. Instead, you can either specify the agent(s) to be dispatched in the end-user's token, or use the AgentDispatch.createDispatch API"""
    worker_type: WorkerType = WorkerType.ROOM
    """Whether to spin up an agent for each room or publisher."""
    max_retry: int = 16
    """Maximum number of times to retry connecting to LiveKit."""
    ws_url: str = "ws://localhost:7880"
    """URL to connect to the LiveKit server.

    By default it uses ``LIVEKIT_URL`` from environment"""
    api_key: str | None = None
    """API key to authenticate with LiveKit.

    By default it uses ``LIVEKIT_API_KEY`` from environment"""
    api_secret: str | None = None
    """API secret to authenticate with LiveKit.

    By default it uses ``LIVEKIT_API_SECRET`` from environment"""
    host: str = ""  # default to all interfaces
    port: int | _WorkerEnvOption[int] = _WorkerEnvOption(
        dev_default=0, prod_default=8081
    )
    """Port for local HTTP server to listen on.

    The HTTP server is used as a health check endpoint.
    """

    def validate_config(self, devmode: bool):
        load_threshold = _WorkerEnvOption.getvalue(self.load_threshold, devmode)
        if load_threshold > 1 and not devmode:
            logger.warning(
                f"load_threshold in prod env must be less than 1, current value: {load_threshold}"
            )

WorkerOptions(entrypoint_fnc: 'Callable[[JobContext], Awaitable[None]]', request_fnc: 'Callable[[JobRequest], Awaitable[None]]' = , prewarm_fnc: 'Callable[[JobProcess], Any]' = , load_fnc: 'Callable[[Worker], float] | Callable[[], float]' = >, job_executor_type: 'JobExecutorType' = , load_threshold: 'float | _WorkerEnvOption[float]' = _WorkerEnvOption(dev_default=inf, prod_default=0.75), job_memory_warn_mb: 'float' = 300, job_memory_limit_mb: 'float' = 0, num_idle_processes: 'int | _WorkerEnvOption[int]' = _WorkerEnvOption(dev_default=0, prod_default=3), shutdown_process_timeout: 'float' = 60.0, initialize_process_timeout: 'float' = 10.0, permissions: 'WorkerPermissions' = , agent_name: 'str' = '', worker_type: 'WorkerType' = , max_retry: 'int' = 16, ws_url: 'str' = 'ws://localhost:7880', api_key: 'str | None' = None, api_secret: 'str | None' = None, host: 'str' = '', port: 'int | _WorkerEnvOption[int]' = _WorkerEnvOption(dev_default=0, prod_default=8081))

Class variables

var agent_name : str

Set agent_name to enable explicit dispatch. When explicit dispatch is enabled, jobs will not be dispatched to rooms automatically. Instead, you can either specify the agent(s) to be dispatched in the end-user's token, or use the AgentDispatch.createDispatch API

var api_key : str | None

API key to authenticate with LiveKit.

By default it uses LIVEKIT_API_KEY from environment

var api_secret : str | None

API secret to authenticate with LiveKit.

By default it uses LIVEKIT_API_SECRET from environment

var entrypoint_fnc : Callable[[livekit.agents.job.JobContext], Awaitable[None]]

Entrypoint function that will be called when a job is assigned to this worker.

var host : str
var initialize_process_timeout : float

Maximum amount of time to wait for a process to initialize/prewarm

var job_executor_type : livekit.agents.job.JobExecutorType

Which executor to use to run jobs. (currently thread or process are supported)

var job_memory_limit_mb : float

Maximum memory usage for a job in MB, the job process will be killed if it exceeds this limit. Defaults to 0 (disabled).

var job_memory_warn_mb : float

Memory warning threshold in MB. If the job process exceeds this limit, a warning will be logged.

var load_threshold : float | livekit.agents.worker._WorkerEnvOption[float]

When the load exceeds this threshold, the worker will be marked as unavailable.

Defaults to 0.75 on "production" mode, and is disabled in "development" mode.

var max_retry : int

Maximum number of times to retry connecting to LiveKit.

var num_idle_processes : int | livekit.agents.worker._WorkerEnvOption[int]

Number of idle processes to keep warm.

var permissions : livekit.agents.worker.WorkerPermissions

Permissions that the agent should join the room with.

var port : int | livekit.agents.worker._WorkerEnvOption[int]

Port for local HTTP server to listen on.

The HTTP server is used as a health check endpoint.

var shutdown_process_timeout : float

Maximum amount of time to wait for a job to shut down gracefully

var worker_type : livekit.agents.worker.WorkerType

Whether to spin up an agent for each room or publisher.

var ws_url : str

URL to connect to the LiveKit server.

By default it uses LIVEKIT_URL from environment

Static methods

def load_fnc(worker: Worker) ‑> float

Methods

def prewarm_fnc(proc: JobProcess) ‑> Any
Expand source code
def _default_initialize_process_fnc(proc: JobProcess) -> Any:
    return
async def request_fnc(ctx: JobRequest) ‑> None
Expand source code
async def _default_request_fnc(ctx: JobRequest) -> None:
    await ctx.accept()
def validate_config(self, devmode: bool)
Expand source code
def validate_config(self, devmode: bool):
    load_threshold = _WorkerEnvOption.getvalue(self.load_threshold, devmode)
    if load_threshold > 1 and not devmode:
        logger.warning(
            f"load_threshold in prod env must be less than 1, current value: {load_threshold}"
        )
class WorkerPermissions (can_publish: bool = True,
can_subscribe: bool = True,
can_publish_data: bool = True,
can_update_metadata: bool = True,
can_publish_sources: list[models.TrackSource] = <factory>,
hidden: bool = False)
Expand source code
@dataclass
class WorkerPermissions:
    can_publish: bool = True
    can_subscribe: bool = True
    can_publish_data: bool = True
    can_update_metadata: bool = True
    can_publish_sources: list[models.TrackSource] = field(default_factory=list)
    hidden: bool = False

WorkerPermissions(can_publish: 'bool' = True, can_subscribe: 'bool' = True, can_publish_data: 'bool' = True, can_update_metadata: 'bool' = True, can_publish_sources: 'list[models.TrackSource]' = , hidden: 'bool' = False)

Class variables

var can_publish : bool
var can_publish_data : bool
var can_publish_sources : list[]
var can_subscribe : bool
var can_update_metadata : bool
var hidden : bool
class WorkerType (*args, **kwds)
Expand source code
class WorkerType(Enum):
    ROOM = agent.JobType.JT_ROOM
    PUBLISHER = agent.JobType.JT_PUBLISHER

Create a collection of name/value pairs.

Example enumeration:

>>> class Color(Enum):
...     RED = 1
...     BLUE = 2
...     GREEN = 3

Access them by:

  • attribute access:

Color.RED

  • value lookup:

Color(1)

  • name lookup:

Color['RED']

Enumerations can be iterated over, and know how many members they have:

>>> len(Color)
3
>>> list(Color)
[<Color.RED: 1>, <Color.BLUE: 2>, <Color.GREEN: 3>]

Methods can be added to enumerations, and members can have their own attributes – see the documentation for details.

Ancestors

  • enum.Enum

Class variables

var PUBLISHER
var ROOM