policy/frameworks/cluster/backend/zeromq/main.zeek

Cluster::Backend::ZeroMQ

ZeroMQ cluster backend support.

Overview

For publish-subscribe functionality, one node in the Zeek cluster spawns a thread running a central broker listening on a XPUB and XSUB socket. These sockets are connected via zmq_proxy(). All other nodes connect to this central broker with their own XSUB and XPUB sockets, establishing a global many-to-many publish-subscribe system where each node sees subscriptions and messages from all other nodes in a Zeek cluster. ZeroMQ’s publish-subscribe pattern documentation may be a good starting point. Elsewhere in ZeroMQ’s documentation, the central broker is also called forwarder.

For remote logging functionality, the ZeroMQ pipeline pattern is used. All logger nodes listen on a PULL socket. Other nodes connect via PUSH sockets to all of the loggers. Concretely, remote logging functionality is not publish-subscribe, but instead leverages ZeroMQ’s built-in load-balancing functionality provided by PUSH and PULL sockets.

The ZeroMQ cluster backend technically allows to run a non-Zeek central broker (it only needs to offer XPUB and XSUB sockets). Further, it is possible to run non-Zeek logger nodes. All a logger node needs to do is open a ZeroMQ PULL socket and interpret the format used by Zeek nodes to send their log writes.

Overload Behavior

The ZeroMQ cluster backend by default drops outgoing and incoming events when the Zeek cluster is overloaded. Dropping of outgoing events is governed by the Cluster::Backend::ZeroMQ::xpub_sndhwm setting. This is the High Water Mark (HWM) for the local XPUB socket’s queue. Once reached, any outgoing events are dropped until there’s room in the socket’s queue again. The metric zeek_cluster_zeromq_xpub_drops_total is incremented for every dropped event.

For incoming events, the Cluster::Backend::ZeroMQ::onloop_queue_hwm setting is used. Remote events received via the local XSUB socket are first enqueued as raw event messages for processing on Zeek’s main event loop. When this queue is full due to more remote events incoming than Zeek can possibly process in an event loop iteration, incoming events are dropped and the zeek_cluster_zeromq_onloop_drops_total metric is incremented.

Incoming log batches or subscription and unsubscription events are passed through the onloop queue, but the HWM does currently not apply to them. The assumption is that 1) these are not frequent and 2) more important than arbitrary publish-subscribe events.

To avoid dropping any events (e.g. for performance testing or offline PCAP processing), the recommended strategy is to set both Cluster::Backend::ZeroMQ::xpub_sndhwm and Cluster::Backend::ZeroMQ::onloop_queue_hwm to 0, disabling the HWM and dropping logic. It is up to the user to monitor CPU and memory usage of individual nodes to avoid overloading and running into out-of-memory situations.

As a Zeek operator, you should monitor zeek_cluster_zeromq_xpub_drops_total and zeek_cluster_zeromq_onloop_drops_total. Any non-zero values for these metrics indicate an overloaded Zeek cluster. See the the cluster telemetry options Cluster::Telemetry::core_metrics and Cluster::Telemetry::websocket_metrics for ways to get a better understanding about the events published and received.

Encryption using the CURVE mechanism

When a Zeek cluster spans multiple systems on a shared and untrusted network, it’s strongly recommended to encrypt the network traffic between individual Zeek systems to avoid eavesdropping. ZeroMQ features built-in support for elliptic public-key encryption, offering confidentiality and authentication.

To enable it, generate keypairs for the server and client roles using Cluster::Backend::ZeroMQ::generate_keypair and set the following script-level variables:

Alternatively, set the environment variables:

  • ZEEK_CLUSTER_BACKEND_ZEROMQ_CURVE_CLIENT_PUBLICKEY

  • ZEEK_CLUSTER_BACKEND_ZEROMQ_CURVE_CLIENT_SECRETKEY

  • ZEEK_CLUSTER_BACKEND_ZEROMQ_CURVE_SERVER_PUBLICKEY

  • ZEEK_CLUSTER_BACKEND_ZEROMQ_CURVE_SERVER_SECRETKEY

