policy/frameworks/cluster/backend/zeromq/main.zeek

Cluster::Backend::ZeroMQ

ZeroMQ cluster backend support.

For publish-subscribe functionality, one node in the Zeek cluster spawns a thread running a central broker listening on a XPUB and XSUB socket. These sockets are connected via zmq_proxy(). All other nodes connect to this central broker with their own XSUB and XPUB sockets, establishing a global many-to-many publish-subscribe system where each node sees subscriptions and messages from all other nodes in a Zeek cluster. ZeroMQ’s publish-subscribe pattern documentation may be a good starting point. Elsewhere in ZeroMQ’s documentation, the central broker is also called forwarder.

For remote logging functionality, the ZeroMQ pipeline pattern is used. All logger nodes listen on a PULL socket. Other nodes connect via PUSH sockets to all of the loggers. Concretely, remote logging functionality is not publish-subscribe, but instead leverages ZeroMQ’s built-in load-balancing functionality provided by PUSH and PULL sockets.

The ZeroMQ cluster backend technically allows to run a non-Zeek central broker (it only needs to offer XPUB and XSUB sockets). Further, it is possible to run non-Zeek logger nodes. All a logger node needs to do is open a ZeroMQ PULL socket and interpret the format used by Zeek nodes to send their log writes.

Namespace:

Cluster::Backend::ZeroMQ

Summary

Redefinable Options

Cluster::Backend::ZeroMQ::connect_log_endpoints: vector &redef

Vector of ZeroMQ endpoints to connect to for logging.

Cluster::Backend::ZeroMQ::connect_xpub_endpoint: string &redef

The central broker’s XPUB endpoint to connect to.

Cluster::Backend::ZeroMQ::connect_xsub_endpoint: string &redef

The central broker’s XSUB endpoint to connect to.

Cluster::Backend::ZeroMQ::debug_flags: count &redef

Bitmask to enable low-level stderr based debug printing.

Cluster::Backend::ZeroMQ::hello_expiration: interval &redef

Expiration for hello state.

Cluster::Backend::ZeroMQ::linger_ms: int &redef

Configure the ZeroMQ’s sockets linger value.

Cluster::Backend::ZeroMQ::listen_log_endpoint: string &redef

PULL socket address to listen on for log messages.

Cluster::Backend::ZeroMQ::listen_xpub_endpoint: string &redef

XPUB listen endpoint for the central broker.

Cluster::Backend::ZeroMQ::listen_xpub_nodrop: bool &redef

Do not silently drop messages if high-water-mark is reached.

Cluster::Backend::ZeroMQ::listen_xsub_endpoint: string &redef

XSUB listen endpoint for the central broker.

Cluster::Backend::ZeroMQ::log_immediate: bool &redef

Configure ZeroMQ’s immedidate setting on PUSH sockets

Cluster::Backend::ZeroMQ::log_rcvbuf: int &redef

Kernel receive buffer size for log sockets.

Cluster::Backend::ZeroMQ::log_rcvhwm: int &redef

Receive high water mark value for the log PULL sockets.

Cluster::Backend::ZeroMQ::log_sndbuf: int &redef

Kernel transmit buffer size for log sockets.

Cluster::Backend::ZeroMQ::log_sndhwm: int &redef

Send high water mark value for the log PUSH sockets.

Cluster::Backend::ZeroMQ::poll_max_messages: count &redef

Messages to receive before yielding.

Cluster::Backend::ZeroMQ::run_proxy_thread: bool &redef

Toggle for running a central ZeroMQ XPUB-XSUB broker on this node.

Cluster::Backend::ZeroMQ::xpub_nodrop: bool &redef

Do not silently drop messages if high-water-mark is reached.

State Variables

Cluster::Backend::ZeroMQ::node_topic_prefix: string &redef

The node topic prefix to use.

Cluster::Backend::ZeroMQ::nodeid_topic_prefix: string &redef

The node_id topic prefix to use.

Redefinitions

Cluster::Backend::ZeroMQ::run_proxy_thread: bool &redef

Cluster::backend: Cluster::BackendTag &redef

Cluster::logger_pool_spec: Cluster::PoolSpec &redef

Cluster::logger_topic: string &redef

