2. Communication

Broker’s primary objective is to facilitate efficient communication through a publish/subscribe model. In this model, entities send data by publishing to a specific topic, and receive data by subscribing to topics of interest. The asynchronous nature of publish/subscribe makes it a popular choice for loosely coupled, distributed systems.

Broker is the successor of Broccoli. Broker enables arbitrary applications to communicate in Zeek’s data model. In this chapter, we first describe generic Broker communication between peers that don’t assume any specific message layout. Afterwards, we show how to exchange events with Zeek through an additional Zeek-specific shim on top of Broker’s generic messages.

2.1. Exchanging Broker Messages

We start with a discussion of generic message exchange between Broker clients. At the Broker level, messages are just arbitrary values that have no further semantics attached. It’s up to senders and receivers to agree on a specific layout of messages (e.g., a set of doubles for a measurement series).

2.1.1. Endpoints

Broker encapsulates its entire peering setup in an endpoint object. Multiple instances of an endpoint can exist in the same process, but each endpoint features a thread-pool and (configurable) scheduler, which determines the execution of Broker’s components. Using a single endpoint per OS process guarantees the most efficient usage of available hardware resources. Nonetheless, multiple Broker applications can seamlessly operate when linked together, as there exists no global library state.

Note

Instances of type endpoint have reference semantics: that is, they behave like a reference in that it’s impossible to obtain an invalid one (unlike a null pointer). An endpoint can also be copied around cheaply, but is not safe against access from concurrent threads.

2.1.2. Peerings

In order to publish or receive messages an endpoint needs to peer with other endpoints. A peering is a bidirectional relationship between two endpoints. Peering endpoints exchange subscriptions and then forward messages accordingly. This allows for creating flexible communication topologies that use topic-based message routing.

An endpoint can either initiate a peering itself by connecting to remote locations, or wait for an incoming request:

// Open port and subscribe to 'foo' with all
// incoming peerings.
// Establish outgoing peering and subscribe to 'bar'.
endpoint ep1;
auto sub1 = ep1.make_subscriber({"/topic/test"});
ep1.peer("127.0.0.1", 9999);

endpoint ep0;
auto sub0 = ep0.make_subscriber({"/topic/test"});
ep0.listen("127.0.0.1", 9999);

2.1.3. Sending Data

In Broker a message consists of a topic-data pair. That is, endpoints publish values as data instances along with a topic that steers them to interested subscribers:

ep.publish("/topic/test", "42"); // Message is a single number.
ep.publish("/topic/test", vector{1, 2, 3}); // Message is a vector of values.

Note

Publishing a message can be a no-op if there exists no subscriber. Because Broker has fire-and-forget messaging semantics, the runtime does not generate a notification if no subscribers exist.

One can also explicitly create a dedicated publisher for a specific topic first, and then use that to send subsequent messages. This approach is better suited for high-volume streams, as it leverages CAF’s demand management internally:

auto pub = ep.make_publisher("/topic/test");
pub.publish("42"); // Message is a single number.
pub.publish(vector{1, 2, 3}); // Message is a vector.

Finally, there’s also a streaming version of the publisher that pulls messages from a producer as capacity becomes available on the output channel; see endpoint::publish_all and endpoint::publish_all_no_sync.

See Section 3 for a detailed discussion on how to construct values for messages in the form of various types of data instances.

2.1.4. Receiving Data

Endpoints receive data by creating a subscriber attached to the topics of interest. Subscriptions are prefix-based, matching all topics that start with a given string. A subscriber can either retrieve incoming messages explicitly by calling get or poll (synchronous API), or spawn a background worker to process messages as they come in (asynchronous API).

2.1.4.1. Synchronous API

The synchronous API exists for applications that want to poll for messages explicitly. Once a subscriber is registered for topics, calling get will wait for a new message:

endpoint ep;
auto sub = ep.make_subscriber({"/topic/test"});
auto msg = sub.get();
auto topic = get_topic(msg);
auto data_ = get_data(msg);
std::cout << "topic: " << topic << " data: " << data_ << std::endl;

By default the function get blocks until the subscriber has at least one message available, which it then returns. Each retrieved message consists of the same two elements that the publisher passed along: the topic that the message has been published to, and the message’s payload in the form of an arbitray Broker value, (i.e., a data instance). The example just prints them both out.

Blocking indefinitely until messages arrive often won’t work well, in particular not in combination with existing event loops or polling. Therefore, get takes an additional optional timeout parameter to wait only for a certain amount of time. Alternatively, one can also use available to explicitly check for available messages, or poll to extract just all currently pending messages (which may be none):

if ( sub.available() )
    msg = sub.get(); // Won't block now.

for ( auto m : sub.poll() ) // Iterate over all available messages
    std::cout << "topic: " << get_topic(m) << " data: " << get_data(m) << std::endl;

For integration into event loops, subscriber also provides a file descriptor that signals whether messages are available:

auto fd = sub.fd();
::pollfd p = {fd, POLLIN, 0};
auto n = ::poll(&p, 1, -1);
if (n < 0)
    std::terminate(); // poll failed

if (n == 1 && p.revents & POLLIN) {
    auto msg = sub.get(); // Won't block now.
    // ...
    }

2.1.4.2. Asynchronous API

TODO: Document.

2.1.5. Status and Error Messages

Broker informs clients about any communication errors—and optionally also about non-critical connectivity changes—through separate status messages. To get access to that information, one creates a status_subscriber, which provides a similar synchronous get/available/poll API as the standard message subscriber. By default, a status_subscriber returns only errors:

auto ss = ep.make_status_subscriber();

