rxjs
- Version 7.8.2
- Published
- 4.5 MB
- 1 dependency
- Apache-2.0 license
Install
npm i rxjs
yarn add rxjs
pnpm add rxjs
Overview
Reactive Extensions for modern JavaScript
Index
Variables
Functions
- animationFrames()
- audit()
- auditTime()
- bindCallback()
- bindNodeCallback()
- buffer()
- bufferCount()
- bufferTime()
- bufferToggle()
- bufferWhen()
- catchError()
- combineLatest()
- combineLatestAll()
- combineLatestWith()
- concat()
- concatAll()
- concatMap()
- concatMapTo()
- concatWith()
- connect()
- connectable()
- count()
- debounce()
- debounceTime()
- defaultIfEmpty()
- defer()
- delay()
- delayWhen()
- dematerialize()
- distinct()
- distinctUntilChanged()
- distinctUntilKeyChanged()
- elementAt()
- empty()
- endWith()
- every()
- exhaustAll()
- exhaustMap()
- expand()
- filter()
- finalize()
- find()
- findIndex()
- first()
- firstValueFrom()
- forkJoin()
- from()
- fromEvent()
- fromEventPattern()
- generate()
- groupBy()
- identity()
- ignoreElements()
- iif()
- interval()
- isEmpty()
- isObservable()
- last()
- lastValueFrom()
- map()
- mapTo()
- materialize()
- max()
- merge()
- mergeAll()
- mergeMap()
- mergeMapTo()
- mergeScan()
- mergeWith()
- min()
- multicast()
- never()
- noop()
- observeOn()
- of()
- onErrorResumeNext()
- onErrorResumeNextWith()
- pairs()
- pairwise()
- partition()
- pipe()
- pluck()
- publish()
- publishBehavior()
- publishLast()
- publishReplay()
- race()
- raceWith()
- range()
- reduce()
- refCount()
- repeat()
- repeatWhen()
- retry()
- retryWhen()
- sample()
- sampleTime()
- scan()
- scheduled()
- sequenceEqual()
- share()
- shareReplay()
- single()
- skip()
- skipLast()
- skipUntil()
- skipWhile()
- startWith()
- subscribeOn()
- switchAll()
- switchMap()
- switchMapTo()
- switchScan()
- take()
- takeLast()
- takeUntil()
- takeWhile()
- tap()
- throttle()
- throttleTime()
- throwError()
- throwIfEmpty()
- timeInterval()
- timeout()
- timeoutWith()
- timer()
- timestamp()
- toArray()
- using()
- window()
- windowCount()
- windowTime()
- windowToggle()
- windowWhen()
- withLatestFrom()
- zip()
- zipAll()
- zipWith()
Classes
Interfaces
Enums
Type Aliases
Variables
variable animationFrame
const animationFrame: AnimationFrameScheduler;
Deprecated
Renamed to animationFrameScheduler. Will be removed in v8.
variable animationFrameScheduler
const animationFrameScheduler: AnimationFrameScheduler;
Animation Frame Scheduler
Perform task when
window.requestAnimationFrame
would fireWhen
animationFrame
scheduler is used with delay, it will fall back to asyncScheduler scheduler behaviour.Without delay,
animationFrame
scheduler can be used to create smooth browser animations. It makes sure scheduled task will happen just before next browser content repaint, thus performing animations as efficiently as possible.## Example Schedule div height animation
// html: <div style="background: #0ff;"></div>import { animationFrameScheduler } from 'rxjs';const div = document.querySelector('div');animationFrameScheduler.schedule(function(height) {div.style.height = height + "px";this.schedule(height + 1); // `this` references currently executing Action,// which we reschedule with new state}, 0, 0);// You will see a div element growing in height
variable ArgumentOutOfRangeError
const ArgumentOutOfRangeError: ArgumentOutOfRangeErrorCtor;
variable asap
const asap: AsapScheduler;
Deprecated
Renamed to asapScheduler. Will be removed in v8.
variable asapScheduler
const asapScheduler: AsapScheduler;
Asap Scheduler
Perform task as fast as it can be performed asynchronously
asap
scheduler behaves the same as asyncScheduler scheduler when you use it to delay task in time. If however you set delay to0
,asap
will wait for current synchronously executing code to end and then it will try to execute given task as fast as possible.asap
scheduler will do its best to minimize time between end of currently executing code and start of scheduled task. This makes it best candidate for performing so called "deferring". Traditionally this was achieved by callingsetTimeout(deferredTask, 0)
, but that technique involves some (although minimal) unwanted delay.Note that using
asap
scheduler does not necessarily mean that your task will be first to process after currently executing code. In particular, if some task was also scheduled withasap
before, that task will execute first. That being said, if you need to schedule task asynchronously, but as soon as possible,asap
scheduler is your best bet.## Example Compare async and asap scheduler<
import { asapScheduler, asyncScheduler } from 'rxjs';asyncScheduler.schedule(() => console.log('async')); // scheduling 'async' first...asapScheduler.schedule(() => console.log('asap'));// Logs:// "asap"// "async"// ... but 'asap' goes first!
variable async
const async: AsyncScheduler;
Deprecated
Renamed to asyncScheduler. Will be removed in v8.
variable asyncScheduler
const asyncScheduler: AsyncScheduler;
Async Scheduler
Schedule task as if you used setTimeout(task, duration)
async
scheduler schedules tasks asynchronously, by putting them on the JavaScript event loop queue. It is best used to delay tasks in time or to schedule tasks repeating in intervals.If you just want to "defer" task, that is to perform it right after currently executing synchronous code ends (commonly achieved by
setTimeout(deferredTask, 0)
), better choice will be the asapScheduler scheduler.## Examples Use async scheduler to delay task
import { asyncScheduler } from 'rxjs';const task = () => console.log('it works!');asyncScheduler.schedule(task, 2000);// After 2 seconds logs:// "it works!"Use async scheduler to repeat task in intervals
import { asyncScheduler } from 'rxjs';function task(state) {console.log(state);this.schedule(state + 1, 1000); // `this` references currently executing Action,// which we reschedule with new state and delay}asyncScheduler.schedule(task, 3000, 0);// Logs:// 0 after 3s// 1 after 4s// 2 after 5s// 3 after 6s
variable combineAll
const combineAll: { <T>(): OperatorFunction<ObservableInput<T>, T[]>; <T>(): OperatorFunction<any, T[]>; <T, R>(project: (...values: T[]) => R): OperatorFunction<ObservableInput<T>, R>; <R>(project: (...values: any[]) => R): OperatorFunction<any, R>;};
Deprecated
Renamed to combineLatestAll. Will be removed in v8.
variable config
const config: GlobalConfig;
The GlobalConfig object for RxJS. It is used to configure things like how to react on unhandled errors.
variable EMPTY
const EMPTY: Observable<never>;
A simple Observable that emits no items to the Observer and immediately emits a complete notification.
Just emits 'complete', and nothing else.

A simple Observable that only emits the complete notification. It can be used for composing with other Observables, such as in a mergeMap.
## Examples
Log complete notification
import { EMPTY } from 'rxjs';EMPTY.subscribe({next: () => console.log('Next'),complete: () => console.log('Complete!')});// Outputs// Complete!Emit the number 7, then complete
import { EMPTY, startWith } from 'rxjs';const result = EMPTY.pipe(startWith(7));result.subscribe(x => console.log(x));// Outputs// 7Map and flatten only odd numbers to the sequence
'a'
,'b'
,'c'
import { interval, mergeMap, of, EMPTY } from 'rxjs';const interval$ = interval(1000);const result = interval$.pipe(mergeMap(x => x % 2 === 1 ? of('a', 'b', 'c') : EMPTY),);result.subscribe(x => console.log(x));// Results in the following to the console:// x is equal to the count on the interval, e.g. (0, 1, 2, 3, ...)// x will occur every 1000ms// if x % 2 is equal to 1, print a, b, c (each on its own)// if x % 2 is not equal to 1, nothing will be outputSee Also
variable EmptyError
const EmptyError: EmptyErrorCtor;
An error thrown when an Observable or a sequence was queried but has no elements.
See Also
variable exhaust
const exhaust: <O extends ObservableInput<any>>() => OperatorFunction< O, ObservedValueOf<O>>;
Deprecated
Renamed to exhaustAll. Will be removed in v8.
variable flatMap
const flatMap: { <T, O extends ObservableInput<any>>( project: (value: T, index: number) => O, concurrent?: number ): OperatorFunction<T, ObservedValueOf<O>>; <T, O extends ObservableInput<any>>( project: (value: T, index: number) => O, resultSelector: undefined, concurrent?: number ): OperatorFunction<T, ObservedValueOf<O>>; <T, R, O extends ObservableInput<any>>( project: (value: T, index: number) => O, resultSelector: ( outerValue: T, innerValue: ObservedValueOf<O>, outerIndex: number, innerIndex: number ) => R, concurrent?: number ): OperatorFunction<T, R>;};
Deprecated
Renamed to mergeMap. Will be removed in v8.
variable NEVER
const NEVER: Observable<never>;
An Observable that emits no items to the Observer and never completes.

A simple Observable that emits neither values nor errors nor the completion notification. It can be used for testing purposes or for composing with other Observables. Please note that by never emitting a complete notification, this Observable keeps the subscription from being disposed automatically. Subscriptions need to be manually disposed.
## Example
Emit the number 7, then never emit anything else (not even complete)
import { NEVER, startWith } from 'rxjs';const info = () => console.log('Will not be called');const result = NEVER.pipe(startWith(7));result.subscribe({next: x => console.log(x),error: info,complete: info});See Also
variable NotFoundError
const NotFoundError: NotFoundErrorCtor;
An error thrown when a value or values are missing from an observable sequence.
See Also
variable ObjectUnsubscribedError
const ObjectUnsubscribedError: ObjectUnsubscribedErrorCtor;
An error thrown when an action is invalid because the object has been unsubscribed.
See Also
ObjectUnsubscribedError
variable observable
const observable: string | symbol;
Symbol.observable or a string "@@observable". Used for interop
Deprecated
We will no longer be exporting this symbol in upcoming versions of RxJS. Instead polyfill and use Symbol.observable directly *or* use https://www.npmjs.com/package/symbol-observable
variable queue
const queue: QueueScheduler;
Deprecated
Renamed to queueScheduler. Will be removed in v8.
variable queueScheduler
const queueScheduler: QueueScheduler;
Queue Scheduler
Put every next task on a queue, instead of executing it immediately
queue
scheduler, when used with delay, behaves the same as asyncScheduler scheduler.When used without delay, it schedules given task synchronously - executes it right when it is scheduled. However when called recursively, that is when inside the scheduled task, another task is scheduled with queue scheduler, instead of executing immediately as well, that task will be put on a queue and wait for current one to finish.
This means that when you execute task with
queue
scheduler, you are sure it will end before any other task scheduled with that scheduler will start.## Examples Schedule recursively first, then do something
import { queueScheduler } from 'rxjs';queueScheduler.schedule(() => {queueScheduler.schedule(() => console.log('second')); // will not happen now, but will be put on a queueconsole.log('first');});// Logs:// "first"// "second"Reschedule itself recursively
import { queueScheduler } from 'rxjs';queueScheduler.schedule(function(state) {if (state !== 0) {console.log('before', state);this.schedule(state - 1); // `this` references currently executing Action,// which we reschedule with new stateconsole.log('after', state);}}, 0, 3);// In scheduler that runs recursively, you would expect:// "before", 3// "before", 2// "before", 1// "after", 1// "after", 2// "after", 3// But with queue it logs:// "before", 3// "after", 3// "before", 2// "after", 2// "before", 1// "after", 1
variable SequenceError
const SequenceError: SequenceErrorCtor;
An error thrown when something is wrong with the sequence of values arriving on the observable.
See Also
variable TimeoutError
const TimeoutError: TimeoutErrorCtor;
An error thrown by the timeout operator.
Provided so users can use as a type and do quality comparisons. We recommend you do not subclass this or create instances of this class directly. If you have need of a error representing a timeout, you should create your own error class and use that.
See Also
variable UnsubscriptionError
const UnsubscriptionError: UnsubscriptionErrorCtor;
An error thrown when one or more errors have occurred during the
unsubscribe
of a Subscription.
Functions
function animationFrames
animationFrames: ( timestampProvider?: TimestampProvider) => Observable<{ timestamp: number; elapsed: number }>;
An observable of animation frames
Emits the amount of time elapsed since subscription and the timestamp on each animation frame. Defaults to milliseconds provided to the requestAnimationFrame's callback. Does not end on its own.
Every subscription will start a separate animation loop. Since animation frames are always scheduled by the browser to occur directly before a repaint, scheduling more than one animation frame synchronously should not be much different or have more overhead than looping over an array of events during a single animation frame. However, if for some reason the developer would like to ensure the execution of animation-related handlers are all executed during the same task by the engine, the
share
operator can be used.This is useful for setting up animations with RxJS.
## Examples
Tweening a div to move it on the screen
import { animationFrames, map, takeWhile, endWith } from 'rxjs';function tween(start: number, end: number, duration: number) {const diff = end - start;return animationFrames().pipe(// Figure out what percentage of time has passedmap(({ elapsed }) => elapsed / duration),// Take the vector while less than 100%takeWhile(v => v < 1),// Finish with 100%endWith(1),// Calculate the distance traveled between start and endmap(v => v * diff + start));}// Setup a div for us to move aroundconst div = document.createElement('div');document.body.appendChild(div);div.style.position = 'absolute';div.style.width = '40px';div.style.height = '40px';div.style.backgroundColor = 'lime';div.style.transform = 'translate3d(10px, 0, 0)';tween(10, 200, 4000).subscribe(x => {div.style.transform = `translate3d(${ x }px, 0, 0)`;});Providing a custom timestamp provider
import { animationFrames, TimestampProvider } from 'rxjs';// A custom timestamp providerlet now = 0;const customTSProvider: TimestampProvider = {now() { return now++; }};const source$ = animationFrames(customTSProvider);// Log increasing numbers 0...1...2... on every animation frame.source$.subscribe(({ elapsed }) => console.log(elapsed));Parameter timestampProvider
An object with a
now
method that provides a numeric timestamp
function audit
audit: <T>( durationSelector: (value: T) => ObservableInput<any>) => MonoTypeOperatorFunction<T>;
Ignores source values for a duration determined by another Observable, then emits the most recent value from the source Observable, then repeats this process.
It's like auditTime, but the silencing duration is determined by a second Observable.

audit
is similar tothrottle
, but emits the last value from the silenced time window, instead of the first value.audit
emits the most recent value from the source Observable on the output Observable as soon as its internal timer becomes disabled, and ignores source values while the timer is enabled. Initially, the timer is disabled. As soon as the first source value arrives, the timer is enabled by calling thedurationSelector
function with the source value, which returns the "duration" Observable. When the duration Observable emits a value, the timer is disabled, then the most recent source value is emitted on the output Observable, and this process repeats for the next source value.## Example
Emit clicks at a rate of at most one click per second
import { fromEvent, audit, interval } from 'rxjs';const clicks = fromEvent(document, 'click');const result = clicks.pipe(audit(ev => interval(1000)));result.subscribe(x => console.log(x));Parameter durationSelector
A function that receives a value from the source Observable, for computing the silencing duration, returned as an Observable or a Promise. A function that returns an Observable that performs rate-limiting of emissions from the source Observable.
See Also
function auditTime
auditTime: <T>( duration: number, scheduler?: SchedulerLike) => MonoTypeOperatorFunction<T>;
Ignores source values for
duration
milliseconds, then emits the most recent value from the source Observable, then repeats this process.When it sees a source value, it ignores that plus the next ones for
duration
milliseconds, and then it emits the most recent value from the source.
auditTime
is similar tothrottleTime
, but emits the last value from the silenced time window, instead of the first value.auditTime
emits the most recent value from the source Observable on the output Observable as soon as its internal timer becomes disabled, and ignores source values while the timer is enabled. Initially, the timer is disabled. As soon as the first source value arrives, the timer is enabled. Afterduration
milliseconds (or the time unit determined internally by the optionalscheduler
) has passed, the timer is disabled, then the most recent source value is emitted on the output Observable, and this process repeats for the next source value. Optionally takes a SchedulerLike for managing timers.## Example
Emit clicks at a rate of at most one click per second
import { fromEvent, auditTime } from 'rxjs';const clicks = fromEvent(document, 'click');const result = clicks.pipe(auditTime(1000));result.subscribe(x => console.log(x));Parameter duration
Time to wait before emitting the most recent source value, measured in milliseconds or the time unit determined internally by the optional
scheduler
.Parameter scheduler
The SchedulerLike to use for managing the timers that handle the rate-limiting behavior. A function that returns an Observable that performs rate-limiting of emissions from the source Observable.
See Also
function bindCallback
bindCallback: { ( callbackFunc: (...args: any[]) => void, resultSelector: (...args: any[]) => any, scheduler?: SchedulerLike ): (...args: any[]) => Observable<any>; <A extends readonly unknown[], R extends readonly unknown[]>( callbackFunc: (...args: [...A, (...res: R) => void]) => void, schedulerLike?: SchedulerLike ): (...arg: A) => Observable<R extends [] ? void : R extends [any] ? R[0] : R>;};
function bindNodeCallback
bindNodeCallback: { ( callbackFunc: (...args: any[]) => void, resultSelector: (...args: any[]) => any, scheduler?: SchedulerLike ): (...args: any[]) => Observable<any>; <A extends readonly unknown[], R extends readonly unknown[]>( callbackFunc: (...args: [...A, (err: any, ...res: R) => void]) => void, schedulerLike?: SchedulerLike ): (...arg: A) => Observable<R extends [] ? void : R extends [any] ? R[0] : R>;};
function buffer
buffer: <T>(closingNotifier: ObservableInput<any>) => OperatorFunction<T, T[]>;
Buffers the source Observable values until
closingNotifier
emits.Collects values from the past as an array, and emits that array only when another Observable emits.

Buffers the incoming Observable values until the given
closingNotifier
ObservableInput
(that internally gets converted to an Observable) emits a value, at which point it emits the buffer on the output Observable and starts a new buffer internally, awaiting the next timeclosingNotifier
emits.## Example
On every click, emit array of most recent interval events
import { fromEvent, interval, buffer } from 'rxjs';const clicks = fromEvent(document, 'click');const intervalEvents = interval(1000);const buffered = intervalEvents.pipe(buffer(clicks));buffered.subscribe(x => console.log(x));Parameter closingNotifier
An
ObservableInput
that signals the buffer to be emitted on the output Observable. A function that returns an Observable of buffers, which are arrays of values.See Also
function bufferCount
bufferCount: <T>( bufferSize: number, startBufferEvery?: number | null) => OperatorFunction<T, T[]>;
Buffers the source Observable values until the size hits the maximum
bufferSize
given.Collects values from the past as an array, and emits that array only when its size reaches
bufferSize
.
Buffers a number of values from the source Observable by
bufferSize
then emits the buffer and clears it, and starts a new buffer eachstartBufferEvery
values. IfstartBufferEvery
is not provided or isnull
, then new buffers are started immediately at the start of the source and when each buffer closes and is emitted.## Examples
Emit the last two click events as an array
import { fromEvent, bufferCount } from 'rxjs';const clicks = fromEvent(document, 'click');const buffered = clicks.pipe(bufferCount(2));buffered.subscribe(x => console.log(x));On every click, emit the last two click events as an array
import { fromEvent, bufferCount } from 'rxjs';const clicks = fromEvent(document, 'click');const buffered = clicks.pipe(bufferCount(2, 1));buffered.subscribe(x => console.log(x));Parameter bufferSize
The maximum size of the buffer emitted.
Parameter startBufferEvery
Interval at which to start a new buffer. For example if
startBufferEvery
is2
, then a new buffer will be started on every other value from the source. A new buffer is started at the beginning of the source by default. A function that returns an Observable of arrays of buffered values.See Also
function bufferTime
bufferTime: { <T>(bufferTimeSpan: number, scheduler?: SchedulerLike): OperatorFunction<T, T[]>; <T>( bufferTimeSpan: number, bufferCreationInterval: number, scheduler?: SchedulerLike ): OperatorFunction<T, T[]>; <T>( bufferTimeSpan: number, bufferCreationInterval: number, maxBufferSize: number, scheduler?: SchedulerLike ): OperatorFunction<T, T[]>;};
function bufferToggle
bufferToggle: <T, O>( openings: ObservableInput<O>, closingSelector: (value: O) => ObservableInput<any>) => OperatorFunction<T, T[]>;
Buffers the source Observable values starting from an emission from
openings
and ending when the output ofclosingSelector
emits.Collects values from the past as an array. Starts collecting only when
opening
emits, and calls theclosingSelector
function to get an Observable that tells when to close the buffer.
Buffers values from the source by opening the buffer via signals from an Observable provided to
openings
, and closing and sending the buffers when a Subscribable or Promise returned by theclosingSelector
function emits.## Example
Every other second, emit the click events from the next 500ms
import { fromEvent, interval, bufferToggle, EMPTY } from 'rxjs';const clicks = fromEvent(document, 'click');const openings = interval(1000);const buffered = clicks.pipe(bufferToggle(openings, i =>i % 2 ? interval(500) : EMPTY));buffered.subscribe(x => console.log(x));Parameter openings
A Subscribable or Promise of notifications to start new buffers.
Parameter closingSelector
A function that takes the value emitted by the
openings
observable and returns a Subscribable or Promise, which, when it emits, signals that the associated buffer should be emitted and cleared. A function that returns an Observable of arrays of buffered values.See Also
function bufferWhen
bufferWhen: <T>( closingSelector: () => ObservableInput<any>) => OperatorFunction<T, T[]>;
Buffers the source Observable values, using a factory function of closing Observables to determine when to close, emit, and reset the buffer.
Collects values from the past as an array. When it starts collecting values, it calls a function that returns an Observable that tells when to close the buffer and restart collecting.

Opens a buffer immediately, then closes the buffer when the observable returned by calling
closingSelector
function emits a value. When it closes the buffer, it immediately opens a new buffer and repeats the process.## Example
Emit an array of the last clicks every [1-5] random seconds
import { fromEvent, bufferWhen, interval } from 'rxjs';const clicks = fromEvent(document, 'click');const buffered = clicks.pipe(bufferWhen(() => interval(1000 + Math.random() * 4000)));buffered.subscribe(x => console.log(x));Parameter closingSelector
A function that takes no arguments and returns an Observable that signals buffer closure. A function that returns an Observable of arrays of buffered values.
See Also
function catchError
catchError: <T, O extends ObservableInput<any>>( selector: (err: any, caught: Observable<T>) => O) => OperatorFunction<T, T | ObservedValueOf<O>>;
function combineLatest
combineLatest: { <T extends unique symbol>(arg: T): Observable<unknown>; (sources: []): Observable<never>; <A extends readonly unknown[]>( sources: readonly [...ObservableInputTuple<A>] ): Observable<A>; <A extends readonly unknown[], R>( sources: readonly [...ObservableInputTuple<A>], resultSelector: (...values: A) => R, scheduler: SchedulerLike ): Observable<R>; <A extends readonly unknown[], R>( sources: readonly [...ObservableInputTuple<A>], resultSelector: (...values: A) => R ): Observable<R>; <A extends readonly unknown[]>( sources: readonly [...ObservableInputTuple<A>], scheduler: SchedulerLike ): Observable<A>; <A extends readonly unknown[]>( ...sources_0: ObservableInputTuple<A> ): Observable<A>; <A extends readonly unknown[], R>( ...sourcesAndResultSelectorAndScheduler: [ ...ObservableInputTuple<A>, (...values: A) => R, SchedulerLike ] ): Observable<R>; <A extends readonly unknown[], R>( ...sourcesAndResultSelector: [ ...ObservableInputTuple<A>, (...values: A) => R ] ): Observable<R>; <A extends readonly unknown[]>( ...sourcesAndScheduler: [...ObservableInputTuple<A>, SchedulerLike] ): Observable<A>; (sourcesObject: { [x: string]: never; [x: number]: never; [x: symbol]: never; }): Observable<never>; <T extends Record<string, ObservableInput<any>>>(sourcesObject: T): Observable<{ [K in keyof T]: ObservedValueOf<T[K]>; }>;};
You have passed
any
here, we can't figure out if it is an array or an object, so you're gettingunknown
. Use better types.Parameter arg
Something typed as
any
Deprecated
The
scheduler
parameter will be removed in v8. Usescheduled
andcombineLatestAll
. Details: https://rxjs.dev/deprecations/scheduler-argumentDeprecated
Pass an array of sources instead. The rest-parameters signature will be removed in v8. Details: https://rxjs.dev/deprecations/array-argument
function combineLatestAll
combineLatestAll: { <T>(): OperatorFunction<ObservableInput<T>, T[]>; <T>(): OperatorFunction<any, T[]>; <T, R>(project: (...values: T[]) => R): OperatorFunction<ObservableInput<T>, R>; <R>(project: (...values: any[]) => R): OperatorFunction<any, R>;};
function combineLatestWith
combineLatestWith: <T, A extends readonly unknown[]>( ...otherSources_0: ObservableInputTuple<A>) => OperatorFunction<T, Cons<T, A>>;
Create an observable that combines the latest values from all passed observables and the source into arrays and emits them.
Returns an observable, that when subscribed to, will subscribe to the source observable and all sources provided as arguments. Once all sources emit at least one value, all of the latest values will be emitted as an array. After that, every time any source emits a value, all of the latest values will be emitted as an array.
This is a useful operator for eagerly calculating values based off of changed inputs.
## Example
Simple concatenation of values from two inputs
import { fromEvent, combineLatestWith, map } from 'rxjs';// Setup: Add two inputs to the pageconst input1 = document.createElement('input');document.body.appendChild(input1);const input2 = document.createElement('input');document.body.appendChild(input2);// Get streams of changesconst input1Changes$ = fromEvent(input1, 'change');const input2Changes$ = fromEvent(input2, 'change');// Combine the changes by adding them togetherinput1Changes$.pipe(combineLatestWith(input2Changes$),map(([e1, e2]) => (<HTMLInputElement>e1.target).value + ' - ' + (<HTMLInputElement>e2.target).value)).subscribe(x => console.log(x));Parameter otherSources
the other sources to subscribe to. A function that returns an Observable that emits the latest emissions from both source and provided Observables.
function concat
concat: { <T extends readonly unknown[]>(...inputs_0: ObservableInputTuple<T>): Observable< T[number] >; <T extends readonly unknown[]>( ...inputsAndScheduler: [...ObservableInputTuple<T>, SchedulerLike] ): Observable<T[number]>;};
function concatAll
concatAll: <O extends ObservableInput<any>>() => OperatorFunction< O, ObservedValueOf<O>>;
Converts a higher-order Observable into a first-order Observable by concatenating the inner Observables in order.
Flattens an Observable-of-Observables by putting one inner Observable after the other.

Joins every Observable emitted by the source (a higher-order Observable), in a serial fashion. It subscribes to each inner Observable only after the previous inner Observable has completed, and merges all of their values into the returned observable.
__Warning:__ If the source Observable emits Observables quickly and endlessly, and the inner Observables it emits generally complete slower than the source emits, you can run into memory issues as the incoming Observables collect in an unbounded buffer.
Note:
concatAll
is equivalent tomergeAll
with concurrency parameter set to1
.## Example
For each click event, tick every second from 0 to 3, with no concurrency
import { fromEvent, map, interval, take, concatAll } from 'rxjs';const clicks = fromEvent(document, 'click');const higherOrder = clicks.pipe(map(() => interval(1000).pipe(take(4))));const firstOrder = higherOrder.pipe(concatAll());firstOrder.subscribe(x => console.log(x));// Results in the following:// (results are not concurrent)// For every click on the "document" it will emit values 0 to 3 spaced// on a 1000ms interval// one click = 1000ms-> 0 -1000ms-> 1 -1000ms-> 2 -1000ms-> 3See Also
A function that returns an Observable emitting values from all the inner Observables concatenated.
function concatMap
concatMap: { <T, O extends ObservableInput<any>>( project: (value: T, index: number) => O ): OperatorFunction<T, ObservedValueOf<O>>; <T, O extends ObservableInput<any>>( project: (value: T, index: number) => O, resultSelector: undefined ): OperatorFunction<T, ObservedValueOf<O>>; <T, R, O extends ObservableInput<any>>( project: (value: T, index: number) => O, resultSelector: ( outerValue: T, innerValue: ObservedValueOf<O>, outerIndex: number, innerIndex: number ) => R ): OperatorFunction<T, R>;};
Deprecated
The
resultSelector
parameter will be removed in v8. Use an innermap
instead. Details: https://rxjs.dev/deprecations/resultSelector
function concatMapTo
concatMapTo: { <O extends ObservableInput<unknown>>(observable: O): OperatorFunction< unknown, ObservedValueOf<O> >; <O extends ObservableInput<unknown>>( observable: O, resultSelector: undefined ): OperatorFunction<unknown, ObservedValueOf<O>>; <T, R, O extends ObservableInput<unknown>>( observable: O, resultSelector: ( outerValue: T, innerValue: ObservedValueOf<O>, outerIndex: number, innerIndex: number ) => R ): OperatorFunction<T, R>;};
Deprecated
Will be removed in v9. Use concatMap instead:
concatMap(() => result)
Deprecated
The
resultSelector
parameter will be removed in v8. Use an innermap
instead. Details: https://rxjs.dev/deprecations/resultSelector
function concatWith
concatWith: <T, A extends readonly unknown[]>( ...otherSources_0: ObservableInputTuple<A>) => OperatorFunction<T, T | A[number]>;
Emits all of the values from the source observable, then, once it completes, subscribes to each observable source provided, one at a time, emitting all of their values, and not subscribing to the next one until it completes.
concat(a$, b$, c$)
is the same asa$.pipe(concatWith(b$, c$))
.## Example
Listen for one mouse click, then listen for all mouse moves.
import { fromEvent, map, take, concatWith } from 'rxjs';const clicks$ = fromEvent(document, 'click');const moves$ = fromEvent(document, 'mousemove');clicks$.pipe(map(() => 'click'),take(1),concatWith(moves$.pipe(map(() => 'move')))).subscribe(x => console.log(x));// 'click'// 'move'// 'move'// 'move'// ...Parameter otherSources
Other observable sources to subscribe to, in sequence, after the original source is complete. A function that returns an Observable that concatenates subscriptions to the source and provided Observables subscribing to the next only once the current subscription completes.
function connect
connect: <T, O extends ObservableInput<unknown>>( selector: (shared: Observable<T>) => O, config?: ConnectConfig<T>) => OperatorFunction<T, ObservedValueOf<O>>;
Creates an observable by multicasting the source within a function that allows the developer to define the usage of the multicast prior to connection.
This is particularly useful if the observable source you wish to multicast could be synchronous or asynchronous. This sets it apart from share, which, in the case of totally synchronous sources will fail to share a single subscription with multiple consumers, as by the time the subscription to the result of share has returned, if the source is synchronous its internal reference count will jump from 0 to 1 back to 0 and reset.
To use
connect
, you provide aselector
function that will give you a multicast observable that is not yet connected. You then use that multicast observable to create a resulting observable that, when subscribed, will set up your multicast. This is generally, but not always, accomplished with merge.Note that using a takeUntil inside of
connect
'sselector
_might_ mean you were looking to use the takeWhile operator instead.When you subscribe to the result of
connect
, theselector
function will be called. After theselector
function returns, the observable it returns will be subscribed to, _then_ the multicast will be connected to the source.## Example
Sharing a totally synchronous observable
import { of, tap, connect, merge, map, filter } from 'rxjs';const source$ = of(1, 2, 3, 4, 5).pipe(tap({subscribe: () => console.log('subscription started'),next: n => console.log(`source emitted ${ n }`)}));source$.pipe(// Notice in here we're merging 3 subscriptions to `shared$`.connect(shared$ => merge(shared$.pipe(map(n => `all ${ n }`)),shared$.pipe(filter(n => n % 2 === 0), map(n => `even ${ n }`)),shared$.pipe(filter(n => n % 2 === 1), map(n => `odd ${ n }`))))).subscribe(console.log);// Expected output: (notice only one subscription)'subscription started''source emitted 1''all 1''odd 1''source emitted 2''all 2''even 2''source emitted 3''all 3''odd 3''source emitted 4''all 4''even 4''source emitted 5''all 5''odd 5'Parameter selector
A function used to set up the multicast. Gives you a multicast observable that is not yet connected. With that, you're expected to create and return and Observable, that when subscribed to, will utilize the multicast observable. After this function is executed -- and its return value subscribed to -- the operator will subscribe to the source, and the connection will be made.
Parameter config
The configuration object for
connect
.
function connectable
connectable: <T>( source: ObservableInput<T>, config?: ConnectableConfig<T>) => Connectable<T>;
Creates an observable that multicasts once
connect()
is called on it.Parameter source
The observable source to make connectable.
Parameter config
The configuration object for
connectable
.Returns
A "connectable" observable, that has a
connect()
method, that you must call to connect the source to all consumers through the subject provided as the connector.
function count
count: <T>( predicate?: (value: T, index: number) => boolean) => OperatorFunction<T, number>;
Counts the number of emissions on the source and emits that number when the source completes.
Tells how many values were emitted, when the source completes.

count
transforms an Observable that emits values into an Observable that emits a single value that represents the number of values emitted by the source Observable. If the source Observable terminates with an error,count
will pass this error notification along without emitting a value first. If the source Observable does not terminate at all,count
will neither emit a value nor terminate. This operator takes an optionalpredicate
function as argument, in which case the output emission will represent the number of source values that matchedtrue
with thepredicate
.## Examples
Counts how many seconds have passed before the first click happened
import { interval, fromEvent, takeUntil, count } from 'rxjs';const seconds = interval(1000);const clicks = fromEvent(document, 'click');const secondsBeforeClick = seconds.pipe(takeUntil(clicks));const result = secondsBeforeClick.pipe(count());result.subscribe(x => console.log(x));Counts how many odd numbers are there between 1 and 7
import { range, count } from 'rxjs';const numbers = range(1, 7);const result = numbers.pipe(count(i => i % 2 === 1));result.subscribe(x => console.log(x));// Results in:// 4Parameter predicate
A function that is used to analyze the value and the index and determine whether or not to increment the count. Return
true
to increment the count, and returnfalse
to keep the count the same. If the predicate is not provided, every value will be counted. A function that returns an Observable that emits one number that represents the count of emissions.See Also
function debounce
debounce: <T>( durationSelector: (value: T) => ObservableInput<any>) => MonoTypeOperatorFunction<T>;
Emits a notification from the source Observable only after a particular time span determined by another Observable has passed without another source emission.
It's like debounceTime, but the time span of emission silence is determined by a second Observable.

debounce
delays notifications emitted by the source Observable, but drops previous pending delayed emissions if a new notification arrives on the source Observable. This operator keeps track of the most recent notification from the source Observable, and spawns a duration Observable by calling thedurationSelector
function. The notification is emitted only when the duration Observable emits a next notification, and if no other notification was emitted on the source Observable since the duration Observable was spawned. If a new notification appears before the duration Observable emits, the previous notification will not be emitted and a new duration is scheduled fromdurationSelector
is scheduled. If the completing event happens during the scheduled duration the last cached notification is emitted before the completion event is forwarded to the output observable. If the error event happens during the scheduled duration or after it only the error event is forwarded to the output observable. The cache notification is not emitted in this case.Like debounceTime, this is a rate-limiting operator, and also a delay-like operator since output emissions do not necessarily occur at the same time as they did on the source Observable.
## Example
Emit the most recent click after a burst of clicks
import { fromEvent, scan, debounce, interval } from 'rxjs';const clicks = fromEvent(document, 'click');const result = clicks.pipe(scan(i => ++i, 1),debounce(i => interval(200 * i)));result.subscribe(x => console.log(x));Parameter durationSelector
A function that receives a value from the source Observable, for computing the timeout duration for each source value, returned as an Observable or a Promise. A function that returns an Observable that delays the emissions of the source Observable by the specified duration Observable returned by
durationSelector
, and may drop some values if they occur too frequently.See Also
function debounceTime
debounceTime: <T>( dueTime: number, scheduler?: SchedulerLike) => MonoTypeOperatorFunction<T>;
Emits a notification from the source Observable only after a particular time span has passed without another source emission.
It's like delay, but passes only the most recent notification from each burst of emissions.

debounceTime
delays notifications emitted by the source Observable, but drops previous pending delayed emissions if a new notification arrives on the source Observable. This operator keeps track of the most recent notification from the source Observable, and emits that only whendueTime
has passed without any other notification appearing on the source Observable. If a new value appears beforedueTime
silence occurs, the previous notification will be dropped and will not be emitted and a newdueTime
is scheduled. If the completing event happens duringdueTime
the last cached notification is emitted before the completion event is forwarded to the output observable. If the error event happens duringdueTime
or after it only the error event is forwarded to the output observable. The cache notification is not emitted in this case.This is a rate-limiting operator, because it is impossible for more than one notification to be emitted in any time window of duration
dueTime
, but it is also a delay-like operator since output emissions do not occur at the same time as they did on the source Observable. Optionally takes a SchedulerLike for managing timers.## Example
Emit the most recent click after a burst of clicks
import { fromEvent, debounceTime } from 'rxjs';const clicks = fromEvent(document, 'click');const result = clicks.pipe(debounceTime(1000));result.subscribe(x => console.log(x));Parameter dueTime
The timeout duration in milliseconds (or the time unit determined internally by the optional
scheduler
) for the window of time required to wait for emission silence before emitting the most recent source value.Parameter scheduler
The SchedulerLike to use for managing the timers that handle the timeout for each value. A function that returns an Observable that delays the emissions of the source Observable by the specified
dueTime
, and may drop some values if they occur too frequently.See Also
function defaultIfEmpty
defaultIfEmpty: <T, R>(defaultValue: R) => OperatorFunction<T, T | R>;
Emits a given value if the source Observable completes without emitting any
next
value, otherwise mirrors the source Observable.If the source Observable turns out to be empty, then this operator will emit a default value.

defaultIfEmpty
emits the values emitted by the source Observable or a specified default value if the source Observable is empty (completes without having emitted anynext
value).## Example
If no clicks happen in 5 seconds, then emit 'no clicks'
import { fromEvent, takeUntil, interval, defaultIfEmpty } from 'rxjs';const clicks = fromEvent(document, 'click');const clicksBeforeFive = clicks.pipe(takeUntil(interval(5000)));const result = clicksBeforeFive.pipe(defaultIfEmpty('no clicks'));result.subscribe(x => console.log(x));Parameter defaultValue
The default value used if the source Observable is empty. A function that returns an Observable that emits either the specified
defaultValue
if the source Observable emits no items, or the values emitted by the source Observable.See Also
function defer
defer: <R extends ObservableInput<any>>( observableFactory: () => R) => Observable<ObservedValueOf<R>>;
Creates an Observable that, on subscribe, calls an Observable factory to make an Observable for each new Observer.
Creates the Observable lazily, that is, only when it is subscribed.

defer
allows you to create an Observable only when the Observer subscribes. It waits until an Observer subscribes to it, calls the given factory function to get an Observable -- where a factory function typically generates a new Observable -- and subscribes the Observer to this Observable. In case the factory function returns a falsy value, then EMPTY is used as Observable instead. Last but not least, an exception during the factory function call is transferred to the Observer by callingerror
.## Example
Subscribe to either an Observable of clicks or an Observable of interval, at random
import { defer, fromEvent, interval } from 'rxjs';const clicksOrInterval = defer(() => {return Math.random() > 0.5? fromEvent(document, 'click'): interval(1000);});clicksOrInterval.subscribe(x => console.log(x));// Results in the following behavior:// If the result of Math.random() is greater than 0.5 it will listen// for clicks anywhere on the "document"; when document is clicked it// will log a MouseEvent object to the console. If the result is less// than 0.5 it will emit ascending numbers, one every second(1000ms).Parameter observableFactory
The Observable factory function to invoke for each Observer that subscribes to the output Observable. May also return any
ObservableInput
, which will be converted on the fly to an Observable. An Observable whose Observers' subscriptions trigger an invocation of the given Observable factory function.See Also
function delay
delay: <T>( due: number | Date, scheduler?: SchedulerLike) => MonoTypeOperatorFunction<T>;
Delays the emission of items from the source Observable by a given timeout or until a given Date.
Time shifts each item by some specified amount of milliseconds.

If the delay argument is a Number, this operator time shifts the source Observable by that amount of time expressed in milliseconds. The relative time intervals between the values are preserved.
If the delay argument is a Date, this operator time shifts the start of the Observable execution until the given date occurs.
## Examples
Delay each click by one second
import { fromEvent, delay } from 'rxjs';const clicks = fromEvent(document, 'click');const delayedClicks = clicks.pipe(delay(1000)); // each click emitted after 1 seconddelayedClicks.subscribe(x => console.log(x));Delay all clicks until a future date happens
import { fromEvent, delay } from 'rxjs';const clicks = fromEvent(document, 'click');const date = new Date('March 15, 2050 12:00:00'); // in the futureconst delayedClicks = clicks.pipe(delay(date)); // click emitted only after that datedelayedClicks.subscribe(x => console.log(x));Parameter due
The delay duration in milliseconds (a
number
) or aDate
until which the emission of the source items is delayed.Parameter scheduler
The SchedulerLike to use for managing the timers that handle the time-shift for each item. A function that returns an Observable that delays the emissions of the source Observable by the specified timeout or Date.
See Also
function delayWhen
delayWhen: { <T>( delayDurationSelector: (value: T, index: number) => ObservableInput<any>, subscriptionDelay: Observable<any> ): MonoTypeOperatorFunction<T>; <T>( delayDurationSelector: (value: T, index: number) => ObservableInput<any> ): MonoTypeOperatorFunction<T>;};
Deprecated
The
subscriptionDelay
parameter will be removed in v8.
function dematerialize
dematerialize: <N extends ObservableNotification<any>>() => OperatorFunction< N, ValueFromNotification<N>>;
Converts an Observable of ObservableNotification objects into the emissions that they represent.
Unwraps ObservableNotification objects as actual
next
,error
andcomplete
emissions. The opposite of materialize.
dematerialize
is assumed to operate an Observable that only emits ObservableNotification objects asnext
emissions, and does not emit anyerror
. Such Observable is the output of amaterialize
operation. Those notifications are then unwrapped using the metadata they contain, and emitted asnext
,error
, andcomplete
on the output Observable.Use this operator in conjunction with materialize.
## Example
Convert an Observable of Notifications to an actual Observable
import { NextNotification, ErrorNotification, of, dematerialize } from 'rxjs';const notifA: NextNotification<string> = { kind: 'N', value: 'A' };const notifB: NextNotification<string> = { kind: 'N', value: 'B' };const notifE: ErrorNotification = { kind: 'E', error: new TypeError('x.toUpperCase is not a function') };const materialized = of(notifA, notifB, notifE);const upperCase = materialized.pipe(dematerialize());upperCase.subscribe({next: x => console.log(x),error: e => console.error(e)});// Results in:// A// B// TypeError: x.toUpperCase is not a functionSee Also
A function that returns an Observable that emits items and notifications embedded in Notification objects emitted by the source Observable.
function distinct
distinct: <T, K>( keySelector?: (value: T) => K, flushes?: ObservableInput<any>) => MonoTypeOperatorFunction<T>;
Returns an Observable that emits all items emitted by the source Observable that are distinct by comparison from previous items.
If a
keySelector
function is provided, then it will project each value from the source observable into a new value that it will check for equality with previously projected values. If thekeySelector
function is not provided, it will use each value from the source observable directly with an equality check against previous values.In JavaScript runtimes that support
Set
, this operator will use aSet
to improve performance of the distinct value checking.In other runtimes, this operator will use a minimal implementation of
Set
that relies on anArray
andindexOf
under the hood, so performance will degrade as more values are checked for distinction. Even in newer browsers, a long-runningdistinct
use might result in memory leaks. To help alleviate this in some scenarios, an optionalflushes
parameter is also provided so that the internalSet
can be "flushed", basically clearing it of values.## Examples
A simple example with numbers
import { of, distinct } from 'rxjs';of(1, 1, 2, 2, 2, 1, 2, 3, 4, 3, 2, 1).pipe(distinct()).subscribe(x => console.log(x));// Outputs// 1// 2// 3// 4An example using the
keySelector
functionimport { of, distinct } from 'rxjs';of({ age: 4, name: 'Foo'},{ age: 7, name: 'Bar'},{ age: 5, name: 'Foo'}).pipe(distinct(({ name }) => name)).subscribe(x => console.log(x));// Outputs// { age: 4, name: 'Foo' }// { age: 7, name: 'Bar' }Parameter keySelector
Optional
function
to select which value you want to check as distinct.Parameter flushes
Optional
ObservableInput
for flushing the internal HashSet of the operator. A function that returns an Observable that emits items from the source Observable with distinct values.See Also
function distinctUntilChanged
distinctUntilChanged: { <T>( comparator?: (previous: T, current: T) => boolean ): MonoTypeOperatorFunction<T>; <T, K>( comparator: (previous: K, current: K) => boolean, keySelector: (value: T) => K ): MonoTypeOperatorFunction<T>;};
function distinctUntilKeyChanged
distinctUntilKeyChanged: { <T>(key: keyof T): MonoTypeOperatorFunction<T>; <T, K extends keyof T>( key: K, compare: (x: T[K], y: T[K]) => boolean ): MonoTypeOperatorFunction<T>;};
function elementAt
elementAt: <T, D = T>( index: number, defaultValue?: D) => OperatorFunction<T, T | D>;
Emits the single value at the specified
index
in a sequence of emissions from the source Observable.Emits only the i-th value, then completes.

elementAt
returns an Observable that emits the item at the specifiedindex
in the source Observable, or a default value if thatindex
is out of range and thedefault
argument is provided. If thedefault
argument is not given and theindex
is out of range, the output Observable will emit anArgumentOutOfRangeError
error.## Example
Emit only the third click event
import { fromEvent, elementAt } from 'rxjs';const clicks = fromEvent(document, 'click');const result = clicks.pipe(elementAt(2));result.subscribe(x => console.log(x));// Results in:// click 1 = nothing// click 2 = nothing// click 3 = MouseEvent object logged to consoleParameter index
Is the number
i
for the i-th source emission that has happened since the subscription, starting from the number0
.Parameter defaultValue
The default value returned for missing indices. A function that returns an Observable that emits a single item, if it is found. Otherwise, it will emit the default value if given. If not, it emits an error.
Throws
{ArgumentOutOfRangeError} When using
elementAt(i)
, it delivers anArgumentOutOfRangeError
to the Observer'serror
callback ifi < 0
or the Observable has completed before emitting the i-thnext
notification.See Also
function empty
empty: (scheduler?: SchedulerLike) => Observable<never>;
Parameter scheduler
A SchedulerLike to use for scheduling the emission of the complete notification.
Deprecated
Replaced with the EMPTY constant or scheduled (e.g.
scheduled([], scheduler)
). Will be removed in v8.
function endWith
endWith: { <T>(scheduler: SchedulerLike): MonoTypeOperatorFunction<T>; <T, A extends unknown[] = T[]>( ...valuesAndScheduler: [...A, SchedulerLike] ): OperatorFunction<T, T | ValueFromArray<A>>; <T, A extends unknown[] = T[]>(...values: A): OperatorFunction< T, T | ValueFromArray<A> >;};
Deprecated
The
scheduler
parameter will be removed in v8. Usescheduled
andconcatAll
. Details: https://rxjs.dev/deprecations/scheduler-argument
function every
every: { <T>(predicate: BooleanConstructor): OperatorFunction< T, Exclude<T, Falsy> extends never ? false : boolean >; <T>(predicate: BooleanConstructor, thisArg: any): OperatorFunction< T, Exclude<T, Falsy> extends never ? false : boolean >; <T, A>( predicate: ( this: A, value: T, index: number, source: Observable<T> ) => boolean, thisArg: A ): OperatorFunction<T, boolean>; <T>( predicate: (value: T, index: number, source: Observable<T>) => boolean ): OperatorFunction<T, boolean>;};
Deprecated
Use a closure instead of a
thisArg
. Signatures accepting athisArg
will be removed in v8.
function exhaustAll
exhaustAll: <O extends ObservableInput<any>>() => OperatorFunction< O, ObservedValueOf<O>>;
Converts a higher-order Observable into a first-order Observable by dropping inner Observables while the previous inner Observable has not yet completed.
Flattens an Observable-of-Observables by dropping the next inner Observables while the current inner is still executing.

exhaustAll
subscribes to an Observable that emits Observables, also known as a higher-order Observable. Each time it observes one of these emitted inner Observables, the output Observable begins emitting the items emitted by that inner Observable. So far, it behaves like mergeAll. However,exhaustAll
ignores every new inner Observable if the previous Observable has not yet completed. Once that one completes, it will accept and flatten the next inner Observable and repeat this process.## Example
Run a finite timer for each click, only if there is no currently active timer
import { fromEvent, map, interval, take, exhaustAll } from 'rxjs';const clicks = fromEvent(document, 'click');const higherOrder = clicks.pipe(map(() => interval(1000).pipe(take(5))));const result = higherOrder.pipe(exhaustAll());result.subscribe(x => console.log(x));See Also
A function that returns an Observable that takes a source of Observables and propagates the first Observable exclusively until it completes before subscribing to the next.
function exhaustMap
exhaustMap: { <T, O extends ObservableInput<any>>( project: (value: T, index: number) => O ): OperatorFunction<T, ObservedValueOf<O>>; <T, O extends ObservableInput<any>>( project: (value: T, index: number) => O, resultSelector: undefined ): OperatorFunction<T, ObservedValueOf<O>>; <T, I, R>( project: (value: T, index: number) => ObservableInput<I>, resultSelector: ( outerValue: T, innerValue: I, outerIndex: number, innerIndex: number ) => R ): OperatorFunction<T, R>;};
Deprecated
The
resultSelector
parameter will be removed in v8. Use an innermap
instead. Details: https://rxjs.dev/deprecations/resultSelector
function expand
expand: { <T, O extends ObservableInput<unknown>>( project: (value: T, index: number) => O, concurrent?: number, scheduler?: SchedulerLike ): OperatorFunction<T, ObservedValueOf<O>>; <T, O extends ObservableInput<unknown>>( project: (value: T, index: number) => O, concurrent: number, scheduler: SchedulerLike ): OperatorFunction<T, ObservedValueOf<O>>;};
Deprecated
The
scheduler
parameter will be removed in v8. If you need to schedule the inner subscription, usesubscribeOn
within the projection function:expand((value) => fn(value).pipe(subscribeOn(scheduler)))
. Details: Details: https://rxjs.dev/deprecations/scheduler-argument
function filter
filter: { <T, S extends T, A>( predicate: (this: A, value: T, index: number) => value is S, thisArg: A ): OperatorFunction<T, S>; <T, S extends T>( predicate: (value: T, index: number) => value is S ): OperatorFunction<T, S>; <T>(predicate: BooleanConstructor): OperatorFunction<T, TruthyTypesOf<T>>; <T, A>( predicate: (this: A, value: T, index: number) => boolean, thisArg: A ): MonoTypeOperatorFunction<T>; <T>( predicate: (value: T, index: number) => boolean ): MonoTypeOperatorFunction<T>;};
Deprecated
Use a closure instead of a
thisArg
. Signatures accepting athisArg
will be removed in v8.
function finalize
finalize: <T>(callback: () => void) => MonoTypeOperatorFunction<T>;
Returns an Observable that mirrors the source Observable, but will call a specified function when the source terminates on complete or error. The specified function will also be called when the subscriber explicitly unsubscribes.
## Examples
Execute callback function when the observable completes
import { interval, take, finalize } from 'rxjs';// emit value in sequence every 1 secondconst source = interval(1000);const example = source.pipe(take(5), //take only the first 5 valuesfinalize(() => console.log('Sequence complete')) // Execute when the observable completes);const subscribe = example.subscribe(val => console.log(val));// results:// 0// 1// 2// 3// 4// 'Sequence complete'Execute callback function when the subscriber explicitly unsubscribes
import { interval, finalize, tap, noop, timer } from 'rxjs';const source = interval(100).pipe(finalize(() => console.log('[finalize] Called')),tap({next: () => console.log('[next] Called'),error: () => console.log('[error] Not called'),complete: () => console.log('[tap complete] Not called')}));const sub = source.subscribe({next: x => console.log(x),error: noop,complete: () => console.log('[complete] Not called')});timer(150).subscribe(() => sub.unsubscribe());// results:// '[next] Called'// 0// '[finalize] Called'Parameter callback
Function to be called when source terminates. A function that returns an Observable that mirrors the source, but will call the specified function on termination.
function find
find: { <T>(predicate: BooleanConstructor): OperatorFunction<T, TruthyTypesOf<T>>; <T, S extends T, A>( predicate: ( this: A, value: T, index: number, source: Observable<T> ) => value is S, thisArg: A ): OperatorFunction<T, S>; <T, S extends T>( predicate: (value: T, index: number, source: Observable<T>) => value is S ): OperatorFunction<T, S>; <T, A>( predicate: ( this: A, value: T, index: number, source: Observable<T> ) => boolean, thisArg: A ): OperatorFunction<T, T>; <T>( predicate: (value: T, index: number, source: Observable<T>) => boolean ): OperatorFunction<T, T>;};
Deprecated
Use a closure instead of a
thisArg
. Signatures accepting athisArg
will be removed in v8.
function findIndex
findIndex: { <T>(predicate: BooleanConstructor): OperatorFunction< T, T extends Falsy ? -1 : number >; <T>(predicate: BooleanConstructor, thisArg: any): OperatorFunction< T, T extends Falsy ? -1 : number >; <T, A>( predicate: ( this: A, value: T, index: number, source: Observable<T> ) => boolean, thisArg: A ): OperatorFunction<T, number>; <T>( predicate: (value: T, index: number, source: Observable<T>) => boolean ): OperatorFunction<T, number>;};
Deprecated
Use a closure instead of a
thisArg
. Signatures accepting athisArg
will be removed in v8.
function first
first: { <T, D = T>(predicate?: null, defaultValue?: D): OperatorFunction<T, T | D>; <T>(predicate: BooleanConstructor): OperatorFunction<T, TruthyTypesOf<T>>; <T, D>(predicate: BooleanConstructor, defaultValue: D): OperatorFunction< T, D | TruthyTypesOf<T> >; <T, S extends T>( predicate: (value: T, index: number, source: Observable<T>) => value is S, defaultValue?: S ): OperatorFunction<T, S>; <T, S extends T, D>( predicate: (value: T, index: number, source: Observable<T>) => value is S, defaultValue: D ): OperatorFunction<T, S | D>; <T, D = T>( predicate: (value: T, index: number, source: Observable<T>) => boolean, defaultValue?: D ): OperatorFunction<T, T | D>;};
function firstValueFrom
firstValueFrom: { <T, D>(source: Observable<T>, config: FirstValueFromConfig<D>): Promise<T | D>; <T>(source: Observable<T>): Promise<T>;};
function forkJoin
forkJoin: { <T extends unique symbol>(arg: T): Observable<unknown>; (scheduler: null): Observable<never>; (sources: readonly []): Observable<never>; <A extends readonly unknown[]>( sources: readonly [...ObservableInputTuple<A>] ): Observable<A>; <A extends readonly unknown[], R>( sources: readonly [...ObservableInputTuple<A>], resultSelector: (...values: A) => R ): Observable<R>; <A extends readonly unknown[]>( ...sources_0: ObservableInputTuple<A> ): Observable<A>; <A extends readonly unknown[], R>( ...sourcesAndResultSelector: [ ...ObservableInputTuple<A>, (...values: A) => R ] ): Observable<R>; (sourcesObject: { [x: string]: never; [x: number]: never; [x: symbol]: never; }): Observable<never>; <T extends Record<string, ObservableInput<any>>>(sourcesObject: T): Observable<{ [K in keyof T]: ObservedValueOf<T[K]>; }>;};
You have passed
any
here, we can't figure out if it is an array or an object, so you're gettingunknown
. Use better types.Parameter arg
Something typed as
any
Deprecated
Pass an array of sources instead. The rest-parameters signature will be removed in v8. Details: https://rxjs.dev/deprecations/array-argument
function from
from: { <O extends ObservableInput<any>>(input: O): Observable<ObservedValueOf<O>>; <O extends ObservableInput<any>>(input: O, scheduler: SchedulerLike): Observable< ObservedValueOf<O> >;};
Deprecated
The
scheduler
parameter will be removed in v8. Usescheduled
. Details: https://rxjs.dev/deprecations/scheduler-argument
function fromEvent
fromEvent: { <T>( target: HasEventTargetAddRemove<T> | ArrayLike<HasEventTargetAddRemove<T>>, eventName: string ): Observable<T>; <T, R>( target: HasEventTargetAddRemove<T> | ArrayLike<HasEventTargetAddRemove<T>>, eventName: string, resultSelector: (event: T) => R ): Observable<R>; <T>( target: HasEventTargetAddRemove<T> | ArrayLike<HasEventTargetAddRemove<T>>, eventName: string, options: EventListenerOptions ): Observable<T>; <T, R>( target: HasEventTargetAddRemove<T> | ArrayLike<HasEventTargetAddRemove<T>>, eventName: string, options: EventListenerOptions, resultSelector: (event: T) => R ): Observable<R>; ( target: NodeStyleEventEmitter | ArrayLike<NodeStyleEventEmitter>, eventName: string ): Observable<unknown>; <T>( target: NodeStyleEventEmitter | ArrayLike<NodeStyleEventEmitter>, eventName: string ): Observable<T>; <R>( target: NodeStyleEventEmitter | ArrayLike<NodeStyleEventEmitter>, eventName: string, resultSelector: (...args: any[]) => R ): Observable<R>; ( target: NodeCompatibleEventEmitter | ArrayLike<NodeCompatibleEventEmitter>, eventName: string ): Observable<unknown>; <T>( target: NodeCompatibleEventEmitter | ArrayLike<NodeCompatibleEventEmitter>, eventName: string ): Observable<T>; <R>( target: NodeCompatibleEventEmitter | ArrayLike<NodeCompatibleEventEmitter>, eventName: string, resultSelector: (...args: any[]) => R ): Observable<R>; <T>( target: | JQueryStyleEventEmitter<any, T> | ArrayLike<JQueryStyleEventEmitter<any, T>>, eventName: string ): Observable<T>; <T, R>( target: | JQueryStyleEventEmitter<any, T> | ArrayLike<JQueryStyleEventEmitter<any, T>>, eventName: string, resultSelector: (value: T, ...args: any[]) => R ): Observable<R>;};
Deprecated
Do not specify explicit type parameters. Signatures with type parameters that cannot be inferred will be removed in v8.
function fromEventPattern
fromEventPattern: { <T>( addHandler: (handler: NodeEventHandler) => any, removeHandler?: (handler: NodeEventHandler, signal?: any) => void ): Observable<T>; <T>( addHandler: (handler: NodeEventHandler) => any, removeHandler?: (handler: NodeEventHandler, signal?: any) => void, resultSelector?: (...args: any[]) => T ): Observable<T>;};
function generate
generate: { <T, S>( initialState: S, condition: ConditionFunc<S>, iterate: IterateFunc<S>, resultSelector: ResultFunc<S, T>, scheduler?: SchedulerLike ): Observable<T>; <S>( initialState: S, condition: ConditionFunc<S>, iterate: IterateFunc<S>, scheduler?: SchedulerLike ): Observable<S>; <S>(options: GenerateBaseOptions<S>): Observable<S>; <T, S>(options: GenerateOptions<T, S>): Observable<T>;};
Generates an observable sequence by running a state-driven loop producing the sequence's elements, using the specified scheduler to send out observer messages.

## Examples
Produces sequence of numbers
import { generate } from 'rxjs';const result = generate(0, x => x < 3, x => x + 1, x => x);result.subscribe(x => console.log(x));// Logs:// 0// 1// 2Use
asapScheduler
import { generate, asapScheduler } from 'rxjs';const result = generate(1, x => x < 5, x => x * 2, x => x + 1, asapScheduler);result.subscribe(x => console.log(x));// Logs:// 2// 3// 5Parameter initialState
Initial state.
Parameter condition
Condition to terminate generation (upon returning false).
Parameter iterate
Iteration step function.
Parameter resultSelector
Selector function for results produced in the sequence.
Parameter scheduler
A SchedulerLike on which to run the generator loop. If not provided, defaults to emit immediately.
Returns
The generated sequence.
See Also
Deprecated
Instead of passing separate arguments, use the options argument. Signatures taking separate arguments will be removed in v8.
Generates an Observable by running a state-driven loop that emits an element on each iteration.
Use it instead of nexting values in a for loop.

generate
allows you to create a stream of values generated with a loop very similar to a traditional for loop. The first argument ofgenerate
is a beginning value. The second argument is a function that accepts this value and tests if some condition still holds. If it does, then the loop continues, if not, it stops. The third value is a function which takes the previously defined value and modifies it in some way on each iteration. Note how these three parameters are direct equivalents of three expressions in a traditional for loop: the first expression initializes some state (for example, a numeric index), the second tests if the loop can perform the next iteration (for example, if the index is lower than 10) and the third states how the defined value will be modified on every step (for example, the index will be incremented by one).Return value of a
generate
operator is an Observable that on each loop iteration emits a value. First of all, the condition function is ran. If it returns true, then the Observable emits the currently stored value (initial value at the first iteration) and finally updates that value with iterate function. If at some point the condition returns false, then the Observable completes at that moment.Optionally you can pass a fourth parameter to
generate
- a result selector function which allows you to immediately map the value that would normally be emitted by an Observable.If you find three anonymous functions in
generate
call hard to read, you can provide a single object to the operator instead where the object has the properties:initialState
,condition
,iterate
andresultSelector
, which should have respective values that you would normally pass togenerate
.resultSelector
is still optional, but that form of callinggenerate
allows you to omitcondition
as well. If you omit it, that means condition always holds, or in other words the resulting Observable will never complete.Both forms of
generate
can optionally accept a scheduler. In case of a multi-parameter call, scheduler simply comes as a last argument (no matter if there is aresultSelector
function or not). In case of a single-parameter call, you can provide it as ascheduler
property on the object passed to the operator. In both cases, a scheduler decides when the next iteration of the loop will happen and therefore when the next value will be emitted by the Observable. For example, to ensure that each value is pushed to the Observer on a separate task in the event loop, you could use theasync
scheduler. Note that by default (when no scheduler is passed) values are simply emitted synchronously.## Examples
Use with condition and iterate functions
import { generate } from 'rxjs';const result = generate(0, x => x < 3, x => x + 1);result.subscribe({next: value => console.log(value),complete: () => console.log('Complete!')});// Logs:// 0// 1// 2// 'Complete!'Use with condition, iterate and resultSelector functions
import { generate } from 'rxjs';const result = generate(0, x => x < 3, x => x + 1, x => x * 1000);result.subscribe({next: value => console.log(value),complete: () => console.log('Complete!')});// Logs:// 0// 1000// 2000// 'Complete!'Use with options object
import { generate } from 'rxjs';const result = generate({initialState: 0,condition(value) { return value < 3; },iterate(value) { return value + 1; },resultSelector(value) { return value * 1000; }});result.subscribe({next: value => console.log(value),complete: () => console.log('Complete!')});// Logs:// 0// 1000// 2000// 'Complete!'Use options object without condition function
import { generate } from 'rxjs';const result = generate({initialState: 0,iterate(value) { return value + 1; },resultSelector(value) { return value * 1000; }});result.subscribe({next: value => console.log(value),complete: () => console.log('Complete!') // This will never run});// Logs:// 0// 1000// 2000// 3000// ...and never stops.Parameter initialState
Initial state.
Parameter condition
Condition to terminate generation (upon returning false).
Parameter iterate
Iteration step function.
Parameter scheduler
A Scheduler on which to run the generator loop. If not provided, defaults to emitting immediately. The generated sequence.
See Also
Deprecated
Instead of passing separate arguments, use the options argument. Signatures taking separate arguments will be removed in v8.
Generates an observable sequence by running a state-driven loop producing the sequence's elements, using the specified scheduler to send out observer messages. The overload accepts options object that might contain initial state, iterate, condition and scheduler.

## Examples
Use options object with condition function
import { generate } from 'rxjs';const result = generate({initialState: 0,condition: x => x < 3,iterate: x => x + 1});result.subscribe({next: value => console.log(value),complete: () => console.log('Complete!')});// Logs:// 0// 1// 2// 'Complete!'Parameter options
Object that must contain initialState, iterate and might contain condition and scheduler.
Returns
The generated sequence.
See Also
Generates an observable sequence by running a state-driven loop producing the sequence's elements, using the specified scheduler to send out observer messages. The overload accepts options object that might contain initial state, iterate, condition, result selector and scheduler.

## Examples
Use options object with condition and iterate function
import { generate } from 'rxjs';const result = generate({initialState: 0,condition: x => x < 3,iterate: x => x + 1,resultSelector: x => x});result.subscribe({next: value => console.log(value),complete: () => console.log('Complete!')});// Logs:// 0// 1// 2// 'Complete!'Parameter options
Object that must contain initialState, iterate, resultSelector and might contain condition and scheduler.
Returns
The generated sequence.
See Also
function groupBy
groupBy: { <T, K>( key: (value: T) => K, options: BasicGroupByOptions<K, T> ): OperatorFunction<T, GroupedObservable<K, T>>; <T, K, E>( key: (value: T) => K, options: GroupByOptionsWithElement<K, E, T> ): OperatorFunction<T, GroupedObservable<K, E>>; <T, K extends T>(key: (value: T) => value is K): OperatorFunction< T, GroupedObservable<true, K> | GroupedObservable<false, Exclude<T, K>> >; <T, K>(key: (value: T) => K): OperatorFunction<T, GroupedObservable<K, T>>; <T, K>( key: (value: T) => K, element: void, duration: (grouped: GroupedObservable<K, T>) => Observable<any> ): OperatorFunction<T, GroupedObservable<K, T>>; <T, K, R>( key: (value: T) => K, element?: (value: T) => R, duration?: (grouped: GroupedObservable<K, R>) => Observable<any> ): OperatorFunction<T, GroupedObservable<K, R>>; <T, K, R>( key: (value: T) => K, element?: (value: T) => R, duration?: (grouped: GroupedObservable<K, R>) => Observable<any>, connector?: () => Subject<R> ): OperatorFunction<T, GroupedObservable<K, R>>;};
Deprecated
use the options parameter instead.
Groups the items emitted by an Observable according to a specified criterion, and emits these grouped items as
GroupedObservables
, one GroupedObservable per group.
When the Observable emits an item, a key is computed for this item with the key function.
If a GroupedObservable for this key exists, this GroupedObservable emits. Otherwise, a new GroupedObservable for this key is created and emits.
A GroupedObservable represents values belonging to the same group represented by a common key. The common key is available as the
key
field of a GroupedObservable instance.The elements emitted by GroupedObservables are by default the items emitted by the Observable, or elements returned by the element function.
## Examples
Group objects by
id
and return as arrayimport { of, groupBy, mergeMap, reduce } from 'rxjs';of({ id: 1, name: 'JavaScript' },{ id: 2, name: 'Parcel' },{ id: 2, name: 'webpack' },{ id: 1, name: 'TypeScript' },{ id: 3, name: 'TSLint' }).pipe(groupBy(p => p.id),mergeMap(group$ => group$.pipe(reduce((acc, cur) => [...acc, cur], [])))).subscribe(p => console.log(p));// displays:// [{ id: 1, name: 'JavaScript' }, { id: 1, name: 'TypeScript'}]// [{ id: 2, name: 'Parcel' }, { id: 2, name: 'webpack'}]// [{ id: 3, name: 'TSLint' }]Pivot data on the
id
fieldimport { of, groupBy, mergeMap, reduce, map } from 'rxjs';of({ id: 1, name: 'JavaScript' },{ id: 2, name: 'Parcel' },{ id: 2, name: 'webpack' },{ id: 1, name: 'TypeScript' },{ id: 3, name: 'TSLint' }).pipe(groupBy(p => p.id, { element: p => p.name }),mergeMap(group$ => group$.pipe(reduce((acc, cur) => [...acc, cur], [`${ group$.key }`]))),map(arr => ({ id: parseInt(arr[0], 10), values: arr.slice(1) }))).subscribe(p => console.log(p));// displays:// { id: 1, values: [ 'JavaScript', 'TypeScript' ] }// { id: 2, values: [ 'Parcel', 'webpack' ] }// { id: 3, values: [ 'TSLint' ] }Parameter key
A function that extracts the key for each item.
Parameter element
A function that extracts the return element for each item.
Parameter duration
A function that returns an Observable to determine how long each group should exist.
Parameter connector
Factory function to create an intermediate Subject through which grouped elements are emitted. A function that returns an Observable that emits GroupedObservables, each of which corresponds to a unique key value and each of which emits those items from the source Observable that share that key value.
Deprecated
Use the options parameter instead.
function identity
identity: <T>(x: T) => T;
This function takes one parameter and just returns it. Simply put, this is like
<T>(x: T): T => x
.## Examples
This is useful in some cases when using things like
mergeMap
import { interval, take, map, range, mergeMap, identity } from 'rxjs';const source$ = interval(1000).pipe(take(5));const result$ = source$.pipe(map(i => range(i)),mergeMap(identity) // same as mergeMap(x => x));result$.subscribe({next: console.log});Or when you want to selectively apply an operator
import { interval, take, identity } from 'rxjs';const shouldLimit = () => Math.random() < 0.5;const source$ = interval(1000);const result$ = source$.pipe(shouldLimit() ? take(5) : identity);result$.subscribe({next: console.log});Parameter x
Any value that is returned by this function
Returns
The value passed as the first parameter to this function
function ignoreElements
ignoreElements: () => OperatorFunction<unknown, never>;
Ignores all items emitted by the source Observable and only passes calls of
complete
orerror
.
The
ignoreElements
operator suppresses all items emitted by the source Observable, but allows its termination notification (eithererror
orcomplete
) to pass through unchanged.If you do not care about the items being emitted by an Observable, but you do want to be notified when it completes or when it terminates with an error, you can apply the
ignoreElements
operator to the Observable, which will ensure that it will never call its observers’next
handlers.## Example
Ignore all
next
emissions from the sourceimport { of, ignoreElements } from 'rxjs';of('you', 'talking', 'to', 'me').pipe(ignoreElements()).subscribe({next: word => console.log(word),error: err => console.log('error:', err),complete: () => console.log('the end'),});// result:// 'the end'A function that returns an empty Observable that only calls
complete
orerror
, based on which one is called by the source Observable.
function iif
iif: <T, F>( condition: () => boolean, trueResult: ObservableInput<T>, falseResult: ObservableInput<F>) => Observable<T | F>;
Checks a boolean at subscription time, and chooses between one of two observable sources
iif
expects a function that returns a boolean (thecondition
function), and two sources, thetrueResult
and thefalseResult
, and returns an Observable.At the moment of subscription, the
condition
function is called. If the result istrue
, the subscription will be to the source passed as thetrueResult
, otherwise, the subscription will be to the source passed as thefalseResult
.If you need to check more than two options to choose between more than one observable, have a look at the defer creation method.
## Examples
Change at runtime which Observable will be subscribed
import { iif, of } from 'rxjs';let subscribeToFirst;const firstOrSecond = iif(() => subscribeToFirst,of('first'),of('second'));subscribeToFirst = true;firstOrSecond.subscribe(value => console.log(value));// Logs:// 'first'subscribeToFirst = false;firstOrSecond.subscribe(value => console.log(value));// Logs:// 'second'Control access to an Observable
import { iif, of, EMPTY } from 'rxjs';let accessGranted;const observableIfYouHaveAccess = iif(() => accessGranted,of('It seems you have an access...'),EMPTY);accessGranted = true;observableIfYouHaveAccess.subscribe({next: value => console.log(value),complete: () => console.log('The end')});// Logs:// 'It seems you have an access...'// 'The end'accessGranted = false;observableIfYouHaveAccess.subscribe({next: value => console.log(value),complete: () => console.log('The end')});// Logs:// 'The end'Parameter condition
Condition which Observable should be chosen.
Parameter trueResult
An Observable that will be subscribed if condition is true.
Parameter falseResult
An Observable that will be subscribed if condition is false. An observable that proxies to
trueResult
orfalseResult
, depending on the result of thecondition
function.See Also
function interval
interval: (period?: number, scheduler?: SchedulerLike) => Observable<number>;
Creates an Observable that emits sequential numbers every specified interval of time, on a specified SchedulerLike.
Emits incremental numbers periodically in time.

interval
returns an Observable that emits an infinite sequence of ascending integers, with a constant interval of time of your choosing between those emissions. The first emission is not sent immediately, but only after the first period has passed. By default, this operator uses theasync
SchedulerLike to provide a notion of time, but you may pass any SchedulerLike to it.## Example
Emits ascending numbers, one every second (1000ms) up to the number 3
import { interval, take } from 'rxjs';const numbers = interval(1000);const takeFourNumbers = numbers.pipe(take(4));takeFourNumbers.subscribe(x => console.log('Next: ', x));// Logs:// Next: 0// Next: 1// Next: 2// Next: 3Parameter period
The interval size in milliseconds (by default) or the time unit determined by the scheduler's clock.
Parameter scheduler
The SchedulerLike to use for scheduling the emission of values, and providing a notion of "time". An Observable that emits a sequential number each time interval.
See Also
function isEmpty
isEmpty: <T>() => OperatorFunction<T, boolean>;
Emits
false
if the input Observable emits any values, or emitstrue
if the input Observable completes without emitting any values.Tells whether any values are emitted by an Observable.

isEmpty
transforms an Observable that emits values into an Observable that emits a single boolean value representing whether or not any values were emitted by the source Observable. As soon as the source Observable emits a value,isEmpty
will emit afalse
and complete. If the source Observable completes having not emitted anything,isEmpty
will emit atrue
and complete.A similar effect could be achieved with count, but
isEmpty
can emit afalse
value sooner.## Examples
Emit
false
for a non-empty Observableimport { Subject, isEmpty } from 'rxjs';const source = new Subject<string>();const result = source.pipe(isEmpty());source.subscribe(x => console.log(x));result.subscribe(x => console.log(x));source.next('a');source.next('b');source.next('c');source.complete();// Outputs// 'a'// false// 'b'// 'c'Emit
true
for an empty Observableimport { EMPTY, isEmpty } from 'rxjs';const result = EMPTY.pipe(isEmpty());result.subscribe(x => console.log(x));// Outputs// trueSee Also
function isObservable
isObservable: (obj: any) => obj is Observable<unknown>;
Tests to see if the object is an RxJS Observable
Parameter obj
the object to test
function last
last: { <T>(predicate: BooleanConstructor): OperatorFunction<T, TruthyTypesOf<T>>; <T, D>(predicate: BooleanConstructor, defaultValue: D): OperatorFunction< T, D | TruthyTypesOf<T> >; <T, D = T>(predicate?: null, defaultValue?: D): OperatorFunction<T, T | D>; <T, S extends T>( predicate: (value: T, index: number, source: Observable<T>) => value is S, defaultValue?: S ): OperatorFunction<T, S>; <T, D = T>( predicate: (value: T, index: number, source: Observable<T>) => boolean, defaultValue?: D ): OperatorFunction<T, T | D>;};
function lastValueFrom
lastValueFrom: { <T, D>(source: Observable<T>, config: LastValueFromConfig<D>): Promise<T | D>; <T>(source: Observable<T>): Promise<T>;};
function map
map: { <T, R>(project: (value: T, index: number) => R): OperatorFunction<T, R>; <T, R, A>( project: (this: A, value: T, index: number) => R, thisArg: A ): OperatorFunction<T, R>;};
Deprecated
Use a closure instead of a
thisArg
. Signatures accepting athisArg
will be removed in v8.
function mapTo
mapTo: { <R>(value: R): OperatorFunction<unknown, R>; <T, R>(value: R): OperatorFunction<T, R>;};
function materialize
materialize: <T>() => OperatorFunction< T, Notification<T> & ObservableNotification<T>>;
Represents all of the notifications from the source Observable as
next
emissions marked with their original types within Notification objects.Wraps
next
,error
andcomplete
emissions in Notification objects, emitted asnext
on the output Observable.
materialize
returns an Observable that emits anext
notification for eachnext
,error
, orcomplete
emission of the source Observable. When the source Observable emitscomplete
, the output Observable will emitnext
as a Notification of type "complete", and then it will emitcomplete
as well. When the source Observable emitserror
, the output will emitnext
as a Notification of type "error", and thencomplete
.This operator is useful for producing metadata of the source Observable, to be consumed as
next
emissions. Use it in conjunction with dematerialize.## Example
Convert a faulty Observable to an Observable of Notifications
import { of, materialize, map } from 'rxjs';const letters = of('a', 'b', 13, 'd');const upperCase = letters.pipe(map((x: any) => x.toUpperCase()));const materialized = upperCase.pipe(materialize());materialized.subscribe(x => console.log(x));// Results in the following:// - Notification { kind: 'N', value: 'A', error: undefined, hasValue: true }// - Notification { kind: 'N', value: 'B', error: undefined, hasValue: true }// - Notification { kind: 'E', value: undefined, error: TypeError { message: x.toUpperCase is not a function }, hasValue: false }See Also
A function that returns an Observable that emits Notification objects that wrap the original emissions from the source Observable with metadata.
function max
max: <T>(comparer?: (x: T, y: T) => number) => MonoTypeOperatorFunction<T>;
The
max
operator operates on an Observable that emits numbers (or items that can be compared with a provided function), and when source Observable completes it emits a single item: the item with the largest value.
## Examples
Get the maximal value of a series of numbers
import { of, max } from 'rxjs';of(5, 4, 7, 2, 8).pipe(max()).subscribe(x => console.log(x));// Outputs// 8Use a comparer function to get the maximal item
import { of, max } from 'rxjs';of({ age: 7, name: 'Foo' },{ age: 5, name: 'Bar' },{ age: 9, name: 'Beer' }).pipe(max((a, b) => a.age < b.age ? -1 : 1)).subscribe(x => console.log(x.name));// Outputs// 'Beer'Parameter comparer
Optional comparer function that it will use instead of its default to compare the value of two items. A function that returns an Observable that emits item with the largest value.
See Also
function merge
merge: { <A extends readonly unknown[]>( ...sources_0: ObservableInputTuple<A> ): Observable<A[number]>; <A extends readonly unknown[]>( ...sourcesAndConcurrency: [...ObservableInputTuple<A>, number?] ): Observable<A[number]>; <A extends readonly unknown[]>( ...sourcesAndScheduler: [...ObservableInputTuple<A>, SchedulerLike?] ): Observable<A[number]>; <A extends readonly unknown[]>( ...sourcesAndConcurrencyAndScheduler: [ ...ObservableInputTuple<A>, number?, SchedulerLike? ] ): Observable<A[number]>;};
Deprecated
The
scheduler
parameter will be removed in v8. Usescheduled
andmergeAll
. Details: https://rxjs.dev/deprecations/scheduler-argument
function mergeAll
mergeAll: <O extends ObservableInput<any>>( concurrent?: number) => OperatorFunction<O, ObservedValueOf<O>>;
Converts a higher-order Observable into a first-order Observable which concurrently delivers all values that are emitted on the inner Observables.
Flattens an Observable-of-Observables.

mergeAll
subscribes to an Observable that emits Observables, also known as a higher-order Observable. Each time it observes one of these emitted inner Observables, it subscribes to that and delivers all the values from the inner Observable on the output Observable. The output Observable only completes once all inner Observables have completed. Any error delivered by a inner Observable will be immediately emitted on the output Observable.## Examples
Spawn a new interval Observable for each click event, and blend their outputs as one Observable
import { fromEvent, map, interval, mergeAll } from 'rxjs';const clicks = fromEvent(document, 'click');const higherOrder = clicks.pipe(map(() => interval(1000)));const firstOrder = higherOrder.pipe(mergeAll());firstOrder.subscribe(x => console.log(x));Count from 0 to 9 every second for each click, but only allow 2 concurrent timers
import { fromEvent, map, interval, take, mergeAll } from 'rxjs';const clicks = fromEvent(document, 'click');const higherOrder = clicks.pipe(map(() => interval(1000).pipe(take(10))));const firstOrder = higherOrder.pipe(mergeAll(2));firstOrder.subscribe(x => console.log(x));Parameter concurrent
Maximum number of inner Observables being subscribed to concurrently. A function that returns an Observable that emits values coming from all the inner Observables emitted by the source Observable.
See Also
function mergeMap
mergeMap: { <T, O extends ObservableInput<any>>( project: (value: T, index: number) => O, concurrent?: number ): OperatorFunction<T, ObservedValueOf<O>>; <T, O extends ObservableInput<any>>( project: (value: T, index: number) => O, resultSelector: undefined, concurrent?: number ): OperatorFunction<T, ObservedValueOf<O>>; <T, R, O extends ObservableInput<any>>( project: (value: T, index: number) => O, resultSelector: ( outerValue: T, innerValue: ObservedValueOf<O>, outerIndex: number, innerIndex: number ) => R, concurrent?: number ): OperatorFunction<T, R>;};
Deprecated
The
resultSelector
parameter will be removed in v8. Use an innermap
instead. Details: https://rxjs.dev/deprecations/resultSelector
function mergeMapTo
mergeMapTo: { <O extends ObservableInput<unknown>>( innerObservable: O, concurrent?: number ): OperatorFunction<unknown, ObservedValueOf<O>>; <T, R, O extends ObservableInput<unknown>>( innerObservable: O, resultSelector: ( outerValue: T, innerValue: ObservedValueOf<O>, outerIndex: number, innerIndex: number ) => R, concurrent?: number ): OperatorFunction<T, R>;};
Deprecated
Will be removed in v9. Use mergeMap instead:
mergeMap(() => result)
Deprecated
The
resultSelector
parameter will be removed in v8. Use an innermap
instead. Details: https://rxjs.dev/deprecations/resultSelector
function mergeScan
mergeScan: <T, R>( accumulator: (acc: R, value: T, index: number) => ObservableInput<R>, seed: R, concurrent?: number) => OperatorFunction<T, R>;
Applies an accumulator function over the source Observable where the accumulator function itself returns an Observable, then each intermediate Observable returned is merged into the output Observable.
It's like scan, but the Observables returned by the accumulator are merged into the outer Observable.
The first parameter of the
mergeScan
is anaccumulator
function which is being called every time the source Observable emits a value.mergeScan
will subscribe to the value returned by theaccumulator
function and will emit values to the subscriber emitted by inner Observable.The
accumulator
function is being called with three parameters passed to it:acc
,value
andindex
. Theacc
parameter is used as the state parameter whose value is initially set to theseed
parameter (the second parameter passed to themergeScan
operator).mergeScan
internally keeps the value of theacc
parameter: as long as the source Observable emits without inner Observable emitting, theacc
will be set toseed
. The next time the inner Observable emits a value,mergeScan
will internally remember it and it will be passed to theaccumulator
function asacc
parameter the next time source emits.The
value
parameter of theaccumulator
function is the value emitted by the source Observable, while theindex
is a number which represent the order of the current emission by the source Observable. It starts with 0.The last parameter to the
mergeScan
is theconcurrent
value which defaults to Infinity. It represents the maximum number of inner Observable subscriptions at a time.## Example
Count the number of click events
import { fromEvent, map, mergeScan, of } from 'rxjs';const click$ = fromEvent(document, 'click');const one$ = click$.pipe(map(() => 1));const seed = 0;const count$ = one$.pipe(mergeScan((acc, one) => of(acc + one), seed));count$.subscribe(x => console.log(x));// Results:// 1// 2// 3// 4// ...and so on for each clickParameter accumulator
The accumulator function called on each source value.
Parameter seed
The initial accumulation value.
Parameter concurrent
Maximum number of input Observables being subscribed to concurrently. A function that returns an Observable of the accumulated values.
See Also
function mergeWith
mergeWith: <T, A extends readonly unknown[]>( ...otherSources_0: ObservableInputTuple<A>) => OperatorFunction<T, T | A[number]>;
Merge the values from all observables to a single observable result.
Creates an observable, that when subscribed to, subscribes to the source observable, and all other sources provided as arguments. All values from every source are emitted from the resulting subscription.
When all sources complete, the resulting observable will complete.
When any source errors, the resulting observable will error.
## Example
Joining all outputs from multiple user input event streams
import { fromEvent, map, mergeWith } from 'rxjs';const clicks$ = fromEvent(document, 'click').pipe(map(() => 'click'));const mousemoves$ = fromEvent(document, 'mousemove').pipe(map(() => 'mousemove'));const dblclicks$ = fromEvent(document, 'dblclick').pipe(map(() => 'dblclick'));mousemoves$.pipe(mergeWith(clicks$, dblclicks$)).subscribe(x => console.log(x));// result (assuming user interactions)// 'mousemove'// 'mousemove'// 'mousemove'// 'click'// 'click'// 'dblclick'Parameter otherSources
the sources to combine the current source with. A function that returns an Observable that merges the values from all given Observables.
See Also
function min
min: <T>(comparer?: (x: T, y: T) => number) => MonoTypeOperatorFunction<T>;
The
min
operator operates on an Observable that emits numbers (or items that can be compared with a provided function), and when source Observable completes it emits a single item: the item with the smallest value.
## Examples
Get the minimal value of a series of numbers
import { of, min } from 'rxjs';of(5, 4, 7, 2, 8).pipe(min()).subscribe(x => console.log(x));// Outputs// 2Use a comparer function to get the minimal item
import { of, min } from 'rxjs';of({ age: 7, name: 'Foo' },{ age: 5, name: 'Bar' },{ age: 9, name: 'Beer' }).pipe(min((a, b) => a.age < b.age ? -1 : 1)).subscribe(x => console.log(x.name));// Outputs// 'Bar'Parameter comparer
Optional comparer function that it will use instead of its default to compare the value of two items. A function that returns an Observable that emits item with the smallest value.
See Also
function multicast
multicast: { <T>(subject: Subject<T>): UnaryFunction<Observable<T>, ConnectableObservable<T>>; <T, O extends ObservableInput<any>>( subject: Subject<T>, selector: (shared: Observable<T>) => O ): OperatorFunction<T, ObservedValueOf<O>>; <T>(subjectFactory: () => Subject<T>): UnaryFunction< Observable<T>, ConnectableObservable<T> >; <T, O extends ObservableInput<any>>( subjectFactory: () => Subject<T>, selector: (shared: Observable<T>) => O ): OperatorFunction<T, ObservedValueOf<O>>;};
An operator that creates a ConnectableObservable, that when connected, with the
connect
method, will use the provided subject to multicast the values from the source to all consumers.Parameter subject
The subject to multicast through. A function that returns a ConnectableObservable
Deprecated
Will be removed in v8. To create a connectable observable, use connectable. If you're using refCount after
multicast
, use the share operator instead.multicast(subject), refCount()
is equivalent toshare({ connector: () => subject, resetOnError: false, resetOnComplete: false, resetOnRefCountZero: false })
. Details: https://rxjs.dev/deprecations/multicastingBecause this is deprecated in favor of the connect operator, and was otherwise poorly documented, rather than duplicate the effort of documenting the same behavior, please see documentation for the connect operator.
Parameter subject
The subject used to multicast.
Parameter selector
A setup function to setup the multicast A function that returns an observable that mirrors the observable returned by the selector.
Deprecated
Will be removed in v8. Use the connect operator instead.
multicast(subject, selector)
is equivalent toconnect(selector, { connector: () => subject })
. Details: https://rxjs.dev/deprecations/multicastingAn operator that creates a ConnectableObservable, that when connected, with the
connect
method, will use the provided subject to multicast the values from the source to all consumers.Parameter subjectFactory
A factory that will be called to create the subject. Passing a function here will cause the underlying subject to be "reset" on error, completion, or refCounted unsubscription of the source. A function that returns a ConnectableObservable
Deprecated
Will be removed in v8. To create a connectable observable, use connectable. If you're using refCount after
multicast
, use the share operator instead.multicast(() => new BehaviorSubject('test')), refCount()
is equivalent toshare({ connector: () => new BehaviorSubject('test') })
. Details: https://rxjs.dev/deprecations/multicastingBecause this is deprecated in favor of the connect operator, and was otherwise poorly documented, rather than duplicate the effort of documenting the same behavior, please see documentation for the connect operator.
Parameter subjectFactory
A factory that creates the subject used to multicast.
Parameter selector
A function to setup the multicast and select the output. A function that returns an observable that mirrors the observable returned by the selector.
Deprecated
Will be removed in v8. Use the connect operator instead.
multicast(subjectFactory, selector)
is equivalent toconnect(selector, { connector: subjectFactory })
. Details: https://rxjs.dev/deprecations/multicasting
function never
never: () => Observable<never>;
Deprecated
Replaced with the NEVER constant. Will be removed in v8.
function noop
noop: () => void;
function observeOn
observeOn: <T>( scheduler: SchedulerLike, delay?: number) => MonoTypeOperatorFunction<T>;
Re-emits all notifications from source Observable with specified scheduler.
Ensure a specific scheduler is used, from outside of an Observable.
observeOn
is an operator that accepts a scheduler as a first parameter, which will be used to reschedule notifications emitted by the source Observable. It might be useful, if you do not have control over internal scheduler of a given Observable, but want to control when its values are emitted nevertheless.Returned Observable emits the same notifications (nexted values, complete and error events) as the source Observable, but rescheduled with provided scheduler. Note that this doesn't mean that source Observables internal scheduler will be replaced in any way. Original scheduler still will be used, but when the source Observable emits notification, it will be immediately scheduled again - this time with scheduler passed to
observeOn
. An anti-pattern would be callingobserveOn
on Observable that emits lots of values synchronously, to split that emissions into asynchronous chunks. For this to happen, scheduler would have to be passed into the source Observable directly (usually into the operator that creates it).observeOn
simply delays notifications a little bit more, to ensure that they are emitted at expected moments.As a matter of fact,
observeOn
accepts second parameter, which specifies in milliseconds with what delay notifications will be emitted. The main difference between delay operator andobserveOn
is thatobserveOn
will delay all notifications - including error notifications - whiledelay
will pass through error from source Observable immediately when it is emitted. In general it is highly recommended to usedelay
operator for any kind of delaying of values in the stream, while usingobserveOn
to specify which scheduler should be used for notification emissions in general.## Example
Ensure values in subscribe are called just before browser repaint
import { interval, observeOn, animationFrameScheduler } from 'rxjs';const someDiv = document.createElement('div');someDiv.style.cssText = 'width: 200px;background: #09c';document.body.appendChild(someDiv);const intervals = interval(10); // Intervals are scheduled// with async scheduler by default...intervals.pipe(observeOn(animationFrameScheduler) // ...but we will observe on animationFrame) // scheduler to ensure smooth animation..subscribe(val => {someDiv.style.height = val + 'px';});Parameter scheduler
Scheduler that will be used to reschedule notifications from source Observable.
Parameter delay
Number of milliseconds that states with what delay every notification should be rescheduled. A function that returns an Observable that emits the same notifications as the source Observable, but with provided scheduler.
See Also
function of
of: { (value: null): Observable<null>; (value: undefined): Observable<undefined>; (scheduler: SchedulerLike): Observable<never>; <A extends readonly unknown[]>( ...valuesAndScheduler: [...A, SchedulerLike] ): Observable<ValueFromArray<A>>; (): Observable<never>; <T>(): Observable<T>; <T>(value: T): Observable<T>; <A extends readonly unknown[]>(...values: A): Observable<ValueFromArray<A>>;};
Deprecated
The
scheduler
parameter will be removed in v8. Usescheduled
. Details: https://rxjs.dev/deprecations/scheduler-argumentDeprecated
Do not specify explicit type parameters. Signatures with type parameters that cannot be inferred will be removed in v8.
function onErrorResumeNext
onErrorResumeNext: { <A extends readonly unknown[]>( sources: [...ObservableInputTuple<A>] ): Observable<A[number]>; <A extends readonly unknown[]>( ...sources_0: ObservableInputTuple<A> ): Observable<A[number]>;};
function onErrorResumeNextWith
onErrorResumeNextWith: { <T, A extends readonly unknown[]>( sources: [...ObservableInputTuple<A>] ): OperatorFunction<T, T | A[number]>; <T, A extends readonly unknown[]>( ...sources_0: ObservableInputTuple<A> ): OperatorFunction<T, T | A[number]>;};
function pairs
pairs: { <T>(arr: readonly T[], scheduler?: SchedulerLike): Observable<[string, T]>; <O extends Record<string, unknown>>( obj: O, scheduler?: SchedulerLike ): Observable<[keyof O, O[keyof O]]>; <T>(iterable: Iterable<T>, scheduler?: SchedulerLike): Observable<[string, T]>; ( n: number | bigint | boolean | symbol | ((...args: any[]) => any), scheduler?: SchedulerLike ): Observable<[never, never]>;};
Deprecated
Use
from(Object.entries(obj))
instead. Will be removed in v8.
function pairwise
pairwise: <T>() => OperatorFunction<T, [T, T]>;
Groups pairs of consecutive emissions together and emits them as an array of two values.
Puts the current value and previous value together as an array, and emits that.

The Nth emission from the source Observable will cause the output Observable to emit an array [(N-1)th, Nth] of the previous and the current value, as a pair. For this reason,
pairwise
emits on the second and subsequent emissions from the source Observable, but not on the first emission, because there is no previous value in that case.## Example
On every click (starting from the second), emit the relative distance to the previous click
import { fromEvent, pairwise, map } from 'rxjs';const clicks = fromEvent<PointerEvent>(document, 'click');const pairs = clicks.pipe(pairwise());const distance = pairs.pipe(map(([first, second]) => {const x0 = first.clientX;const y0 = first.clientY;const x1 = second.clientX;const y1 = second.clientY;return Math.sqrt(Math.pow(x0 - x1, 2) + Math.pow(y0 - y1, 2));}));distance.subscribe(x => console.log(x));See Also
A function that returns an Observable of pairs (as arrays) of consecutive values from the source Observable.
function partition
partition: { <T, U extends T, A>( source: ObservableInput<T>, predicate: (this: A, value: T, index: number) => value is U, thisArg: A ): [Observable<U>, Observable<Exclude<T, U>>]; <T, U extends T>( source: ObservableInput<T>, predicate: (value: T, index: number) => value is U ): [Observable<U>, Observable<Exclude<T, U>>]; <T, A>( source: ObservableInput<T>, predicate: (this: A, value: T, index: number) => boolean, thisArg: A ): [Observable<T>, Observable<T>]; <T>( source: ObservableInput<T>, predicate: (value: T, index: number) => boolean ): [Observable<T>, Observable<T>];};
Deprecated
Use a closure instead of a
thisArg
. Signatures accepting athisArg
will be removed in v8.
function pipe
pipe: { (): typeof identity; <T, A>(fn1: UnaryFunction<T, A>): UnaryFunction<T, A>; <T, A, B>(fn1: UnaryFunction<T, A>, fn2: UnaryFunction<A, B>): UnaryFunction< T, B >; <T, A, B, C>( fn1: UnaryFunction<T, A>, fn2: UnaryFunction<A, B>, fn3: UnaryFunction<B, C> ): UnaryFunction<T, C>; <T, A, B, C, D>( fn1: UnaryFunction<T, A>, fn2: UnaryFunction<A, B>, fn3: UnaryFunction<B, C>, fn4: UnaryFunction<C, D> ): UnaryFunction<T, D>; <T, A, B, C, D, E>( fn1: UnaryFunction<T, A>, fn2: UnaryFunction<A, B>, fn3: UnaryFunction<B, C>, fn4: UnaryFunction<C, D>, fn5: UnaryFunction<D, E> ): UnaryFunction<T, E>; <T, A, B, C, D, E, F>( fn1: UnaryFunction<T, A>, fn2: UnaryFunction<A, B>, fn3: UnaryFunction<B, C>, fn4: UnaryFunction<C, D>, fn5: UnaryFunction<D, E>, fn6: UnaryFunction<E, F> ): UnaryFunction<T, F>; <T, A, B, C, D, E, F, G>( fn1: UnaryFunction<T, A>, fn2: UnaryFunction<A, B>, fn3: UnaryFunction<B, C>, fn4: UnaryFunction<C, D>, fn5: UnaryFunction<D, E>, fn6: UnaryFunction<E, F>, fn7: UnaryFunction<F, G> ): UnaryFunction<T, G>; <T, A, B, C, D, E, F, G, H>( fn1: UnaryFunction<T, A>, fn2: UnaryFunction<A, B>, fn3: UnaryFunction<B, C>, fn4: UnaryFunction<C, D>, fn5: UnaryFunction<D, E>, fn6: UnaryFunction<E, F>, fn7: UnaryFunction<F, G>, fn8: UnaryFunction<G, H> ): UnaryFunction<T, H>; <T, A, B, C, D, E, F, G, H, I>( fn1: UnaryFunction<T, A>, fn2: UnaryFunction<A, B>, fn3: UnaryFunction<B, C>, fn4: UnaryFunction<C, D>, fn5: UnaryFunction<D, E>, fn6: UnaryFunction<E, F>, fn7: UnaryFunction<F, G>, fn8: UnaryFunction<G, H>, fn9: UnaryFunction<H, I> ): UnaryFunction<T, I>; <T, A, B, C, D, E, F, G, H, I>( fn1: UnaryFunction<T, A>, fn2: UnaryFunction<A, B>, fn3: UnaryFunction<B, C>, fn4: UnaryFunction<C, D>, fn5: UnaryFunction<D, E>, fn6: UnaryFunction<E, F>, fn7: UnaryFunction<F, G>, fn8: UnaryFunction<G, H>, fn9: UnaryFunction<H, I>, ...fns: UnaryFunction<any, any>[] ): UnaryFunction<T, unknown>;};
function pluck
pluck: { <T, K1 extends keyof T>(k1: K1): OperatorFunction<T, T[K1]>; <T, K1 extends keyof T, K2 extends keyof T[K1]>( k1: K1, k2: K2 ): OperatorFunction<T, T[K1][K2]>; <T, K1 extends keyof T, K2 extends keyof T[K1], K3 extends keyof T[K1][K2]>( k1: K1, k2: K2, k3: K3 ): OperatorFunction<T, T[K1][K2][K3]>; < T, K1 extends keyof T, K2 extends keyof T[K1], K3 extends keyof T[K1][K2], K4 extends keyof T[K1][K2][K3] >( k1: K1, k2: K2, k3: K3, k4: K4 ): OperatorFunction<T, T[K1][K2][K3][K4]>; < T, K1 extends keyof T, K2 extends keyof T[K1], K3 extends keyof T[K1][K2], K4 extends keyof T[K1][K2][K3], K5 extends keyof T[K1][K2][K3][K4] >( k1: K1, k2: K2, k3: K3, k4: K4, k5: K5 ): OperatorFunction<T, T[K1][K2][K3][K4][K5]>; < T, K1 extends keyof T, K2 extends keyof T[K1], K3 extends keyof T[K1][K2], K4 extends keyof T[K1][K2][K3], K5 extends keyof T[K1][K2][K3][K4], K6 extends keyof T[K1][K2][K3][K4][K5] >( k1: K1, k2: K2, k3: K3, k4: K4, k5: K5, k6: K6 ): OperatorFunction<T, T[K1][K2][K3][K4][K5][K6]>; < T, K1 extends keyof T, K2 extends keyof T[K1], K3 extends keyof T[K1][K2], K4 extends keyof T[K1][K2][K3], K5 extends keyof T[K1][K2][K3][K4], K6 extends keyof T[K1][K2][K3][K4][K5] >( k1: K1, k2: K2, k3: K3, k4: K4, k5: K5, k6: K6, ...rest: string[] ): OperatorFunction<T, unknown>; <T>(...properties: string[]): OperatorFunction<T, unknown>;};
Deprecated
Use map and optional chaining:
pluck('foo', 'bar')
ismap(x => x?.foo?.bar)
. Will be removed in v8.
function publish
publish: { <T>(): UnaryFunction<Observable<T>, ConnectableObservable<T>>; <T, O extends ObservableInput<any>>( selector: (shared: Observable<T>) => O ): OperatorFunction<T, ObservedValueOf<O>>;};
Returns a connectable observable that, when connected, will multicast all values through a single underlying Subject instance.
Deprecated
Will be removed in v8. To create a connectable observable, use connectable.
source.pipe(publish())
is equivalent toconnectable(source, { connector: () => new Subject(), resetOnDisconnect: false })
. If you're using refCount afterpublish
, use share operator instead.source.pipe(publish(), refCount())
is equivalent tosource.pipe(share({ resetOnError: false, resetOnComplete: false, resetOnRefCountZero: false }))
. Details: https://rxjs.dev/deprecations/multicastingReturns an observable, that when subscribed to, creates an underlying Subject, provides an observable view of it to a
selector
function, takes the observable result of that selector function and subscribes to it, sending its values to the consumer, _then_ connects the subject to the original source.Parameter selector
A function used to setup multicasting prior to automatic connection.
Deprecated
Will be removed in v8. Use the connect operator instead.
publish(selector)
is equivalent toconnect(selector)
. Details: https://rxjs.dev/deprecations/multicasting
function publishBehavior
publishBehavior: <T>( initialValue: T) => UnaryFunction<Observable<T>, ConnectableObservable<T>>;
Creates a ConnectableObservable that utilizes a BehaviorSubject.
Parameter initialValue
The initial value passed to the BehaviorSubject. A function that returns a ConnectableObservable
Deprecated
Will be removed in v8. To create a connectable observable that uses a BehaviorSubject under the hood, use connectable.
source.pipe(publishBehavior(initValue))
is equivalent toconnectable(source, { connector: () => new BehaviorSubject(initValue), resetOnDisconnect: false })
. If you're using refCount afterpublishBehavior
, use the share operator instead.source.pipe(publishBehavior(initValue), refCount())
is equivalent tosource.pipe(share({ connector: () => new BehaviorSubject(initValue), resetOnError: false, resetOnComplete: false, resetOnRefCountZero: false }))
. Details: https://rxjs.dev/deprecations/multicasting
function publishLast
publishLast: <T>() => UnaryFunction<Observable<T>, ConnectableObservable<T>>;
Returns a connectable observable sequence that shares a single subscription to the underlying sequence containing only the last notification.

Similar to publish, but it waits until the source observable completes and stores the last emitted value. Similarly to publishReplay and publishBehavior, this keeps storing the last value even if it has no more subscribers. If subsequent subscriptions happen, they will immediately get that last stored value and complete.
## Example
import { ConnectableObservable, interval, publishLast, tap, take } from 'rxjs';const connectable = <ConnectableObservable<number>>interval(1000).pipe(tap(x => console.log('side effect', x)),take(3),publishLast());connectable.subscribe({next: x => console.log('Sub. A', x),error: err => console.log('Sub. A Error', err),complete: () => console.log('Sub. A Complete')});connectable.subscribe({next: x => console.log('Sub. B', x),error: err => console.log('Sub. B Error', err),complete: () => console.log('Sub. B Complete')});connectable.connect();// Results:// 'side effect 0' - after one second// 'side effect 1' - after two seconds// 'side effect 2' - after three seconds// 'Sub. A 2' - immediately after 'side effect 2'// 'Sub. B 2'// 'Sub. A Complete'// 'Sub. B Complete'See Also
A function that returns an Observable that emits elements of a sequence produced by multicasting the source sequence.
Deprecated
Will be removed in v8. To create a connectable observable with an AsyncSubject under the hood, use connectable.
source.pipe(publishLast())
is equivalent toconnectable(source, { connector: () => new AsyncSubject(), resetOnDisconnect: false })
. If you're using refCount afterpublishLast
, use the share operator instead.source.pipe(publishLast(), refCount())
is equivalent tosource.pipe(share({ connector: () => new AsyncSubject(), resetOnError: false, resetOnComplete: false, resetOnRefCountZero: false }))
. Details: https://rxjs.dev/deprecations/multicasting
function publishReplay
publishReplay: { <T>( bufferSize?: number, windowTime?: number, timestampProvider?: TimestampProvider ): MonoTypeOperatorFunction<T>; <T, O extends ObservableInput<any>>( bufferSize: number, windowTime: number, selector: (shared: Observable<T>) => O, timestampProvider?: TimestampProvider ): OperatorFunction<T, ObservedValueOf<O>>; <T, O extends ObservableInput<any>>( bufferSize: number, windowTime: number, selector: undefined, timestampProvider: TimestampProvider ): OperatorFunction<T, ObservedValueOf<O>>;};
Creates a ConnectableObservable that uses a ReplaySubject internally.
Parameter bufferSize
The buffer size for the underlying ReplaySubject.
Parameter windowTime
The window time for the underlying ReplaySubject.
Parameter timestampProvider
The timestamp provider for the underlying ReplaySubject.
Deprecated
Will be removed in v8. To create a connectable observable that uses a ReplaySubject under the hood, use connectable.
source.pipe(publishReplay(size, time, scheduler))
is equivalent toconnectable(source, { connector: () => new ReplaySubject(size, time, scheduler), resetOnDisconnect: false })
. If you're using refCount afterpublishReplay
, use the share operator instead.publishReplay(size, time, scheduler), refCount()
is equivalent toshare({ connector: () => new ReplaySubject(size, time, scheduler), resetOnError: false, resetOnComplete: false, resetOnRefCountZero: false })
. Details: https://rxjs.dev/deprecations/multicastingCreates an observable, that when subscribed to, will create a ReplaySubject, and pass an observable from it (using [asObservable](api/index/class/Subject#asObservable)) to the
selector
function, which then returns an observable that is subscribed to before "connecting" the source to the internalReplaySubject
.Since this is deprecated, for additional details see the documentation for connect.
Parameter bufferSize
The buffer size for the underlying ReplaySubject.
Parameter windowTime
The window time for the underlying ReplaySubject.
Parameter selector
A function used to setup the multicast.
Parameter timestampProvider
The timestamp provider for the underlying ReplaySubject.
Deprecated
Will be removed in v8. Use the connect operator instead.
source.pipe(publishReplay(size, window, selector, scheduler))
is equivalent tosource.pipe(connect(selector, { connector: () => new ReplaySubject(size, window, scheduler) }))
. Details: https://rxjs.dev/deprecations/multicastingCreates a ConnectableObservable that uses a ReplaySubject internally.
Parameter bufferSize
The buffer size for the underlying ReplaySubject.
Parameter windowTime
The window time for the underlying ReplaySubject.
Parameter selector
Passing
undefined
here determines that this operator will return a ConnectableObservable.Parameter timestampProvider
The timestamp provider for the underlying ReplaySubject.
Deprecated
Will be removed in v8. To create a connectable observable that uses a ReplaySubject under the hood, use connectable.
source.pipe(publishReplay(size, time, scheduler))
is equivalent toconnectable(source, { connector: () => new ReplaySubject(size, time, scheduler), resetOnDisconnect: false })
. If you're using refCount afterpublishReplay
, use the share operator instead.publishReplay(size, time, scheduler), refCount()
is equivalent toshare({ connector: () => new ReplaySubject(size, time, scheduler), resetOnError: false, resetOnComplete: false, resetOnRefCountZero: false })
. Details: https://rxjs.dev/deprecations/multicasting
function race
race: { <T extends readonly unknown[]>(inputs: [...ObservableInputTuple<T>]): Observable< T[number] >; <T extends readonly unknown[]>(...inputs_0: ObservableInputTuple<T>): Observable< T[number] >;};
function raceWith
raceWith: <T, A extends readonly unknown[]>( ...otherSources_0: ObservableInputTuple<A>) => OperatorFunction<T, T | A[number]>;
Creates an Observable that mirrors the first source Observable to emit a next, error or complete notification from the combination of the Observable to which the operator is applied and supplied Observables.
## Example
import { interval, map, raceWith } from 'rxjs';const obs1 = interval(7000).pipe(map(() => 'slow one'));const obs2 = interval(3000).pipe(map(() => 'fast one'));const obs3 = interval(5000).pipe(map(() => 'medium one'));obs1.pipe(raceWith(obs2, obs3)).subscribe(winner => console.log(winner));// Outputs// a series of 'fast one'Parameter otherSources
Sources used to race for which Observable emits first. A function that returns an Observable that mirrors the output of the first Observable to emit an item.
function range
range: { (start: number, count?: number): Observable<number>; (start: number, count: number, scheduler: SchedulerLike): Observable<number>;};
Deprecated
The
scheduler
parameter will be removed in v8. Userange(start, count).pipe(observeOn(scheduler))
instead. Details: Details: https://rxjs.dev/deprecations/scheduler-argument
function reduce
reduce: { <V, A = V>( accumulator: (acc: A | V, value: V, index: number) => A ): OperatorFunction<V, V | A>; <V, A>( accumulator: (acc: A, value: V, index: number) => A, seed: A ): OperatorFunction<V, A>; <V, A, S = A>( accumulator: (acc: A | S, value: V, index: number) => A, seed: S ): OperatorFunction<V, A>;};
function refCount
refCount: <T>() => MonoTypeOperatorFunction<T>;
Make a ConnectableObservable behave like a ordinary observable and automates the way you can connect to it.
Internally it counts the subscriptions to the observable and subscribes (only once) to the source if the number of subscriptions is larger than 0. If the number of subscriptions is smaller than 1, it unsubscribes from the source. This way you can make sure that everything before the *published* refCount has only a single subscription independently of the number of subscribers to the target observable.
Note that using the share operator is exactly the same as using the
multicast(() => new Subject())
operator (making the observable hot) and the *refCount* operator in a sequence.
## Example
In the following example there are two intervals turned into connectable observables by using the *publish* operator. The first one uses the *refCount* operator, the second one does not use it. You will notice that a connectable observable does nothing until you call its connect function.
import { interval, tap, publish, refCount } from 'rxjs';// Turn the interval observable into a ConnectableObservable (hot)const refCountInterval = interval(400).pipe(tap(num => console.log(`refCount ${ num }`)),publish(),refCount());const publishedInterval = interval(400).pipe(tap(num => console.log(`publish ${ num }`)),publish());refCountInterval.subscribe();refCountInterval.subscribe();// 'refCount 0' -----> 'refCount 1' -----> etc// All subscriptions will receive the same value and the tap (and// every other operator) before the `publish` operator will be executed// only once per event independently of the number of subscriptions.publishedInterval.subscribe();// Nothing happens until you call .connect() on the observable.A function that returns an Observable that automates the connection to ConnectableObservable.
See Also
Deprecated
Replaced with the share operator. How
share
is used will depend on the connectable observable you created just prior to therefCount
operator. Details: https://rxjs.dev/deprecations/multicasting
function repeat
repeat: <T>( countOrConfig?: number | RepeatConfig) => MonoTypeOperatorFunction<T>;
Returns an Observable that will resubscribe to the source stream when the source stream completes.
Repeats all values emitted on the source. It's like retry, but for non error cases.

Repeat will output values from a source until the source completes, then it will resubscribe to the source a specified number of times, with a specified delay. Repeat can be particularly useful in combination with closing operators like take, takeUntil, first, or takeWhile, as it can be used to restart a source again from scratch.
Repeat is very similar to retry, where retry will resubscribe to the source in the error case, but
repeat
will resubscribe if the source completes.Note that
repeat
will _not_ catch errors. Use retry for that.-
repeat(0)
returns an empty observable -repeat()
will repeat forever -repeat({ delay: 200 })
will repeat forever, with a delay of 200ms between repetitions. -repeat({ count: 2, delay: 400 })
will repeat twice, with a delay of 400ms between repetitions. -repeat({ delay: (count) => timer(count * 1000) })
will repeat forever, but will have a delay that grows by one second for each repetition.## Example
Repeat a message stream
import { of, repeat } from 'rxjs';const source = of('Repeat message');const result = source.pipe(repeat(3));result.subscribe(x => console.log(x));// Results// 'Repeat message'// 'Repeat message'// 'Repeat message'Repeat 3 values, 2 times
import { interval, take, repeat } from 'rxjs';const source = interval(1000);const result = source.pipe(take(3), repeat(2));result.subscribe(x => console.log(x));// Results every second// 0// 1// 2// 0// 1// 2Defining two complex repeats with delays on the same source. Note that the second repeat cannot be called until the first repeat as exhausted it's count.
import { defer, of, repeat } from 'rxjs';const source = defer(() => {return of(`Hello, it is ${new Date()}`)});source.pipe(// Repeat 3 times with a delay of 1 second between repetitionsrepeat({count: 3,delay: 1000,}),// *Then* repeat forever, but with an exponential step-back// maxing out at 1 minute.repeat({delay: (count) => timer(Math.min(60000, 2 ^ count * 1000))}))Parameter countOrConfig
Either the number of times the source Observable items are repeated (a count of 0 will yield an empty Observable) or a RepeatConfig object.
See Also
function repeatWhen
repeatWhen: <T>( notifier: (notifications: Observable<void>) => ObservableInput<any>) => MonoTypeOperatorFunction<T>;
Returns an Observable that mirrors the source Observable with the exception of a
complete
. If the source Observable callscomplete
, this method will emit to the Observable returned fromnotifier
. If that Observable callscomplete
orerror
, then this method will callcomplete
orerror
on the child subscription. Otherwise this method will resubscribe to the source Observable.
## Example
Repeat a message stream on click
import { of, fromEvent, repeatWhen } from 'rxjs';const source = of('Repeat message');const documentClick$ = fromEvent(document, 'click');const result = source.pipe(repeatWhen(() => documentClick$));result.subscribe(data => console.log(data))Parameter notifier
Function that receives an Observable of notifications with which a user can
complete
orerror
, aborting the repetition. A function that returns an Observable that mirrors the source Observable with the exception of acomplete
.See Also
Deprecated
Will be removed in v9 or v10. Use repeat's option instead. Instead of
repeatWhen(() => notify$)
, use:repeat({ delay: () => notify$ })
.
function retry
retry: { <T>(count?: number): MonoTypeOperatorFunction<T>; <T>(config: RetryConfig): MonoTypeOperatorFunction<T>;};
function retryWhen
retryWhen: <T>( notifier: (errors: Observable<any>) => ObservableInput<any>) => MonoTypeOperatorFunction<T>;
Returns an Observable that mirrors the source Observable with the exception of an
error
. If the source Observable callserror
, this method will emit the Throwable that caused the error to theObservableInput
returned fromnotifier
. If that Observable callscomplete
orerror
then this method will callcomplete
orerror
on the child subscription. Otherwise this method will resubscribe to the source Observable.
Retry an observable sequence on error based on custom criteria.
## Example
import { interval, map, retryWhen, tap, delayWhen, timer } from 'rxjs';const source = interval(1000);const result = source.pipe(map(value => {if (value > 5) {// error will be picked up by retryWhenthrow value;}return value;}),retryWhen(errors =>errors.pipe(// log error messagetap(value => console.log(`Value ${ value } was too high!`)),// restart in 5 secondsdelayWhen(value => timer(value * 1000)))));result.subscribe(value => console.log(value));// results:// 0// 1// 2// 3// 4// 5// 'Value 6 was too high!'// - Wait 5 seconds then repeatParameter notifier
Function that receives an Observable of notifications with which a user can
complete
orerror
, aborting the retry. A function that returns an Observable that mirrors the source Observable with the exception of anerror
.See Also
Deprecated
Will be removed in v9 or v10, use retry's
delay
option instead. Will be removed in v9 or v10. Use retry's option instead. Instead ofretryWhen(() => notify$)
, use:retry({ delay: () => notify$ })
.
function sample
sample: <T>(notifier: ObservableInput<any>) => MonoTypeOperatorFunction<T>;
Emits the most recently emitted value from the source Observable whenever another Observable, the
notifier
, emits.It's like sampleTime, but samples whenever the
notifier
ObservableInput
emits something.
Whenever the
notifier
ObservableInput
emits a value,sample
looks at the source Observable and emits whichever value it has most recently emitted since the previous sampling, unless the source has not emitted anything since the previous sampling. Thenotifier
is subscribed to as soon as the output Observable is subscribed.## Example
On every click, sample the most recent
seconds
timerimport { fromEvent, interval, sample } from 'rxjs';const seconds = interval(1000);const clicks = fromEvent(document, 'click');const result = seconds.pipe(sample(clicks));result.subscribe(x => console.log(x));Parameter notifier
The
ObservableInput
to use for sampling the source Observable. A function that returns an Observable that emits the results of sampling the values emitted by the source Observable whenever the notifier Observable emits value or completes.See Also
function sampleTime
sampleTime: <T>( period: number, scheduler?: SchedulerLike) => MonoTypeOperatorFunction<T>;
Emits the most recently emitted value from the source Observable within periodic time intervals.
Samples the source Observable at periodic time intervals, emitting what it samples.

sampleTime
periodically looks at the source Observable and emits whichever value it has most recently emitted since the previous sampling, unless the source has not emitted anything since the previous sampling. The sampling happens periodically in time everyperiod
milliseconds (or the time unit defined by the optionalscheduler
argument). The sampling starts as soon as the output Observable is subscribed.## Example
Every second, emit the most recent click at most once
import { fromEvent, sampleTime } from 'rxjs';const clicks = fromEvent(document, 'click');const result = clicks.pipe(sampleTime(1000));result.subscribe(x => console.log(x));Parameter period
The sampling period expressed in milliseconds or the time unit determined internally by the optional
scheduler
.Parameter scheduler
The SchedulerLike to use for managing the timers that handle the sampling. A function that returns an Observable that emits the results of sampling the values emitted by the source Observable at the specified time interval.
See Also
function scan
scan: { <V, A = V>( accumulator: (acc: A | V, value: V, index: number) => A ): OperatorFunction<V, V | A>; <V, A>( accumulator: (acc: A, value: V, index: number) => A, seed: A ): OperatorFunction<V, A>; <V, A, S>( accumulator: (acc: A | S, value: V, index: number) => A, seed: S ): OperatorFunction<V, A>;};
function scheduled
scheduled: <T>( input: ObservableInput<T>, scheduler: SchedulerLike) => Observable<T>;
Converts from a common ObservableInput type to an observable where subscription and emissions are scheduled on the provided scheduler.
Parameter input
The observable, array, promise, iterable, etc you would like to schedule
Parameter scheduler
The scheduler to use to schedule the subscription and emissions from the returned observable.
See Also
function sequenceEqual
sequenceEqual: <T>( compareTo: ObservableInput<T>, comparator?: (a: T, b: T) => boolean) => OperatorFunction<T, boolean>;
Compares all values of two observables in sequence using an optional comparator function and returns an observable of a single boolean value representing whether or not the two sequences are equal.
Checks to see of all values emitted by both observables are equal, in order.

sequenceEqual
subscribes to source observable andcompareTo
ObservableInput
(that internally gets converted to an observable) and buffers incoming values from each observable. Whenever either observable emits a value, the value is buffered and the buffers are shifted and compared from the bottom up; If any value pair doesn't match, the returned observable will emitfalse
and complete. If one of the observables completes, the operator will wait for the other observable to complete; If the other observable emits before completing, the returned observable will emitfalse
and complete. If one observable never completes or emits after the other completes, the returned observable will never complete.## Example
Figure out if the Konami code matches
import { from, fromEvent, map, bufferCount, mergeMap, sequenceEqual } from 'rxjs';const codes = from(['ArrowUp','ArrowUp','ArrowDown','ArrowDown','ArrowLeft','ArrowRight','ArrowLeft','ArrowRight','KeyB','KeyA','Enter', // no start key, clearly.]);const keys = fromEvent<KeyboardEvent>(document, 'keyup').pipe(map(e => e.code));const matches = keys.pipe(bufferCount(11, 1),mergeMap(last11 => from(last11).pipe(sequenceEqual(codes))));matches.subscribe(matched => console.log('Successful cheat at Contra? ', matched));Parameter compareTo
The
ObservableInput
sequence to compare the source sequence to.Parameter comparator
An optional function to compare each value pair.
A function that returns an Observable that emits a single boolean value representing whether or not the values emitted by the source Observable and provided
ObservableInput
were equal in sequence.See Also
function share
share: { <T>(): MonoTypeOperatorFunction<T>; <T>(options: ShareConfig<T>): MonoTypeOperatorFunction<T>;};
function shareReplay
shareReplay: { <T>(config: ShareReplayConfig): MonoTypeOperatorFunction<T>; <T>( bufferSize?: number, windowTime?: number, scheduler?: SchedulerLike ): MonoTypeOperatorFunction<T>;};
function single
single: { <T>(predicate: BooleanConstructor): OperatorFunction<T, TruthyTypesOf<T>>; <T>( predicate?: (value: T, index: number, source: Observable<T>) => boolean ): MonoTypeOperatorFunction<T>;};
function skip
skip: <T>(count: number) => MonoTypeOperatorFunction<T>;
Returns an Observable that skips the first
count
items emitted by the source Observable.
Skips the values until the sent notifications are equal or less than provided skip count. It raises an error if skip count is equal or more than the actual number of emits and source raises an error.
## Example
Skip the values before the emission
import { interval, skip } from 'rxjs';// emit every half secondconst source = interval(500);// skip the first 10 emitted valuesconst result = source.pipe(skip(10));result.subscribe(value => console.log(value));// output: 10...11...12...13...Parameter count
The number of times, items emitted by source Observable should be skipped. A function that returns an Observable that skips the first
count
values emitted by the source Observable.See Also
function skipLast
skipLast: <T>(skipCount: number) => MonoTypeOperatorFunction<T>;
Skip a specified number of values before the completion of an observable.

Returns an observable that will emit values as soon as it can, given a number of skipped values. For example, if you
skipLast(3)
on a source, when the source emits its fourth value, the first value the source emitted will finally be emitted from the returned observable, as it is no longer part of what needs to be skipped.All values emitted by the result of
skipLast(N)
will be delayed byN
emissions, as each value is held in a buffer until enough values have been emitted that that the buffered value may finally be sent to the consumer.After subscribing, unsubscribing will not result in the emission of the buffered skipped values.
## Example
Skip the last 2 values of an observable with many values
import { of, skipLast } from 'rxjs';const numbers = of(1, 2, 3, 4, 5);const skipLastTwo = numbers.pipe(skipLast(2));skipLastTwo.subscribe(x => console.log(x));// Results in:// 1 2 3// (4 and 5 are skipped)Parameter skipCount
Number of elements to skip from the end of the source Observable. A function that returns an Observable that skips the last
count
values emitted by the source Observable.See Also
function skipUntil
skipUntil: <T>(notifier: ObservableInput<any>) => MonoTypeOperatorFunction<T>;
Returns an Observable that skips items emitted by the source Observable until a second Observable emits an item.
The
skipUntil
operator causes the observable stream to skip the emission of values until the passed in observable emits the first value. This can be particularly useful in combination with user interactions, responses of HTTP requests or waiting for specific times to pass by.
Internally, the
skipUntil
operator subscribes to the passed innotifier
ObservableInput
(which gets converted to an Observable) in order to recognize the emission of its first value. Whennotifier
emits next, the operator unsubscribes from it and starts emitting the values of the *source* observable until it completes or errors. It will never let the *source* observable emit any values if thenotifier
completes or throws an error without emitting a value before.## Example
In the following example, all emitted values of the interval observable are skipped until the user clicks anywhere within the page
import { interval, fromEvent, skipUntil } from 'rxjs';const intervalObservable = interval(1000);const click = fromEvent(document, 'click');const emitAfterClick = intervalObservable.pipe(skipUntil(click));// clicked at 4.6s. output: 5...6...7...8........ or// clicked at 7.3s. output: 8...9...10..11.......emitAfterClick.subscribe(value => console.log(value));Parameter notifier
An
ObservableInput
that has to emit an item before the source Observable elements begin to be mirrored by the resulting Observable. A function that returns an Observable that skips items from the source Observable until thenotifier
Observable emits an item, then emits the remaining items.See Also
function skipWhile
skipWhile: { <T>(predicate: BooleanConstructor): OperatorFunction< T, Extract<T, Falsy> extends never ? never : T >; <T>(predicate: (value: T, index: number) => true): OperatorFunction<T, never>; <T>( predicate: (value: T, index: number) => boolean ): MonoTypeOperatorFunction<T>;};
function startWith
startWith: { <T>(value: null): OperatorFunction<T, T | null>; <T>(value: undefined): OperatorFunction<T, T>; <T, A extends readonly unknown[] = T[]>( ...valuesAndScheduler: [...A, SchedulerLike] ): OperatorFunction<T, T | ValueFromArray<A>>; <T, A extends readonly unknown[] = T[]>(...values: A): OperatorFunction< T, T | ValueFromArray<A> >;};
Deprecated
The
scheduler
parameter will be removed in v8. Usescheduled
andconcatAll
. Details: https://rxjs.dev/deprecations/scheduler-argument
function subscribeOn
subscribeOn: <T>( scheduler: SchedulerLike, delay?: number) => MonoTypeOperatorFunction<T>;
Asynchronously subscribes Observers to this Observable on the specified SchedulerLike.
With
subscribeOn
you can decide what type of scheduler a specific Observable will be using when it is subscribed to.Schedulers control the speed and order of emissions to observers from an Observable stream.

## Example
Given the following code:
import { of, merge } from 'rxjs';const a = of(1, 2, 3);const b = of(4, 5, 6);merge(a, b).subscribe(console.log);// Outputs// 1// 2// 3// 4// 5// 6Both Observable
a
andb
will emit their values directly and synchronously once they are subscribed to.If we instead use the
subscribeOn
operator declaring that we want to use the asyncScheduler for values emitted by Observablea
:import { of, subscribeOn, asyncScheduler, merge } from 'rxjs';const a = of(1, 2, 3).pipe(subscribeOn(asyncScheduler));const b = of(4, 5, 6);merge(a, b).subscribe(console.log);// Outputs// 4// 5// 6// 1// 2// 3The reason for this is that Observable
b
emits its values directly and synchronously like before but the emissions froma
are scheduled on the event loop because we are now using the asyncScheduler for that specific Observable.Parameter scheduler
The SchedulerLike to perform subscription actions on.
Parameter delay
A delay to pass to the scheduler to delay subscriptions A function that returns an Observable modified so that its subscriptions happen on the specified SchedulerLike.
function switchAll
switchAll: <O extends ObservableInput<any>>() => OperatorFunction< O, ObservedValueOf<O>>;
Converts a higher-order Observable into a first-order Observable producing values only from the most recent observable sequence
Flattens an Observable-of-Observables.

switchAll
subscribes to a source that is an observable of observables, also known as a "higher-order observable" (orObservable<Observable<T>>
). It subscribes to the most recently provided "inner observable" emitted by the source, unsubscribing from any previously subscribed to inner observable, such that only the most recent inner observable may be subscribed to at any point in time. The resulting observable returned byswitchAll
will only complete if the source observable completes, *and* any currently subscribed to inner observable also has completed, if there are any.## Examples
Spawn a new interval observable for each click event, but for every new click, cancel the previous interval and subscribe to the new one
import { fromEvent, tap, map, interval, switchAll } from 'rxjs';const clicks = fromEvent(document, 'click').pipe(tap(() => console.log('click')));const source = clicks.pipe(map(() => interval(1000)));source.pipe(switchAll()).subscribe(x => console.log(x));// Output// click// 0// 1// 2// 3// ...// click// 0// 1// 2// ...// click// ...See Also
A function that returns an Observable that converts a higher-order Observable into a first-order Observable producing values only from the most recent Observable sequence.
function switchMap
switchMap: { <T, O extends ObservableInput<any>>( project: (value: T, index: number) => O ): OperatorFunction<T, ObservedValueOf<O>>; <T, O extends ObservableInput<any>>( project: (value: T, index: number) => O, resultSelector: undefined ): OperatorFunction<T, ObservedValueOf<O>>; <T, R, O extends ObservableInput<any>>( project: (value: T, index: number) => O, resultSelector: ( outerValue: T, innerValue: ObservedValueOf<O>, outerIndex: number, innerIndex: number ) => R ): OperatorFunction<T, R>;};
Deprecated
The
resultSelector
parameter will be removed in v8. Use an innermap
instead. Details: https://rxjs.dev/deprecations/resultSelector
function switchMapTo
switchMapTo: { <O extends ObservableInput<unknown>>(observable: O): OperatorFunction< unknown, ObservedValueOf<O> >; <O extends ObservableInput<unknown>>( observable: O, resultSelector: undefined ): OperatorFunction<unknown, ObservedValueOf<O>>; <T, R, O extends ObservableInput<unknown>>( observable: O, resultSelector: ( outerValue: T, innerValue: ObservedValueOf<O>, outerIndex: number, innerIndex: number ) => R ): OperatorFunction<T, R>;};
Deprecated
Will be removed in v9. Use switchMap instead:
switchMap(() => result)
Deprecated
The
resultSelector
parameter will be removed in v8. Use an innermap
instead. Details: https://rxjs.dev/deprecations/resultSelector
function switchScan
switchScan: <T, R, O extends ObservableInput<any>>( accumulator: (acc: R, value: T, index: number) => O, seed: R) => OperatorFunction<T, ObservedValueOf<O>>;
Applies an accumulator function over the source Observable where the accumulator function itself returns an Observable, emitting values only from the most recently returned Observable.
It's like mergeScan, but only the most recent Observable returned by the accumulator is merged into the outer Observable.
Parameter accumulator
The accumulator function called on each source value.
Parameter seed
The initial accumulation value. A function that returns an observable of the accumulated values.
See Also
function take
take: <T>(count: number) => MonoTypeOperatorFunction<T>;
Emits only the first
count
values emitted by the source Observable.Takes the first
count
values from the source, then completes.
take
returns an Observable that emits only the firstcount
values emitted by the source Observable. If the source emits fewer thancount
values then all of its values are emitted. After that, it completes, regardless if the source completes.## Example
Take the first 5 seconds of an infinite 1-second interval Observable
import { interval, take } from 'rxjs';const intervalCount = interval(1000);const takeFive = intervalCount.pipe(take(5));takeFive.subscribe(x => console.log(x));// Logs:// 0// 1// 2// 3// 4Parameter count
The maximum number of
next
values to emit. A function that returns an Observable that emits only the firstcount
values emitted by the source Observable, or all of the values from the source if the source emits fewer thancount
values.See Also
function takeLast
takeLast: <T>(count: number) => MonoTypeOperatorFunction<T>;
Waits for the source to complete, then emits the last N values from the source, as specified by the
count
argument.
takeLast
results in an observable that will hold values up tocount
values in memory, until the source completes. It then pushes all values in memory to the consumer, in the order they were received from the source, then notifies the consumer that it is complete.If for some reason the source completes before the
count
supplied totakeLast
is reached, all values received until that point are emitted, and then completion is notified.**Warning**: Using
takeLast
with an observable that never completes will result in an observable that never emits a value.## Example
Take the last 3 values of an Observable with many values
import { range, takeLast } from 'rxjs';const many = range(1, 100);const lastThree = many.pipe(takeLast(3));lastThree.subscribe(x => console.log(x));Parameter count
The maximum number of values to emit from the end of the sequence of values emitted by the source Observable. A function that returns an Observable that emits at most the last
count
values emitted by the source Observable.See Also
function takeUntil
takeUntil: <T>(notifier: ObservableInput<any>) => MonoTypeOperatorFunction<T>;
Emits the values emitted by the source Observable until a
notifier
Observable emits a value.Lets values pass until a second Observable,
notifier
, emits a value. Then, it completes.
takeUntil
subscribes and begins mirroring the source Observable. It also monitors a second Observable,notifier
that you provide. If thenotifier
emits a value, the output Observable stops mirroring the source Observable and completes. If thenotifier
doesn't emit any value and completes thentakeUntil
will pass all values.## Example
Tick every second until the first click happens
import { interval, fromEvent, takeUntil } from 'rxjs';const source = interval(1000);const clicks = fromEvent(document, 'click');const result = source.pipe(takeUntil(clicks));result.subscribe(x => console.log(x));Parameter notifier
The
ObservableInput
whose first emitted value will cause the output Observable oftakeUntil
to stop emitting values from the source Observable. A function that returns an Observable that emits the values from the source Observable untilnotifier
emits its first value.See Also
function takeWhile
takeWhile: { <T>(predicate: BooleanConstructor, inclusive: true): MonoTypeOperatorFunction<T>; <T>(predicate: BooleanConstructor, inclusive: false): OperatorFunction< T, TruthyTypesOf<T> >; <T>(predicate: BooleanConstructor): OperatorFunction<T, TruthyTypesOf<T>>; <T, S extends T>( predicate: (value: T, index: number) => value is S ): OperatorFunction<T, S>; <T, S extends T>( predicate: (value: T, index: number) => value is S, inclusive: false ): OperatorFunction<T, S>; <T>( predicate: (value: T, index: number) => boolean, inclusive?: boolean ): MonoTypeOperatorFunction<T>;};
function tap
tap: { <T>( observerOrNext?: Partial<TapObserver<T>> | ((value: T) => void) ): MonoTypeOperatorFunction<T>; <T>( next?: (value: T) => void, error?: (error: any) => void, complete?: () => void ): MonoTypeOperatorFunction<T>;};
Deprecated
Instead of passing separate callback arguments, use an observer argument. Signatures taking separate callback arguments will be removed in v8. Details: https://rxjs.dev/deprecations/subscribe-arguments
function throttle
throttle: <T>( durationSelector: (value: T) => ObservableInput<any>, config?: ThrottleConfig) => MonoTypeOperatorFunction<T>;
Emits a value from the source Observable, then ignores subsequent source values for a duration determined by another Observable, then repeats this process.
It's like throttleTime, but the silencing duration is determined by a second Observable.

throttle
emits the source Observable values on the output Observable when its internal timer is disabled, and ignores source values when the timer is enabled. Initially, the timer is disabled. As soon as the first source value arrives, it is forwarded to the output Observable, and then the timer is enabled by calling thedurationSelector
function with the source value, which returns the "duration" Observable. When the duration Observable emits a value, the timer is disabled, and this process repeats for the next source value.## Example
Emit clicks at a rate of at most one click per second
import { fromEvent, throttle, interval } from 'rxjs';const clicks = fromEvent(document, 'click');const result = clicks.pipe(throttle(() => interval(1000)));result.subscribe(x => console.log(x));Parameter durationSelector
A function that receives a value from the source Observable, for computing the silencing duration for each source value, returned as an
ObservableInput
.Parameter config
A configuration object to define
leading
andtrailing
behavior. Defaults to{ leading: true, trailing: false }
. A function that returns an Observable that performs the throttle operation to limit the rate of emissions from the source.See Also
function throttleTime
throttleTime: <T>( duration: number, scheduler?: SchedulerLike, config?: ThrottleConfig) => MonoTypeOperatorFunction<T>;
Emits a value from the source Observable, then ignores subsequent source values for
duration
milliseconds, then repeats this process.Lets a value pass, then ignores source values for the next
duration
milliseconds.
throttleTime
emits the source Observable values on the output Observable when its internal timer is disabled, and ignores source values when the timer is enabled. Initially, the timer is disabled. As soon as the first source value arrives, it is forwarded to the output Observable, and then the timer is enabled. Afterduration
milliseconds (or the time unit determined internally by the optionalscheduler
) has passed, the timer is disabled, and this process repeats for the next source value. Optionally takes a SchedulerLike for managing timers.## Examples
### Limit click rate
Emit clicks at a rate of at most one click per second
import { fromEvent, throttleTime } from 'rxjs';const clicks = fromEvent(document, 'click');const result = clicks.pipe(throttleTime(1000));result.subscribe(x => console.log(x));Parameter duration
Time to wait before emitting another value after emitting the last value, measured in milliseconds or the time unit determined internally by the optional
scheduler
.Parameter scheduler
The SchedulerLike to use for managing the timers that handle the throttling. Defaults to asyncScheduler.
Parameter config
A configuration object to define
leading
andtrailing
behavior. Defaults to{ leading: true, trailing: false }
. A function that returns an Observable that performs the throttle operation to limit the rate of emissions from the source.See Also
function throwError
throwError: { (errorFactory: () => any): Observable<never>; (error: any): Observable<never>; (errorOrErrorFactory: any, scheduler: SchedulerLike): Observable<never>;};
Creates an observable that will create an error instance and push it to the consumer as an error immediately upon subscription.
Just errors and does nothing else

This creation function is useful for creating an observable that will create an error and error every time it is subscribed to. Generally, inside of most operators when you might want to return an errored observable, this is unnecessary. In most cases, such as in the inner return of concatMap, mergeMap, defer, and many others, you can simply throw the error, and RxJS will pick that up and notify the consumer of the error.
## Example
Create a simple observable that will create a new error with a timestamp and log it and the message every time you subscribe to it
import { throwError } from 'rxjs';let errorCount = 0;const errorWithTimestamp$ = throwError(() => {const error: any = new Error(`This is error number ${ ++errorCount }`);error.timestamp = Date.now();return error;});errorWithTimestamp$.subscribe({error: err => console.log(err.timestamp, err.message)});errorWithTimestamp$.subscribe({error: err => console.log(err.timestamp, err.message)});// Logs the timestamp and a new error message for each subscription### Unnecessary usage
Using
throwError
inside of an operator or creation function with a callback, is usually not necessaryimport { of, concatMap, timer, throwError } from 'rxjs';const delays$ = of(1000, 2000, Infinity, 3000);delays$.pipe(concatMap(ms => {if (ms < 10000) {return timer(ms);} else {// This is probably overkill.return throwError(() => new Error(`Invalid time ${ ms }`));}})).subscribe({next: console.log,error: console.error});You can just throw the error instead
import { of, concatMap, timer } from 'rxjs';const delays$ = of(1000, 2000, Infinity, 3000);delays$.pipe(concatMap(ms => {if (ms < 10000) {return timer(ms);} else {// Cleaner and easier to read for most folks.throw new Error(`Invalid time ${ ms }`);}})).subscribe({next: console.log,error: console.error});Parameter errorFactory
A factory function that will create the error instance that is pushed.
Returns an observable that will error with the specified error immediately upon subscription.
Parameter error
The error instance to emit
Deprecated
Support for passing an error value will be removed in v8. Instead, pass a factory function to
throwError(() => new Error('test'))
. This is because it will create the error at the moment it should be created and capture a more appropriate stack trace. If for some reason you need to create the error ahead of time, you can still do that:const err = new Error('test'); throwError(() => err);
.Notifies the consumer of an error using a given scheduler by scheduling it at delay
0
upon subscription.Parameter errorOrErrorFactory
An error instance or error factory
Parameter scheduler
A scheduler to use to schedule the error notification
Deprecated
The
scheduler
parameter will be removed in v8. UsethrowError
in combination with observeOn:throwError(() => new Error('test')).pipe(observeOn(scheduler));
. Details: https://rxjs.dev/deprecations/scheduler-argument
function throwIfEmpty
throwIfEmpty: <T>(errorFactory?: () => any) => MonoTypeOperatorFunction<T>;
If the source observable completes without emitting a value, it will emit an error. The error will be created at that time by the optional
errorFactory
argument, otherwise, the error will be EmptyError.
## Example
Throw an error if the document wasn't clicked within 1 second
import { fromEvent, takeUntil, timer, throwIfEmpty } from 'rxjs';const click$ = fromEvent(document, 'click');click$.pipe(takeUntil(timer(1000)),throwIfEmpty(() => new Error('The document was not clicked within 1 second'))).subscribe({next() {console.log('The document was clicked');},error(err) {console.error(err.message);}});Parameter errorFactory
A factory function called to produce the error to be thrown when the source observable completes without emitting a value. A function that returns an Observable that throws an error if the source Observable completed without emitting.
function timeInterval
timeInterval: <T>( scheduler?: SchedulerLike) => OperatorFunction<T, TimeInterval<T>>;
Emits an object containing the current value, and the time that has passed between emitting the current value and the previous value, which is calculated by using the provided
scheduler
'snow()
method to retrieve the current time at each emission, then calculating the difference. Thescheduler
defaults to asyncScheduler, so by default, theinterval
will be in milliseconds.Convert an Observable that emits items into one that emits indications of the amount of time elapsed between those emissions.

## Example
Emit interval between current value with the last value
import { interval, timeInterval } from 'rxjs';const seconds = interval(1000);seconds.pipe(timeInterval()).subscribe(value => console.log(value));// NOTE: The values will never be this precise,// intervals created with `interval` or `setInterval`// are non-deterministic.// { value: 0, interval: 1000 }// { value: 1, interval: 1000 }// { value: 2, interval: 1000 }Parameter scheduler
Scheduler used to get the current time. A function that returns an Observable that emits information about value and interval.
function timeout
timeout: { <T, O extends ObservableInput<unknown>, M = unknown>( config: TimeoutConfig<T, O, M> & { with: (info: TimeoutInfo<T, M>) => O } ): OperatorFunction<T, T | ObservedValueOf<O>>; <T, M = unknown>( config: Omit<TimeoutConfig<T, any, M>, 'with'> ): OperatorFunction<T, T>; <T>(first: Date, scheduler?: SchedulerLike): MonoTypeOperatorFunction<T>; <T>(each: number, scheduler?: SchedulerLike): MonoTypeOperatorFunction<T>;};
If
with
is provided, this will return an observable that will switch to a different observable if the source does not push values within the specified time parameters.The most flexible option for creating a timeout behavior.
The first thing to know about the configuration is if you do not provide a
with
property to the configuration, when timeout conditions are met, this operator will emit a TimeoutError. Otherwise, it will use the factory function provided bywith
, and switch your subscription to the result of that. Timeout conditions are provided by the settings infirst
andeach
.The
first
property can be either aDate
for a specific time, anumber
for a time period relative to the point of subscription, or it can be skipped. This property is to check timeout conditions for the arrival of the first value from the source _only_. The timings of all subsequent values from the source will be checked against the time period provided byeach
, if it was provided.The
each
property can be either anumber
or skipped. If a value foreach
is provided, it represents the amount of time the resulting observable will wait between the arrival of values from the source before timing out. Note that iffirst
is _not_ provided, the value fromeach
will be used to check timeout conditions for the arrival of the first value and all subsequent values. Iffirst
_is_ provided,each
will only be use to check all values after the first.## Examples
Emit a custom error if there is too much time between values
import { interval, timeout, throwError } from 'rxjs';class CustomTimeoutError extends Error {constructor() {super('It was too slow');this.name = 'CustomTimeoutError';}}const slow$ = interval(900);slow$.pipe(timeout({each: 1000,with: () => throwError(() => new CustomTimeoutError())})).subscribe({error: console.error});Switch to a faster observable if your source is slow.
import { interval, timeout } from 'rxjs';const slow$ = interval(900);const fast$ = interval(500);slow$.pipe(timeout({each: 1000,with: () => fast$,})).subscribe(console.log);Parameter config
The configuration for the timeout.
Returns an observable that will error or switch to a different observable if the source does not push values within the specified time parameters.
The most flexible option for creating a timeout behavior.
The first thing to know about the configuration is if you do not provide a
with
property to the configuration, when timeout conditions are met, this operator will emit a TimeoutError. Otherwise, it will use the factory function provided bywith
, and switch your subscription to the result of that. Timeout conditions are provided by the settings infirst
andeach
.The
first
property can be either aDate
for a specific time, anumber
for a time period relative to the point of subscription, or it can be skipped. This property is to check timeout conditions for the arrival of the first value from the source _only_. The timings of all subsequent values from the source will be checked against the time period provided byeach
, if it was provided.The
each
property can be either anumber
or skipped. If a value foreach
is provided, it represents the amount of time the resulting observable will wait between the arrival of values from the source before timing out. Note that iffirst
is _not_ provided, the value fromeach
will be used to check timeout conditions for the arrival of the first value and all subsequent values. Iffirst
_is_ provided,each
will only be use to check all values after the first.### Handling TimeoutErrors
If no
with
property was provided, subscriptions to the resulting observable may emit an error of TimeoutError. The timeout error provides useful information you can examine when you're handling the error. The most common way to handle the error would be with catchError, although you could use tap or just the error handler in yoursubscribe
call directly, if your error handling is only a side effect (such as notifying the user, or logging).In this case, you would check the error for
instanceof TimeoutError
to validate that the error was indeed fromtimeout
, and not from some other source. If it's not fromtimeout
, you should probably rethrow it if you're in acatchError
.## Examples
Emit a TimeoutError if the first value, and _only_ the first value, does not arrive within 5 seconds
import { interval, timeout } from 'rxjs';// A random interval that lasts between 0 and 10 seconds per tickconst source$ = interval(Math.round(Math.random() * 10_000));source$.pipe(timeout({ first: 5_000 })).subscribe({next: console.log,error: console.error});Emit a TimeoutError if the source waits longer than 5 seconds between any two values or the first value and subscription.
import { timer, timeout, expand } from 'rxjs';const getRandomTime = () => Math.round(Math.random() * 10_000);// An observable that waits a random amount of time between each delivered valueconst source$ = timer(getRandomTime()).pipe(expand(() => timer(getRandomTime())));source$.pipe(timeout({ each: 5_000 })).subscribe({next: console.log,error: console.error});Emit a TimeoutError if the source does not emit before 7 seconds, _or_ if the source waits longer than 5 seconds between any two values after the first.
import { timer, timeout, expand } from 'rxjs';const getRandomTime = () => Math.round(Math.random() * 10_000);// An observable that waits a random amount of time between each delivered valueconst source$ = timer(getRandomTime()).pipe(expand(() => timer(getRandomTime())));source$.pipe(timeout({ first: 7_000, each: 5_000 })).subscribe({next: console.log,error: console.error});Returns an observable that will error if the source does not push its first value before the specified time passed as a
Date
. This is functionally the same astimeout({ first: someDate })
.Errors if the first value doesn't show up before the given date and time

Parameter first
The date to at which the resulting observable will timeout if the source observable does not emit at least one value.
Parameter scheduler
The scheduler to use. Defaults to asyncScheduler.
Returns an observable that will error if the source does not push a value within the specified time in milliseconds. This is functionally the same as
timeout({ each: milliseconds })
.Errors if it waits too long between any value

Parameter each
The time allowed between each pushed value from the source before the resulting observable will timeout.
Parameter scheduler
The scheduler to use. Defaults to asyncScheduler.
function timeoutWith
timeoutWith: { <T, R>( dueBy: Date, switchTo: ObservableInput<R>, scheduler?: SchedulerLike ): OperatorFunction<T, T | R>; <T, R>( waitFor: number, switchTo: ObservableInput<R>, scheduler?: SchedulerLike ): OperatorFunction<T, T | R>;};
Deprecated
Replaced with timeout. Instead of
timeoutWith(someDate, a$, scheduler)
, use the configuration objecttimeout({ first: someDate, with: () => a$, scheduler })
. Will be removed in v8.Deprecated
Replaced with timeout. Instead of
timeoutWith(100, a$, scheduler)
, use the configuration objecttimeout({ each: 100, with: () => a$, scheduler })
. Will be removed in v8.
function timer
timer: { (due: number | Date, scheduler?: SchedulerLike): Observable<0>; ( startDue: number | Date, intervalDuration: number, scheduler?: SchedulerLike ): Observable<number>; ( dueTime: number | Date, unused: undefined, scheduler?: SchedulerLike ): Observable<0>;};
Creates an observable that will wait for a specified time period, or exact date, before emitting the number 0.
Used to emit a notification after a delay.
This observable is useful for creating delays in code, or racing against other values for ad-hoc timeouts.
The
delay
is specified by default in milliseconds, however providing a custom scheduler could create a different behavior.## Examples
Wait 3 seconds and start another observable
You might want to use
timer
to delay subscription to an observable by a set amount of time. Here we use a timer with concatMapTo or concatMap in order to wait a few seconds and start a subscription to a source.import { of, timer, concatMap } from 'rxjs';// This could be any observableconst source = of(1, 2, 3);timer(3000).pipe(concatMap(() => source)).subscribe(console.log);Take all values until the start of the next minute
Using a
Date
as the trigger for the first emission, you can do things like wait until midnight to fire an event, or in this case, wait until a new minute starts (chosen so the example wouldn't take too long to run) in order to stop watching a stream. Leveraging takeUntil.import { interval, takeUntil, timer } from 'rxjs';// Build a Date object that marks the// next minute.const currentDate = new Date();const startOfNextMinute = new Date(currentDate.getFullYear(),currentDate.getMonth(),currentDate.getDate(),currentDate.getHours(),currentDate.getMinutes() + 1);// This could be any observable streamconst source = interval(1000);const result = source.pipe(takeUntil(timer(startOfNextMinute)));result.subscribe(console.log);### Known Limitations
- The asyncScheduler uses
setTimeout
which has limitations for how far in the future it can be scheduled.- If a
scheduler
is provided that returns a timestamp other than an epoch fromnow()
, and aDate
object is passed to thedueTime
argument, the calculation for when the first emission should occur will be incorrect. In this case, it would be best to do your own calculations ahead of time, and pass anumber
in as thedueTime
.Parameter due
If a
number
, the amount of time in milliseconds to wait before emitting. If aDate
, the exact time at which to emit.Parameter scheduler
The scheduler to use to schedule the delay. Defaults to asyncScheduler.
Creates an observable that starts an interval after a specified delay, emitting incrementing numbers -- starting at
0
-- on each interval after words.The
delay
andintervalDuration
are specified by default in milliseconds, however providing a custom scheduler could create a different behavior.## Example
### Start an interval that starts right away
Since interval waits for the passed delay before starting, sometimes that's not ideal. You may want to start an interval immediately.
timer
works well for this. Here we have both side-by-side so you can see them in comparison.Note that this observable will never complete.
import { timer, interval } from 'rxjs';timer(0, 1000).subscribe(n => console.log('timer', n));interval(1000).subscribe(n => console.log('interval', n));### Known Limitations
- The asyncScheduler uses
setTimeout
which has limitations for how far in the future it can be scheduled.- If a
scheduler
is provided that returns a timestamp other than an epoch fromnow()
, and aDate
object is passed to thedueTime
argument, the calculation for when the first emission should occur will be incorrect. In this case, it would be best to do your own calculations ahead of time, and pass anumber
in as thestartDue
.Parameter startDue
If a
number
, is the time to wait before starting the interval. If aDate
, is the exact time at which to start the interval.Parameter intervalDuration
The delay between each value emitted in the interval. Passing a negative number here will result in immediate completion after the first value is emitted, as though no
intervalDuration
was passed at all.Parameter scheduler
The scheduler to use to schedule the delay. Defaults to asyncScheduler.
Deprecated
The signature allowing
undefined
to be passed forintervalDuration
will be removed in v8. Use thetimer(dueTime, scheduler?)
signature instead.
function timestamp
timestamp: <T>( timestampProvider?: TimestampProvider) => OperatorFunction<T, Timestamp<T>>;
Attaches a timestamp to each item emitted by an observable indicating when it was emitted
The
timestamp
operator maps the *source* observable stream to an object of type{value: T, timestamp: R}
. The properties are generically typed. Thevalue
property contains the value and type of the *source* observable. Thetimestamp
is generated by the schedulersnow
function. By default, it uses theasyncScheduler
which simply returnsDate.now()
(milliseconds since 1970/01/01 00:00:00:000) and therefore is of typenumber
.
## Example
In this example there is a timestamp attached to the document's click events
import { fromEvent, timestamp } from 'rxjs';const clickWithTimestamp = fromEvent(document, 'click').pipe(timestamp());// Emits data of type { value: PointerEvent, timestamp: number }clickWithTimestamp.subscribe(data => {console.log(data);});Parameter timestampProvider
An object with a
now()
method used to get the current timestamp. A function that returns an Observable that attaches a timestamp to each item emitted by the source Observable indicating when it was emitted.
function toArray
toArray: <T>() => OperatorFunction<T, T[]>;
Collects all source emissions and emits them as an array when the source completes.
Get all values inside an array when the source completes

toArray
will wait until the source Observable completes before emitting the array containing all emissions. When the source Observable errors no array will be emitted.## Example
import { interval, take, toArray } from 'rxjs';const source = interval(1000);const example = source.pipe(take(10),toArray());example.subscribe(value => console.log(value));// output: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]A function that returns an Observable that emits an array of items emitted by the source Observable when source completes.
function using
using: <T extends ObservableInput<any>>( resourceFactory: () => Unsubscribable | void, observableFactory: (resource: Unsubscribable | void) => T | void) => Observable<ObservedValueOf<T>>;
Creates an Observable that uses a resource which will be disposed at the same time as the Observable.
Use it when you catch yourself cleaning up after an Observable.
using
is a factory operator, which accepts two functions. First function returns a disposable resource. It can be an arbitrary object that implementsunsubscribe
method. Second function will be injected with that object and should return an Observable. That Observable can use resource object during its execution. Both functions passed tousing
will be called every time someone subscribes - neither an Observable nor resource object will be shared in any way between subscriptions.When Observable returned by
using
is subscribed, Observable returned from the second function will be subscribed as well. All its notifications (nexted values, completion and error events) will be emitted unchanged by the output Observable. If however someone unsubscribes from the Observable or source Observable completes or errors by itself, theunsubscribe
method on resource object will be called. This can be used to do any necessary clean up, which otherwise would have to be handled by hand. Note that complete or error notifications are not emitted when someone cancels subscription to an Observable viaunsubscribe
, sousing
can be used as a hook, allowing you to make sure that all resources which need to exist during an Observable execution will be disposed at appropriate time.Parameter resourceFactory
A function which creates any resource object that implements
unsubscribe
method.Parameter observableFactory
A function which creates an Observable, that can use injected resource object. An Observable that behaves the same as Observable returned by
observableFactory
, but which - when completed, errored or unsubscribed - will also callunsubscribe
on created resource object.See Also
function window
window: <T>( windowBoundaries: ObservableInput<any>) => OperatorFunction<T, Observable<T>>;
Branch out the source Observable values as a nested Observable whenever
windowBoundaries
emits.It's like buffer, but emits a nested Observable instead of an array.

Returns an Observable that emits windows of items it collects from the source Observable. The output Observable emits connected, non-overlapping windows. It emits the current window and opens a new one whenever the
windowBoundaries
emits an item.windowBoundaries
can be any type thatObservableInput
accepts. It internally gets converted to an Observable. Because each window is an Observable, the output is a higher-order Observable.## Example
In every window of 1 second each, emit at most 2 click events
import { fromEvent, interval, window, map, take, mergeAll } from 'rxjs';const clicks = fromEvent(document, 'click');const sec = interval(1000);const result = clicks.pipe(window(sec),map(win => win.pipe(take(2))), // take at most 2 emissions from each windowmergeAll() // flatten the Observable-of-Observables);result.subscribe(x => console.log(x));Parameter windowBoundaries
An
ObservableInput
that completes the previous window and starts a new window. A function that returns an Observable of windows, which are Observables emitting values of the source Observable.See Also
function windowCount
windowCount: <T>( windowSize: number, startWindowEvery?: number) => OperatorFunction<T, Observable<T>>;
Branch out the source Observable values as a nested Observable with each nested Observable emitting at most
windowSize
values.It's like bufferCount, but emits a nested Observable instead of an array.

Returns an Observable that emits windows of items it collects from the source Observable. The output Observable emits windows every
startWindowEvery
items, each containing no more thanwindowSize
items. When the source Observable completes or encounters an error, the output Observable emits the current window and propagates the notification from the source Observable. IfstartWindowEvery
is not provided, then new windows are started immediately at the start of the source and when each window completes with sizewindowSize
.## Examples
Ignore every 3rd click event, starting from the first one
import { fromEvent, windowCount, map, skip, mergeAll } from 'rxjs';const clicks = fromEvent(document, 'click');const result = clicks.pipe(windowCount(3),map(win => win.pipe(skip(1))), // skip first of every 3 clicksmergeAll() // flatten the Observable-of-Observables);result.subscribe(x => console.log(x));Ignore every 3rd click event, starting from the third one
import { fromEvent, windowCount, mergeAll } from 'rxjs';const clicks = fromEvent(document, 'click');const result = clicks.pipe(windowCount(2, 3),mergeAll() // flatten the Observable-of-Observables);result.subscribe(x => console.log(x));Parameter windowSize
The maximum number of values emitted by each window.
Parameter startWindowEvery
Interval at which to start a new window. For example if
startWindowEvery
is2
, then a new window will be started on every other value from the source. A new window is started at the beginning of the source by default. A function that returns an Observable of windows, which in turn are Observable of values.See Also
function windowTime
windowTime: { <T>(windowTimeSpan: number, scheduler?: SchedulerLike): OperatorFunction< T, Observable<T> >; <T>( windowTimeSpan: number, windowCreationInterval: number, scheduler?: SchedulerLike ): OperatorFunction<T, Observable<T>>; <T>( windowTimeSpan: number, windowCreationInterval: number | void, maxWindowSize: number, scheduler?: SchedulerLike ): OperatorFunction<T, Observable<T>>;};
function windowToggle
windowToggle: <T, O>( openings: ObservableInput<O>, closingSelector: (openValue: O) => ObservableInput<any>) => OperatorFunction<T, Observable<T>>;
Branch out the source Observable values as a nested Observable starting from an emission from
openings
and ending when the output ofclosingSelector
emits.It's like bufferToggle, but emits a nested Observable instead of an array.

Returns an Observable that emits windows of items it collects from the source Observable. The output Observable emits windows that contain those items emitted by the source Observable between the time when the
openings
Observable emits an item and when the Observable returned byclosingSelector
emits an item.## Example
Every other second, emit the click events from the next 500ms
import { fromEvent, interval, windowToggle, EMPTY, mergeAll } from 'rxjs';const clicks = fromEvent(document, 'click');const openings = interval(1000);const result = clicks.pipe(windowToggle(openings, i => i % 2 ? interval(500) : EMPTY),mergeAll());result.subscribe(x => console.log(x));Parameter openings
An observable of notifications to start new windows.
Parameter closingSelector
A function that takes the value emitted by the
openings
observable and returns an Observable, which, when it emits a next notification, signals that the associated window should complete. A function that returns an Observable of windows, which in turn are Observables.See Also
function windowWhen
windowWhen: <T>( closingSelector: () => ObservableInput<any>) => OperatorFunction<T, Observable<T>>;
Branch out the source Observable values as a nested Observable using a factory function of closing Observables to determine when to start a new window.
It's like bufferWhen, but emits a nested Observable instead of an array.

Returns an Observable that emits windows of items it collects from the source Observable. The output Observable emits connected, non-overlapping windows. It emits the current window and opens a new one whenever the Observable produced by the specified
closingSelector
function emits an item. The first window is opened immediately when subscribing to the output Observable.## Example
Emit only the first two clicks events in every window of [1-5] random seconds
import { fromEvent, windowWhen, interval, map, take, mergeAll } from 'rxjs';const clicks = fromEvent(document, 'click');const result = clicks.pipe(windowWhen(() => interval(1000 + Math.random() * 4000)),map(win => win.pipe(take(2))), // take at most 2 emissions from each windowmergeAll() // flatten the Observable-of-Observables);result.subscribe(x => console.log(x));Parameter closingSelector
A function that takes no arguments and returns an ObservableInput (that gets converted to Observable) that signals (on either
next
orcomplete
) when to close the previous window and start a new one. A function that returns an Observable of windows, which in turn are Observables.See Also
function withLatestFrom
withLatestFrom: { <T, O extends unknown[]>(...inputs_0: ObservableInputTuple<O>): OperatorFunction< T, [T, ...O] >; <T, O extends unknown[], R>( ...inputs: [...ObservableInputTuple<O>, (value_0: T, ...value_1: O) => R] ): OperatorFunction<T, R>;};
function zip
zip: { <A extends readonly unknown[]>( sources: [...ObservableInputTuple<A>] ): Observable<A>; <A extends readonly unknown[], R>( sources: [...ObservableInputTuple<A>], resultSelector: (...values: A) => R ): Observable<R>; <A extends readonly unknown[]>( ...sources_0: ObservableInputTuple<A> ): Observable<A>; <A extends readonly unknown[], R>( ...sourcesAndResultSelector: [ ...ObservableInputTuple<A>, (...values: A) => R ] ): Observable<R>;};
function zipAll
zipAll: { <T>(): OperatorFunction<ObservableInput<T>, T[]>; <T>(): OperatorFunction<any, T[]>; <T, R>(project: (...values: T[]) => R): OperatorFunction<ObservableInput<T>, R>; <R>(project: (...values: any[]) => R): OperatorFunction<any, R>;};
function zipWith
zipWith: <T, A extends readonly unknown[]>( ...otherInputs_0: ObservableInputTuple<A>) => OperatorFunction<T, Cons<T, A>>;
Subscribes to the source, and the observable inputs provided as arguments, and combines their values, by index, into arrays.
What is meant by "combine by index": The first value from each will be made into a single array, then emitted, then the second value from each will be combined into a single array and emitted, then the third value from each will be combined into a single array and emitted, and so on.
This will continue until it is no longer able to combine values of the same index into an array.
After the last value from any one completed source is emitted in an array, the resulting observable will complete, as there is no way to continue "zipping" values together by index.
Use-cases for this operator are limited. There are memory concerns if one of the streams is emitting values at a much faster rate than the others. Usage should likely be limited to streams that emit at a similar pace, or finite streams of known length.
In many cases, authors want
combineLatestWith
and notzipWith
.Parameter otherInputs
other observable inputs to collate values from. A function that returns an Observable that emits items by index combined from the source Observable and provided Observables, in form of an array.
Classes
class AsyncSubject
class AsyncSubject<T> extends Subject<T> {}
A variant of Subject that only emits a value when it completes. It will emit its latest value to all its observers on completion.
class BehaviorSubject
class BehaviorSubject<T> extends Subject<T> {}
A variant of Subject that requires an initial value and emits its current value whenever it is subscribed to.
constructor
constructor(_value: {});
property value
readonly value: {};
method getValue
getValue: () => T;
method next
next: (value: T) => void;
class ConnectableObservable
class ConnectableObservable<T> extends Observable<T> {}
ConnectableObservable
Deprecated
Will be removed in v8. Use connectable to create a connectable observable. If you are using the
refCount
method ofConnectableObservable
, use the share operator instead. Details: https://rxjs.dev/deprecations/multicasting
constructor
constructor(source: Observable<T>, subjectFactory: () => Subject<T>);
Parameter source
The source observable
Parameter subjectFactory
The factory that creates the subject used internally.
Deprecated
Will be removed in v8. Use connectable to create a connectable observable.
new ConnectableObservable(source, factory)
is equivalent toconnectable(source, { connector: factory })
. When therefCount()
method is needed, the share operator should be used instead:new ConnectableObservable(source, factory).refCount()
is equivalent tosource.pipe(share({ connector: factory }))
. Details: https://rxjs.dev/deprecations/multicasting
property source
source: Observable<T>;
property subjectFactory
protected subjectFactory: () => Subject<T>;
method connect
connect: () => Subscription;
Deprecated
ConnectableObservable will be removed in v8. Use connectable instead. Details: https://rxjs.dev/deprecations/multicasting
method getSubject
protected getSubject: () => Subject<T>;
method refCount
refCount: () => Observable<T>;
Deprecated
ConnectableObservable will be removed in v8. Use the share operator instead. Details: https://rxjs.dev/deprecations/multicasting
class Notification
class Notification<T> {}
Represents a push-based event or value that an Observable can emit. This class is particularly useful for operators that manage notifications, like materialize, dematerialize, observeOn, and others. Besides wrapping the actual delivered value, it also annotates it with metadata of, for instance, what type of push message it is (
next
,error
, orcomplete
).See Also
Deprecated
It is NOT recommended to create instances of
Notification
directly. Rather, try to create POJOs matching the signature outlined in ObservableNotification. For example:{ kind: 'N', value: 1 }
,{ kind: 'E', error: new Error('bad') }
, or{ kind: 'C' }
. Will be removed in v8.
constructor
constructor(kind: string, value?: {});
Creates a "Next" notification object.
Parameter kind
Always
'N'
Parameter value
The value to notify with if observed.
Deprecated
Internal implementation detail. Use instead.
constructor
constructor(kind: string, value: undefined, error: any);
Creates an "Error" notification object.
Parameter kind
Always
'E'
Parameter value
Always
undefined
Parameter error
The error to notify with if observed.
Deprecated
Internal implementation detail. Use instead.
constructor
constructor(kind: string);
Creates a "completion" notification object.
Parameter kind
Always
'C'
Deprecated
Internal implementation detail. Use instead.
property error
readonly error?: any;
property hasValue
readonly hasValue: boolean;
A value signifying that the notification will "next" if observed. In truth, This is really synonymous with just checking
kind === "N"
.Deprecated
Will be removed in v8. Instead, just check to see if the value of
kind
is"N"
.
property kind
readonly kind: 'N' | 'E' | 'C';
property value
readonly value?: {};
method accept
accept: { ( next: (value: T) => void, error: (err: any) => void, complete: () => void ): void; (next: (value: T) => void, error: (err: any) => void): void; (next: (value: T) => void): void; (observer: PartialObserver<T>): void;};
Executes a notification on the appropriate handler from a list provided. If a handler is missing for the kind of notification, nothing is called and no error is thrown, it will be a noop.
Parameter next
A next handler
Parameter error
An error handler
Parameter complete
A complete handler
Deprecated
Replaced with . Will be removed in v8.
Executes a notification on the appropriate handler from a list provided. If a handler is missing for the kind of notification, nothing is called and no error is thrown, it will be a noop.
Parameter next
A next handler
Parameter error
An error handler
Deprecated
Replaced with . Will be removed in v8.
Executes the next handler if the Notification is of
kind
"N"
. Otherwise this will not error, and it will be a noop.Parameter next
The next handler
Deprecated
Replaced with . Will be removed in v8.
Executes the appropriate handler on a passed
observer
given thekind
of notification. If the handler is missing it will do nothing. Even if the notification is an error, if there is no error handler on the observer, an error will not be thrown, it will noop.Parameter observer
The observer to notify.
Deprecated
Replaced with . Will be removed in v8.
method createComplete
static createComplete: () => Notification<never> & CompleteNotification;
A shortcut to create a Notification instance of the type
complete
. The valueless "complete" Notification.Deprecated
It is NOT recommended to create instances of
Notification
directly. Rather, try to create POJOs matching the signature outlined in ObservableNotification. For example:{ kind: 'N', value: 1 }
,{ kind: 'E', error: new Error('bad') }
, or{ kind: 'C' }
. Will be removed in v8.
method createError
static createError: (err?: any) => Notification<never> & ErrorNotification;
A shortcut to create a Notification instance of the type
error
from a given error.Parameter err
The
error
error. The "error" Notification representing the argument.Deprecated
It is NOT recommended to create instances of
Notification
directly. Rather, try to create POJOs matching the signature outlined in ObservableNotification. For example:{ kind: 'N', value: 1 }
,{ kind: 'E', error: new Error('bad') }
, or{ kind: 'C' }
. Will be removed in v8.
method createNext
static createNext: <T>(value: T) => Notification<T> & NextNotification<T>;
A shortcut to create a Notification instance of the type
next
from a given value.Parameter value
The
next
value. The "next" Notification representing the argument.Deprecated
It is NOT recommended to create instances of
Notification
directly. Rather, try to create POJOs matching the signature outlined in ObservableNotification. For example:{ kind: 'N', value: 1 }
,{ kind: 'E', error: new Error('bad') }
, or{ kind: 'C' }
. Will be removed in v8.
method do
do: { ( next: (value: T) => void, error: (err: any) => void, complete: () => void ): void; (next: (value: T) => void, error: (err: any) => void): void; (next: (value: T) => void): void;};
Executes a notification on the appropriate handler from a list provided. If a handler is missing for the kind of notification, nothing is called and no error is thrown, it will be a noop.
Parameter next
A next handler
Parameter error
An error handler
Parameter complete
A complete handler
Deprecated
Replaced with . Will be removed in v8.
Executes a notification on the appropriate handler from a list provided. If a handler is missing for the kind of notification, nothing is called and no error is thrown, it will be a noop.
Parameter next
A next handler
Parameter error
An error handler
Deprecated
Replaced with . Will be removed in v8.
Executes the next handler if the Notification is of
kind
"N"
. Otherwise this will not error, and it will be a noop.Parameter next
The next handler
Deprecated
Replaced with . Will be removed in v8.
method observe
observe: (observer: PartialObserver<T>) => void;
Executes the appropriate handler on a passed
observer
given thekind
of notification. If the handler is missing it will do nothing. Even if the notification is an error, if there is no error handler on the observer, an error will not be thrown, it will noop.Parameter observer
The observer to notify.
method toObservable
toObservable: () => Observable<T>;
Returns a simple Observable that just delivers the notification represented by this Notification instance.
Deprecated
Will be removed in v8. To convert a
Notification
to an Observable, use of and dematerialize:of(notification).pipe(dematerialize())
.
class Observable
class Observable<T> implements Subscribable<T> {}
A representation of any set of values over any amount of time. This is the most basic building block of RxJS.
constructor
constructor( subscribe?: (this: Observable<T>, subscriber: Subscriber<T>) => TeardownLogic);
Parameter subscribe
The function that is called when the Observable is initially subscribed to. This function is given a Subscriber, to which new values can be
next
ed, or anerror
method can be called to raise an error, orcomplete
can be called to notify of a successful completion.
property create
static create: (...args: any[]) => any;
Creates a new Observable by calling the Observable constructor
Parameter subscribe
the subscriber function to be passed to the Observable constructor A new observable.
Deprecated
Use
new Observable()
instead. Will be removed in v8.
property operator
operator: Operator<any, T>;
Deprecated
Internal implementation detail, do not use directly. Will be made internal in v8.
property source
source: Observable<any>;
Deprecated
Internal implementation detail, do not use directly. Will be made internal in v8.
method forEach
forEach: { (next: (value: T) => void): Promise<void>; ( next: (value: T) => void, promiseCtor: PromiseConstructorLike ): Promise<void>;};
Used as a NON-CANCELLABLE means of subscribing to an observable, for use with APIs that expect promises, like
async/await
. You cannot unsubscribe from this.**WARNING**: Only use this with observables you *know* will complete. If the source observable does not complete, you will end up with a promise that is hung up, and potentially all of the state of an async function hanging out in memory. To avoid this situation, look into adding something like timeout, take, takeWhile, or takeUntil amongst others.
#### Example
import { interval, take } from 'rxjs';const source$ = interval(1000).pipe(take(4));async function getTotal() {let total = 0;await source$.forEach(value => {total += value;console.log('observable -> ' + value);});return total;}getTotal().then(total => console.log('Total: ' + total));// Expected:// 'observable -> 0'// 'observable -> 1'// 'observable -> 2'// 'observable -> 3'// 'Total: 6'Parameter next
A handler for each value emitted by the observable. A promise that either resolves on observable completion or rejects with the handled error.
Parameter next
a handler for each value emitted by the observable
Parameter promiseCtor
a constructor function used to instantiate the Promise a promise that either resolves on observable completion or rejects with the handled error
Deprecated
Passing a Promise constructor will no longer be available in upcoming versions of RxJS. This is because it adds weight to the library, for very little benefit. If you need this functionality, it is recommended that you either polyfill Promise, or you create an adapter to convert the returned native promise to whatever promise implementation you wanted. Will be removed in v8.
method lift
lift: <R>(operator?: Operator<T, R>) => Observable<R>;
Creates a new Observable, with this Observable instance as the source, and the passed operator defined as the new observable's operator.
Parameter operator
the operator defining the operation to take on the observable A new observable with the Operator applied.
Deprecated
Internal implementation detail, do not use directly. Will be made internal in v8. If you have implemented an operator using
lift
, it is recommended that you create an operator by simply returningnew Observable()
directly. See "Creating new operators from scratch" section here: https://rxjs.dev/guide/operators
method pipe
pipe: { (): Observable<T>; <A>(op1: OperatorFunction<T, A>): Observable<A>; <A, B>( op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B> ): Observable<B>; <A, B, C>( op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C> ): Observable<C>; <A, B, C, D>( op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>, op4: OperatorFunction<C, D> ): Observable<D>; <A, B, C, D, E>( op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>, op4: OperatorFunction<C, D>, op5: OperatorFunction<D, E> ): Observable<E>; <A, B, C, D, E, F>( op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>, op4: OperatorFunction<C, D>, op5: OperatorFunction<D, E>, op6: OperatorFunction<E, F> ): Observable<F>; <A, B, C, D, E, F, G>( op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>, op4: OperatorFunction<C, D>, op5: OperatorFunction<D, E>, op6: OperatorFunction<E, F>, op7: OperatorFunction<F, G> ): Observable<G>; <A, B, C, D, E, F, G, H>( op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>, op4: OperatorFunction<C, D>, op5: OperatorFunction<D, E>, op6: OperatorFunction<E, F>, op7: OperatorFunction<F, G>, op8: OperatorFunction<G, H> ): Observable<H>; <A, B, C, D, E, F, G, H, I>( op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>, op4: OperatorFunction<C, D>, op5: OperatorFunction<D, E>, op6: OperatorFunction<E, F>, op7: OperatorFunction<F, G>, op8: OperatorFunction<G, H>, op9: OperatorFunction<H, I> ): Observable<I>; <A, B, C, D, E, F, G, H, I>( op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>, op4: OperatorFunction<C, D>, op5: OperatorFunction<D, E>, op6: OperatorFunction<E, F>, op7: OperatorFunction<F, G>, op8: OperatorFunction<G, H>, op9: OperatorFunction<H, I>, ...operations: OperatorFunction<any, any>[] ): Observable<unknown>;};
method subscribe
subscribe: { (observerOrNext?: Partial<Observer<T>> | ((value: T) => void)): Subscription; ( next?: (value: T) => void, error?: (error: any) => void, complete?: () => void ): Subscription;};
Deprecated
Instead of passing separate callback arguments, use an observer argument. Signatures taking separate callback arguments will be removed in v8. Details: https://rxjs.dev/deprecations/subscribe-arguments
method toPromise
toPromise: { (): Promise<T | undefined>; (PromiseCtor: PromiseConstructor): Promise<T>; (PromiseCtor: PromiseConstructorLike): Promise<T>;};
Deprecated
Replaced with firstValueFrom and lastValueFrom. Will be removed in v8. Details: https://rxjs.dev/deprecations/to-promise
class ReplaySubject
class ReplaySubject<T> extends Subject<T> {}
A variant of Subject that "replays" old values to new subscribers by emitting them when they first subscribe.
ReplaySubject
has an internal buffer that will store a specified number of values that it has observed. LikeSubject
,ReplaySubject
"observes" values by having them passed to itsnext
method. When it observes a value, it will store that value for a time determined by the configuration of theReplaySubject
, as passed to its constructor.When a new subscriber subscribes to the
ReplaySubject
instance, it will synchronously emit all values in its buffer in a First-In-First-Out (FIFO) manner. TheReplaySubject
will also complete, if it has observed completion; and it will error if it has observed an error.There are two main configuration items to be concerned with:
1.
bufferSize
- This will determine how many items are stored in the buffer, defaults to infinite. 2.windowTime
- The amount of time to hold a value in the buffer before removing it from the buffer.Both configurations may exist simultaneously. So if you would like to buffer a maximum of 3 values, as long as the values are less than 2 seconds old, you could do so with a
new ReplaySubject(3, 2000)
.### Differences with BehaviorSubject
BehaviorSubject
is similar tonew ReplaySubject(1)
, with a couple of exceptions:1.
BehaviorSubject
comes "primed" with a single value upon construction. 2.ReplaySubject
will replay values, even after observing an error, whereBehaviorSubject
will not.See Also
constructor
constructor( _bufferSize?: number, _windowTime?: number, _timestampProvider?: TimestampProvider);
Parameter _bufferSize
The size of the buffer to replay on subscription
Parameter _windowTime
The amount of time the buffered items will stay buffered
Parameter _timestampProvider
An object with a
now()
method that provides the current timestamp. This is used to calculate the amount of time something has been buffered.
method next
next: (value: T) => void;
class Scheduler
class Scheduler implements SchedulerLike {}
An execution context and a data structure to order tasks and schedule their execution. Provides a notion of (potentially virtual) time, through the
now()
getter method.Each unit of work in a Scheduler is called an
Action
.class Scheduler {now(): number;schedule(work, delay?, state?): Subscription;}Deprecated
Scheduler is an internal implementation detail of RxJS, and should not be used directly. Rather, create your own class and implement SchedulerLike. Will be made internal in v8.
constructor
constructor(schedulerActionCtor: typeof Action, now?: () => number);
property now
static now: () => number;
property now
now: () => number;
A getter method that returns a number representing the current time (at the time this function was called) according to the scheduler's own internal clock. A number that represents the current time. May or may not have a relation to wall-clock time. May or may not refer to a time unit (e.g. milliseconds).
method schedule
schedule: <T>( work: (this: SchedulerAction<T>, state?: T) => void, delay?: number, state?: T) => Subscription;
Schedules a function,
work
, for execution. May happen at some point in the future, according to thedelay
parameter, if specified. May be passed some context object,state
, which will be passed to thework
function.The given arguments will be processed an stored as an Action object in a queue of actions.
Parameter work
A function representing a task, or some unit of work to be executed by the Scheduler.
Parameter delay
Time to wait before executing the work, where the time unit is implicit and defined by the Scheduler itself.
Parameter state
Some contextual data that the
work
function uses when called by the Scheduler. A subscription in order to be able to unsubscribe the scheduled work.
class Subject
class Subject<T> extends Observable<T> implements SubscriptionLike {}
A Subject is a special type of Observable that allows values to be multicasted to many Observers. Subjects are like EventEmitters.
Every Subject is an Observable and an Observer. You can subscribe to a Subject, and you can call next to feed values as well as error and complete.
constructor
constructor();
property closed
closed: boolean;
property create
static create: (...args: any[]) => any;
Creates a "subject" by basically gluing an observer to an observable.
Deprecated
Recommended you do not use. Will be removed at some point in the future. Plans for replacement still under discussion.
property hasError
hasError: boolean;
Deprecated
Internal implementation detail, do not use directly. Will be made internal in v8.
property isStopped
isStopped: boolean;
Deprecated
Internal implementation detail, do not use directly. Will be made internal in v8.
property observed
readonly observed: boolean;
property observers
observers: Observer<T>[];
Deprecated
Internal implementation detail, do not use directly. Will be made internal in v8.
property thrownError
thrownError: any;
Deprecated
Internal implementation detail, do not use directly. Will be made internal in v8.
method asObservable
asObservable: () => Observable<T>;
Creates a new Observable with this Subject as the source. You can do this to create custom Observer-side logic of the Subject and conceal it from code that uses the Observable. Observable that this Subject casts to.
method complete
complete: () => void;
method error
error: (err: any) => void;
method lift
lift: <R>(operator: Operator<T, R>) => Observable<R>;
Deprecated
Internal implementation detail, do not use directly. Will be made internal in v8.
method next
next: (value: T) => void;
method unsubscribe
unsubscribe: () => void;
class Subscriber
class Subscriber<T> extends Subscription implements Observer<T> {}
Implements the Observer interface and extends the Subscription class. While the Observer is the public API for consuming the values of an Observable, all Observers get converted to a Subscriber, in order to provide Subscription-like capabilities such as
unsubscribe
. Subscriber is a common type in RxJS, and crucial for implementing operators, but it is rarely used as a public API.
constructor
constructor(destination?: Subscriber<any> | Observer<any>);
Deprecated
Internal implementation detail, do not use directly. Will be made internal in v8. There is no reason to directly create an instance of Subscriber. This type is exported for typings reasons.
property destination
protected destination: Subscriber<any> | Observer<any>;
Deprecated
Internal implementation detail, do not use directly. Will be made internal in v8.
property isStopped
protected isStopped: boolean;
Deprecated
Internal implementation detail, do not use directly. Will be made internal in v8.
method complete
complete: () => void;
The Observer callback to receive a valueless notification of type
complete
from the Observable. Notifies the Observer that the Observable has finished sending push-based notifications.
method create
static create: <T>( next?: (x?: T) => void, error?: (e?: any) => void, complete?: () => void) => Subscriber<T>;
A static factory for a Subscriber, given a (potentially partial) definition of an Observer.
Parameter next
The
next
callback of an Observer.Parameter error
The
error
callback of an Observer.Parameter complete
The
complete
callback of an Observer. A Subscriber wrapping the (partially defined) Observer represented by the given arguments.Deprecated
Do not use. Will be removed in v8. There is no replacement for this method, and there is no reason to be creating instances of
Subscriber
directly. If you have a specific use case, please file an issue.
method error
error: (err?: any) => void;
The Observer callback to receive notifications of type
error
from the Observable, with an attachedError
. Notifies the Observer that the Observable has experienced an error condition.Parameter err
The
error
exception.
method next
next: (value: T) => void;
The Observer callback to receive notifications of type
next
from the Observable, with a value. The Observable may call this method 0 or more times.Parameter value
The
next
value.
method unsubscribe
unsubscribe: () => void;
class Subscription
class Subscription implements SubscriptionLike {}
Represents a disposable resource, such as the execution of an Observable. A Subscription has one important method,
unsubscribe
, that takes no argument and just disposes the resource held by the subscription.Additionally, subscriptions may be grouped together through the
add()
method, which will attach a child Subscription to the current Subscription. When a Subscription is unsubscribed, all its children (and its grandchildren) will be unsubscribed as well.
constructor
constructor(initialTeardown?: () => void);
Parameter initialTeardown
A function executed first as part of the finalization process that is kicked off when is called.
property closed
closed: boolean;
A flag to indicate whether this Subscription has already been unsubscribed.
property EMPTY
static EMPTY: Subscription;
method add
add: (teardown: TeardownLogic) => void;
Adds a finalizer to this subscription, so that finalization will be unsubscribed/called when this subscription is unsubscribed. If this subscription is already , because it has already been unsubscribed, then whatever finalizer is passed to it will automatically be executed (unless the finalizer itself is also a closed subscription).
Closed Subscriptions cannot be added as finalizers to any subscription. Adding a closed subscription to a any subscription will result in no operation. (A noop).
Adding a subscription to itself, or adding
null
orundefined
will not perform any operation at all. (A noop).Subscription
instances that are added to this instance will automatically remove themselves if they are unsubscribed. Functions and Unsubscribable objects that you wish to remove will need to be removed manually withParameter teardown
The finalization logic to add to this subscription.
method remove
remove: (teardown: Exclude<TeardownLogic, void>) => void;
Removes a finalizer from this subscription that was previously added with the method.
Note that
Subscription
instances, when unsubscribed, will automatically remove themselves from every otherSubscription
they have been added to. This means that using theremove
method is not a common thing and should be used thoughtfully.If you add the same finalizer instance of a function or an unsubscribable object to a
Subscription
instance more than once, you will need to callremove
the same number of times to remove all instances.All finalizer instances are removed to free up memory upon unsubscription.
Parameter teardown
The finalizer to remove from this subscription
method unsubscribe
unsubscribe: () => void;
Disposes the resources held by the subscription. May, for instance, cancel an ongoing Observable execution or cancel any other type of work that started when the Subscription was created.
class VirtualAction
class VirtualAction<T> extends AsyncAction<T> {}
constructor
constructor( scheduler: VirtualTimeScheduler, work: (this: SchedulerAction<T>, state?: T) => void, index?: number);
property active
protected active: boolean;
property index
protected index: number;
property scheduler
protected scheduler: VirtualTimeScheduler;
property work
protected work: (this: SchedulerAction<T>, state?: T) => void;
method recycleAsyncId
protected recycleAsyncId: ( scheduler: VirtualTimeScheduler, id?: any, delay?: number) => TimerHandle | undefined;
method requestAsyncId
protected requestAsyncId: ( scheduler: VirtualTimeScheduler, id?: any, delay?: number) => TimerHandle;
method schedule
schedule: (state?: T, delay?: number) => Subscription;
class VirtualTimeScheduler
class VirtualTimeScheduler extends AsyncScheduler {}
constructor
constructor(schedulerActionCtor?: typeof AsyncAction, maxFrames?: number);
This creates an instance of a
VirtualTimeScheduler
. Experts only. The signature of this constructor is likely to change in the long run.Parameter schedulerActionCtor
The type of Action to initialize when initializing actions during scheduling.
Parameter maxFrames
The maximum number of frames to process before stopping. Used to prevent endless flush cycles.
property frame
frame: number;
The current frame for the state of the virtual scheduler instance. The difference between two "frames" is synonymous with the passage of "virtual time units". So if you record
scheduler.frame
to be1
, then later, observescheduler.frame
to be at11
, that means10
virtual time units have passed.
property frameTimeFactor
static frameTimeFactor: number;
Deprecated
Not used in VirtualTimeScheduler directly. Will be removed in v8.
property index
index: number;
Used internally to examine the current virtual action index being processed.
Deprecated
Internal implementation detail, do not use directly. Will be made internal in v8.
property maxFrames
maxFrames: number;
method flush
flush: () => void;
Prompt the Scheduler to execute all of its queued actions, therefore clearing its queue.
Interfaces
interface ArgumentOutOfRangeError
interface ArgumentOutOfRangeError extends Error {}
interface BasicGroupByOptions
interface BasicGroupByOptions<K, T> {}
interface CompleteNotification
interface CompleteNotification {}
A notification representing a "completion" from an observable. Can be used with dematerialize.
property kind
kind: 'C';
interface CompletionObserver
interface CompletionObserver<T> {}
interface Connectable
interface Connectable<T> extends Observable<T> {}
An observable with a
connect
method that is used to create a subscription to an underlying source, connecting it with all consumers via a multicast.
method connect
connect: () => Subscription;
(Idempotent) Calling this method will connect the underlying source observable to all subscribed consumers through an underlying Subject.
Returns
A subscription, that when unsubscribed, will "disconnect" the source from the connector subject, severing notifications to all consumers.
interface ConnectConfig
interface ConnectConfig<T> {}
An object used to configure connect operator.
interface EmptyError
interface EmptyError extends Error {}
interface ErrorNotification
interface ErrorNotification {}
A notification representing an "error" from an observable. Can be used with dematerialize.
interface ErrorObserver
interface ErrorObserver<T> {}
interface GlobalConfig
interface GlobalConfig {}
The global configuration object for RxJS, used to configure things like how to react on unhandled errors. Accessible via config object.
property onStoppedNotification
onStoppedNotification: | (( notification: ObservableNotification<any>, subscriber: Subscriber<any> ) => void) | null;
A registration point for notifications that cannot be sent to subscribers because they have completed, errored or have been explicitly unsubscribed. By default, next, complete and error notifications sent to stopped subscribers are noops. However, sometimes callers might want a different behavior. For example, with sources that attempt to report errors to stopped subscribers, a caller can configure RxJS to throw an unhandled error instead. This will _always_ be called asynchronously on another job in the runtime. This is because we do not want errors thrown in this user-configured handler to interfere with the behavior of the library.
property onUnhandledError
onUnhandledError: ((err: any) => void) | null;
A registration point for unhandled errors from RxJS. These are errors that cannot were not handled by consuming code in the usual subscription path. For example, if you have this configured, and you subscribe to an observable without providing an error handler, errors from that subscription will end up here. This will _always_ be called asynchronously on another job in the runtime. This is because we do not want errors thrown in this user-configured handler to interfere with the behavior of the library.
property Promise
Promise?: PromiseConstructorLike;
The promise constructor used by default for and methods.
Deprecated
As of version 8, RxJS will no longer support this sort of injection of a Promise constructor. If you need a Promise implementation other than native promises, please polyfill/patch Promise as you see appropriate. Will be removed in v8.
property useDeprecatedNextContext
useDeprecatedNextContext: boolean;
If true, enables an as-of-yet undocumented feature from v5: The ability to access
unsubscribe()
viathis
context innext
functions created in observers passed tosubscribe
.This is being removed because the performance was severely problematic, and it could also cause issues when types other than POJOs are passed to subscribe as subscribers, as they will likely have their
this
context overwritten.Deprecated
As of version 8, RxJS will no longer support altering the context of next functions provided as part of an observer to Subscribe. Instead, you will have access to a subscription or a signal or token that will allow you to do things like unsubscribe and test closed status. Will be removed in v8.
property useDeprecatedSynchronousErrorHandling
useDeprecatedSynchronousErrorHandling: boolean;
If true, turns on synchronous error rethrowing, which is a deprecated behavior in v6 and higher. This behavior enables bad patterns like wrapping a subscribe call in a try/catch block. It also enables producer interference, a nasty bug where a multicast can be broken for all observers by a downstream consumer with an unhandled error. DO NOT USE THIS FLAG UNLESS IT'S NEEDED TO BUY TIME FOR MIGRATION REASONS.
Deprecated
As of version 8, RxJS will no longer support synchronous throwing of unhandled errors. All errors will be thrown on a separate call stack to prevent bad behaviors described above. Will be removed in v8.
interface GroupByOptionsWithElement
interface GroupByOptionsWithElement<K, E, T> {}
interface GroupedObservable
interface GroupedObservable<K, T> extends Observable<T> {}
An observable of values that is the emitted by the result of a groupBy operator, contains a
key
property for the grouping.
property key
readonly key: K;
The key value for the grouped notifications.
interface InteropObservable
interface InteropObservable<T> {}
An object that implements the
Symbol.observable
interface.
property [Symbol.observable]
[Symbol.observable]: () => Subscribable<T>;
interface MonoTypeOperatorFunction
interface MonoTypeOperatorFunction<T> extends OperatorFunction<T, T> {}
A function type interface that describes a function that accepts and returns a parameter of the same type.
Used to describe OperatorFunction with the only one type:
OperatorFunction<T, T>
.
interface NextNotification
interface NextNotification<T> {}
A notification representing a "next" from an observable. Can be used with dematerialize.
interface NextObserver
interface NextObserver<T> {}
interface NotFoundError
interface NotFoundError extends Error {}
interface ObjectUnsubscribedError
interface ObjectUnsubscribedError extends Error {}
interface Observer
interface Observer<T> {}
An object interface that defines a set of callback functions a user can use to get notified of any set of Observable events.
For more info, please refer to .
property complete
complete: () => void;
A callback function that gets called by the producer if and when it has no more values to provide (by calling
next
callback function). This means that no error has happened. This callback can't be called more than one time, it can't be called if theerror
callback function have been called previously, nor it can't be called if the consumer has unsubscribed.For more info, please refer to .
property error
error: (err: any) => void;
A callback function that gets called by the producer if and when it encountered a problem of any kind. The errored value will be provided through the
err
parameter. This callback can't be called more than one time, it can't be called if thecomplete
callback function have been called previously, nor it can't be called if the consumer has unsubscribed.For more info, please refer to .
property next
next: (value: T) => void;
A callback function that gets called by the producer during the subscription when the producer "has" the
value
. It won't be called iferror
orcomplete
callback functions have been called, nor after the consumer has unsubscribed.For more info, please refer to .
interface Operator
interface Operator<T, R> {}
*
Deprecated
Internal implementation detail, do not use directly. Will be made internal in v8.
method call
call: (subscriber: Subscriber<R>, source: any) => TeardownLogic;
interface OperatorFunction
interface OperatorFunction<T, R> extends UnaryFunction<Observable<T>, Observable<R>> {}
interface ReadableStreamLike
interface ReadableStreamLike<T> {}
The base signature RxJS will look for to identify and use a [ReadableStream](https://streams.spec.whatwg.org/#rs-class) as an ObservableInput source.
method getReader
getReader: () => ReadableStreamDefaultReaderLike<T>;
interface RepeatConfig
interface RepeatConfig {}
property count
count?: number;
The number of times to repeat the source. Defaults to
Infinity
.
property delay
delay?: number | ((count: number) => ObservableInput<any>);
If a
number
, will delay the repeat of the source by that number of milliseconds. If a function, it will provide the number of times the source has been subscribed to, and the return value should be a valid observable input that will notify when the source should be repeated. If the notifier observable is empty, the result will complete.
interface RetryConfig
interface RetryConfig {}
The retry operator configuration object.
retry
either accepts anumber
or an object described by this interface.
property count
count?: number;
The maximum number of times to retry. If
count
is omitted,retry
will try to resubscribe on errors infinite number of times.
property delay
delay?: number | ((error: any, retryCount: number) => ObservableInput<any>);
The number of milliseconds to delay before retrying, OR a function to return a notifier for delaying. If a function is given, that function should return a notifier that, when it emits will retry the source. If the notifier completes _without_ emitting, the resulting observable will complete without error, if the notifier errors, the error will be pushed to the result.
property resetOnSuccess
resetOnSuccess?: boolean;
Whether or not to reset the retry counter when the retried subscription emits its first value.
interface SchedulerAction
interface SchedulerAction<T> extends Subscription {}
method schedule
schedule: (state?: T, delay?: number) => Subscription;
interface SchedulerLike
interface SchedulerLike extends TimestampProvider {}
method schedule
schedule: { <T>( work: (this: SchedulerAction<T>, state: T) => void, delay: number, state: T ): Subscription; <T>( work: (this: SchedulerAction<T>, state?: T) => void, delay: number, state?: T ): Subscription; <T>( work: (this: SchedulerAction<T>, state?: T) => void, delay?: number, state?: T ): Subscription;};
interface SequenceError
interface SequenceError extends Error {}
interface ShareConfig
interface ShareConfig<T> {}
property connector
connector?: () => SubjectLike<T>;
The factory used to create the subject that will connect the source observable to multicast consumers.
property resetOnComplete
resetOnComplete?: boolean | (() => ObservableInput<any>);
If
true
, the resulting observable will reset internal state on completion from source and return to a "cold" state. This allows the resulting observable to be "repeated" after it is done. Iffalse
, when the source completes, it will push the completion through the connecting subject, and the subject will remain the connecting subject, meaning the resulting observable will not go "cold" again, and subsequent repeats or resubscriptions will resubscribe to that same subject. It is also possible to pass a notifier factory returning anObservableInput
instead which grants more fine-grained control over how and when the reset should happen. This allows behaviors like conditional or delayed resets.
property resetOnError
resetOnError?: boolean | ((error: any) => ObservableInput<any>);
If
true
, the resulting observable will reset internal state on error from source and return to a "cold" state. This allows the resulting observable to be "retried" in the event of an error. Iffalse
, when an error comes from the source it will push the error into the connecting subject, and the subject will remain the connecting subject, meaning the resulting observable will not go "cold" again, and subsequent retries or resubscriptions will resubscribe to that same subject. In all cases, RxJS subjects will emit the same error again, however ReplaySubject will also push its buffered values before pushing the error. It is also possible to pass a notifier factory returning anObservableInput
instead which grants more fine-grained control over how and when the reset should happen. This allows behaviors like conditional or delayed resets.
property resetOnRefCountZero
resetOnRefCountZero?: boolean | (() => ObservableInput<any>);
If
true
, when the number of subscribers to the resulting observable reaches zero due to those subscribers unsubscribing, the internal state will be reset and the resulting observable will return to a "cold" state. This means that the next time the resulting observable is subscribed to, a new subject will be created and the source will be subscribed to again. Iffalse
, when the number of subscribers to the resulting observable reaches zero due to unsubscription, the subject will remain connected to the source, and new subscriptions to the result will be connected through that same subject. It is also possible to pass a notifier factory returning anObservableInput
instead which grants more fine-grained control over how and when the reset should happen. This allows behaviors like conditional or delayed resets.
interface ShareReplayConfig
interface ShareReplayConfig {}
property bufferSize
bufferSize?: number;
property refCount
refCount: boolean;
property scheduler
scheduler?: SchedulerLike;
property windowTime
windowTime?: number;
interface SubjectLike
interface SubjectLike<T> extends Observer<T>, Subscribable<T> {}
interface Subscribable
interface Subscribable<T> {}
OBSERVABLE INTERFACES
method subscribe
subscribe: (observer: Partial<Observer<T>>) => Unsubscribable;
interface SubscriptionLike
interface SubscriptionLike extends Unsubscribable {}
property closed
readonly closed: boolean;
method unsubscribe
unsubscribe: () => void;
interface TapObserver
interface TapObserver<T> extends Observer<T> {}
An extension to the Observer interface used only by the tap operator.
It provides a useful set of callbacks a user can register to do side-effects in cases other than what the usual Observer callbacks are (, and/or ).
## Example
import { fromEvent, switchMap, tap, interval, take } from 'rxjs';const source$ = fromEvent(document, 'click');const result$ = source$.pipe(switchMap((_, i) => i % 2 === 0? fromEvent(document, 'mousemove').pipe(tap({subscribe: () => console.log('Subscribed to the mouse move events after click #' + i),unsubscribe: () => console.log('Mouse move events #' + i + ' unsubscribed'),finalize: () => console.log('Mouse move events #' + i + ' finalized')})): interval(1_000).pipe(take(5),tap({subscribe: () => console.log('Subscribed to the 1-second interval events after click #' + i),unsubscribe: () => console.log('1-second interval events #' + i + ' unsubscribed'),finalize: () => console.log('1-second interval events #' + i + ' finalized')}))));const subscription = result$.subscribe({next: console.log});setTimeout(() => {console.log('Unsubscribe after 60 seconds');subscription.unsubscribe();}, 60_000);
property finalize
finalize: () => void;
The callback that
tap
operator invokes when any kind of happens - either when the source Observableerror
s orcomplete
s or when it gets explicitly unsubscribed by the user. There is no difference in using this callback or the finalize operator, but if you're already usingtap
operator, you can use this callback instead. You'd get the same result in either case.
property subscribe
subscribe: () => void;
The callback that
tap
operator invokes at the moment when the source Observable gets subscribed to.
property unsubscribe
unsubscribe: () => void;
The callback that
tap
operator invokes when an explicit happens. It won't get invoked onerror
orcomplete
events.
interface ThrottleConfig
interface ThrottleConfig {}
An object interface used by throttle or throttleTime that ensure configuration options of these operators.
See Also
property leading
leading?: boolean;
If
true
, the resulting Observable will emit the first value from the source Observable at the **start** of the "throttling" process (when starting an internal timer that prevents other emissions from the source to pass through). Iffalse
, it will not emit the first value from the source Observable at the start of the "throttling" process.If not provided, defaults to:
true
.
property trailing
trailing?: boolean;
If
true
, the resulting Observable will emit the last value from the source Observable at the **end** of the "throttling" process (when ending an internal timer that prevents other emissions from the source to pass through). Iffalse
, it will not emit the last value from the source Observable at the end of the "throttling" process.If not provided, defaults to:
false
.
interface TimeInterval
interface TimeInterval<T> {}
A value emitted and the amount of time since the last value was emitted.
Emitted by the
timeInterval
operator.See Also
interface TimeoutConfig
interface TimeoutConfig< T, O extends ObservableInput<unknown> = ObservableInput<T>, M = unknown> {}
property each
each?: number;
The time allowed between values from the source before timeout is triggered.
property first
first?: number | Date;
The relative time as a
number
in milliseconds, or a specific time as aDate
object, by which the first value must arrive from the source before timeout is triggered.
property meta
meta?: M;
Optional additional metadata you can provide to code that handles the timeout, will be provided through the TimeoutError. This can be used to help identify the source of a timeout or pass along other information related to the timeout.
property scheduler
scheduler?: SchedulerLike;
The scheduler to use with time-related operations within this operator. Defaults to asyncScheduler
property with
with?: (info: TimeoutInfo<T, M>) => O;
A factory used to create observable to switch to when timeout occurs. Provides a TimeoutInfo about the source observable's emissions and what delay or exact time triggered the timeout.
interface TimeoutError
interface TimeoutError<T = unknown, M = unknown> extends Error {}
An error emitted when a timeout occurs.
property info
info: TimeoutInfo<T, M> | null;
The information provided to the error by the timeout operation that created the error. Will be
null
if used directly in non-RxJS code with an empty constructor. (Note that using this constructor directly is not recommended, you should create your own errors)
interface TimeoutInfo
interface TimeoutInfo<T, M = unknown> {}
interface Timestamp
interface Timestamp<T> {}
A value and the time at which it was emitted.
Emitted by the
timestamp
operatorSee Also
interface TimestampProvider
interface TimestampProvider {}
This is a type that provides a method to allow RxJS to create a numeric timestamp
method now
now: () => number;
Returns a timestamp as a number.
This is used by types like
ReplaySubject
or operators liketimestamp
to calculate the amount of time passed between events.
interface UnaryFunction
interface UnaryFunction<T, R> {}
A function type interface that describes a function that accepts one parameter
T
and returns another parameterR
.Usually used to describe OperatorFunction - it always takes a single parameter (the source Observable) and returns another Observable.
call signature
(source: T): R;
interface Unsubscribable
interface Unsubscribable {}
method unsubscribe
unsubscribe: () => void;
interface UnsubscriptionError
interface UnsubscriptionError extends Error {}
property errors
readonly errors: any[];
Enums
enum NotificationKind
enum NotificationKind { NEXT = 'N', ERROR = 'E', COMPLETE = 'C',}
Deprecated
Use a string literal instead.
NotificationKind
will be replaced with a type alias in v8. It will not be replaced with a const enum as those are not compatible with isolated modules.
Type Aliases
type Cons
type Cons<X, Y extends readonly any[]> = ((arg: X, ...rest: Y) => any) extends ( ...args: infer U) => any ? U : never;
Constructs a new tuple with the specified type at the head. If you declare
Cons<A, [B, C]>
you will get back[A, B, C]
.
type FactoryOrValue
type FactoryOrValue<T> = T | (() => T);
type Falsy
type Falsy = null | undefined | false | 0 | -0 | 0n | '';
A simple type to represent a gamut of "falsy" values... with a notable exception:
NaN
is "falsy" however, it is not and cannot be typed via TypeScript. See comments here: https://github.com/microsoft/TypeScript/issues/28682#issuecomment-707142417
type Head
type Head<X extends readonly any[]> = ((...args: X) => any) extends ( arg: infer U, ...rest: any[]) => any ? U : never;
Extracts the head of a tuple. If you declare
Head<[A, B, C]>
you will get backA
.
type ObservableInput
type ObservableInput<T> = | Observable<T> | InteropObservable<T> | AsyncIterable<T> | PromiseLike<T> | ArrayLike<T> | Iterable<T> | ReadableStreamLike<T>;
Valid types that can be converted to observables.
type ObservableInputTuple
type ObservableInputTuple<T> = { [K in keyof T]: ObservableInput<T[K]>;};
Used to infer types from arguments to functions like forkJoin. So that you can have
forkJoin([Observable<A>, PromiseLike<B>]): Observable<[A, B]>
et al.
type ObservableLike
type ObservableLike<T> = InteropObservable<T>;
Deprecated
Renamed to InteropObservable. Will be removed in v8.
type ObservableNotification
type ObservableNotification<T> = | NextNotification<T> | ErrorNotification | CompleteNotification;
Valid observable notification types.
type ObservedValueOf
type ObservedValueOf<O> = O extends ObservableInput<infer T> ? T : never;
Extracts the type from an
ObservableInput<any>
. If you haveO extends ObservableInput<any>
and you pass inObservable<number>
, orPromise<number>
, etc, it will type asnumber
.
type ObservedValuesFromArray
type ObservedValuesFromArray<X> = ObservedValueUnionFromArray<X>;
Deprecated
Renamed to ObservedValueUnionFromArray. Will be removed in v8.
type ObservedValueTupleFromArray
type ObservedValueTupleFromArray<X> = { [K in keyof X]: ObservedValueOf<X[K]>;};
Extracts a tuple of element types from an
ObservableInput<any>[]
. If you haveO extends ObservableInput<any>[]
and you pass in[Observable<string>, Observable<number>]
you would get back a type of[string, number]
.
type ObservedValueUnionFromArray
type ObservedValueUnionFromArray<X> = X extends Array<ObservableInput<infer T>> ? T : never;
Extracts a union of element types from an
ObservableInput<any>[]
. If you haveO extends ObservableInput<any>[]
and you pass inObservable<string>[]
orPromise<string>[]
you would get back a type ofstring
. If you pass in[Observable<string>, Observable<number>]
you would get back a type ofstring | number
.
type PartialObserver
type PartialObserver<T> = NextObserver<T> | ErrorObserver<T> | CompletionObserver<T>;
type SubscribableOrPromise
type SubscribableOrPromise<T> = | Subscribable<T> | Subscribable<never> | PromiseLike<T> | InteropObservable<T>;
Deprecated
Do not use. Most likely you want to use
ObservableInput
. Will be removed in v8.
type Tail
type Tail<X extends readonly any[]> = ((...args: X) => any) extends ( arg: any, ...rest: infer U) => any ? U : never;
Extracts the tail of a tuple. If you declare
Tail<[A, B, C]>
you will get back[B, C]
.
type TeardownLogic
type TeardownLogic = Subscription | Unsubscribable | (() => void) | void;
type TruthyTypesOf
type TruthyTypesOf<T> = T extends Falsy ? never : T;
type ValueFromArray
type ValueFromArray<A extends readonly unknown[]> = A extends Array<infer T> ? T : never;
Extracts the generic value from an Array type. If you have
T extends Array<any>
, and pass astring[]
to it,ValueFromArray<T>
will return the actual type ofstring
.
type ValueFromNotification
type ValueFromNotification<T> = T extends { kind: 'N' | 'E' | 'C';} ? T extends NextNotification<any> ? T extends { value: infer V; } ? V : undefined : never : never;
Gets the value type from an ObservableNotification, if possible.
Package Files (167)
- dist/types/index.d.ts
- dist/types/internal/AsyncSubject.d.ts
- dist/types/internal/BehaviorSubject.d.ts
- dist/types/internal/Notification.d.ts
- dist/types/internal/Observable.d.ts
- dist/types/internal/Operator.d.ts
- dist/types/internal/ReplaySubject.d.ts
- dist/types/internal/Scheduler.d.ts
- dist/types/internal/Subject.d.ts
- dist/types/internal/Subscriber.d.ts
- dist/types/internal/Subscription.d.ts
- dist/types/internal/config.d.ts
- dist/types/internal/firstValueFrom.d.ts
- dist/types/internal/lastValueFrom.d.ts
- dist/types/internal/observable/ConnectableObservable.d.ts
- dist/types/internal/observable/bindCallback.d.ts
- dist/types/internal/observable/bindNodeCallback.d.ts
- dist/types/internal/observable/combineLatest.d.ts
- dist/types/internal/observable/concat.d.ts
- dist/types/internal/observable/connectable.d.ts
- dist/types/internal/observable/defer.d.ts
- dist/types/internal/observable/dom/animationFrames.d.ts
- dist/types/internal/observable/empty.d.ts
- dist/types/internal/observable/forkJoin.d.ts
- dist/types/internal/observable/from.d.ts
- dist/types/internal/observable/fromEvent.d.ts
- dist/types/internal/observable/fromEventPattern.d.ts
- dist/types/internal/observable/generate.d.ts
- dist/types/internal/observable/iif.d.ts
- dist/types/internal/observable/interval.d.ts
- dist/types/internal/observable/merge.d.ts
- dist/types/internal/observable/never.d.ts
- dist/types/internal/observable/of.d.ts
- dist/types/internal/observable/onErrorResumeNext.d.ts
- dist/types/internal/observable/pairs.d.ts
- dist/types/internal/observable/partition.d.ts
- dist/types/internal/observable/race.d.ts
- dist/types/internal/observable/range.d.ts
- dist/types/internal/observable/throwError.d.ts
- dist/types/internal/observable/timer.d.ts
- dist/types/internal/observable/using.d.ts
- dist/types/internal/observable/zip.d.ts
- dist/types/internal/operators/audit.d.ts
- dist/types/internal/operators/auditTime.d.ts
- dist/types/internal/operators/buffer.d.ts
- dist/types/internal/operators/bufferCount.d.ts
- dist/types/internal/operators/bufferTime.d.ts
- dist/types/internal/operators/bufferToggle.d.ts
- dist/types/internal/operators/bufferWhen.d.ts
- dist/types/internal/operators/catchError.d.ts
- dist/types/internal/operators/combineAll.d.ts
- dist/types/internal/operators/combineLatestAll.d.ts
- dist/types/internal/operators/combineLatestWith.d.ts
- dist/types/internal/operators/concatAll.d.ts
- dist/types/internal/operators/concatMap.d.ts
- dist/types/internal/operators/concatMapTo.d.ts
- dist/types/internal/operators/concatWith.d.ts
- dist/types/internal/operators/connect.d.ts
- dist/types/internal/operators/count.d.ts
- dist/types/internal/operators/debounce.d.ts
- dist/types/internal/operators/debounceTime.d.ts
- dist/types/internal/operators/defaultIfEmpty.d.ts
- dist/types/internal/operators/delay.d.ts
- dist/types/internal/operators/delayWhen.d.ts
- dist/types/internal/operators/dematerialize.d.ts
- dist/types/internal/operators/distinct.d.ts
- dist/types/internal/operators/distinctUntilChanged.d.ts
- dist/types/internal/operators/distinctUntilKeyChanged.d.ts
- dist/types/internal/operators/elementAt.d.ts
- dist/types/internal/operators/endWith.d.ts
- dist/types/internal/operators/every.d.ts
- dist/types/internal/operators/exhaust.d.ts
- dist/types/internal/operators/exhaustAll.d.ts
- dist/types/internal/operators/exhaustMap.d.ts
- dist/types/internal/operators/expand.d.ts
- dist/types/internal/operators/filter.d.ts
- dist/types/internal/operators/finalize.d.ts
- dist/types/internal/operators/find.d.ts
- dist/types/internal/operators/findIndex.d.ts
- dist/types/internal/operators/first.d.ts
- dist/types/internal/operators/flatMap.d.ts
- dist/types/internal/operators/groupBy.d.ts
- dist/types/internal/operators/ignoreElements.d.ts
- dist/types/internal/operators/isEmpty.d.ts
- dist/types/internal/operators/last.d.ts
- dist/types/internal/operators/map.d.ts
- dist/types/internal/operators/mapTo.d.ts
- dist/types/internal/operators/materialize.d.ts
- dist/types/internal/operators/max.d.ts
- dist/types/internal/operators/mergeAll.d.ts
- dist/types/internal/operators/mergeMap.d.ts
- dist/types/internal/operators/mergeMapTo.d.ts
- dist/types/internal/operators/mergeScan.d.ts
- dist/types/internal/operators/mergeWith.d.ts
- dist/types/internal/operators/min.d.ts
- dist/types/internal/operators/multicast.d.ts
- dist/types/internal/operators/observeOn.d.ts
- dist/types/internal/operators/onErrorResumeNextWith.d.ts
- dist/types/internal/operators/pairwise.d.ts
- dist/types/internal/operators/pluck.d.ts
- dist/types/internal/operators/publish.d.ts
- dist/types/internal/operators/publishBehavior.d.ts
- dist/types/internal/operators/publishLast.d.ts
- dist/types/internal/operators/publishReplay.d.ts
- dist/types/internal/operators/raceWith.d.ts
- dist/types/internal/operators/reduce.d.ts
- dist/types/internal/operators/refCount.d.ts
- dist/types/internal/operators/repeat.d.ts
- dist/types/internal/operators/repeatWhen.d.ts
- dist/types/internal/operators/retry.d.ts
- dist/types/internal/operators/retryWhen.d.ts
- dist/types/internal/operators/sample.d.ts
- dist/types/internal/operators/sampleTime.d.ts
- dist/types/internal/operators/scan.d.ts
- dist/types/internal/operators/sequenceEqual.d.ts
- dist/types/internal/operators/share.d.ts
- dist/types/internal/operators/shareReplay.d.ts
- dist/types/internal/operators/single.d.ts
- dist/types/internal/operators/skip.d.ts
- dist/types/internal/operators/skipLast.d.ts
- dist/types/internal/operators/skipUntil.d.ts
- dist/types/internal/operators/skipWhile.d.ts
- dist/types/internal/operators/startWith.d.ts
- dist/types/internal/operators/subscribeOn.d.ts
- dist/types/internal/operators/switchAll.d.ts
- dist/types/internal/operators/switchMap.d.ts
- dist/types/internal/operators/switchMapTo.d.ts
- dist/types/internal/operators/switchScan.d.ts
- dist/types/internal/operators/take.d.ts
- dist/types/internal/operators/takeLast.d.ts
- dist/types/internal/operators/takeUntil.d.ts
- dist/types/internal/operators/takeWhile.d.ts
- dist/types/internal/operators/tap.d.ts
- dist/types/internal/operators/throttle.d.ts
- dist/types/internal/operators/throttleTime.d.ts
- dist/types/internal/operators/throwIfEmpty.d.ts
- dist/types/internal/operators/timeInterval.d.ts
- dist/types/internal/operators/timeout.d.ts
- dist/types/internal/operators/timeoutWith.d.ts
- dist/types/internal/operators/timestamp.d.ts
- dist/types/internal/operators/toArray.d.ts
- dist/types/internal/operators/window.d.ts
- dist/types/internal/operators/windowCount.d.ts
- dist/types/internal/operators/windowTime.d.ts
- dist/types/internal/operators/windowToggle.d.ts
- dist/types/internal/operators/windowWhen.d.ts
- dist/types/internal/operators/withLatestFrom.d.ts
- dist/types/internal/operators/zipAll.d.ts
- dist/types/internal/operators/zipWith.d.ts
- dist/types/internal/scheduled/scheduled.d.ts
- dist/types/internal/scheduler/VirtualTimeScheduler.d.ts
- dist/types/internal/scheduler/animationFrame.d.ts
- dist/types/internal/scheduler/asap.d.ts
- dist/types/internal/scheduler/async.d.ts
- dist/types/internal/scheduler/queue.d.ts
- dist/types/internal/symbol/observable.d.ts
- dist/types/internal/types.d.ts
- dist/types/internal/util/ArgumentOutOfRangeError.d.ts
- dist/types/internal/util/EmptyError.d.ts
- dist/types/internal/util/NotFoundError.d.ts
- dist/types/internal/util/ObjectUnsubscribedError.d.ts
- dist/types/internal/util/SequenceError.d.ts
- dist/types/internal/util/UnsubscriptionError.d.ts
- dist/types/internal/util/identity.d.ts
- dist/types/internal/util/isObservable.d.ts
- dist/types/internal/util/noop.d.ts
- dist/types/internal/util/pipe.d.ts
Dependencies (1)
Dev Dependencies (61)
- @angular-devkit/build-optimizer
- @angular-devkit/schematics
- @swc/core
- @swc/helpers
- @types/chai
- @types/lodash
- @types/mocha
- @types/node
- @types/shelljs
- @types/sinon
- @types/sinon-chai
- @types/source-map
- @typescript-eslint/eslint-plugin
- @typescript-eslint/parser
- babel-polyfill
- chai
- check-side-effects
- color
- colors
- cross-env
- cz-conventional-changelog
- dependency-cruiser
- escape-string-regexp
- eslint
- eslint-plugin-jasmine
- form-data
- fs-extra
- glob
- google-closure-compiler-js
- husky
- klaw-sync
- lint-staged
- lodash
- minimist
- mocha
- nodemon
- npm-run-all
- opn-cli
- platform
- prettier
- promise
- rollup
- rollup-plugin-alias
- rollup-plugin-inject
- rollup-plugin-node-resolve
- shelljs
- shx
- sinon
- sinon-chai
- source-map-support
- systemjs
- ts-node
- tslint
- tslint-config-prettier
- tslint-etc
- tslint-no-toplevel-property-access
- tslint-no-unused-expression-chai
- typescript
- validate-commit-msg
- web-streams-polyfill
- webpack
Peer Dependencies (0)
No peer dependencies.
Badge
To add a badge like this oneto your package's README, use the codes available below.
You may also use Shields.io to create a custom badge linking to https://www.jsdocs.io/package/rxjs
.
- Markdown[](https://www.jsdocs.io/package/rxjs)
- HTML<a href="https://www.jsdocs.io/package/rxjs"><img src="https://img.shields.io/badge/jsDocs.io-reference-blue" alt="jsDocs.io"></a>
- Updated .
Package analyzed in 14982 ms. - Missing or incorrect documentation? Open an issue for this package.