Cluster::manager_topic: string &redef

Cluster::node_id: function &redef

Cluster::node_topic: function &redef

Cluster::nodeid_topic: function &redef

Cluster::proxy_pool_spec: Cluster::PoolSpec &redef

Cluster::proxy_topic: string &redef

Cluster::worker_pool_spec: Cluster::PoolSpec &redef

Cluster::worker_topic: string &redef

Events

Cluster::Backend::ZeroMQ::hello: event

Low-level event send to a node in response to their subscription.

Cluster::Backend::ZeroMQ::subscription: event

Low-level event when a subscription is added.

Cluster::Backend::ZeroMQ::unsubscription: event

Low-level event when a subscription vanishes.

Detailed Interface

Redefinable Options

Cluster::Backend::ZeroMQ::connect_log_endpoints
Type:

vector of string

Attributes:

&redef

Default:
[]

Vector of ZeroMQ endpoints to connect to for logging.

A node’s PUSH socket used for logging connects to each of the ZeroMQ endpoints listed in this vector.

Cluster::Backend::ZeroMQ::connect_xpub_endpoint
Type:

string

Attributes:

&redef

Default:

"tcp://127.0.0.1:5556"

The central broker’s XPUB endpoint to connect to.

A node connects with its XSUB socket to the XPUB socket of the central broker.

Cluster::Backend::ZeroMQ::connect_xsub_endpoint
Type:

string

Attributes:

&redef

Default:

"tcp://127.0.0.1:5555"

The central broker’s XSUB endpoint to connect to.

A node connects with its XPUB socket to the XSUB socket of the central broker.

Cluster::Backend::ZeroMQ::debug_flags
Type:

count

Attributes:

&redef

Default:

0

Bitmask to enable low-level stderr based debug printing.

poll debugging: 1 (produce verbose zmq::poll() output)

Or values from the above list together and set debug_flags to the result. E.g. use 7 to select 4, 2 and 1. Only use this in development if something seems off. The thread used internally will produce output on stderr.

Cluster::Backend::ZeroMQ::hello_expiration
Type:

interval

Attributes:

&redef

Default:

10.0 secs

Expiration for hello state.

How long to wait before expiring information about subscriptions and hello messages from other nodes. These expirations trigger reporter warnings.

Cluster::Backend::ZeroMQ::linger_ms
Type:

int

Attributes:

&redef

Default:

500

Configure the ZeroMQ’s sockets linger value.

The default used by libzmq is 30 seconds (30 000) which is very long when loggers vanish before workers during a shutdown, so we reduce this to 500 milliseconds by default.

A value of -1 configures blocking forever, while 0 would immediately discard any pending messages.

See ZeroMQ’s ZMQ_LINGER documentation for more details.

Cluster::Backend::ZeroMQ::listen_log_endpoint
Type:

string

Attributes:

&redef

Default:

""

PULL socket address to listen on for log messages.

If empty, don’t listen for log messages, otherwise a ZeroMQ address to bind to. E.g., tcp://127.0.0.1:5555.

Cluster::Backend::ZeroMQ::listen_xpub_endpoint
Type:

string

Attributes:

&redef

Default:

"tcp://127.0.0.1:5555"

XPUB listen endpoint for the central broker.

This setting is used for the XPUB socket of the central broker started when Cluster::Backend::ZeroMQ::run_proxy_thread is T.

Cluster::Backend::ZeroMQ::listen_xpub_nodrop
Type:

bool

Attributes:

&redef

Default:

T

Do not silently drop messages if high-water-mark is reached.

Whether to configure ZMQ_XPUB_NODROP on the XPUB socket to detect when sending a message fails due to reaching the high-water-mark.

This setting applies to the XPUB/XSUB broker started when Cluster::Backend::ZeroMQ::run_proxy_thread is T.

See ZeroMQ’s ZMQ_XPUB_NODROP documentation for more details.

Cluster::Backend::ZeroMQ::listen_xsub_endpoint
Type:

string

Attributes:

&redef

Default:

"tcp://127.0.0.1:5556"

XSUB listen endpoint for the central broker.

This setting is used for the XSUB socket of the central broker started when Cluster::Backend::ZeroMQ::run_proxy_thread is T.

