base/frameworks/broker/main.zeek

Broker

The Broker-based communication API and its various options.

Namespace:

Broker

Imports:

base/bif/comm.bif.zeek, base/bif/messaging.bif.zeek

Summary

Runtime Options

Broker::peer_counts_as_iosource: bool &redef

Whether calling Broker::peer will register the Broker system as an I/O source that will block the process from shutting down.

Redefinable Options

Broker::aggressive_interval: count &redef

Frequency of work-stealing polling attempts for Broker/CAF threads in “aggressive” mode.

Broker::aggressive_polls: count &redef

Number of work-stealing polling attempts for Broker/CAF threads in “aggressive” mode.

Broker::congestion_queue_size: count &redef

The number of buffered messages at the Broker/CAF layer after which a subscriber considers themselves congested (i.e.

Broker::default_connect_retry: interval &redef

Default interval to retry connecting to a peer if it cannot be made to work initially, or if it ever becomes disconnected.

Broker::default_listen_address: string &redef

Default address on which to listen.

Broker::default_listen_address_websocket: string &redef

Default address on which to listen for WebSocket connections.

Broker::default_listen_retry: interval &redef

Default interval to retry listening on a port if it’s currently in use already.

Broker::default_log_topic_prefix: string &redef

The default topic prefix where logs will be published.

Broker::default_port: port &redef

Default port for native Broker communication.

Broker::default_port_websocket: port &redef

Default port for Broker WebSocket communication.

Broker::disable_ssl: bool &redef

If true, do not use SSL for network connections.

Broker::forward_messages: bool &redef

Forward all received messages to subscribing peers.

Broker::log_batch_interval: interval &redef

Max time to buffer log messages before sending the current set out as a batch.

Broker::log_batch_size: count &redef

The max number of log entries per log stream to batch together when sending log messages to a remote logger.

Broker::max_threads: count &redef

Max number of threads to use for Broker/CAF functionality.

Broker::moderate_interval: count &redef

Frequency of work-stealing polling attempts for Broker/CAF threads in “moderate” mode.

Broker::moderate_polls: count &redef

Number of work-stealing polling attempts for Broker/CAF threads in “moderate” mode.

Broker::moderate_sleep: interval &redef

Interval of time for under-utilized Broker/CAF threads to sleep when in “moderate” mode.

Broker::peer_buffer_size: count &redef

Max number of items we buffer at most per peer.

Broker::peer_overflow_policy: string &redef

Configures how Broker responds to peers that cannot keep up with the incoming message rate.

Broker::relaxed_interval: count &redef

Frequency of work-stealing polling attempts for Broker/CAF threads in “relaxed” mode.

Broker::relaxed_sleep: interval &redef

Interval of time for under-utilized Broker/CAF threads to sleep when in “relaxed” mode.

Broker::scheduler_policy: string &redef

The CAF scheduling policy to use.

Broker::ssl_cafile: string &redef

Path to a file containing concatenated trusted certificates in PEM format.

Broker::ssl_capath: string &redef

Path to an OpenSSL-style directory of trusted certificates.

Broker::ssl_certificate: string &redef

Path to a file containing a X.509 certificate for this node in PEM format.

Broker::ssl_keyfile: string &redef

Path to the file containing the private key for this node’s certificate.

Broker::ssl_passphrase: string &redef

Passphrase to decrypt the private key specified by Broker::ssl_keyfile.

Broker::web_socket_buffer_size: count &redef

Same as peer_buffer_size but for WebSocket clients.

Broker::web_socket_overflow_policy: string &redef

Same as peer_overflow_policy but for WebSocket clients.

Types

Broker::Data: record

Opaque communication data.

Broker::DataVector: vector

Opaque communication data sequence.

Broker::EndpointInfo: record

Broker::ErrorCode: enum

Enumerates the possible error types.

Broker::Event: record

Opaque event communication data.

Broker::NetworkInfo: record

Broker::PeerInfo: record

Broker::PeerInfos: vector

Broker::PeerStatus: enum

The possible states of a peer endpoint.

Broker::TableItem: record

Opaque communication data used as a convenient way to wrap key-value pairs that comprise table entries.

Functions

Broker::auto_publish: function &deprecated =

Automatically send an event to any interested peers whenever it is locally dispatched.

Broker::auto_unpublish: function &deprecated =

Stop automatically sending an event to peers upon local dispatch.

Broker::default_log_topic: function

The default implementation for Broker::log_topic.

Broker::flush_logs: function

Sends all pending log messages to remote peers.

Broker::forward: function

Register a topic prefix subscription for events that should only be forwarded to any subscribing peers and not raise any event handlers on the receiving/forwarding node.

Broker::listen: function

Listen for remote connections using the native Broker protocol.

Broker::listen_websocket: function

Listen for remote connections using WebSocket.

Broker::log_topic: function &redef

A function that will be called for each log entry to determine what broker topic string will be used for sending it to peers.

Broker::node_id: function

Get a unique identifier for the local broker endpoint.

Broker::peer: function

Initiate a remote connection.

Broker::peers: function

Get a list of all peer connections.

Broker::publish_id: function

Publishes the value of an identifier to a given topic.

Broker::subscribe: function

Register interest in all peer event messages that use a certain topic prefix.

Broker::unpeer: function

Remove a remote connection.

Broker::unsubscribe: function

Unregister interest in all peer event messages that use a topic prefix.

Detailed Interface

Runtime Options

Broker::peer_counts_as_iosource
Type:

bool

Attributes:

&redef

Default:

T

Whether calling Broker::peer will register the Broker system as an I/O source that will block the process from shutting down. For example, set this to false when you are reading pcaps, but also want to initiate a Broker peering and still shutdown after done reading the pcap.

Redefinable Options

Broker::aggressive_interval
Type:

count

Attributes:

&redef

Default:

4

Frequency of work-stealing polling attempts for Broker/CAF threads in “aggressive” mode. Only used for the “stealing” scheduler policy.

Broker::aggressive_polls
Type:

count

Attributes:

&redef

Default:

5

Number of work-stealing polling attempts for Broker/CAF threads in “aggressive” mode. Only used for the “stealing” scheduler policy.

Broker::congestion_queue_size
Type:

count

Attributes:

&redef

Default:

200

The number of buffered messages at the Broker/CAF layer after which a subscriber considers themselves congested (i.e. tune the congestion control mechanisms).

Broker::default_connect_retry
Type:

interval

Attributes:

&redef

Default:

30.0 secs

Default interval to retry connecting to a peer if it cannot be made to work initially, or if it ever becomes disconnected. Use of the ZEEK_DEFAULT_CONNECT_RETRY environment variable (set as number of seconds) will override this option and also any values given to Broker::peer.

Broker::default_listen_address
Type:

string

Attributes:

&redef

Default:

""

Redefinition:

from policy/frameworks/management/agent/boot.zeek

=:

127.0.0.1

Default address on which to listen.

See also: Broker::listen

Broker::default_listen_address_websocket
Type:

string

Attributes:

&redef

Default:

""

Default address on which to listen for WebSocket connections.

See also: Broker::listen_websocket

Broker::default_listen_retry
Type:

interval

Attributes:

&redef

Default:

30.0 secs

Default interval to retry listening on a port if it’s currently in use already. Use of the ZEEK_DEFAULT_LISTEN_RETRY environment variable (set as a number of seconds) will override this option and also any values given to Broker::listen.

Broker::default_log_topic_prefix
Type:

string

Attributes:

&redef

Default:

"zeek/logs/"

The default topic prefix where logs will be published. The log’s stream id is appended when writing to a particular stream.

Broker::default_port
Type:

port

Attributes:

&redef

Default:

9999/tcp

Default port for native Broker communication. Where not specified otherwise, this is the port to connect to and listen on.

Broker::default_port_websocket
Type:

port

Attributes:

&redef

Default:

9997/tcp

Default port for Broker WebSocket communication. Where not specified otherwise, this is the port to connect to and listen on for WebSocket connections.

See the Broker documentation for a specification of the message format over WebSocket connections.

Broker::disable_ssl
Type:

bool

Attributes:

&redef

Default:

F

If true, do not use SSL for network connections. By default, SSL will even be used if no certificates / CAs have been configured. In that case (which is the default) the communication will be encrypted, but not authenticated.

Broker::forward_messages
Type:

bool

Attributes:

&redef

Default:

F

Forward all received messages to subscribing peers.

Broker::log_batch_interval
Type:

interval

Attributes:

&redef

Default:

1.0 sec

Max time to buffer log messages before sending the current set out as a batch.

Broker::log_batch_size
Type:

count

Attributes:

&redef

Default:

400

The max number of log entries per log stream to batch together when sending log messages to a remote logger.

Broker::max_threads
Type:

count

Attributes:

&redef

Default:

1

Max number of threads to use for Broker/CAF functionality. The ZEEK_BROKER_MAX_THREADS environment variable overrides this setting.

Broker::moderate_interval
Type:

count

Attributes:

&redef

Default:

2

Frequency of work-stealing polling attempts for Broker/CAF threads in “moderate” mode. Only used for the “stealing” scheduler policy.

Broker::moderate_polls
Type:

count

Attributes:

&redef

Default:

5

Number of work-stealing polling attempts for Broker/CAF threads in “moderate” mode. Only used for the “stealing” scheduler policy.

Broker::moderate_sleep
Type:

interval

Attributes:

&redef

Default:

16.0 msecs

Interval of time for under-utilized Broker/CAF threads to sleep when in “moderate” mode. Only used for the “stealing” scheduler policy.

Broker::peer_buffer_size
Type:

count

Attributes:

&redef

Default:

2048

Max number of items we buffer at most per peer. What action to take when the buffer reaches its maximum size is determined by peer_overflow_policy.

Broker::peer_overflow_policy
Type:

string

Attributes:

&redef

Default:

"disconnect"

Configures how Broker responds to peers that cannot keep up with the incoming message rate. Available strategies: - disconnect: drop the connection to the unresponsive peer - drop_newest: replace the newest message in the buffer - drop_oldest: removed the olsted message from the buffer, then append

Broker::relaxed_interval
Type:

count

Attributes:

&redef

Default:

1

Frequency of work-stealing polling attempts for Broker/CAF threads in “relaxed” mode. Only used for the “stealing” scheduler policy.

Broker::relaxed_sleep
Type:

interval

Attributes:

&redef

Default:

64.0 msecs

Interval of time for under-utilized Broker/CAF threads to sleep when in “relaxed” mode. Only used for the “stealing” scheduler policy.

Broker::scheduler_policy
Type:

string

Attributes:

&redef

Default:

"sharing"

The CAF scheduling policy to use. Available options are “sharing” and “stealing”. The “sharing” policy uses a single, global work queue along with mutex and condition variable used for accessing it, which may be better for cases that don’t require much concurrency or need lower power consumption. The “stealing” policy uses multiple work queues protected by spinlocks, which may be better for use-cases that have more concurrency needs. E.g. may be worth testing the “stealing” policy along with dedicating more threads if a lot of data store processing is required.

Broker::ssl_cafile
Type:

string

Attributes:

&redef

Default:

""

Path to a file containing concatenated trusted certificates in PEM format. If set, Zeek will require valid certificates for all peers.

Broker::ssl_capath
Type:

string

Attributes:

&redef

Default:

""

Path to an OpenSSL-style directory of trusted certificates. If set, Zeek will require valid certificates for all peers.

Broker::ssl_certificate
Type:

string

Attributes:

&redef

Default:

""

Path to a file containing a X.509 certificate for this node in PEM format. If set, Zeek will require valid certificates for all peers.

Broker::ssl_keyfile
Type:

string

Attributes:

&redef

Default:

""

Path to the file containing the private key for this node’s certificate. If set, Zeek will require valid certificates for all peers.

Broker::ssl_passphrase
Type:

string

Attributes:

&redef

Default:

""

Passphrase to decrypt the private key specified by Broker::ssl_keyfile. If set, Zeek will require valid certificates for all peers.

Broker::web_socket_buffer_size
Type:

count

Attributes:

&redef

Default:

512

Same as peer_buffer_size but for WebSocket clients.

Broker::web_socket_overflow_policy
Type:

string

Attributes:

&redef

Default:

"disconnect"

Same as peer_overflow_policy but for WebSocket clients.

Types

Broker::Data
Type:

record

data: opaque of Broker::Data &optional

Opaque communication data.

Broker::DataVector
Type:

vector of Broker::Data

Opaque communication data sequence.

Broker::EndpointInfo
Type:

record

id: string

A unique identifier of the node.

network: Broker::NetworkInfo &optional

Network-level information.

Broker::ErrorCode
Type:

enum

Broker::NO_ERROR

(present if base/bif/comm.bif.zeek is loaded)

Broker::UNSPECIFIED

The unspecified default error code.

Broker::PEER_INCOMPATIBLE

Version incompatibility.

Broker::PEER_INVALID

Referenced peer does not exist.

Broker::PEER_UNAVAILABLE

Remote peer not listening.

Broker::PEER_DISCONNECT_DURING_HANDSHAKE

(present if base/bif/comm.bif.zeek is loaded)

Broker::PEER_TIMEOUT

A peering request timed out.

Broker::MASTER_EXISTS

Master with given name already exists.

Broker::NO_SUCH_MASTER

Master with given name does not exist.

Broker::NO_SUCH_KEY

The given data store key does not exist.

Broker::REQUEST_TIMEOUT

The store operation timed out.

Broker::TYPE_CLASH

The operation expected a different type than provided.

Broker::INVALID_DATA

The data value cannot be used to carry out the desired operation.

Broker::BACKEND_FAILURE

The storage backend failed to execute the operation.

Broker::STALE_DATA

The storage backend failed to execute the operation.

Broker::CANNOT_OPEN_FILE

(present if base/bif/comm.bif.zeek is loaded)

Broker::CANNOT_WRITE_FILE

(present if base/bif/comm.bif.zeek is loaded)

Broker::INVALID_TOPIC_KEY

(present if base/bif/comm.bif.zeek is loaded)

Broker::END_OF_FILE

(present if base/bif/comm.bif.zeek is loaded)

Broker::INVALID_TAG

(present if base/bif/comm.bif.zeek is loaded)

Broker::INVALID_STATUS

(present if base/bif/comm.bif.zeek is loaded)

Broker::CAF_ERROR

Catch-all for a CAF-level problem.

Enumerates the possible error types.

Broker::Event
Type:

record

name: string &optional

The name of the event. Not set if invalid event or arguments.

args: Broker::DataVector

The arguments to the event.

Opaque event communication data.

Broker::NetworkInfo
Type:

record

address: string &log

The IP address or hostname where the endpoint listens.

bound_port: port &log

The port where the endpoint is bound to.

Broker::PeerInfo
Type:

record

peer: Broker::EndpointInfo

status: Broker::PeerStatus

Broker::PeerInfos
Type:

vector of Broker::PeerInfo

Broker::PeerStatus
Type:

enum

Broker::INITIALIZING

The peering process is initiated.

Broker::CONNECTING

Connection establishment in process.

Broker::CONNECTED

Connection established, peering pending.

Broker::PEERED

Successfully peered.

Broker::DISCONNECTED

Connection to remote peer lost.

Broker::RECONNECTING

Reconnecting to peer after a lost connection.

The possible states of a peer endpoint.

Broker::TableItem
Type:

record

key: Broker::Data

val: Broker::Data

Opaque communication data used as a convenient way to wrap key-value pairs that comprise table entries.

Functions

Broker::auto_publish
Type:

function (topic: string, ev: any) : bool

Attributes:

&deprecated = “Remove in v8.1. Switch to explicit Broker::publish() calls. Auto-publish won’t work with all cluster backends.”

Automatically send an event to any interested peers whenever it is locally dispatched. (For example, using “event my_event(…);” in a script.)

Parameters:
  • topic – a topic string associated with the event message. Peers advertise interest by registering a subscription to some prefix of this topic name.

  • ev – a Zeek event value.

Returns:

true if automatic event sending is now enabled.

Broker::auto_unpublish
Type:

function (topic: string, ev: any) : bool

Attributes:

&deprecated = “Remove in v8.1. See Broker::auto_publish()”

Stop automatically sending an event to peers upon local dispatch.

Parameters:
Returns:

true if automatic events will not occur for the topic/event pair.

Broker::default_log_topic
Type:

function (id: Log::ID, path: string) : string

The default implementation for Broker::log_topic.

Broker::flush_logs
Type:

function () : count

Sends all pending log messages to remote peers. This normally doesn’t need to be used except for test cases that are time-sensitive.

Broker::forward
Type:

function (topic_prefix: string) : bool

Register a topic prefix subscription for events that should only be forwarded to any subscribing peers and not raise any event handlers on the receiving/forwarding node. i.e. it’s the same as Broker::subscribe except matching events are not raised on the receiver, just forwarded. Use Broker::unsubscribe with the same argument to undo this operation.

Parameters:

topic_prefix – a prefix to match against remote message topics. e.g. an empty prefix matches everything and “a” matches “alice” and “amy” but not “bob”.

Returns:

true if a new event forwarding/subscription is now registered.

Broker::listen
Type:

function (a: string &default = Broker::default_listen_address &optional, p: port &default = Broker::default_port &optional, retry: interval &default = Broker::default_listen_retry &optional) : port

Listen for remote connections using the native Broker protocol.

Parameters:
  • a – an address string on which to accept connections, e.g. “127.0.0.1”. An empty string refers to INADDR_ANY.

  • p – the TCP port to listen on. The value 0 means that the OS should choose the next available free port.

  • retry – If non-zero, retries listening in regular intervals if the port cannot be acquired immediately. 0 disables retries. If the ZEEK_DEFAULT_LISTEN_RETRY environment variable is set (as number of seconds), it overrides any value given here.

Returns:

the bound port or 0/? on failure.

See also: Broker::status

Broker::listen_websocket
Type:

function (a: string &default = Broker::default_listen_address_websocket &optional, p: port &default = Broker::default_port_websocket &optional, retry: interval &default = Broker::default_listen_retry &optional) : port

Listen for remote connections using WebSocket.

Parameters:
  • a – an address string on which to accept connections, e.g. “127.0.0.1”. An empty string refers to INADDR_ANY.

  • p – the TCP port to listen on. The value 0 means that the OS should choose the next available free port.

  • retry – If non-zero, retries listening in regular intervals if the port cannot be acquired immediately. 0 disables retries. If the ZEEK_DEFAULT_LISTEN_RETRY environment variable is set (as number of seconds), it overrides any value given here.

Returns:

the bound port or 0/? on failure.

See also: Broker::status

Broker::log_topic
Type:

function (id: Log::ID, path: string) : string

Attributes:

&redef

A function that will be called for each log entry to determine what broker topic string will be used for sending it to peers. The default implementation will return a value based on Broker::default_log_topic_prefix.

Parameters:
  • id – the ID associated with the log stream entry that will be sent.

  • path – the path to which the log stream entry will be output.

Returns:

a string representing the broker topic to which the log will be sent.

Broker::node_id
Type:

function () : string

Get a unique identifier for the local broker endpoint.

Returns:

a unique identifier for the local broker endpoint.

Broker::peer
Type:

function (a: string, p: port &default = Broker::default_port &optional, retry: interval &default = Broker::default_connect_retry &optional) : bool

Initiate a remote connection.

Parameters:
  • a – an address to connect to, e.g. “localhost” or “127.0.0.1”.

  • p – the TCP port on which the remote side is listening.

  • retry – an interval at which to retry establishing the connection with the remote peer if it cannot be made initially, or if it ever becomes disconnected. If the ZEEK_DEFAULT_CONNECT_RETRY environment variable is set (as number of seconds), it overrides any value given here.

Returns:

true if it’s possible to try connecting with the peer and it’s a new peer. The actual connection may not be established until a later point in time.

See also: Broker::status

Broker::peers
Type:

function () : vector of Broker::PeerInfo

Get a list of all peer connections.

Returns:

a list of all peer connections.

Broker::publish_id
Type:

function (topic: string, id: string) : bool

Publishes the value of an identifier to a given topic. The subscribers will update their local value for that identifier on receipt.

Parameters:
  • topic – a topic associated with the message.

  • id – the identifier to publish.

Returns:

true if the message is sent.

Broker::subscribe
Type:

function (topic_prefix: string) : bool

Register interest in all peer event messages that use a certain topic prefix. Note that subscriptions may not be altered immediately after calling (except during zeek_init).

Parameters:

topic_prefix – a prefix to match against remote message topics. e.g. an empty prefix matches everything and “a” matches “alice” and “amy” but not “bob”.

Returns:

true if it’s a new event subscription and it is now registered.

Broker::unpeer
Type:

function (a: string, p: port) : bool

Remove a remote connection.

Note that this does not terminate the connection to the peer, it just means that we won’t exchange any further information with it unless peering resumes later.

Parameters:
  • a – the address used in previous successful call to Broker::peer.

  • p – the port used in previous successful call to Broker::peer.

  • TODO – We do not have a function yet to terminate a connection.

Returns:

true if the arguments match a previously successful call to Broker::peer.

Broker::unsubscribe
Type:

function (topic_prefix: string) : bool

Unregister interest in all peer event messages that use a topic prefix. Note that subscriptions may not be altered immediately after calling (except during zeek_init).

Parameters:

topic_prefix – a prefix previously supplied to a successful call to Broker::subscribe or Broker::forward.

Returns:

true if interest in the topic prefix is no longer advertised.