zeromq
- Version 6.1.2
- Published
- 11.3 MB
- 2 dependencies
- MIT AND MPL-2.0 license
Install
npm i zeromq
yarn add zeromq
pnpm add zeromq
Overview
Next-generation ZeroMQ bindings for Node.js
Index
Variables
Functions
Classes
Interfaces
Socket
- affinity
- backlog
- connectTimeout
- curvePublicKey
- curveSecretKey
- curveServer
- curveServerKey
- gssapiPlainText
- gssapiPrincipal
- gssapiPrincipalNameType
- gssapiServer
- gssapiServicePrincipal
- gssapiServicePrincipalNameType
- handshakeInterval
- heartbeatInterval
- heartbeatTimeout
- heartbeatTimeToLive
- immediate
- interface
- ipv6
- lastEndpoint
- linger
- loopbackFastPath
- maxMessageSize
- multicastMaxTransportDataUnit
- plainPassword
- plainServer
- plainUsername
- rate
- reconnectInterval
- reconnectMaxInterval
- recoveryInterval
- securityMechanism
- socksProxy
- tcpAcceptFilter
- tcpKeepalive
- tcpKeepaliveCount
- tcpKeepaliveIdle
- tcpKeepaliveInterval
- tcpMaxRetransmitTimeout
- threadSafe
- type
- typeOfService
- vmciBufferMaxSize
- vmciBufferMinSize
- vmciBufferSize
- vmciConnectTimeout
- zapDomain
- zapEnforceDomain
Type Aliases
Variables
variable capability
const capability: Partial<{ ipc: boolean; pgm: boolean; tipc: boolean; norm: boolean; curve: boolean; gssapi: boolean; draft: boolean;}>;
Exposes some of the optionally available ØMQ capabilities, which may depend on the library version and platform.
This is an object with keys corresponding to supported ØMQ features and transport protocols. Available capabilities will be set to
true
. Unavailable capabilities will be absent or set tofalse
.Possible keys include: *
ipc
- Support for theipc://
protocol. *pgm
- Support for thepgm://
protocol. *tipc
- Support for thetipc://
protocol. *norm
- Support for thenorm://
protocol. *curve
- Support for the CURVE security mechanism. *gssapi
- Support for the GSSAPI security mechanism. *draft
- Wether the library is built with support for DRAFT sockets.
variable context
const context: Context;
Any socket that has no explicit context passed in during construction will be associated with this context. The default context is exposed in order to be able to change its behaviour with Context options.
variable version
const version: string;
The version of the ØMQ library the bindings were built with. Formatted as
(major).(minor).(patch)
. For example:"4.3.2"
.
Functions
function curveKeyPair
curveKeyPair: () => { publicKey: string; secretKey: string };
Returns a new random key pair to be used with the CURVE security mechanism.
To correctly connect two sockets with this mechanism:
* Generate a **client** keypair with curveKeyPair(). * Assign the private and public key on the client socket with Socket.curveSecretKey and Socket.curvePublicKey. * Generate a **server** keypair with curveKeyPair(). * Assign the private key on the server socket with Socket.curveSecretKey. * Assign the public key **on the client socket** with Socket.curveServerKey. The server does *not* need to know its own public key. Key distribution is *not* handled by the CURVE security mechanism.
Returns
An object with a
publicKey
and asecretKey
property, each being a 40 character Z85-encoded string.
Classes
class Context
class Context {}
A ØMQ context. Contexts manage the background I/O to send and receive messages of their associated sockets.
It is usually not necessary to instantiate a new context - the global context is used for new sockets by default. The global context is the only context that is shared between threads (when using [worker_threads](https://nodejs.org/api/worker_threads.html)). Custom contexts can only be used in the same thread.
// Use default context (recommended).const socket = new Dealer()// Use custom context.const context = new Context()const socket = new Dealer({context})**Note:** By default all contexts (including the global context) will prevent the process from terminating if there are any messages in an outgoing queue, even if the associated socket was closed. For some applications this is unnecessary or unwanted. Consider setting Context.blocky to
false
or setting Socket.linger for each new socket.
constructor
constructor(options?: { blocky?: boolean; ioThreads?: number; maxMessageSize?: number; maxSockets?: number; ipv6?: boolean; threadPriority?: number; threadSchedulingPolicy?: number;});
Creates a new ØMQ context and sets any provided context options. Sockets need to be explicitly associated with a new context during construction.
Parameter options
An optional object with options that will be set on the context during creation.
method getBoolOption
protected getBoolOption: (option: number) => boolean;
method getInt32Option
protected getInt32Option: (option: number) => number;
method setBoolOption
protected setBoolOption: (option: number, value: boolean) => void;
method setInt32Option
protected setInt32Option: (option: number, value: number) => void;
class Dealer
class Dealer extends Socket {}
A Dealer socket can be used to extend request/reply sockets. Each message sent is round-robined among all connected peers, and each message received is fair-queued from all connected peers.
When a Dealer socket enters the mute state due to having reached the high water mark for all peers, or if there are no peers at all, then any Writable.send() operations on the socket shall block until the mute state ends or at least one peer becomes available for sending; messages are not discarded.
When a Dealer is connected to a Reply socket, each message sent must consist of an empty message part, the delimiter, followed by one or more body parts.
constructor
constructor(options?: { context?: Context; maxMessageSize?: number; ipv6?: boolean; affinity?: number; rate?: number; recoveryInterval?: number; linger?: number; reconnectInterval?: number; backlog?: number; reconnectMaxInterval?: number; tcpKeepalive?: number; tcpKeepaliveCount?: number; tcpKeepaliveIdle?: number; tcpKeepaliveInterval?: number; tcpAcceptFilter?: string; immediate?: boolean; plainServer?: boolean; plainUsername?: string; plainPassword?: string; curveServer?: boolean; curvePublicKey?: string; curveSecretKey?: string; curveServerKey?: string; gssapiServer?: boolean; gssapiPrincipal?: string; gssapiServicePrincipal?: string; gssapiPlainText?: boolean; gssapiPrincipalNameType?: 'hostBased' | 'userName' | 'krb5Principal'; gssapiServicePrincipalNameType?: 'hostBased' | 'userName' | 'krb5Principal'; zapDomain?: string; typeOfService?: number; handshakeInterval?: number; socksProxy?: string; heartbeatInterval?: number; heartbeatTimeToLive?: number; heartbeatTimeout?: number; connectTimeout?: number; tcpMaxRetransmitTimeout?: number; multicastMaxTransportDataUnit?: number; vmciBufferSize?: number; vmciBufferMinSize?: number; vmciBufferMaxSize?: number; vmciConnectTimeout?: number; interface?: string; zapEnforceDomain?: boolean; loopbackFastPath?: boolean; multicastHops?: number; sendBufferSize?: number; sendHighWaterMark?: number; sendTimeout?: number; receiveBufferSize?: number; receiveHighWaterMark?: number; receiveTimeout?: number; conflate?: boolean; routingId?: string; probeRouter?: boolean;});
property conflate
conflate: boolean;
ZMQ_CONFLATE
If set to
true
, a socket shall keep only one message in its inbound/outbound queue: the last message to be received/sent. Ignores any high water mark options. Does not support multi-part messages - in particular, only one part of it is kept in the socket internal queue.
property probeRouter
probeRouter: boolean;
ZMQ_PROBE_ROUTER
When set to
true
, the socket will automatically send an empty message when a new connection is made or accepted. You may set this on sockets connected to a Router socket. The application must filter such empty messages. This option provides the Router with an event signaling the arrival of a new peer.*Warning:** Do not set this option on a socket that talks to any other socket type except Router: the results are undefined.
property routingId
routingId: string;
ZMQ_ROUTING_ID
The identity of the specified socket when connecting to a
Router
socket.
class Observer
class Observer {}
constructor
constructor(socket: Socket);
Creates a new ØMQ observer. It should not be necessary to instantiate a new observer. Access an existing observer for a socket with Socket.events.
const socket = new Publisher()const events = socket.eventsParameter socket
The socket to observe.
property closed
readonly closed: boolean;
Whether the observer was closed, either manually or because the associated socket was closed.
Modifiers
@readonly
method close
close: () => void;
Closes the observer. Afterwards no new events will be received or emitted. Calling this method is optional.
method receive
receive: () => Promise<Event>;
Waits for the next event to become availeble on the observer. Reads an event immediately if possible. If no events are queued, it will wait asynchonously. The promise will be resolved with the next event when available.
When reading events with receive() the observer may **not** be in event emitter mode. Avoid mixing calls to receive() with event handlers via attached with on().
for await (event of socket.events) {switch (event.type) {case "bind":console.log(`Socket bound to ${event.address}`)break// ...}}Returns
Resolved with the next event and its details. See Event.
class Pair
class Pair extends Socket {}
A Pair socket can only be connected to one other Pair at any one time. No message routing or filtering is performed on any messages.
When a Pair socket enters the mute state due to having reached the high water mark for the connected peer, or if no peer is connected, then any Writable.send() operations on the socket shall block until the peer becomes available for sending; messages are not discarded.
While Pair sockets can be used over transports other than
inproc://
, their inability to auto-reconnect coupled with the fact new incoming connections will be terminated while any previous connections (including ones in a closing state) exist makes them unsuitable fortcp://
in most cases.
constructor
constructor(options?: { context?: Context; maxMessageSize?: number; ipv6?: boolean; affinity?: number; rate?: number; recoveryInterval?: number; linger?: number; reconnectInterval?: number; backlog?: number; reconnectMaxInterval?: number; tcpKeepalive?: number; tcpKeepaliveCount?: number; tcpKeepaliveIdle?: number; tcpKeepaliveInterval?: number; tcpAcceptFilter?: string; immediate?: boolean; plainServer?: boolean; plainUsername?: string; plainPassword?: string; curveServer?: boolean; curvePublicKey?: string; curveSecretKey?: string; curveServerKey?: string; gssapiServer?: boolean; gssapiPrincipal?: string; gssapiServicePrincipal?: string; gssapiPlainText?: boolean; gssapiPrincipalNameType?: 'hostBased' | 'userName' | 'krb5Principal'; gssapiServicePrincipalNameType?: 'hostBased' | 'userName' | 'krb5Principal'; zapDomain?: string; typeOfService?: number; handshakeInterval?: number; socksProxy?: string; heartbeatInterval?: number; heartbeatTimeToLive?: number; heartbeatTimeout?: number; connectTimeout?: number; tcpMaxRetransmitTimeout?: number; multicastMaxTransportDataUnit?: number; vmciBufferSize?: number; vmciBufferMinSize?: number; vmciBufferMaxSize?: number; vmciConnectTimeout?: number; interface?: string; zapEnforceDomain?: boolean; loopbackFastPath?: boolean; multicastHops?: number; sendBufferSize?: number; sendHighWaterMark?: number; sendTimeout?: number; receiveBufferSize?: number; receiveHighWaterMark?: number; receiveTimeout?: number;});
class Proxy
class Proxy<F extends Socket = Socket, B extends Socket = Socket> {}
Proxy messages between two ØMQ sockets. The proxy connects a front-end socket to a back-end socket. Conceptually, data flows from front-end to back-end. Depending on the socket types, replies may flow in the opposite direction. The direction is conceptual only; the proxy is fully symmetric and there is no technical difference between front-end and back-end.
// Proxy between a router/dealer socket for 5 seconds.const proxy = new Proxy(new Router, new Dealer)await proxy.frontEnd.bind("tcp://*:3001")await proxy.backEnd.bind("tcp://*:3002")setTimeout(() => proxy.terminate(), 5000)await proxy.run()[Review the ØMQ documentation](http://api.zeromq.org/4-3:zmq-proxy#toc3) for an overview of some example applications of a proxy.
constructor
constructor(frontEnd: Socket, backEnd: Socket);
Creates a new ØMQ proxy. Proxying will start between the front-end and back-end sockets when run() is called after both sockets have been bound or connected.
Parameter frontEnd
The front-end socket.
Parameter backEnd
The back-end socket.
property backEnd
readonly backEnd: Socket;
Returns the original back-end socket.
Modifiers
@readonly
property frontEnd
readonly frontEnd: Socket;
Returns the original front-end socket.
Modifiers
@readonly
method pause
pause: () => void;
Temporarily suspends any proxy activity. Resume activity with resume().
method resume
resume: () => void;
Resumes proxy activity after suspending it with pause().
method run
run: () => Promise<void>;
Starts the proxy loop in a worker thread and waits for its termination. Before starting, you must set any socket options, and connect or bind both front-end and back-end sockets.
On termination the front-end and back-end sockets will be closed automatically.
Returns
Resolved when the proxy has terminated.
method terminate
terminate: () => void;
Gracefully shuts down the proxy. The front-end and back-end sockets will be closed automatically. There might be a slight delay between terminating and the run() method resolving.
class Publisher
class Publisher extends Socket {}
A Publisher socket is used to distribute data to Subscribers. Messages sent are distributed in a fan out fashion to all connected peers. This socket cannot receive messages.
When a Publisher enters the mute state due to having reached the high water mark for a connected Subscriber, then any messages that would be sent to the subscriber in question shall instead be dropped until the mute state ends. The Writable.send() method will never block.
constructor
constructor(options?: { context?: Context; maxMessageSize?: number; ipv6?: boolean; affinity?: number; rate?: number; recoveryInterval?: number; linger?: number; reconnectInterval?: number; backlog?: number; reconnectMaxInterval?: number; tcpKeepalive?: number; tcpKeepaliveCount?: number; tcpKeepaliveIdle?: number; tcpKeepaliveInterval?: number; tcpAcceptFilter?: string; immediate?: boolean; plainServer?: boolean; plainUsername?: string; plainPassword?: string; curveServer?: boolean; curvePublicKey?: string; curveSecretKey?: string; curveServerKey?: string; gssapiServer?: boolean; gssapiPrincipal?: string; gssapiServicePrincipal?: string; gssapiPlainText?: boolean; gssapiPrincipalNameType?: 'hostBased' | 'userName' | 'krb5Principal'; gssapiServicePrincipalNameType?: 'hostBased' | 'userName' | 'krb5Principal'; zapDomain?: string; typeOfService?: number; handshakeInterval?: number; socksProxy?: string; heartbeatInterval?: number; heartbeatTimeToLive?: number; heartbeatTimeout?: number; connectTimeout?: number; tcpMaxRetransmitTimeout?: number; multicastMaxTransportDataUnit?: number; vmciBufferSize?: number; vmciBufferMinSize?: number; vmciBufferMaxSize?: number; vmciConnectTimeout?: number; interface?: string; zapEnforceDomain?: boolean; loopbackFastPath?: boolean; multicastHops?: number; sendBufferSize?: number; sendHighWaterMark?: number; sendTimeout?: number; noDrop?: boolean; conflate?: boolean; invertMatching?: boolean;});
property conflate
conflate: boolean;
ZMQ_CONFLATE
If set to
true
, a socket shall keep only one message in its inbound/outbound queue: the last message to be received/sent. Ignores any high water mark options. Does not support multi-part messages - in particular, only one part of it is kept in the socket internal queue.
property invertMatching
invertMatching: boolean;
ZMQ_INVERT_MATCHING
Causes messages to be sent to all connected sockets except those subscribed to a prefix that matches the message.
All Subscriber sockets connecting to the Publisher must also have the option set to
true
. Failure to do so will have the Subscriber sockets reject everything the Publisher socket sends them.
property noDrop
noDrop: boolean;
ZMQ_XPUB_NODROP
Sets the socket behaviour to return an error if the high water mark is reached and the message could not be send. The default is to drop the message silently when the peer high water mark is reached.
class Pull
class Pull extends Socket {}
A Pull socket is used by a pipeline node to receive messages from upstream pipeline nodes. Messages are fair-queued from among all connected upstream nodes. This socket cannot send messages.
constructor
constructor(options?: { context?: Context; maxMessageSize?: number; ipv6?: boolean; affinity?: number; rate?: number; recoveryInterval?: number; linger?: number; reconnectInterval?: number; backlog?: number; reconnectMaxInterval?: number; tcpKeepalive?: number; tcpKeepaliveCount?: number; tcpKeepaliveIdle?: number; tcpKeepaliveInterval?: number; tcpAcceptFilter?: string; immediate?: boolean; plainServer?: boolean; plainUsername?: string; plainPassword?: string; curveServer?: boolean; curvePublicKey?: string; curveSecretKey?: string; curveServerKey?: string; gssapiServer?: boolean; gssapiPrincipal?: string; gssapiServicePrincipal?: string; gssapiPlainText?: boolean; gssapiPrincipalNameType?: 'hostBased' | 'userName' | 'krb5Principal'; gssapiServicePrincipalNameType?: 'hostBased' | 'userName' | 'krb5Principal'; zapDomain?: string; typeOfService?: number; handshakeInterval?: number; socksProxy?: string; heartbeatInterval?: number; heartbeatTimeToLive?: number; heartbeatTimeout?: number; connectTimeout?: number; tcpMaxRetransmitTimeout?: number; multicastMaxTransportDataUnit?: number; vmciBufferSize?: number; vmciBufferMinSize?: number; vmciBufferMaxSize?: number; vmciConnectTimeout?: number; interface?: string; zapEnforceDomain?: boolean; loopbackFastPath?: boolean; receiveBufferSize?: number; receiveHighWaterMark?: number; receiveTimeout?: number; conflate?: boolean;});
class Push
class Push extends Socket {}
A Push socket is used by a pipeline node to send messages to downstream pipeline nodes. Messages are round-robined to all connected downstream nodes. This socket cannot receive messages.
When a Push socket enters the mute state due to having reached the high water mark for all downstream nodes, or if there are no downstream nodes at all, then Writable.send() will block until the mute state ends or at least one downstream node becomes available for sending; messages are not discarded.
constructor
constructor(options?: { context?: Context; maxMessageSize?: number; ipv6?: boolean; affinity?: number; rate?: number; recoveryInterval?: number; linger?: number; reconnectInterval?: number; backlog?: number; reconnectMaxInterval?: number; tcpKeepalive?: number; tcpKeepaliveCount?: number; tcpKeepaliveIdle?: number; tcpKeepaliveInterval?: number; tcpAcceptFilter?: string; immediate?: boolean; plainServer?: boolean; plainUsername?: string; plainPassword?: string; curveServer?: boolean; curvePublicKey?: string; curveSecretKey?: string; curveServerKey?: string; gssapiServer?: boolean; gssapiPrincipal?: string; gssapiServicePrincipal?: string; gssapiPlainText?: boolean; gssapiPrincipalNameType?: 'hostBased' | 'userName' | 'krb5Principal'; gssapiServicePrincipalNameType?: 'hostBased' | 'userName' | 'krb5Principal'; zapDomain?: string; typeOfService?: number; handshakeInterval?: number; socksProxy?: string; heartbeatInterval?: number; heartbeatTimeToLive?: number; heartbeatTimeout?: number; connectTimeout?: number; tcpMaxRetransmitTimeout?: number; multicastMaxTransportDataUnit?: number; vmciBufferSize?: number; vmciBufferMinSize?: number; vmciBufferMaxSize?: number; vmciConnectTimeout?: number; interface?: string; zapEnforceDomain?: boolean; loopbackFastPath?: boolean; multicastHops?: number; sendBufferSize?: number; sendHighWaterMark?: number; sendTimeout?: number; conflate?: boolean;});
class Reply
class Reply extends Socket {}
A Reply socket can act as a server which receives requests from and sends replies to a Request socket. This socket type allows only an alternating sequence of Readable.receive() and subsequent Writable.send() calls. Each request received is fair-queued from among all clients, and each reply sent is routed to the client that issued the last request. If the original requester does not exist any more the reply is silently discarded.
constructor
constructor(options?: { context?: Context; maxMessageSize?: number; ipv6?: boolean; affinity?: number; rate?: number; recoveryInterval?: number; linger?: number; reconnectInterval?: number; backlog?: number; reconnectMaxInterval?: number; tcpKeepalive?: number; tcpKeepaliveCount?: number; tcpKeepaliveIdle?: number; tcpKeepaliveInterval?: number; tcpAcceptFilter?: string; immediate?: boolean; plainServer?: boolean; plainUsername?: string; plainPassword?: string; curveServer?: boolean; curvePublicKey?: string; curveSecretKey?: string; curveServerKey?: string; gssapiServer?: boolean; gssapiPrincipal?: string; gssapiServicePrincipal?: string; gssapiPlainText?: boolean; gssapiPrincipalNameType?: 'hostBased' | 'userName' | 'krb5Principal'; gssapiServicePrincipalNameType?: 'hostBased' | 'userName' | 'krb5Principal'; zapDomain?: string; typeOfService?: number; handshakeInterval?: number; socksProxy?: string; heartbeatInterval?: number; heartbeatTimeToLive?: number; heartbeatTimeout?: number; connectTimeout?: number; tcpMaxRetransmitTimeout?: number; multicastMaxTransportDataUnit?: number; vmciBufferSize?: number; vmciBufferMinSize?: number; vmciBufferMaxSize?: number; vmciConnectTimeout?: number; interface?: string; zapEnforceDomain?: boolean; loopbackFastPath?: boolean; multicastHops?: number; sendBufferSize?: number; sendHighWaterMark?: number; sendTimeout?: number; receiveBufferSize?: number; receiveHighWaterMark?: number; receiveTimeout?: number; routingId?: string;});
property routingId
routingId: string;
ZMQ_ROUTING_ID
The identity of the specified socket when connecting to a
Router
socket.
class Request
class Request extends Socket {}
A Request socket acts as a client to send requests to and receive replies from a Reply socket. This socket allows only an alternating sequence of Writable.send() and subsequent Readable.receive() calls. Each request sent is round-robined among all services, and each reply received is matched with the last issued request.
If no services are available, then any send operation on the socket shall block until at least one service becomes available. The REQ socket shall not discard messages.
constructor
constructor(options?: { context?: Context; maxMessageSize?: number; ipv6?: boolean; affinity?: number; rate?: number; recoveryInterval?: number; linger?: number; reconnectInterval?: number; backlog?: number; reconnectMaxInterval?: number; tcpKeepalive?: number; tcpKeepaliveCount?: number; tcpKeepaliveIdle?: number; tcpKeepaliveInterval?: number; tcpAcceptFilter?: string; immediate?: boolean; plainServer?: boolean; plainUsername?: string; plainPassword?: string; curveServer?: boolean; curvePublicKey?: string; curveSecretKey?: string; curveServerKey?: string; gssapiServer?: boolean; gssapiPrincipal?: string; gssapiServicePrincipal?: string; gssapiPlainText?: boolean; gssapiPrincipalNameType?: 'hostBased' | 'userName' | 'krb5Principal'; gssapiServicePrincipalNameType?: 'hostBased' | 'userName' | 'krb5Principal'; zapDomain?: string; typeOfService?: number; handshakeInterval?: number; socksProxy?: string; heartbeatInterval?: number; heartbeatTimeToLive?: number; heartbeatTimeout?: number; connectTimeout?: number; tcpMaxRetransmitTimeout?: number; multicastMaxTransportDataUnit?: number; vmciBufferSize?: number; vmciBufferMinSize?: number; vmciBufferMaxSize?: number; vmciConnectTimeout?: number; interface?: string; zapEnforceDomain?: boolean; loopbackFastPath?: boolean; multicastHops?: number; sendBufferSize?: number; sendHighWaterMark?: number; sendTimeout?: number; receiveBufferSize?: number; receiveHighWaterMark?: number; receiveTimeout?: number; routingId?: string; probeRouter?: boolean; correlate?: boolean; relaxed?: boolean;});
property correlate
correlate: boolean;
ZMQ_REQ_CORRELATE
The default behaviour of Request sockets is to rely on the ordering of messages to match requests and responses and that is usually sufficient. When this option is set to
true
the socket will prefix outgoing messages with an extra frame containing a request id. That means the full message is[<request id>,
null, user frames…]
. The Request socket will discard all incoming messages that don't begin with these two frames.
property probeRouter
probeRouter: boolean;
ZMQ_PROBE_ROUTER
When set to
true
, the socket will automatically send an empty message when a new connection is made or accepted. You may set this on sockets connected to a Router socket. The application must filter such empty messages. This option provides the Router with an event signaling the arrival of a new peer.*Warning:** Do not set this option on a socket that talks to any other socket type except Router: the results are undefined.
property relaxed
relaxed: boolean;
ZMQ_REQ_RELAXED
By default, a Request socket does not allow initiating a new request until the reply to the previous one has been received. When set to
true
, sending another message is allowed and previous replies will be discarded. The request-reply state machine is reset and a new request is sent to the next available peer.**Note:** If set to
true
, also enable correlate to ensure correct matching of requests and replies. Otherwise a late reply to an aborted request can be reported as the reply to the superseding request.
property routingId
routingId: string;
ZMQ_ROUTING_ID
The identity of the specified socket when connecting to a
Router
socket.
class Router
class Router extends Socket {}
A Router can be used to extend request/reply sockets. When receiving messages a Router shall prepend a message part containing the routing id of the originating peer to the message. Messages received are fair-queued from among all connected peers. When sending messages, the first part of the message is removed and used to determine the routing id of the peer the message should be routed to.
If the peer does not exist anymore, or has never existed, the message shall be silently discarded. However, if Router.mandatory is set to
true
, the socket shall fail with aEHOSTUNREACH
error in both cases.When a Router enters the mute state due to having reached the high water mark for all peers, then any messages sent to the socket shall be dropped until the mute state ends. Likewise, any messages routed to a peer for which the individual high water mark has been reached shall also be dropped. If Router.mandatory is set to
true
the socket shall block or return anEAGAIN
error in both cases.When a Request socket is connected to a Router, in addition to the routing id of the originating peer each message received shall contain an empty delimiter message part. Hence, the entire structure of each received message as seen by the application becomes: one or more routing id parts, delimiter part, one or more body parts. When sending replies to a Request the delimiter part must be included.
constructor
constructor(options?: { context?: Context; maxMessageSize?: number; ipv6?: boolean; affinity?: number; rate?: number; recoveryInterval?: number; linger?: number; reconnectInterval?: number; backlog?: number; reconnectMaxInterval?: number; tcpKeepalive?: number; tcpKeepaliveCount?: number; tcpKeepaliveIdle?: number; tcpKeepaliveInterval?: number; tcpAcceptFilter?: string; immediate?: boolean; plainServer?: boolean; plainUsername?: string; plainPassword?: string; curveServer?: boolean; curvePublicKey?: string; curveSecretKey?: string; curveServerKey?: string; gssapiServer?: boolean; gssapiPrincipal?: string; gssapiServicePrincipal?: string; gssapiPlainText?: boolean; gssapiPrincipalNameType?: 'hostBased' | 'userName' | 'krb5Principal'; gssapiServicePrincipalNameType?: 'hostBased' | 'userName' | 'krb5Principal'; zapDomain?: string; typeOfService?: number; handshakeInterval?: number; socksProxy?: string; heartbeatInterval?: number; heartbeatTimeToLive?: number; heartbeatTimeout?: number; connectTimeout?: number; tcpMaxRetransmitTimeout?: number; multicastMaxTransportDataUnit?: number; vmciBufferSize?: number; vmciBufferMinSize?: number; vmciBufferMaxSize?: number; vmciConnectTimeout?: number; interface?: string; zapEnforceDomain?: boolean; loopbackFastPath?: boolean; multicastHops?: number; sendBufferSize?: number; sendHighWaterMark?: number; sendTimeout?: number; receiveBufferSize?: number; receiveHighWaterMark?: number; receiveTimeout?: number; routingId?: string; probeRouter?: boolean; mandatory?: boolean; handover?: boolean;});
property handover
handover: boolean;
ZMQ_ROUTER_HANDOVER
If two clients use the same identity when connecting to a Router, the results shall depend on the this option. If it set to
false
(default), the Router socket shall reject clients trying to connect with an already-used identity. If it is set totrue
, the Router socket shall hand-over the connection to the new client and disconnect the existing one.
property mandatory
mandatory: boolean;
ZMQ_ROUTER_MANDATORY
A value of
false
is the default and discards the message silently when it cannot be routed or the peer's high water mark is reached. A value oftrue
causes send() to fail if it cannot be routed, or wait asynchronously if the high water mark is reached.
property probeRouter
probeRouter: boolean;
ZMQ_PROBE_ROUTER
When set to
true
, the socket will automatically send an empty message when a new connection is made or accepted. You may set this on sockets connected to a Router socket. The application must filter such empty messages. This option provides the Router with an event signaling the arrival of a new peer.*Warning:** Do not set this option on a socket that talks to any other socket type except Router: the results are undefined.
property routingId
routingId: string;
ZMQ_ROUTING_ID
The identity of the specified socket when connecting to a
Router
socket.
method connect
connect: (address: string, options?: RouterConnectOptions) => void;
Connects to the given remote address. To specificy a specific routing id, provide a
routingId
option. The identity should be unique, from 1 to 255 bytes long and MAY NOT start with binary zero.Parameter address
The
tcp://
address to connect to.Parameter options
Any connection options.
class Socket
abstract class Socket {}
A ØMQ socket. This class should generally not be used directly. Instead, create one of its subclasses that corresponds to the socket type you want to use.
new zmq.Pair(...)new zmq.Publisher(...)new zmq.Subscriber(...)new zmq.Request(...)new zmq.Reply(...)new zmq.Dealer(...)new zmq.Router(...)new zmq.Pull(...)new zmq.Push(...)new zmq.XPublisher(...)new zmq.XSubscriber(...)new zmq.Stream(...)Socket options can be set during construction or via a property after the socket was created. Most socket options do not take effect until the next bind() or connect() call. Setting such an option after the socket is already connected or bound will display a warning.
constructor
protected constructor(type: SocketType, options?: {});
Creates a new socket of the specified type. Subclasses are expected to provide the correct socket type.
Parameter type
The socket type.
Parameter options
Any options to set during construction.
property closed
readonly closed: boolean;
Whether this socket was previously closed with close().
Modifiers
@readonly
property context
readonly context: Context;
Context that this socket belongs to.
Modifiers
@readonly
property events
readonly events: Observer;
Event Observer for this socket. This starts up a ØMQ monitoring socket internally that receives all socket events.
Modifiers
@readonly
property readable
readonly readable: boolean;
Whether any messages are currently available. If
true
, the next call to Readable.receive() will immediately read a message from the socket. For sockets that cannot receive messsages this is alwaysfalse
.Modifiers
@readonly
property writable
readonly writable: boolean;
Whether any messages can be queued for sending. If
true
, the next call to Writable.send() will immediately queue a message on the socket. For sockets that cannot send messsages this is alwaysfalse
.Modifiers
@readonly
method bind
bind: (address: string) => Promise<void>;
Binds the socket to the given address. During bind() the socket cannot be used. Do not call any other methods until the returned promise resolves. Make sure to use
await
.You can use
*
in place of a hostname to bind on all interfaces/addresses, and you can use*
in place of a port to bind to a random port (which can be retrieved with lastEndpoint later).await socket.bind("tcp://127.0.0.1:3456")await socket.bind("tcp://*:3456") // binds on all interfacesawait socket.bind("tcp://127.0.0.1:*") // binds on random portParameter address
Address to bind this socket to.
Returns
Resolved when the socket was successfully bound.
method close
close: () => void;
Closes the socket and disposes of all resources. Any messages that are queued may be discarded or sent in the background depending on the linger setting.
After this method is called, it is no longer possible to call any other methods on this socket.
Sockets that go out of scope and have no Readable.receive() or Writable.send() operations in progress will automatically be closed. Therefore it is not necessary in most applications to call close() manually.
Calling this method on a socket that is already closed is a no-op.
method connect
connect: (address: string) => void;
Connects to the socket at the given remote address and returns immediately. The connection will be made asynchronously in the background.
socket.connect("tcp://127.0.0.1:3456")Parameter address
The address to connect to.
method disconnect
disconnect: (address: string) => void;
Disconnects a previously connected socket from the given address and returns immediately. Disonnection will happen asynchronously in the background.
socket.disconnect("tcp://127.0.0.1:3456")Parameter address
The previously connected address to disconnect from.
method getBoolOption
protected getBoolOption: (option: number) => boolean;
method getInt32Option
protected getInt32Option: (option: number) => number;
method getInt64Option
protected getInt64Option: (option: number) => number;
method getStringOption
protected getStringOption: (option: number) => string | null;
method getUint32Option
protected getUint32Option: (option: number) => number;
method getUint64Option
protected getUint64Option: (option: number) => number;
method setBoolOption
protected setBoolOption: (option: number, value: boolean) => void;
method setInt32Option
protected setInt32Option: (option: number, value: number) => void;
method setInt64Option
protected setInt64Option: (option: number, value: number) => void;
method setStringOption
protected setStringOption: ( option: number, value: string | Buffer | null) => void;
method setUint32Option
protected setUint32Option: (option: number, value: number) => void;
method setUint64Option
protected setUint64Option: (option: number, value: number) => void;
method unbind
unbind: (address: string) => Promise<void>;
Unbinds the socket to the given address. During unbind() the socket cannot be used. Do not call any other methods until the returned promise resolves. Make sure to use
await
.Parameter address
Address to unbind this socket from.
Returns
Resolved when the socket was successfully unbound.
class Stream
class Stream extends Socket {}
A Stream is used to send and receive TCP data from a non-ØMQ peer with the
tcp://
transport. A Stream can act as client and/or server, sending and/or receiving TCP data asynchronously.When sending and receiving data with Writable.send() and Readable.receive(), the first message part shall be the routing id of the peer. Unroutable messages will cause an error.
When a connection is made to a Stream, a zero-length message will be received. Similarly, when the peer disconnects (or the connection is lost), a zero-length message will be received.
To close a specific connection, Writable.send() the routing id frame followed by a zero-length message.
To open a connection to a server, use Stream.connect().
constructor
constructor(options?: { context?: Context; maxMessageSize?: number; ipv6?: boolean; affinity?: number; rate?: number; recoveryInterval?: number; linger?: number; reconnectInterval?: number; backlog?: number; reconnectMaxInterval?: number; tcpKeepalive?: number; tcpKeepaliveCount?: number; tcpKeepaliveIdle?: number; tcpKeepaliveInterval?: number; tcpAcceptFilter?: string; immediate?: boolean; plainServer?: boolean; plainUsername?: string; plainPassword?: string; curveServer?: boolean; curvePublicKey?: string; curveSecretKey?: string; curveServerKey?: string; gssapiServer?: boolean; gssapiPrincipal?: string; gssapiServicePrincipal?: string; gssapiPlainText?: boolean; gssapiPrincipalNameType?: 'hostBased' | 'userName' | 'krb5Principal'; gssapiServicePrincipalNameType?: 'hostBased' | 'userName' | 'krb5Principal'; zapDomain?: string; typeOfService?: number; handshakeInterval?: number; socksProxy?: string; heartbeatInterval?: number; heartbeatTimeToLive?: number; heartbeatTimeout?: number; connectTimeout?: number; tcpMaxRetransmitTimeout?: number; multicastMaxTransportDataUnit?: number; vmciBufferSize?: number; vmciBufferMinSize?: number; vmciBufferMaxSize?: number; vmciConnectTimeout?: number; interface?: string; zapEnforceDomain?: boolean; loopbackFastPath?: boolean; multicastHops?: number; sendBufferSize?: number; sendHighWaterMark?: number; sendTimeout?: number; receiveBufferSize?: number; receiveHighWaterMark?: number; receiveTimeout?: number; notify?: boolean;});
property notify
notify: boolean;
ZMQ_STREAM_NOTIFY
Enables connect and disconnect notifications on a Stream when set to
true
. When notifications are enabled, the socket delivers a zero-length message when a peer connects or disconnects.
method connect
connect: (address: string, options?: StreamConnectOptions) => void;
Connects to the given remote address. To specificy a specific routing id, provide a
routingId
option. The identity should be unique, from 1 to 255 bytes long and MAY NOT start with binary zero.Parameter address
The
tcp://
address to connect to.Parameter options
Any connection options.
class Subscriber
class Subscriber extends Socket {}
A Subscriber socket is used to subscribe to data distributed by a Publisher. Initially a Subscriber is not subscribed to any messages. Use Subscriber.subscribe() to specify which messages to subscribe to. This socket cannot send messages.
constructor
constructor(options?: { context?: Context; maxMessageSize?: number; ipv6?: boolean; affinity?: number; rate?: number; recoveryInterval?: number; linger?: number; reconnectInterval?: number; backlog?: number; reconnectMaxInterval?: number; tcpKeepalive?: number; tcpKeepaliveCount?: number; tcpKeepaliveIdle?: number; tcpKeepaliveInterval?: number; tcpAcceptFilter?: string; immediate?: boolean; plainServer?: boolean; plainUsername?: string; plainPassword?: string; curveServer?: boolean; curvePublicKey?: string; curveSecretKey?: string; curveServerKey?: string; gssapiServer?: boolean; gssapiPrincipal?: string; gssapiServicePrincipal?: string; gssapiPlainText?: boolean; gssapiPrincipalNameType?: 'hostBased' | 'userName' | 'krb5Principal'; gssapiServicePrincipalNameType?: 'hostBased' | 'userName' | 'krb5Principal'; zapDomain?: string; typeOfService?: number; handshakeInterval?: number; socksProxy?: string; heartbeatInterval?: number; heartbeatTimeToLive?: number; heartbeatTimeout?: number; connectTimeout?: number; tcpMaxRetransmitTimeout?: number; multicastMaxTransportDataUnit?: number; vmciBufferSize?: number; vmciBufferMinSize?: number; vmciBufferMaxSize?: number; vmciConnectTimeout?: number; interface?: string; zapEnforceDomain?: boolean; loopbackFastPath?: boolean; receiveBufferSize?: number; receiveHighWaterMark?: number; receiveTimeout?: number; conflate?: boolean; invertMatching?: boolean;});
property conflate
conflate: boolean;
ZMQ_CONFLATE
If set to
true
, a socket shall keep only one message in its inbound/outbound queue: the last message to be received/sent. Ignores any high water mark options. Does not support multi-part messages - in particular, only one part of it is kept in the socket internal queue.
property invertMatching
invertMatching: boolean;
ZMQ_INVERT_MATCHING
Causes incoming messages that do not match any of the socket's subscriptions to be received by the user.
All Subscriber sockets connecting to a Publisher must also have the option set to
true
. Failure to do so will have the Subscriber sockets reject everything the Publisher socket sends them.
method subscribe
subscribe: (...prefixes: Array<Buffer | string>) => void;
Establish a new message filter. Newly created Subsriber sockets will filtered out all incoming messages. Call this method to subscribe to messages beginning with the given prefix.
Multiple filters may be attached to a single socket, in which case a message shall be accepted if it matches at least one filter. Subscribing without any filters shall subscribe to **all** incoming messages.
const sub = new Subscriber()// Listen to all messages beginning with 'foo'.sub.subscribe("foo")// Listen to all incoming messages.sub.subscribe()Parameter prefixes
The prefixes of messages to subscribe to.
method unsubscribe
unsubscribe: (...prefixes: Array<Buffer | string>) => void;
Remove an existing message filter which was previously established with subscribe(). Stops receiving messages with the given prefix.
Unsubscribing without any filters shall unsubscribe from the "subscribe all" filter that is added by calling subscribe() without arguments.
const sub = new Subscriber()// Listen to all messages beginning with 'foo'.sub.subscribe("foo")// ...// Stop listening to messages beginning with 'foo'.sub.unsubscribe("foo")Parameter prefixes
The prefixes of messages to subscribe to.
class XPublisher
class XPublisher extends Socket {}
Same as Publisher, except that you can receive subscriptions from the peers in form of incoming messages. Subscription message is a byte 1 (for subscriptions) or byte 0 (for unsubscriptions) followed by the subscription body. Messages without a sub/unsub prefix are also received, but have no effect on subscription status.
constructor
constructor(options?: { context?: Context; maxMessageSize?: number; ipv6?: boolean; affinity?: number; rate?: number; recoveryInterval?: number; linger?: number; reconnectInterval?: number; backlog?: number; reconnectMaxInterval?: number; tcpKeepalive?: number; tcpKeepaliveCount?: number; tcpKeepaliveIdle?: number; tcpKeepaliveInterval?: number; tcpAcceptFilter?: string; immediate?: boolean; plainServer?: boolean; plainUsername?: string; plainPassword?: string; curveServer?: boolean; curvePublicKey?: string; curveSecretKey?: string; curveServerKey?: string; gssapiServer?: boolean; gssapiPrincipal?: string; gssapiServicePrincipal?: string; gssapiPlainText?: boolean; gssapiPrincipalNameType?: 'hostBased' | 'userName' | 'krb5Principal'; gssapiServicePrincipalNameType?: 'hostBased' | 'userName' | 'krb5Principal'; zapDomain?: string; typeOfService?: number; handshakeInterval?: number; socksProxy?: string; heartbeatInterval?: number; heartbeatTimeToLive?: number; heartbeatTimeout?: number; connectTimeout?: number; tcpMaxRetransmitTimeout?: number; multicastMaxTransportDataUnit?: number; vmciBufferSize?: number; vmciBufferMinSize?: number; vmciBufferMaxSize?: number; vmciConnectTimeout?: number; interface?: string; zapEnforceDomain?: boolean; loopbackFastPath?: boolean; multicastHops?: number; sendBufferSize?: number; sendHighWaterMark?: number; sendTimeout?: number; receiveBufferSize?: number; receiveHighWaterMark?: number; receiveTimeout?: number; noDrop?: boolean; invertMatching?: boolean; manual?: boolean; welcomeMessage?: string; verbosity?: 'allSubs' | 'allSubsUnsubs';});
property invertMatching
invertMatching: boolean;
ZMQ_INVERT_MATCHING
Causes messages to be sent to all connected sockets except those subscribed to a prefix that matches the message.
property manual
manual: boolean;
ZMQ_XPUB_MANUAL
Sets the XPublisher socket subscription handling mode to manual/automatic. A value of
true
will change the subscription requests handling to manual.
property noDrop
noDrop: boolean;
ZMQ_XPUB_NODROP
Sets the socket behaviour to return an error if the high water mark is reached and the message could not be send. The default is to drop the message silently when the peer high water mark is reached.
property welcomeMessage
welcomeMessage: string;
ZMQ_XPUB_WELCOME_MSG
Sets a welcome message that will be recieved by subscriber when connecting. Subscriber must subscribe to the welcome message before connecting. For welcome messages to work well, poll on incoming subscription messages on the XPublisher socket and handle them.
class XSubscriber
class XSubscriber extends Socket {}
Same as Subscriber, except that you subscribe by sending subscription messages to the socket. Subscription message is a byte 1 (for subscriptions) or byte 0 (for unsubscriptions) followed by the subscription body. Messages without a sub/unsub prefix may also be sent, but have no effect on subscription status.
constructor
constructor(options?: { context?: Context; maxMessageSize?: number; ipv6?: boolean; affinity?: number; rate?: number; recoveryInterval?: number; linger?: number; reconnectInterval?: number; backlog?: number; reconnectMaxInterval?: number; tcpKeepalive?: number; tcpKeepaliveCount?: number; tcpKeepaliveIdle?: number; tcpKeepaliveInterval?: number; tcpAcceptFilter?: string; immediate?: boolean; plainServer?: boolean; plainUsername?: string; plainPassword?: string; curveServer?: boolean; curvePublicKey?: string; curveSecretKey?: string; curveServerKey?: string; gssapiServer?: boolean; gssapiPrincipal?: string; gssapiServicePrincipal?: string; gssapiPlainText?: boolean; gssapiPrincipalNameType?: 'hostBased' | 'userName' | 'krb5Principal'; gssapiServicePrincipalNameType?: 'hostBased' | 'userName' | 'krb5Principal'; zapDomain?: string; typeOfService?: number; handshakeInterval?: number; socksProxy?: string; heartbeatInterval?: number; heartbeatTimeToLive?: number; heartbeatTimeout?: number; connectTimeout?: number; tcpMaxRetransmitTimeout?: number; multicastMaxTransportDataUnit?: number; vmciBufferSize?: number; vmciBufferMinSize?: number; vmciBufferMaxSize?: number; vmciConnectTimeout?: number; interface?: string; zapEnforceDomain?: boolean; loopbackFastPath?: boolean; multicastHops?: number; sendBufferSize?: number; sendHighWaterMark?: number; sendTimeout?: number; receiveBufferSize?: number; receiveHighWaterMark?: number; receiveTimeout?: number;});
Interfaces
interface Context
interface Context {}
property blocky
blocky: boolean;
ZMQ_BLOCKY
By default the context will block forever when closed at process exit. The assumption behind this behavior is that abrupt termination will cause message loss. Most real applications use some form of handshaking to ensure applications receive termination messages, and then terminate the context with Socket.linger set to zero on all sockets. This setting is an easier way to get the same result. When blocky is set to
false
, all new sockets are given a linger timeout of zero. You must still close all sockets before exiting.
property ioThreads
ioThreads: number;
ZMQ_IO_THREADS
Size of the ØMQ thread pool to handle I/O operations. If your application is using only the
inproc
transport for messaging you may set this to zero, otherwise set it to at least one (default).
property ipv6
ipv6: boolean;
ZMQ_IPV6
Enable or disable IPv6. When IPv6 is enabled, a socket will connect to, or accept connections from, both IPv4 and IPv6 hosts.
property maxMessageSize
maxMessageSize: number;
ZMQ_MAX_MSGSZ
Maximum allowed size of a message sent in the context.
property maxSockets
maxSockets: number;
ZMQ_MAX_SOCKETS
Maximum number of sockets allowed on the context.
property maxSocketsLimit
readonly maxSocketsLimit: number;
ZMQ_SOCKET_LIMIT
Largest number of sockets that can be set with maxSockets.
Modifiers
@readonly
property threadPriority
threadPriority: number;
ZMQ_THREAD_PRIORITY
Scheduling priority for internal context's thread pool. This option is not available on Windows. Supported values for this option depend on chosen scheduling policy. Details can be found at http://man7.org/linux/man-pages/man2/sched_setscheduler.2.html. This option only applies before creating any sockets on the context.
property threadSchedulingPolicy
threadSchedulingPolicy: number;
ZMQ_THREAD_SCHED_POLICY
Scheduling policy for internal context's thread pool. This option is not available on Windows. Supported values for this option can be found at http://man7.org/linux/man-pages/man2/sched_setscheduler.2.html. This option only applies before creating any sockets on the context.
interface Dealer
interface Dealer extends Readable, Writable {}
interface EventSubscriber
interface EventSubscriber {}
method off
off: < E extends | 'accept' | 'accept:error' | 'bind' | 'bind:error' | 'connect' | 'connect:delay' | 'connect:retry' | 'close' | 'close:error' | 'disconnect' | 'end' | 'handshake' | 'handshake:error:protocol' | 'handshake:error:auth' | 'handshake:error:other' | 'unknown'>( type: E, listener: (data: EventOfType<E>) => void) => EventSubscriber;
Removes the specified listener function from the list of functions to call when the given event is observed.
Parameter type
The type of event that the listener was listening for.
Parameter listener
The previously registered listener function.
method on
on: < E extends | 'accept' | 'accept:error' | 'bind' | 'bind:error' | 'connect' | 'connect:delay' | 'connect:retry' | 'close' | 'close:error' | 'disconnect' | 'end' | 'handshake' | 'handshake:error:protocol' | 'handshake:error:auth' | 'handshake:error:other' | 'unknown'>( type: E, listener: (data: EventOfType<E>) => void) => EventSubscriber;
Adds a listener function which will be invoked when the given event type is observed. Calling this method will convert the Observer to **event emitter mode**, which will make it impossible to call Observer.receive() at the same time.
socket.events.on("bind", event => {console.log(`Socket bound to ${event.address}`)// ...})Parameter type
The type of event to listen for.
Parameter listener
The listener function that will be called with all event data when the event is observed.
interface Observer
interface Observer extends EventSubscriber {}
method [Symbol.asyncIterator]
[Symbol.asyncIterator]: () => AsyncIterator<ReceiveType<this>, undefined>;
Asynchronously iterate over socket events. When the socket is closed or when the observer is closed manually with Observer.close(), the iterator will return.
for await (event of socket.events) {switch (event.type) {case "bind":console.log(`Socket bound to ${event.address}`)break// ...}}
interface Pair
interface Pair extends Writable, Readable {}
interface Publisher
interface Publisher extends Writable {}
interface Pull
interface Pull extends Readable {}
property conflate
conflate: boolean;
ZMQ_CONFLATE
If set to
true
, a socket shall keep only one message in its inbound/outbound queue: the last message to be received/sent. Ignores any high water mark options. Does not support multi-part messages - in particular, only one part of it is kept in the socket internal queue.
interface Push
interface Push extends Writable {}
property conflate
conflate: boolean;
ZMQ_CONFLATE
If set to
true
, a socket shall keep only one message in its inbound/outbound queue: the last message to be received/sent. Ignores any high water mark options. Does not support multi-part messages - in particular, only one part of it is kept in the socket internal queue.
interface Readable
interface Readable<M extends object[] = Message[]> {}
Describes sockets that can receive messages.
property receiveBufferSize
receiveBufferSize: number;
ZMQ_RCVBUF
Underlying kernel receive buffer size in bytes. A value of -1 means leave the OS default unchanged.
property receiveHighWaterMark
receiveHighWaterMark: number;
ZMQ_RCVHWM
The high water mark is a hard limit on the maximum number of incoming messages ØMQ shall queue in memory for any single peer that the specified socket is communicating with. A value of zero means no limit.
If this limit has been reached the socket shall enter an exceptional state and depending on the socket type, ØMQ shall take appropriate action such as blocking or dropping sent messages.
property receiveTimeout
receiveTimeout: number;
ZMQ_RCVTIMEO
Sets the timeout receiving messages on the socket. If the value is 0, receive() will return a rejected promise immediately if there is no message to receive. If the value is -1, it will wait asynchronously until a message is available. For all other values, it will wait for a message for that amount of time before rejecting.
method [Symbol.asyncIterator]
[Symbol.asyncIterator]: () => AsyncIterator<ReceiveType<this>, undefined>;
Asynchronously iterate over messages becoming available on the socket. When the socket is closed with Socket.close(), the iterator will return. Returning early from the iterator will **not** close the socket unless it also goes out of scope.
for await (const [msg] of socket) {// handle messages}
method receive
receive: () => Promise<M>;
Waits for the next single or multipart message to become availeble on the socket. Reads a message immediately if possible. If no messages can be read, it will wait asynchonously. The promise will be resolved with an array containing the parts of the next message when available.
const [msg] = await socket.receive()const [part1, part2] = await socket.receive()Reading may fail (eventually) if the socket has been configured with a receiveTimeout.
A call to receive() is guaranteed to return with a resolved promise immediately if a message could be read from the socket directly.
Only **one** asynchronously blocking call to receive() can be in progress simultaneously. If you call receive() again on the same socket it will return a rejected promise with an
EBUSY
error. For example, if no messages can be read and noawait
is used:socket.receive() // -> pending promise until read is possiblesocket.receive() // -> promise rejection with `EBUSY` error**Note:** Due to the nature of Node.js and to avoid blocking the main thread, this method always attempts to read messages with the
ZMQ_DONTWAIT
flag. It polls asynchronously if reading is not currently possible. This means that all functionality related to timeouts and blocking behaviour is reimplemented in the Node.js bindings. Any differences in behaviour with the native ZMQ library is considered a bug.Returns
Resolved with message parts that were successfully read.
interface Reply
interface Reply extends Readable, Writable {}
interface Request
interface Request extends Readable, Writable {}
interface Router
interface Router extends Readable, Writable {}
interface RouterConnectOptions
interface RouterConnectOptions {}
property routingId
routingId?: string;
interface Socket
interface Socket {}
Socket option names differ somewhat from the native libzmq option names. This is intentional to improve readability and be more idiomatic for JavaScript/TypeScript.
property affinity
affinity: number;
ZMQ_AFFINITY
I/O thread affinity, which determines which threads from the ØMQ I/O thread pool associated with the socket's context shall handle newly created connections.
**Note:** This value is a bit mask, but values higher than
Number.MAX_SAFE_INTEGER
may not be represented accurately! This currently means that configurations beyond 52 threads are unreliable.
property backlog
backlog: number;
ZMQ_BACKLOG
Maximum length of the queue of outstanding peer connections for the specified socket. This only applies to connection-oriented transports.
property connectTimeout
connectTimeout: number;
ZMQ_CONNECT_TIMEOUT
Sets how long to wait before timing-out a connect() system call. The connect() system call normally takes a long time before it returns a time out error. Setting this option allows the library to time out the call at an earlier interval.
property curvePublicKey
curvePublicKey: string | null;
ZMQ_CURVE_PUBLICKEY
Sets the socket's long term public key. You must set this on CURVE client sockets. A server socket does not need to know its own public key. You can create a new keypair with curveKeyPair().
property curveSecretKey
curveSecretKey: string | null;
ZMQ_CURVE_SECRETKEY
Sets the socket's long term secret key. You must set this on both CURVE client and server sockets. You can create a new keypair with curveKeyPair().
property curveServer
curveServer: boolean;
ZMQ_CURVE_SERVER
Defines whether the socket will act as server for CURVE security. A value of
true
means the socket will act as CURVE server. A value offalse
means the socket will not act as CURVE server, and its security role then depends on other option settings.
property curveServerKey
curveServerKey: string | null;
ZMQ_CURVE_SERVERKEY
Sets the socket's long term server key. This is the public key of the CURVE *server* socket. You must set this on CURVE *client* sockets. This key must have been generated together with the server's secret key. You can create a new keypair with curveKeyPair().
property gssapiPlainText
gssapiPlainText: boolean;
property gssapiPrincipal
gssapiPrincipal: string | null;
property gssapiPrincipalNameType
gssapiPrincipalNameType: 'hostBased' | 'userName' | 'krb5Principal';
property gssapiServer
gssapiServer: boolean;
property gssapiServicePrincipal
gssapiServicePrincipal: string | null;
property gssapiServicePrincipalNameType
gssapiServicePrincipalNameType: 'hostBased' | 'userName' | 'krb5Principal';
property handshakeInterval
handshakeInterval: number;
ZMQ_HANDSHAKE_IVL
Handshaking is the exchange of socket configuration information (socket type, identity, security) that occurs when a connection is first opened (only for connection-oriented transports). If handshaking does not complete within the configured time, the connection shall be closed. The value 0 means no handshake time limit.
property heartbeatInterval
heartbeatInterval: number;
ZMQ_HEARTBEAT_IVL
Interval in milliseconds between sending ZMTP heartbeats for the specified socket. If this option is greater than 0, then a PING ZMTP command will be sent after every interval.
property heartbeatTimeout
heartbeatTimeout: number;
ZMQ_HEARTBEAT_TIMEOUT
How long (in milliseconds) to wait before timing-out a connection after sending a PING ZMTP command and not receiving any traffic. This option is only valid if heartbeatInterval is greater than 0. The connection will time out if there is no traffic received after sending the PING command. The received traffic does not have to be a PONG command - any received traffic will cancel the timeout.
property heartbeatTimeToLive
heartbeatTimeToLive: number;
ZMQ_HEARTBEAT_TTL
The timeout in milliseconds on the remote peer for ZMTP heartbeats. If this option is greater than 0, the remote side shall time out the connection if it does not receive any more traffic within the TTL period. This option does not have any effect if heartbeatInterval is 0. Internally, this value is rounded down to the nearest decisecond, any value less than 100 will have no effect.
property immediate
immediate: boolean;
ZMQ_IMMEDIATE
By default queues will fill on outgoing connections even if the connection has not completed. This can lead to "lost" messages on sockets with round-robin routing (Request, Push, Dealer). If this option is set to
true
, messages shall be queued only to completed connections. This will cause the socket to block if there are no other connections, but will prevent queues from filling on pipes awaiting connection.
property interface
interface: string | null;
ZMQ_BINDTODEVICE
Binds the socket to the given network interface (Linux only). Allows to use Linux VRF, see: https://www.kernel.org/doc/Documentation/networking/vrf.txt. Requires the program to be ran as root **or** with
CAP_NET_RAW
.
property ipv6
ipv6: boolean;
ZMQ_IPV6
Enable or disable IPv6. When IPv6 is enabled, the socket will connect to, or accept connections from, both IPv4 and IPv6 hosts.
property lastEndpoint
readonly lastEndpoint: string | null;
ZMQ_LAST_ENDPOINT
The last endpoint bound for TCP and IPC transports.
Modifiers
@readonly
property linger
linger: number;
ZMQ_LINGER
Determines how long pending messages which have yet to be sent to a peer shall linger in memory after a socket is closed with close().
property loopbackFastPath
loopbackFastPath: boolean;
ZMQ_LOOPBACK_FASTPATH
Enable faster TCP connections on loopback devices. An application can enable this option to reduce the latency and improve the performance of loopback operations on a TCP socket on Windows.
property maxMessageSize
maxMessageSize: number;
ZMQ_MAXMSGSIZE
Limits the size of the inbound message. If a peer sends a message larger than the limit it is disconnected. Value of -1 means no limit.
property multicastMaxTransportDataUnit
multicastMaxTransportDataUnit: number;
ZMQ_MULTICAST_MAXTPDU
Sets the maximum transport data unit size used for outbound multicast packets. This must be set at or below the minimum Maximum Transmission Unit (MTU) for all network paths over which multicast reception is required.
property plainPassword
plainPassword: string | null;
ZMQ_PLAIN_PASSWORD
Sets the password for outgoing connections over TCP or IPC. If you set this to a non-null value, the security mechanism used for connections shall be PLAIN.
property plainServer
plainServer: boolean;
ZMQ_PLAIN_SERVER
Defines whether the socket will act as server for PLAIN security. A value of
true
means the socket will act as PLAIN server. A value offalse
means the socket will not act as PLAIN server, and its security role then depends on other option settings.
property plainUsername
plainUsername: string | null;
ZMQ_PLAIN_USERNAME
Sets the username for outgoing connections over TCP or IPC. If you set this to a non-null value, the security mechanism used for connections shall be PLAIN.
property rate
rate: number;
ZMQ_RATE
Maximum send or receive data rate for multicast transports such as
pgm
.
property reconnectInterval
reconnectInterval: number;
ZMQ_RECONNECT_IVL
Period ØMQ shall wait between attempts to reconnect disconnected peers when using connection-oriented transports. The value -1 means no reconnection.
property reconnectMaxInterval
reconnectMaxInterval: number;
ZMQ_RECONNECT_IVL_MAX
Maximum period ØMQ shall wait between attempts to reconnect. On each reconnect attempt, the previous interval shall be doubled until reconnectMaxInterval is reached. This allows for exponential backoff strategy. Zero (the default) means no exponential backoff is performed and reconnect interval calculations are only based on reconnectInterval.
property recoveryInterval
recoveryInterval: number;
ZMQ_RECOVERY_IVL
Maximum time in milliseconds that a receiver can be absent from a multicast group before unrecoverable data loss will occur.
property securityMechanism
readonly securityMechanism: null | 'plain' | 'curve' | 'gssapi';
ZMQ_MECHANISM
Returns the current security mechanism for the socket, if any. The security mechanism is set implictly by using any of the relevant security options. The returned value is one of: *
null
- No security mechanism is used. *"plain"
- The PLAIN mechanism defines a simple username/password mechanism that lets a server authenticate a client. PLAIN makes no attempt at security or confidentiality. *"curve"
- The CURVE mechanism defines a mechanism for secure authentication and confidentiality for communications between a client and a server. CURVE is intended for use on public networks. *"gssapi"
- The GSSAPI mechanism defines a mechanism for secure authentication and confidentiality for communications between a client and a server using the Generic Security Service Application Program Interface (GSSAPI). The GSSAPI mechanism can be used on both public and private networks.Modifiers
@readonly
property socksProxy
socksProxy: string | null;
ZMQ_SOCKS_PROXY
The SOCKS5 proxy address that shall be used by the socket for the TCP connection(s). Does not support SOCKS5 authentication. If the endpoints are domain names instead of addresses they shall not be resolved and they shall be forwarded unchanged to the SOCKS proxy service in the client connection request message (address type 0x03 domain name).
property tcpAcceptFilter
tcpAcceptFilter: string | null;
ZMQ_TCP_ACCEPT_FILTER
Assign a filter that will be applied for each new TCP transport connection on a listening socket. If no filters are applied, then the TCP transport allows connections from any IP address. If at least one filter is applied then new connection source IP should be matched. To clear all filters set to
null
. Filter is a string with IPv6 or IPv4 CIDR.
property tcpKeepalive
tcpKeepalive: number;
ZMQ_TCP_KEEPALIVE
Override SO_KEEPALIVE socket option (if supported by OS). The default value of -1 leaves it to the OS default.
property tcpKeepaliveCount
tcpKeepaliveCount: number;
ZMQ_TCP_KEEPALIVE_CNT
Overrides TCP_KEEPCNT socket option (if supported by OS). The default value of -1 leaves it to the OS default.
property tcpKeepaliveIdle
tcpKeepaliveIdle: number;
ZMQ_TCP_KEEPALIVE_IDLE
Overrides TCP_KEEPIDLE / TCP_KEEPALIVE socket option (if supported by OS). The default value of -1 leaves it to the OS default.
property tcpKeepaliveInterval
tcpKeepaliveInterval: number;
ZMQ_TCP_KEEPALIVE_INTVL
Overrides TCP_KEEPINTVL socket option (if supported by the OS). The default value of -1 leaves it to the OS default.
property tcpMaxRetransmitTimeout
tcpMaxRetransmitTimeout: number;
ZMQ_TCP_MAXRT
Sets how long before an unacknowledged TCP retransmit times out (if supported by the OS). The system normally attempts many TCP retransmits following an exponential backoff strategy. This means that after a network outage, it may take a long time before the session can be re-established. Setting this option allows the timeout to happen at a shorter interval.
property threadSafe
readonly threadSafe: boolean;
ZMQ_THREAD_SAFE
Whether or not the socket is threadsafe. Currently only DRAFT sockets is thread-safe.
Modifiers
@readonly
property type
readonly type: SocketType;
ZMQ_TYPE
Retrieve the socket type. This is fairly useless because you can test the socket class with e.g.
socket instanceof Dealer
.Modifiers
@readonly
property typeOfService
typeOfService: number;
ZMQ_TOS
Sets the ToS fields (the *Differentiated Services* (DS) and *Explicit Congestion Notification* (ECN) field) of the IP header. The ToS field is typically used to specify a packet's priority. The availability of this option is dependent on intermediate network equipment that inspect the ToS field and provide a path for low-delay, high-throughput, highly-reliable service, etc.
property vmciBufferMaxSize
vmciBufferMaxSize: number;
ZMQ_VMCI_BUFFER_MAX_SIZE
Maximum size of the underlying buffer for the socket. Used during negotiation before the connection is established. For
vmci://
transports only.
property vmciBufferMinSize
vmciBufferMinSize: number;
ZMQ_VMCI_BUFFER_MIN_SIZE
Minimum size of the underlying buffer for the socket. Used during negotiation before the connection is established. For
vmci://
transports only.
property vmciBufferSize
vmciBufferSize: number;
ZMQ_VMCI_BUFFER_SIZE
The size of the underlying buffer for the socket. Used during negotiation before the connection is established. For
vmci://
transports only.
property vmciConnectTimeout
vmciConnectTimeout: number;
ZMQ_VMCI_CONNECT_TIMEOUT
Connection timeout for the socket. For
vmci://
transports only.
property zapDomain
zapDomain: string | null;
ZMQ_ZAP_DOMAIN
Sets the domain for ZAP (ZMQ RFC 27) authentication. For NULL security (the default on all
tcp://
connections), ZAP authentication only happens if you set a non-empty domain. For PLAIN and CURVE security, ZAP requests are always made, if there is a ZAP handler present. See http://rfc.zeromq.org/spec:27 for more details.
property zapEnforceDomain
zapEnforceDomain: boolean;
ZMQ_ZAP_ENFORCE_DOMAIN
The ZAP (ZMQ RFC 27) authentication protocol specifies that a domain must always be set. Older versions of libzmq did not follow the spec and allowed an empty domain to be set. This option can be used to enabled or disable the stricter, backward incompatible behaviour. For now it is disabled by default, but in a future version it will be enabled by default.
interface Stream
interface Stream extends Readable<[Message, Message]>, Writable<[MessageLike, MessageLike]> {}
interface StreamConnectOptions
interface StreamConnectOptions {}
property routingId
routingId?: string;
interface Subscriber
interface Subscriber extends Readable {}
interface Writable
interface Writable< M extends MessageLike | MessageLike[] = MessageLike | MessageLike[], O extends [...object[]] = []> {}
Describes sockets that can send messages.
property multicastHops
multicastHops: number;
ZMQ_MULTICAST_HOPS
Sets the time-to-live field in every multicast packet sent from this socket. The default is 1 which means that the multicast packets don't leave the local network.
property sendBufferSize
sendBufferSize: number;
ZMQ_SNDBUF
Underlying kernel transmit buffer size in bytes. A value of -1 means leave the OS default unchanged.
property sendHighWaterMark
sendHighWaterMark: number;
ZMQ_SNDHWM
The high water mark is a hard limit on the maximum number of outgoing messages ØMQ shall queue in memory for any single peer that the specified socket is communicating with. A value of zero means no limit.
If this limit has been reached the socket shall enter an exceptional state and depending on the socket type, ØMQ shall take appropriate action such as blocking or dropping sent messages.
property sendTimeout
sendTimeout: number;
ZMQ_SNDTIMEO
Sets the timeout for sending messages on the socket. If the value is 0, send() will return a rejected promise immediately if the message cannot be sent. If the value is -1, it will wait asynchronously until the message is sent. For all other values, it will try to send the message for that amount of time before rejecting.
method send
send: (message: M, ...options: O) => Promise<void>;
Sends a single message or a multipart message on the socket. Queues the message immediately if possible, and returns a resolved promise. If the message cannot be queued because the high water mark has been reached, it will wait asynchronously. The promise will be resolved when the message was queued successfully.
await socket.send("hello world")await socket.send(["hello", "world"])Queueing may fail eventually if the socket has been configured with a sendTimeout.
A call to send() is guaranteed to return with a resolved promise immediately if the message could be queued directly.
Only **one** asynchronously blocking call to send() may be executed simultaneously. If you call send() again on a socket that is in the mute state it will return a rejected promise with an
EBUSY
error.The reason for disallowing multiple send() calls simultaneously is that it could create an implicit queue of unsendable outgoing messages. This would circumvent the socket's sendHighWaterMark. Such an implementation could even exhaust all system memory and cause the Node.js process to abort.
For most application you should not notice this implementation detail. Only in rare occasions will a call to send() that does not resolve immediately be undesired. Here are some common scenarios:
* If you wish to **send a message**, use
await send(...)
. ZeroMQ socket types have been carefully designed to give you the correct blocking behaviour on the chosen socket type in almost all cases:* If sending is not possible, it is often better to wait than to continue as if nothing happened. For example, on a Request socket, you can only receive a reply once a message has been sent; so waiting until a message could be queued before continuing with the rest of the program (likely to read from the socket) is required.
* Certain socket types (such as Router) will always allow queueing messages and
await send(...)
won't delay any code that comes after. This makes sense for routers, since typically you don't want a single send operation to stop the handling of other incoming or outgoing messages.* If you wish to send on an occasionally **blocking** socket (for example on a Router with the Router.mandatory option set, or on a Dealer) and you're 100% certain that **dropping a message is better than blocking**, then you can set the sendTimeout option to
0
to effectively force send() to always resolve immediately. Be prepared to catch exceptions if sending a message is not immediately possible.* If you wish to send on a socket and **messages should be queued before they are dropped**, you should implement a [simple queue](examples/queue/queue.ts) in JavaScript. Such a queue is not provided by this library because most real world applications need to deal with undeliverable messages in more complex ways - for example, they might need to reply with a status message; or first retry delivery a certain number of times before giving up.
Parameter message
Single message or multipart message to queue for sending.
Parameter options
Any options, if applicable to the socket type (DRAFT only).
Returns
Resolved when the message was successfully queued.
interface XPublisher
interface XPublisher extends Readable, Writable {}
interface XSubscriber
interface XSubscriber extends Readable, Writable {}
Type Aliases
type Event
type Event = | EventFor<'accept', EventAddress> | EventFor<'accept:error', EventAddress & EventError> | EventFor<'bind', EventAddress> | EventFor<'bind:error', EventAddress & EventError> | EventFor<'connect', EventAddress> | EventFor<'connect:delay', EventAddress> | EventFor<'connect:retry', EventAddress & EventInterval> | EventFor<'close', EventAddress> | EventFor<'close:error', EventAddress & EventError> | EventFor<'disconnect', EventAddress> | EventFor<'end'> | EventFor<'handshake', EventAddress> | EventFor<'handshake:error:protocol', EventAddress & EventError<ProtoError>> | EventFor<'handshake:error:auth', EventAddress & EventError<AuthError>> | EventFor<'handshake:error:other', EventAddress & EventError> | EventFor<'unknown'>;
A union type that represents all possible even types and the associated data. Events always have a
type
property with an EventType value.The following socket events can be generated. This list may be different depending on the ZeroMQ version that is used.
Note that the **error** event is avoided by design, since this has a [special behaviour](https://nodejs.org/api/events.html#events_error_events) in Node.js causing an exception to be thrown if it is unhandled.
Other error names are adjusted to be as close to possible as other [networking related](https://nodejs.org/api/net.html) event names in Node.js and/or to the corresponding ZeroMQ.js method call. Events (including any errors) that correspond to a specific operation are namespaced with a colon
:
, e.g.bind:error
orconnect:retry
.* **accept** - ZMQ_EVENT_ACCEPTED The socket has accepted a connection from a remote peer.
* **accept:error** - ZMQ_EVENT_ACCEPT_FAILED The socket has rejected a connection from a remote peer.
The following additional details will be included with this event:
*
error
- An error object that describes the specific error that occurred.* **bind** - ZMQ_EVENT_LISTENING The socket was successfully bound to a network interface.
* **bind:error** - ZMQ_EVENT_BIND_FAILED The socket could not bind to a given interface.
The following additional details will be included with this event:
*
error
- An error object that describes the specific error that occurred.* **connect** - ZMQ_EVENT_CONNECTED The socket has successfully connected to a remote peer.
* **connect:delay** - ZMQ_EVENT_CONNECT_DELAYED A connect request on the socket is pending.
* **connect:retry** - ZMQ_EVENT_CONNECT_RETRIED A connection attempt is being handled by reconnect timer. Note that the reconnect interval is recalculated at each retry.
The following additional details will be included with this event:
*
interval
- The current reconnect interval.* **close** - ZMQ_EVENT_CLOSED The socket was closed.
* **close:error** - ZMQ_EVENT_CLOSE_FAILED The socket close failed. Note that this event occurs **only on IPC** transports..
The following additional details will be included with this event:
*
error
- An error object that describes the specific error that occurred.* **disconnect** - ZMQ_EVENT_DISCONNECTED The socket was disconnected unexpectedly.
* **handshake** - ZMQ_EVENT_HANDSHAKE_SUCCEEDED The ZMTP security mechanism handshake succeeded. NOTE: This event may still be in DRAFT statea and not yet available in stable releases.
* **handshake:error:protocol** - ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL The ZMTP security mechanism handshake failed due to some mechanism protocol error, either between the ZMTP mechanism peers, or between the mechanism server and the ZAP handler. This indicates a configuration or implementation error in either peer resp. the ZAP handler. NOTE: This event may still be in DRAFT state and not yet available in stable releases.
* **handshake:error:auth** - ZMQ_EVENT_HANDSHAKE_FAILED_AUTH The ZMTP security mechanism handshake failed due to an authentication failure. NOTE: This event may still be in DRAFT state and not yet available in stable releases.
* **handshake:error:other** - ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL Unspecified error during handshake. NOTE: This event may still be in DRAFT state and not yet available in stable releases.
* **end** - ZMQ_EVENT_MONITOR_STOPPED Monitoring on this socket ended.
* **unknown** An event was generated by ZeroMQ that the Node.js library could not interpret. Please submit a pull request for new event types if they are not yet included.
type EventOfType
type EventOfType<E extends EventType = EventType> = Expand< Extract<Event, Event & EventFor<E>>>;
Represents the event data object given one particular event type, for example
EventOfType<"accept">
.
type EventType
type EventType = Event['type'];
A union type of all available event types. See Event for an overview of the events that can be observed.
type Message
type Message = Buffer;
A type representing the messages that are returned inside promises by Readable.receive().
type MessageLike
type MessageLike = | ArrayBufferView | ArrayBuffer | SharedArrayBuffer | string | number | null;
Union type representing all message types that are accepted by Writable.send().
type SocketOptions
type SocketOptions<S extends Socket> = Options< S, { context: Context; }>;
Represents the options that can be assigned in the constructor of a given socket type, for example
new Dealer({...})
. Readonly options for the particular socket will be omitted.
Package Files (2)
Dependencies (2)
Dev Dependencies (36)
- @types/benchmark
- @types/chai
- @types/deasync
- @types/fs-extra
- @types/gh-pages
- @types/mocha
- @types/node
- @types/proper-lockfile
- @types/semver
- @types/shelljs
- @types/which
- benchmark
- chai
- cross-env
- deasync
- downlevel-dts
- electron
- electron-mocha
- eslint
- eslint-config-atomic
- eslint-plugin-prettier
- execa
- fs-extra
- gh-pages
- minify-all-cli
- mocha
- npm-run-all2
- prebuildify
- prettier
- proper-lockfile
- semver
- shx
- ts-node
- typedoc
- typescript
- which
Peer Dependencies (0)
No peer dependencies.
Badge
To add a badge like this oneto your package's README, use the codes available below.
You may also use Shields.io to create a custom badge linking to https://www.jsdocs.io/package/zeromq
.
- Markdown[![jsDocs.io](https://img.shields.io/badge/jsDocs.io-reference-blue)](https://www.jsdocs.io/package/zeromq)
- HTML<a href="https://www.jsdocs.io/package/zeromq"><img src="https://img.shields.io/badge/jsDocs.io-reference-blue" alt="jsDocs.io"></a>
- Updated .
Package analyzed in 7285 ms. - Missing or incorrect documentation? Open an issue for this package.