Cluster::Backend::ZeroMQ::log_immediate
Type:

bool

Attributes:

&redef

Default:

F

Configure ZeroMQ’s immedidate setting on PUSH sockets

Setting this to T will queue log writes only to completed connections. By default, log writes are queued to all potential endpoints listed in Cluster::Backend::ZeroMQ::connect_log_endpoints.

See ZeroMQ’s ZMQ_IMMEDIATE documentation for more details.

Cluster::Backend::ZeroMQ::log_rcvbuf
Type:

int

Attributes:

&redef

Default:

-1

Kernel receive buffer size for log sockets.

Using -1 will use the kernel’s default.

See ZeroMQ’s ZMQ_RCVBUF documentation for more details.

Cluster::Backend::ZeroMQ::log_rcvhwm
Type:

int

Attributes:

&redef

Default:

1000

Receive high water mark value for the log PULL sockets.

If reached, Zeek workers will block or drop messages.

See ZeroMQ’s ZMQ_RCVHWM documentation for more details.

TODO: Make action configurable (block vs drop)

Cluster::Backend::ZeroMQ::log_sndbuf
Type:

int

Attributes:

&redef

Default:

-1

Kernel transmit buffer size for log sockets.

Using -1 will use the kernel’s default.

See ZeroMQ’s ZMQ_SNDBUF documentation.

Cluster::Backend::ZeroMQ::log_sndhwm
Type:

int

Attributes:

&redef

Default:

1000

Send high water mark value for the log PUSH sockets.

If reached, Zeek nodes will block or drop messages.

See ZeroMQ’s ZMQ_SNDHWM documentation for more details.

TODO: Make action configurable (block vs drop)

Cluster::Backend::ZeroMQ::poll_max_messages
Type:

count

Attributes:

&redef

Default:

100

Messages to receive before yielding.

Yield from the receive loop when this many messages have been received from one of the used sockets.

Cluster::Backend::ZeroMQ::run_proxy_thread
Type:

bool

Attributes:

&redef

Default:

F

Redefinition:

from policy/frameworks/cluster/backend/zeromq/main.zeek

=:

Cluster::local_node_type() == Cluster::MANAGER

Toggle for running a central ZeroMQ XPUB-XSUB broker on this node.

If set to T, Cluster::Backend::ZeroMQ::spawn_zmq_proxy_thread is called during zeek_init. The node will listen on Cluster::Backend::ZeroMQ::listen_xsub_endpoint and Cluster::Backend::ZeroMQ::listen_xpub_endpoint and forward subscriptions and messages between nodes.

By default, this is set to T on the manager and F elsewhere.

Cluster::Backend::ZeroMQ::xpub_nodrop
Type:

bool

Attributes:

&redef

Default:

T

Do not silently drop messages if high-water-mark is reached.

Whether to configure ZMQ_XPUB_NODROP on the XPUB socket to detect when sending a message fails due to reaching the high-water-mark.

See ZeroMQ’s ZMQ_XPUB_NODROP documentation for more details.

State Variables

Cluster::Backend::ZeroMQ::node_topic_prefix
Type:

string

Attributes:

&redef

Default:

"zeek.cluster.node"

The node topic prefix to use.

Cluster::Backend::ZeroMQ::nodeid_topic_prefix
Type:

string

Attributes:

&redef

Default:

"zeek.cluster.nodeid"

The node_id topic prefix to use.

Events

Cluster::Backend::ZeroMQ::hello
Type:

event (name: string, id: string)

Low-level event send to a node in response to their subscription.

Parameters:
Cluster::Backend::ZeroMQ::subscription
Type:

event (topic: string)

Low-level event when a subscription is added.

Every node observes all subscriptions from other nodes in a cluster through its XPUB socket. Whenever a new subscription topic is added, this event is raised with the topic.

Parameters:

topic – The topic.

Cluster::Backend::ZeroMQ::unsubscription
Type:

event (topic: string)

Low-level event when a subscription vanishes.

Every node observes all subscriptions from other nodes in a cluster through its XPUB socket. Whenever a subscription is removed from the local XPUB socket, this event is raised with the topic set to the removed subscription.

Parameters:

topic – The topic.