if ( ss.available() ) {
    auto ss_res = ss.get();
    auto err = caf::get<error>(ss_res); // Won't block now.
    std::cerr << "Broker error:" << err.code() << ", " << to_string(err) << std::endl;
}

Errors reflect failures that may impact the correctness of operation. err.code() returns an enum ec that codifies existing error codes:

/// @relates status
enum class ec : uint8_t {
  /// The unspecified default error code.
  unspecified = 1,
  /// Version incompatibility.
  peer_incompatible,
  /// Referenced peer does not exist.
  peer_invalid,
  /// Remote peer not listening.
  peer_unavailable,
  /// An peering request timed out.
  peer_timeout,
  /// Master with given name already exist.
  master_exists,
  /// Master with given name does not exist.
  no_such_master,
  /// The given data store key does not exist.
  no_such_key,
  /// The store operation timed out.
  request_timeout,
  /// The operation expected a different type than provided
  type_clash,
  /// The data value cannot be used to carry out the desired operation.
  invalid_data,
  /// The storage backend failed to execute the operation.
  backend_failure,
  /// The clone store has not yet synchronized with its master, or it has
  /// been disconnected for too long.
  stale_data,

To receive non-critical status messages as well, specify that when creating the status_subscriber:

auto ss = ep.make_status_subscriber(true); // Get status updates and errors.

if ( ss.available() ) {
    auto s = ss.get();

    if ( auto err = caf::get_if<error>(&s) )
        std::cerr << "Broker error:" << err->code() << ", " << to_string(*err) << std::endl;

    if ( auto st = caf::get_if<status>(&s) ) {
	if ( auto ctx = st->context<endpoint_info>() ) // Get the peer this is about if available.
           std::cerr << "Broker status update regarding "
	             << ctx->network->address
	             << ":" << to_string(*st) << std::endl;
	else
           std::cerr << "Broker status update:"
	             << to_string(*st) << std::endl;
    }

Status messages represent non-critical changes to the topology. For example, after a successful peering, both endpoints receive a peer_added status message. The concrete semantics of a status depend on its embedded code, which the enum sc codifies:

enum class sc : uint8_t {
  /// The unspecified default error code.
  unspecified = 0,
  /// Successfully added a new peer.
  peer_added,
  /// Successfully removed a peer.
  peer_removed,
  /// Lost connection to peer.
  peer_lost,
};

Status messages have an optional context and an optional descriptive message. The member function context<T> returns a const T* if the context is available. The type of available context information is dependent on the status code enum sc. For example, all sc::peer_* status codes include an endpoint_info context as well as a message.

2.2. Forwarding

In topologies where multiple endpoints are connected, an endpoint forwards incoming messages to peers by default for topics that it is itself subscribed to. One can configure additional topics to forward, independent of the local subscription status, through the method endpoint::forward(std::vector<topics>). One can also disable forwarding of remote messages altogether through the Broker configuration option forward when creating an endpoint.

When forwarding messages Broker assumes all connected endpoints form a tree topology without any loops. Still, to avoid messages circling indefinitely if a loop happens accidentally, Broker’s message forwarding adds a TTL value to messages, and drops any that have traversed that many hops. The default TTL is 20; it can be changed by setting the Broker configuration option ttl. Note that it is the first hop’s TTL configuration that determines a message’s lifetime (not the original sender’s).

2.3. Exchanging Zeek Events

The communication model discussed so far remains generic for all Broker clients in that it doesn’t associate any semantics with the values exchanged through messages. In practice, however, senders and receivers will need to agree on a specific data layout for the values exchanged, so that they interpret them in the same way. This is in particular true for exchanging events with Zeek—which is one of the main applications for Broker in the first place. To support that, Broker provides built-in support for sending and receiving Zeek events through a small Zeek-specific shim on top of the generic message model. The shim encapsulates Zeek events and takes care of converting them into the expected lower-level message layout that gets transmitted. This way, Zeek events can be exchanged between an external Broker client and Zeek itself—and also even just between Broker clients without any Zeek instances at all.

Here’s a complete ping/ping example between a C++ Broker client and Zeek:

# ping.zeek

redef exit_only_after_terminate = T;

global pong: event(n: int);

event ping(n: int)
	{
	event pong(n);
	}

event zeek_init()
	{
	Broker::subscribe("/topic/test");
	Broker::listen("127.0.0.1", 9999/tcp);
	Broker::auto_publish("/topic/test", pong);
	}
// ping.cc

#include <assert.h>

#include "broker/broker.hh"
#include "broker/zeek.hh"

using namespace broker;

int main() {
    // Setup endpoint and connect to Zeek.
    endpoint ep;
    auto sub = ep.make_subscriber({"/topic/test"});
    auto ss = ep.make_status_subscriber(true);
    ep.peer("127.0.0.1", 9999);

    // Wait until connection is established.
    auto ss_res = ss.get();
    auto st = caf::get_if<status>(&ss_res);
    if ( ! (st && st->code() == sc::peer_added) ) {
        std::cerr << "could not connect" << std::endl;
        return 1;
    }

    for ( int n = 0; n < 5; n++ ) {
        // Send event "ping(n)".
        zeek::Event ping("ping", {n});
        ep.publish("/topic/test", ping);

        // Wait for "pong" reply event.
        auto msg = sub.get();
        zeek::Event pong(move_data(msg));
        std::cout << "received " << pong.name() << pong.args() << std::endl;
    }

    return 0;
}
# g++ -std=c++11 -lbroker -lcaf_core -lcaf_io -lcaf_openssl -o ping ping.cc
# zeek ping.zeek &
# ./ping
received pong[0]
received pong[1]
received pong[2]
received pong[3]
received pong[4]