p-queue

  • Version 9.0.0
  • Published
  • 68.5 kB
  • 2 dependencies
  • MIT license

Install

npm i p-queue
yarn add p-queue
pnpm add p-queue

Overview

Promise queue with concurrency control

Index

Classes

class PQueue

class PQueue<
QueueType extends Queue<RunFunction, EnqueueOptionsType> = PriorityQueue,
EnqueueOptionsType extends QueueAddOptions = QueueAddOptions
> extends EventEmitter<EventName> {}
  • Promise queue with concurrency control.

constructor

constructor(options?: Options<QueueType, EnqueueOptionsType>);

    property concurrency

    concurrency: number;

      property isPaused

      readonly isPaused: boolean;
      • Whether the queue is currently paused.

      property isRateLimited

      readonly isRateLimited: boolean;
      • Whether the queue is currently rate-limited due to intervalCap.

      property isSaturated

      readonly isSaturated: boolean;
      • Whether the queue is saturated. Returns true when: - All concurrency slots are occupied and tasks are waiting, OR - The queue is rate-limited and tasks are waiting

        Useful for detecting backpressure and potential hanging tasks.

        ```js import PQueue from 'p-queue';

        const queue = new PQueue({concurrency: 2});

        // Backpressure handling if (queue.isSaturated) { console.log('Queue is saturated, waiting for capacity...'); await queue.onSizeLessThan(queue.concurrency); }

        // Monitoring for stuck tasks setInterval(() => { if (queue.isSaturated) { console.warn(Queue saturated: ${queue.pending} running, ${queue.size} waiting); } }, 60000); ```

      property pending

      readonly pending: number;
      • Number of running items (no longer in the queue).

      property runningTasks

      readonly runningTasks: readonly {
      readonly id?: string;
      readonly priority: number;
      readonly startTime: number;
      readonly timeout?: number;
      }[];
      • The tasks currently being executed. Each task includes its id, priority, startTime, and timeout (if set).

        Returns an array of task info objects.

        ```js import PQueue from 'p-queue';

        const queue = new PQueue({concurrency: 2});

        // Add tasks with IDs for better debugging queue.add(() => fetchUser(123), {id: 'user-123'}); queue.add(() => fetchPosts(456), {id: 'posts-456', priority: 1});

        // Check what's running console.log(queue.runningTasks); // => [{ // id: 'user-123', // priority: 0, // startTime: 1759253001716, // timeout: undefined // }, { // id: 'posts-456', // priority: 1, // startTime: 1759253001916, // timeout: undefined // }] ```

      property size

      readonly size: number;
      • Size of the queue, the number of queued items waiting to run.

      property timeout

      timeout?: number;
      • Get or set the default timeout for all tasks. Can be changed at runtime.

        Operations will throw a TimeoutError if they don't complete within the specified time.

        The timeout begins when the operation is dequeued and starts execution, not while it's waiting in the queue.

        Example 1

        ``` const queue = new PQueue({timeout: 5000});

        // Change timeout for all future tasks queue.timeout = 10000; ```

      method add

      add: <TaskResultType>(
      function_: Task<TaskResultType>,
      options?: Partial<EnqueueOptionsType>
      ) => Promise<TaskResultType>;
      • Adds a sync or async task to the queue. Always returns a promise.

      method addAll

      addAll: <TaskResultsType>(
      functions: ReadonlyArray<Task<TaskResultsType>>,
      options?: Partial<EnqueueOptionsType>
      ) => Promise<TaskResultsType[]>;
      • Same as .add(), but accepts an array of sync or async functions.

        Returns

        A promise that resolves when all functions are resolved.

      method clear

      clear: () => void;
      • Clear the queue.

      method onEmpty

      onEmpty: () => Promise<void>;
      • Can be called multiple times. Useful if you for example add additional items at a later time.

        Returns

        A promise that settles when the queue becomes empty.

      method onError

      onError: () => Promise<never>;
      • Returns

        A promise that rejects when any task in the queue errors.

        Use with Promise.race([queue.onError(), queue.onIdle()]) to fail fast on the first error while still resolving normally when the queue goes idle.

        Important: The promise returned by add() still rejects. You must handle each add() promise (for example, .catch(() => {})) to avoid unhandled rejections.

        Example 1

        ``` import PQueue from 'p-queue';

        const queue = new PQueue({concurrency: 2});

        queue.add(() => fetchData(1)).catch(() => {}); queue.add(() => fetchData(2)).catch(() => {}); queue.add(() => fetchData(3)).catch(() => {});

        // Stop processing on first error try { await Promise.race([ queue.onError(), queue.onIdle() ]); } catch (error) { queue.pause(); // Stop processing remaining tasks console.error('Queue failed:', error); } ```

      method onIdle

      onIdle: () => Promise<void>;
      • The difference with .onEmpty is that .onIdle guarantees that all work from the queue has finished. .onEmpty merely signals that the queue is empty, but it could mean that some promises haven't completed yet.

        Returns

        A promise that settles when the queue becomes empty, and all promises have completed; queue.size === 0 && queue.pending === 0.

      method onPendingZero

      onPendingZero: () => Promise<void>;
      • The difference with .onIdle is that .onPendingZero only waits for currently running tasks to finish, ignoring queued tasks.

        Returns

        A promise that settles when all currently running tasks have completed; queue.pending === 0.

      method onRateLimit

      onRateLimit: () => Promise<void>;
      • Returns

        A promise that settles when the queue becomes rate-limited due to intervalCap.

      method onRateLimitCleared

      onRateLimitCleared: () => Promise<void>;
      • Returns

        A promise that settles when the queue is no longer rate-limited.

      method onSizeLessThan

      onSizeLessThan: (limit: number) => Promise<void>;
      • Returns

        A promise that settles when the queue size is less than the given limit: queue.size < limit.

        If you want to avoid having the queue grow beyond a certain size you can await queue.onSizeLessThan() before adding a new item.

        Note that this only limits the number of items waiting to start. There could still be up to concurrency jobs already running that this call does not include in its calculation.

      method pause

      pause: () => void;
      • Put queue execution on hold.

      method setPriority

      setPriority: (id: string, priority: number) => void;
      • Updates the priority of a promise function by its id, affecting its execution order. Requires a defined concurrency limit to take effect.

        For example, this can be used to prioritize a promise function to run earlier.

        ```js import PQueue from 'p-queue';

        const queue = new PQueue({concurrency: 1});

        queue.add(async () => '🦄', {priority: 1}); queue.add(async () => '🦀', {priority: 0, id: '🦀'}); queue.add(async () => '🦄', {priority: 1}); queue.add(async () => '🦄', {priority: 1});

        queue.setPriority('🦀', 2); ```

        In this case, the promise function with id: '🦀' runs second.

        You can also deprioritize a promise function to delay its execution:

        ```js import PQueue from 'p-queue';

        const queue = new PQueue({concurrency: 1});

        queue.add(async () => '🦄', {priority: 1}); queue.add(async () => '🦀', {priority: 1, id: '🦀'}); queue.add(async () => '🦄'); queue.add(async () => '🦄', {priority: 0});

        queue.setPriority('🦀', -1); ``` Here, the promise function with id: '🦀' executes last.

      method sizeBy

      sizeBy: (options: Readonly<Partial<EnqueueOptionsType>>) => number;
      • Size of the queue, filtered by the given options.

        For example, this can be used to find the number of items remaining in the queue with a specific priority level.

      method start

      start: () => this;
      • Start (or resume) executing enqueued tasks within concurrency limit. No need to call this if queue is not paused (via options.autoStart = false or by .pause() method.)

      Type Aliases

      type Options

      type Options<
      QueueType extends Queue<RunFunction, QueueOptions>,
      QueueOptions extends QueueAddOptions
      > = {
      /**
      Concurrency limit.
      Minimum: `1`.
      @default Infinity
      */
      readonly concurrency?: number;
      /**
      Whether queue tasks within concurrency limit, are auto-executed as soon as they're added.
      @default true
      */
      readonly autoStart?: boolean;
      /**
      Class with a `enqueue` and `dequeue` method, and a `size` getter. See the [Custom QueueClass](https://github.com/sindresorhus/p-queue#custom-queueclass) section.
      */
      readonly queueClass?: new () => QueueType;
      /**
      The max number of runs in the given interval of time.
      Minimum: `1`.
      @default Infinity
      */
      readonly intervalCap?: number;
      /**
      The length of time in milliseconds before the interval count resets. Must be finite.
      Minimum: `0`.
      @default 0
      */
      readonly interval?: number;
      /**
      Whether the task must finish in the given interval or will be carried over into the next interval count.
      @default false
      */
      readonly carryoverIntervalCount?: boolean;
      /**
      @deprecated Renamed to `carryoverIntervalCount`.
      */
      readonly carryoverConcurrencyCount?: boolean;
      } & TimeoutOptions;

        type Queue

        type Queue<Element, Options> = {
        size: number;
        filter: (options: Readonly<Partial<Options>>) => Element[];
        dequeue: () => Element | undefined;
        enqueue: (run: Element, options?: Partial<Options>) => void;
        setPriority: (id: string, priority: number) => void;
        };

          type QueueAddOptions

          type QueueAddOptions = {
          /**
          Priority of operation. Operations with greater priority will be scheduled first.
          @default 0
          */
          readonly priority?: number;
          /**
          Unique identifier for the promise function, used to update its priority before execution. If not specified, it is auto-assigned an incrementing BigInt starting from `1n`.
          */
          id?: string;
          } & TaskOptions &
          TimeoutOptions;

            Package Files (3)

            Dependencies (2)

            Dev Dependencies (14)

            Peer Dependencies (0)

            No peer dependencies.

            Badge

            To add a badge like this onejsDocs.io badgeto your package's README, use the codes available below.

            You may also use Shields.io to create a custom badge linking to https://www.jsdocs.io/package/p-queue.

            • Markdown
              [![jsDocs.io](https://img.shields.io/badge/jsDocs.io-reference-blue)](https://www.jsdocs.io/package/p-queue)
            • HTML
              <a href="https://www.jsdocs.io/package/p-queue"><img src="https://img.shields.io/badge/jsDocs.io-reference-blue" alt="jsDocs.io"></a>