To avoid confusion, either script-level or environment variable configuration should be used. Mixing the approaches will result in a fatal error at startup.

The central XPUB/XSUB sockets created by the proxy thread act as CURVE server. If you’re hosting the XPUB/XSUB sockets elsewhere or using a non-Zeek process, make sure to configure it with the proper secret key and provide the public key to the clients. The logger’s PULL socket uses the same secret key as the XPUB/XSUB sockets. This means logger nodes require access to the secret key even if the XPUB/XSUB component is external to Zeek. Additionally, the current implementation uses the client’s public key for authentication.

You may generate the keys as follows. Use to_json when there’s a need to consume the key material elsewhere.

$ zeek -e ‘print to_json(Cluster::Backend::ZeroMQ::generate_keypair())’ {“public”:”l2A9cf[>&X7u=.GZFdHI=nz6QT6{$u^weYPEWJb/”,”secret”:”Z0eCkbrKkQBkO90Qb[j5mngd[0%Cl*bo}0<D+&vp”}

The encoding used is Z85.

All Zeek processes share and have access to the same credentials. Note that while the underlying protocol uses asymmetric cryptographic primitives, we use this more like shared symmetric encryption. Any external process with access to the server public key and the client secret and public key can connect to the central XPUB/XSUB sockets or any logger PULL socket.

Implementation Note

ZeroMQ’s ZAP protocol supports per-client authentication. We implement this lightly such that any client needs to possess the configured client secret and public key. Today, this means every node holds the client keys as by default the manager not only hosts the central XPUB/XSUB sockets, but also connects to them. It’s not clear if anything more is really useful. Advanced authentication or authorization concepts should probably be added to the WebSocket API instead.

Namespace:

Cluster::Backend::ZeroMQ

Imports:

base/utils/addrs.zeek

Summary

Redefinable Options

Cluster::Backend::ZeroMQ::connect_log_endpoints: vector &redef

Vector of ZeroMQ endpoints to connect to for logging.

Cluster::Backend::ZeroMQ::connect_xpub_endpoint: string &redef

The central broker’s XPUB endpoint to connect to.

Cluster::Backend::ZeroMQ::connect_xpub_nodrop: bool &redef

Do not silently drop messages if high-water-mark is reached.

Cluster::Backend::ZeroMQ::connect_xsub_endpoint: string &redef

The central broker’s XSUB endpoint to connect to.

Cluster::Backend::ZeroMQ::curve_client_publickey: string &redef

Client public key to use by Zeek processes connecting to the central XPUB/XSUB sockets and PULL sockets.

Cluster::Backend::ZeroMQ::curve_client_secretkey: string &redef

Client secret key to use by Zeek processes connecting to the central XPUB/XSUB sockets and PULL sockets.

Cluster::Backend::ZeroMQ::curve_server_publickey: string &redef

Server public key to use for the central XPUB/XSUB sockets and the logger’s PULL sockets.

Cluster::Backend::ZeroMQ::curve_server_secretkey: string &redef

Server secret key to use for the central XPUB/XSUB sockets and logger PULL sockets.

Cluster::Backend::ZeroMQ::debug_flags: count &redef

Bitmask to enable low-level stderr based debug printing.

Cluster::Backend::ZeroMQ::hello_expiration: interval &redef

Expiration for hello state.

Cluster::Backend::ZeroMQ::internal_topic_prefix: string &redef

The topic prefix used for internal ZeroMQ specific communication.

Cluster::Backend::ZeroMQ::ipv6: bool &redef

Sets the ZMQ_IPV6 option on ZeroMQ contexts created by Zeek.

Cluster::Backend::ZeroMQ::linger_ms: int &redef

Configure the ZeroMQ’s sockets linger value.

Cluster::Backend::ZeroMQ::listen_log_endpoint: string &redef

PULL socket address to listen on for log messages.

Cluster::Backend::ZeroMQ::listen_xpub_endpoint: string &redef

XPUB listen endpoint for the central broker.

Cluster::Backend::ZeroMQ::listen_xpub_nodrop: bool &redef

Do not silently drop messages if high-water-mark is reached.

Cluster::Backend::ZeroMQ::listen_xsub_endpoint: string &redef

XSUB listen endpoint for the central broker.

Cluster::Backend::ZeroMQ::log_immediate: bool &redef

Configure ZeroMQ’s immediate setting on PUSH sockets

Cluster::Backend::ZeroMQ::log_rcvbuf: int &redef

Kernel receive buffer size for log sockets.

Cluster::Backend::ZeroMQ::log_rcvhwm: int &redef

Receive high water mark value for the log PULL sockets.

Cluster::Backend::ZeroMQ::log_sndbuf: int &redef

Kernel transmit buffer size for log sockets.

Cluster::Backend::ZeroMQ::log_sndhwm: int &redef

Send high water mark value for the log PUSH sockets.

Cluster::Backend::ZeroMQ::onloop_queue_hwm: count &redef

Maximum number of incoming events queued for Zeek’s event loop.

Cluster::Backend::ZeroMQ::poll_max_messages: count &redef

Messages to receive before yielding.

Cluster::Backend::ZeroMQ::proxy_io_threads: count &redef

How many IO threads to configure for the ZeroMQ context that acts as a central broker.

Cluster::Backend::ZeroMQ::run_proxy_thread: bool &redef

Toggle for running a central ZeroMQ XPUB-XSUB broker on this node.

Cluster::Backend::ZeroMQ::xpub_sndbuf: int &redef

Kernel transmit buffer size for the XPUB socket.

Cluster::Backend::ZeroMQ::xpub_sndhwm: int &redef

Send high water mark value for the XPUB socket.

Cluster::Backend::ZeroMQ::xsub_rcvbuf: int &redef

Kernel receive buffer size for the XSUB socket.

Cluster::Backend::ZeroMQ::xsub_rcvhwm: int &redef

Receive high water mark value for the XSUB socket.

State Variables

Cluster::Backend::ZeroMQ::node_topic_prefix: string &redef

The node topic prefix to use.

Cluster::Backend::ZeroMQ::nodeid_topic_prefix: string &redef

The node_id topic prefix to use.

Redefinitions

Cluster::Backend::ZeroMQ::run_proxy_thread: bool &redef

Cluster::Telemetry::topic_normalizations: table &ordered &redef

Cluster::backend: Cluster::BackendTag &redef

Cluster::logger_pool_spec: Cluster::PoolSpec &redef

Cluster::logger_topic: string &redef

Cluster::manager_topic: string &redef

Cluster::node_id: function &redef

Cluster::node_topic: function &redef

Cluster::nodeid_topic: function &redef

Cluster::proxy_pool_spec: Cluster::PoolSpec &redef

Cluster::proxy_topic: string &redef

Cluster::worker_pool_spec: Cluster::PoolSpec &redef

Cluster::worker_topic: string &redef

Events

Cluster::Backend::ZeroMQ::hello: event

Low-level event send to a node in response to their subscription.

Cluster::Backend::ZeroMQ::monitoring_event: event

Low-level event raised for ZeroMQ socket events.

Cluster::Backend::ZeroMQ::subscription: event

Low-level event when a subscription is added.

Cluster::Backend::ZeroMQ::unsubscription: event

Low-level event when a subscription vanishes.

Detailed Interface

Redefinable Options

Cluster::Backend::ZeroMQ::connect_log_endpoints
Type:

vector of string

Attributes:

&redef

Default:
[]

Vector of ZeroMQ endpoints to connect to for logging.

A node’s PUSH socket used for logging connects to each of the ZeroMQ endpoints listed in this vector.

Cluster::Backend::ZeroMQ::connect_xpub_endpoint
Type:

string

Attributes:

&redef

Default:

"tcp://127.0.0.1:5556"

The central broker’s XPUB endpoint to connect to.

A node connects with its XSUB socket to the XPUB socket of the central broker.

Cluster::Backend::ZeroMQ::connect_xpub_nodrop
Type:

bool

Attributes:

&redef

Default:

T

Do not silently drop messages if high-water-mark is reached.

Whether to configure ZMQ_XPUB_NODROP on the XPUB socket connecting to the proxy to detect when sending a message fails due to reaching the high-water-mark. If you set this to F, then the XPUB drops metric will stop working as sending on the XPUB socket will always succeed. Unless you’re developing on the ZeroMQ cluster backend, keep this set to T.

See ZeroMQ’s ZMQ_XPUB_NODROP documentation for more details.

Cluster::Backend::ZeroMQ::connect_xsub_endpoint
Type:

string

Attributes:

&redef

Default:

"tcp://127.0.0.1:5555"

The central broker’s XSUB endpoint to connect to.

A node connects with its XPUB socket to the XSUB socket of the central broker.

Cluster::Backend::ZeroMQ::curve_client_publickey
Type:

string

Attributes:

&redef

Default:

""

Client public key to use by Zeek processes connecting to the central XPUB/XSUB sockets and PULL sockets.

Cluster::Backend::ZeroMQ::curve_client_secretkey
Type:

string

Attributes:

&redef

Default:

""

Client secret key to use by Zeek processes connecting to the central XPUB/XSUB sockets and PULL sockets.

Cluster::Backend::ZeroMQ::curve_server_publickey
Type:

string

Attributes:

&redef

Default:

""

Server public key to use for the central XPUB/XSUB sockets and the logger’s PULL sockets.

This key is used by Zeek processes for the connection to the central XPUB/XSUB sockets and individual logger PULL sockets as the public CURVE server key.

Cluster::Backend::ZeroMQ::curve_server_secretkey
Type:

string

Attributes:

&redef

Default:

""

Server secret key to use for the central XPUB/XSUB sockets and logger PULL sockets.

Cluster::Backend::ZeroMQ::debug_flags
Type:

count

Attributes:

&redef

Default:

0

Bitmask to enable low-level stderr based debug printing.

poll: 1 (produce verbose zmq::poll() output) thread: 2 (produce thread related output)

Or values from the above list together and set debug_flags to the result. E.g. use 7 to select 4, 2 and 1. Only use this in development if something seems off. The thread used internally will produce output on stderr.

Cluster::Backend::ZeroMQ::hello_expiration
Type:

interval

Attributes:

&redef

Default:

10.0 secs

Expiration for hello state.

How long to wait before expiring information about subscriptions and hello messages from other nodes. These expirations trigger reporter warnings.

Cluster::Backend::ZeroMQ::internal_topic_prefix
Type:

string

Attributes:

&redef

Default:

"zeek.zeromq.internal."

The topic prefix used for internal ZeroMQ specific communication.

This is used for the “ready to publish callback” topics.

Zeek creates a short-lived subscription for a auto-generated topic name with this prefix and waits for it to be confirmed on its XPUB socket. Once this happens, the XPUB socket should’ve also received all other active subscriptions of other nodes in a cluster from the central XPUB/XSUB proxy and therefore can be deemed ready for publish operations.

Cluster::Backend::ZeroMQ::ipv6
Type:

bool

Attributes:

&redef

Default:

F

Sets the ZMQ_IPV6 option on ZeroMQ contexts created by Zeek.

By default, IPv6 support on the ZeroMQ context is disabled. If you’re using IPv6 addresses for any of the endpoint options, or you’re using Zeek in a dual-stack environment and want to listen on IPv6 and IPv4 addresses at the same time, set this option to T. If you’re deploying Zeek using ZeekControl, this will happen automatically.

See ZeroMQ’s ZMQ_IPV6 documentation for more details.

Cluster::Backend::ZeroMQ::linger_ms
Type:

int

Attributes:

&redef

Default:

500

Configure the ZeroMQ’s sockets linger value.

The default used by libzmq is 30 seconds (30 000) which is very long when loggers vanish before workers during a shutdown, so we reduce this to 500 milliseconds by default.

A value of -1 configures blocking forever, while 0 would immediately discard any pending messages.

See ZeroMQ’s ZMQ_LINGER documentation for more details.

Cluster::Backend::ZeroMQ::listen_log_endpoint
Type:

string

Attributes:

&redef

Default:

""

PULL socket address to listen on for log messages.

If empty, don’t listen for log messages, otherwise a ZeroMQ address to bind to. E.g., tcp://127.0.0.1:5555.

Cluster::Backend::ZeroMQ::listen_xpub_endpoint
Type:

string

Attributes:

&redef

Default:

"tcp://127.0.0.1:5555"

XPUB listen endpoint for the central broker.

This setting is used for the XPUB socket of the central broker started when Cluster::Backend::ZeroMQ::run_proxy_thread is T.

Cluster::Backend::ZeroMQ::listen_xpub_nodrop
Type:

bool

Attributes:

&redef

Default:

T

Do not silently drop messages if high-water-mark is reached.

Whether to configure ZMQ_XPUB_NODROP on the XPUB socket to detect when sending a message fails due to reaching the high-water-mark.

This setting applies to the XPUB/XSUB broker started when Cluster::Backend::ZeroMQ::run_proxy_thread is T.

See ZeroMQ’s ZMQ_XPUB_NODROP documentation for more details.

Cluster::Backend::ZeroMQ::listen_xsub_endpoint
Type:

string

Attributes:

&redef

Default:

"tcp://127.0.0.1:5556"

XSUB listen endpoint for the central broker.

This setting is used for the XSUB socket of the central broker started when Cluster::Backend::ZeroMQ::run_proxy_thread is T.

Cluster::Backend::ZeroMQ::log_immediate
Type:

bool

Attributes:

&redef

Default:

F

Configure ZeroMQ’s immediate setting on PUSH sockets

Setting this to T will queue log writes only to completed connections. By default, log writes are queued to all potential endpoints listed in Cluster::Backend::ZeroMQ::connect_log_endpoints.

See ZeroMQ’s ZMQ_IMMEDIATE documentation for more details.

Cluster::Backend::ZeroMQ::log_rcvbuf
Type:

int

Attributes:

&redef

Default:

-1

Kernel receive buffer size for log sockets.

Using -1 will use the kernel’s default.

See ZeroMQ’s ZMQ_RCVBUF documentation for more details.

Cluster::Backend::ZeroMQ::log_rcvhwm
Type:

int

Attributes:

&redef

Default:

1000

Receive high water mark value for the log PULL sockets.

If reached, Zeek workers will block or drop messages.

See ZeroMQ’s ZMQ_RCVHWM documentation for more details.

TODO: Make action configurable (block vs drop)

Cluster::Backend::ZeroMQ::log_sndbuf
Type:

int

Attributes:

&redef

Default:

-1

Kernel transmit buffer size for log sockets.

Using -1 will use the kernel’s default.

See ZeroMQ’s ZMQ_SNDBUF documentation.

Cluster::Backend::ZeroMQ::log_sndhwm
Type:

int

Attributes:

&redef

Default:

1000

Send high water mark value for the log PUSH sockets.

If reached, Zeek nodes will block or drop messages.

See ZeroMQ’s ZMQ_SNDHWM documentation for more details.

TODO: Make action configurable (block vs drop)

Cluster::Backend::ZeroMQ::onloop_queue_hwm
Type:

count

Attributes:

&redef

Default:

10000

Maximum number of incoming events queued for Zeek’s event loop.

This constant defines the maximum number of remote events queued by the ZeroMQ cluster backend for Zeek’s event loop to drain in one go. If you set this value to 0 (unlimited), consider closely CPU and memory usage of cluster nodes as high remote event rates may starve packet processing.

If more events are received than can fit the queue, new events will be dropped and the zeek_cluster_zeromq_onloop_drops_total metric incremented.

Cluster::Backend::ZeroMQ::poll_max_messages
Type:

count

Attributes:

&redef

Default:

100

Messages to receive before yielding.

Yield from the receive loop when this many messages have been received from one of the used sockets.

Cluster::Backend::ZeroMQ::proxy_io_threads
Type:

count

Attributes:

&redef

Default:

2

How many IO threads to configure for the ZeroMQ context that acts as a central broker. See ZeroMQ’s ZMQ_IO_THREADS documentation and the I/O threads section in the ZeroMQ guide for details.

Cluster::Backend::ZeroMQ::run_proxy_thread
Type:

bool

Attributes:

&redef

Default:

F

Redefinition:

from policy/frameworks/cluster/backend/zeromq/main.zeek

=:

Cluster::local_node_type() == Cluster::MANAGER
Redefinition:

from policy/frameworks/cluster/websocket/server.zeek

=:

``T``

Toggle for running a central ZeroMQ XPUB-XSUB broker on this node.

If set to T, Cluster::Backend::ZeroMQ::spawn_zmq_proxy_thread is called during zeek_init. The node will listen on Cluster::Backend::ZeroMQ::listen_xsub_endpoint and Cluster::Backend::ZeroMQ::listen_xpub_endpoint and forward subscriptions and messages between nodes.

By default, this is set to T on the manager and F elsewhere.

Cluster::Backend::ZeroMQ::xpub_sndbuf
Type:

int

Attributes:

&redef

Default:

-1

Kernel transmit buffer size for the XPUB socket.

Using -1 will use the kernel’s default.

See ZeroMQ’s ZMQ_SNDBUF documentation for more details.

Cluster::Backend::ZeroMQ::xpub_sndhwm
Type:

int

Attributes:

&redef

Default:

1000

Send high water mark value for the XPUB socket.

Events published when the XPUB queue is full will be dropped and the zeek_cluster_zeromq_xpub_drops_total metric incremented.

See ZeroMQ’s ZMQ_SNDHWM documentation for more details.

Cluster::Backend::ZeroMQ::xsub_rcvbuf
Type:

int

Attributes:

&redef

Default:

-1

Kernel receive buffer size for the XSUB socket.

Using -1 will use the kernel’s default.

See ZeroMQ’s ZMQ_RCVBUF documentation for more details.

Cluster::Backend::ZeroMQ::xsub_rcvhwm
Type:

int

Attributes:

&redef

Default:

1000

Receive high water mark value for the XSUB socket.

If reached, the Zeek node will start reporting back pressure to the central XPUB socket.

See ZeroMQ’s ZMQ_RCVHWM documentation for more details.

State Variables

Cluster::Backend::ZeroMQ::node_topic_prefix
Type:

string

Attributes:

&redef

Default:

"zeek.cluster.node"

The node topic prefix to use.

Cluster::Backend::ZeroMQ::nodeid_topic_prefix
Type:

string

Attributes:

&redef

Default:

"zeek.cluster.nodeid"

The node_id topic prefix to use.

Events

Cluster::Backend::ZeroMQ::hello
Type:

event (name: string, id: string)

Low-level event send to a node in response to their subscription.

Parameters:
Cluster::Backend::ZeroMQ::monitoring_event
Type:

event (number: count, value: count, address: string)

Low-level event raised for ZeroMQ socket events.

See zmq_socket_monitor() and the ZMQ_EVENT_* constants in zmq.h for possible values of number and value.

Parameters:
  • number – The event number.

  • value – The event value.

  • address – The socket address of the event.

Cluster::Backend::ZeroMQ::subscription
Type:

event (topic: string)

Low-level event when a subscription is added.

Every node observes all subscriptions from other nodes in a cluster through its XPUB socket. Whenever a new subscription topic is added, this event is raised with the topic.

Parameters:

topic – The topic.

Cluster::Backend::ZeroMQ::unsubscription
Type:

event (topic: string)

Low-level event when a subscription vanishes.

Every node observes all subscriptions from other nodes in a cluster through its XPUB socket. Whenever a subscription is removed from the local XPUB socket, this event is raised with the topic set to the removed subscription.

Parameters:

topic – The topic.