p-queue
- Version 9.0.0
- Published
- 68.5 kB
- 2 dependencies
- MIT license
Install
npm i p-queueyarn add p-queuepnpm add p-queueOverview
Promise queue with concurrency control
Index
Classes
class PQueue
class PQueue< QueueType extends Queue<RunFunction, EnqueueOptionsType> = PriorityQueue, EnqueueOptionsType extends QueueAddOptions = QueueAddOptions> extends EventEmitter<EventName> {}Promise queue with concurrency control.
constructor
constructor(options?: Options<QueueType, EnqueueOptionsType>);property concurrency
concurrency: number;property isPaused
readonly isPaused: boolean;Whether the queue is currently paused.
property isRateLimited
readonly isRateLimited: boolean;Whether the queue is currently rate-limited due to intervalCap.
property isSaturated
readonly isSaturated: boolean;Whether the queue is saturated. Returns
truewhen: - All concurrency slots are occupied and tasks are waiting, OR - The queue is rate-limited and tasks are waitingUseful for detecting backpressure and potential hanging tasks.
```js import PQueue from 'p-queue';
const queue = new PQueue({concurrency: 2});
// Backpressure handling if (queue.isSaturated) { console.log('Queue is saturated, waiting for capacity...'); await queue.onSizeLessThan(queue.concurrency); }
// Monitoring for stuck tasks setInterval(() => { if (queue.isSaturated) { console.warn(
Queue saturated: ${queue.pending} running, ${queue.size} waiting); } }, 60000); ```
property pending
readonly pending: number;Number of running items (no longer in the queue).
property runningTasks
readonly runningTasks: readonly { readonly id?: string; readonly priority: number; readonly startTime: number; readonly timeout?: number;}[];The tasks currently being executed. Each task includes its
id,priority,startTime, andtimeout(if set).Returns an array of task info objects.
```js import PQueue from 'p-queue';
const queue = new PQueue({concurrency: 2});
// Add tasks with IDs for better debugging queue.add(() => fetchUser(123), {id: 'user-123'}); queue.add(() => fetchPosts(456), {id: 'posts-456', priority: 1});
// Check what's running console.log(queue.runningTasks); // => [{ // id: 'user-123', // priority: 0, // startTime: 1759253001716, // timeout: undefined // }, { // id: 'posts-456', // priority: 1, // startTime: 1759253001916, // timeout: undefined // }] ```
property size
readonly size: number;Size of the queue, the number of queued items waiting to run.
property timeout
timeout?: number;Get or set the default timeout for all tasks. Can be changed at runtime.
Operations will throw a
TimeoutErrorif they don't complete within the specified time.The timeout begins when the operation is dequeued and starts execution, not while it's waiting in the queue.
Example 1
``` const queue = new PQueue({timeout: 5000});
// Change timeout for all future tasks queue.timeout = 10000; ```
method add
add: <TaskResultType>( function_: Task<TaskResultType>, options?: Partial<EnqueueOptionsType>) => Promise<TaskResultType>;Adds a sync or async task to the queue. Always returns a promise.
method addAll
addAll: <TaskResultsType>( functions: ReadonlyArray<Task<TaskResultsType>>, options?: Partial<EnqueueOptionsType>) => Promise<TaskResultsType[]>;Same as
.add(), but accepts an array of sync or async functions.Returns
A promise that resolves when all functions are resolved.
method clear
clear: () => void;Clear the queue.
method onEmpty
onEmpty: () => Promise<void>;Can be called multiple times. Useful if you for example add additional items at a later time.
Returns
A promise that settles when the queue becomes empty.
method onError
onError: () => Promise<never>;Returns
A promise that rejects when any task in the queue errors.
Use with
Promise.race([queue.onError(), queue.onIdle()])to fail fast on the first error while still resolving normally when the queue goes idle.Important: The promise returned by
add()still rejects. You must handle eachadd()promise (for example,.catch(() => {})) to avoid unhandled rejections.Example 1
``` import PQueue from 'p-queue';
const queue = new PQueue({concurrency: 2});
queue.add(() => fetchData(1)).catch(() => {}); queue.add(() => fetchData(2)).catch(() => {}); queue.add(() => fetchData(3)).catch(() => {});
// Stop processing on first error try { await Promise.race([ queue.onError(), queue.onIdle() ]); } catch (error) { queue.pause(); // Stop processing remaining tasks console.error('Queue failed:', error); } ```
method onIdle
onIdle: () => Promise<void>;The difference with
.onEmptyis that.onIdleguarantees that all work from the queue has finished..onEmptymerely signals that the queue is empty, but it could mean that some promises haven't completed yet.Returns
A promise that settles when the queue becomes empty, and all promises have completed;
queue.size === 0 && queue.pending === 0.
method onPendingZero
onPendingZero: () => Promise<void>;The difference with
.onIdleis that.onPendingZeroonly waits for currently running tasks to finish, ignoring queued tasks.Returns
A promise that settles when all currently running tasks have completed;
queue.pending === 0.
method onRateLimit
onRateLimit: () => Promise<void>;Returns
A promise that settles when the queue becomes rate-limited due to intervalCap.
method onRateLimitCleared
onRateLimitCleared: () => Promise<void>;Returns
A promise that settles when the queue is no longer rate-limited.
method onSizeLessThan
onSizeLessThan: (limit: number) => Promise<void>;Returns
A promise that settles when the queue size is less than the given limit:
queue.size < limit.If you want to avoid having the queue grow beyond a certain size you can
await queue.onSizeLessThan()before adding a new item.Note that this only limits the number of items waiting to start. There could still be up to
concurrencyjobs already running that this call does not include in its calculation.
method pause
pause: () => void;Put queue execution on hold.
method setPriority
setPriority: (id: string, priority: number) => void;Updates the priority of a promise function by its id, affecting its execution order. Requires a defined concurrency limit to take effect.
For example, this can be used to prioritize a promise function to run earlier.
```js import PQueue from 'p-queue';
const queue = new PQueue({concurrency: 1});
queue.add(async () => '🦄', {priority: 1}); queue.add(async () => '🦀', {priority: 0, id: '🦀'}); queue.add(async () => '🦄', {priority: 1}); queue.add(async () => '🦄', {priority: 1});
queue.setPriority('🦀', 2); ```
In this case, the promise function with
id: '🦀'runs second.You can also deprioritize a promise function to delay its execution:
```js import PQueue from 'p-queue';
const queue = new PQueue({concurrency: 1});
queue.add(async () => '🦄', {priority: 1}); queue.add(async () => '🦀', {priority: 1, id: '🦀'}); queue.add(async () => '🦄'); queue.add(async () => '🦄', {priority: 0});
queue.setPriority('🦀', -1); ``` Here, the promise function with
id: '🦀'executes last.
method sizeBy
sizeBy: (options: Readonly<Partial<EnqueueOptionsType>>) => number;Size of the queue, filtered by the given options.
For example, this can be used to find the number of items remaining in the queue with a specific priority level.
method start
start: () => this;Start (or resume) executing enqueued tasks within concurrency limit. No need to call this if queue is not paused (via
options.autoStart = falseor by.pause()method.)
Type Aliases
type Options
type Options< QueueType extends Queue<RunFunction, QueueOptions>, QueueOptions extends QueueAddOptions> = { /** Concurrency limit.
Minimum: `1`.
@default Infinity */ readonly concurrency?: number; /** Whether queue tasks within concurrency limit, are auto-executed as soon as they're added.
@default true */ readonly autoStart?: boolean; /** Class with a `enqueue` and `dequeue` method, and a `size` getter. See the [Custom QueueClass](https://github.com/sindresorhus/p-queue#custom-queueclass) section. */ readonly queueClass?: new () => QueueType; /** The max number of runs in the given interval of time.
Minimum: `1`.
@default Infinity */ readonly intervalCap?: number; /** The length of time in milliseconds before the interval count resets. Must be finite.
Minimum: `0`.
@default 0 */ readonly interval?: number; /** Whether the task must finish in the given interval or will be carried over into the next interval count.
@default false */ readonly carryoverIntervalCount?: boolean; /** @deprecated Renamed to `carryoverIntervalCount`. */ readonly carryoverConcurrencyCount?: boolean;} & TimeoutOptions;type Queue
type Queue<Element, Options> = { size: number; filter: (options: Readonly<Partial<Options>>) => Element[]; dequeue: () => Element | undefined; enqueue: (run: Element, options?: Partial<Options>) => void; setPriority: (id: string, priority: number) => void;};type QueueAddOptions
type QueueAddOptions = { /** Priority of operation. Operations with greater priority will be scheduled first.
@default 0 */ readonly priority?: number; /** Unique identifier for the promise function, used to update its priority before execution. If not specified, it is auto-assigned an incrementing BigInt starting from `1n`. */ id?: string;} & TaskOptions & TimeoutOptions;Package Files (3)
Dependencies (2)
Dev Dependencies (14)
Peer Dependencies (0)
No peer dependencies.
Badge
To add a badge like this oneto your package's README, use the codes available below.
You may also use Shields.io to create a custom badge linking to https://www.jsdocs.io/package/p-queue.
- Markdown[](https://www.jsdocs.io/package/p-queue)
- HTML<a href="https://www.jsdocs.io/package/p-queue"><img src="https://img.shields.io/badge/jsDocs.io-reference-blue" alt="jsDocs.io"></a>
- Updated .
Package analyzed in 3403 ms. - Missing or incorrect documentation? Open an issue for this package.
