2. Communication

Broker’s primary objective is to facilitate efficient communication through a publish/subscribe model. In this model, entities send data by publishing to a specific topic, and receive data by subscribing to topics of interest. The asynchronous nature of publish/subscribe makes it a popular choice for loosely coupled, distributed systems.

Broker is the successor of Broccoli. Broker enables arbitrary applications to communicate in Zeek’s data model. In this chapter, we first describe generic Broker communication between peers that don’t assume any specific message layout. Afterwards, we show how to exchange events with Zeek through an additional Zeek-specific shim on top of Broker’s generic messages.

2.1. Exchanging Broker Messages

We start with a discussion of generic message exchange between Broker clients. At the Broker level, messages are just arbitrary values that have no further semantics attached. It’s up to senders and receivers to agree on a specific layout of messages (e.g., a set of doubles for a measurement series).

2.1.1. Endpoints

Broker encapsulates its entire peering setup in an endpoint object. Multiple instances of an endpoint can exist in the same process, but each endpoint features a thread-pool and (configurable) scheduler, which determines the execution of Broker’s components. Using a single endpoint per OS process guarantees the most efficient usage of available hardware resources. Nonetheless, multiple Broker applications can seamlessly operate when linked together, as there exists no global library state.

Note

Instances of type endpoint have reference semantics: that is, they behave like a reference in that it’s impossible to obtain an invalid one (unlike a null pointer). An endpoint can also be copied around cheaply, but is not safe against access from concurrent threads.

2.1.2. Peerings

In order to publish or receive messages an endpoint needs to peer with other endpoints. A peering is a bidirectional relationship between two endpoints. Peering endpoints exchange subscriptions and then forward messages accordingly. This allows for creating flexible communication topologies that use topic-based message routing.

An endpoint can either initiate a peering itself by connecting to remote locations, or wait for an incoming request:

// Open port and subscribe to 'foo' with all
// incoming peerings.
// Establish outgoing peering and subscribe to 'bar'.
endpoint ep1;
auto sub1 = ep1.make_subscriber({"/topic/test"});
ep1.peer("127.0.0.1", 9999);

endpoint ep0;
auto sub0 = ep0.make_subscriber({"/topic/test"});
ep0.listen("127.0.0.1", 9999);

2.1.3. Sending Data

In Broker a message consists of a topic-data pair. That is, endpoints publish values as data instances along with a topic that steers them to interested subscribers:

ep.publish("/topic/test", "42"); // Message is a single number.
ep.publish("/topic/test", vector{1, 2, 3}); // Message is a vector of values.

Note

Publishing a message can be a no-op if there exists no subscriber. Because Broker has fire-and-forget messaging semantics, the runtime does not generate a notification if no subscribers exist.

One can also explicitly create a dedicated publisher for a specific topic first, and then use that to send subsequent messages. This approach is better suited for high-volume streams, as it leverages CAF’s demand management internally:

auto pub = ep.make_publisher("/topic/test");
pub.publish("42"); // Message is a single number.
pub.publish(vector{1, 2, 3}); // Message is a vector.

Finally, there’s also a streaming version of the publisher that pulls messages from a producer as capacity becomes available on the output channel; see endpoint::publish_all and endpoint::publish_all_no_sync.

See Section 3 for a detailed discussion on how to construct values for messages in the form of various types of data instances.

2.1.4. Receiving Data

Endpoints receive data by creating a subscriber attached to the topics of interest. Subscriptions are prefix-based, matching all topics that start with a given string. A subscriber can either retrieve incoming messages explicitly by calling get or poll (synchronous API), or spawn a background worker to process messages as they come in (asynchronous API).

2.1.4.1. Synchronous API

The synchronous API exists for applications that want to poll for messages explicitly. Once a subscriber is registered for topics, calling get will wait for a new message:

endpoint ep;
auto sub = ep.make_subscriber({"/topic/test"});
auto msg = sub.get();
auto topic = get_topic(msg);
auto data_ = get_data(msg);
std::cout << "topic: " << topic << " data: " << data_ << std::endl;

By default the function get blocks until the subscriber has at least one message available, which it then returns. Each retrieved message consists of the same two elements that the publisher passed along: the topic that the message has been published to, and the message’s payload in the form of an arbitray Broker value, (i.e., a data instance). The example just prints them both out.

Blocking indefinitely until messages arrive often won’t work well, in particular not in combination with existing event loops or polling. Therefore, get takes an additional optional timeout parameter to wait only for a certain amount of time. Alternatively, one can also use available to explicitly check for available messages, or poll to extract just all currently pending messages (which may be none):

if ( sub.available() )
    msg = sub.get(); // Won't block now.

for ( auto m : sub.poll() ) // Iterate over all available messages
    std::cout << "topic: " << get_topic(m) << " data: " << get_data(m) << std::endl;

For integration into event loops, subscriber also provides a file descriptor that signals whether messages are available:

auto fd = sub.fd();
::pollfd p = {fd, POLLIN, 0};
auto n = ::poll(&p, 1, -1);
if (n < 0)
    std::terminate(); // poll failed

if (n == 1 && p.revents & POLLIN) {
    auto msg = sub.get(); // Won't block now.
    // ...
    }

2.1.4.2. Asynchronous API

TODO: Document.

2.1.5. Status and Error Messages

Broker informs clients about any communication errors—and optionally also about non-critical connectivity changes—through separate status messages. To get access to that information, one creates a status_subscriber, which provides a similar synchronous get/available/poll API as the standard message subscriber. By default, a status_subscriber returns only errors:

auto ss = ep.make_status_subscriber();

if ( ss.available() ) {
    auto ss_res = ss.get();
    auto err = get<error>(ss_res); // Won't block now.
    std::cerr << "Broker error:" << err.code() << ", " << to_string(err) << std::endl;
}

Errors reflect failures that may impact the correctness of operation. err.code() returns an enum ec that codifies existing error codes:

enum class ec : uint8_t {
  /// Not-an-error.
  none,
  /// The unspecified default error code.
  unspecified = 1,
  /// Version incompatibility.
  peer_incompatible,
  /// Referenced peer does not exist.
  peer_invalid,
  /// Remote peer not listening.
  peer_unavailable,
  /// Remote peer closed the connection during handshake.
  peer_disconnect_during_handshake,
  /// An peering request timed out.
  peer_timeout,
  /// Master with given name already exist.
  master_exists,
  /// Master with given name does not exist.
  no_such_master,
  /// The given data store key does not exist.
  no_such_key,
  /// The store operation timed out.
  request_timeout = 10,
  /// The operation expected a different type than provided
  type_clash,
  /// The data value cannot be used to carry out the desired operation.
  invalid_data,
  /// The storage backend failed to execute the operation.
  backend_failure,
  /// The clone store has not yet synchronized with its master, or it has
  /// been disconnected for too long.
  stale_data,
  /// Opening a file failed.
  cannot_open_file,
  /// Writing to an open file failed.
  cannot_write_file,
  /// Received an unknown key for a topic.
  invalid_topic_key,
  /// Reached the end of an input file.
  end_of_file,
  /// Received an unknown type tag value.
  invalid_tag,
  /// Received an invalid message.
  invalid_message = 20,
  /// Deserialized an invalid status.
  invalid_status,
  /// Converting between two data types or formats failed.
  conversion_failed,
  /// Adding a consumer to a producer failed because the producer already added
  /// the consumer.
  consumer_exists,
  /// A producer or consumer did not receive any message from a consumer within
  /// the configured timeout.
  connection_timeout,
  /// Called a member function without satisfying its preconditions.
  bad_member_function_call,
  /// Attempted to use the same request_id twice.
  repeated_request_id,
  /// A clone ran out of sync with the master.
  broken_clone,
  /// Canceled an operation because the system is shutting down.
  shutting_down,
  /// Canceled a peering request due to invalid or inconsistent data.
  invalid_peering_request,
  /// Broker attempted to trigger a second handshake to a peer while the first
  /// handshake did not complete.
  repeated_peering_handshake_request = 30,
  /// Received an unexpected or duplicate message during endpoint handshake.
  unexpected_handshake_message,
  /// Handshake failed due to invalid state transitions.
  invalid_handshake_state,
  /// Dispatching a message failed because no path to the receiver exists.
  no_path_to_peer,
  /// Unable to accept or establish peerings since no connector is available.
  no_connector_available,
  /// Opening a resource failed.
  cannot_open_resource,
  /// Failed to serialize an object to text or binary output.
  serialization_failed,
  /// Failed to deserialize an object from text or binary input.
  deserialization_failed,
  /// Broker refused binary input due to a magic number mismatch.
  wrong_magic_number,
  /// Broker closes a connection because a prior connection exists.
  redundant_connection,
  /// Broker encountered a
  logic_error = 40,
};

To receive non-critical status messages as well, specify that when creating the status_subscriber:

auto ss = ep.make_status_subscriber(true); // Get status updates and errors.

if ( ss.available() ) {
    auto s = ss.get();

    if ( auto err = get_if<error>(&s) )
        std::cerr << "Broker error:" << err->code() << ", " << to_string(*err) << std::endl;

    if ( auto st = get_if<status>(&s) ) {
	if ( auto ctx = st->context<endpoint_info>() ) // Get the peer this is about if available.
           std::cerr << "Broker status update regarding "
	             << ctx->network->address
	             << ":" << to_string(*st) << std::endl;
	else
           std::cerr << "Broker status update:"
	             << to_string(*st) << std::endl;
    }

Status messages represent non-critical changes to the topology. For example, after a successful peering, both endpoints receive a peer_added status message. The concrete semantics of a status depend on its embedded code, which the enum sc codifies:

/// Broker's status codes.
/// @relates status
enum class sc : uint8_t {
  /// Indicates a default-constructed ::status.
  unspecified,
  /// Successfully added a direct connection to a peer.
  peer_added,
  /// Successfully removed a direct connection to a peer.
  peer_removed,
  /// Lost direct connection to a peer.
  peer_lost,
  /// Discovered a new Broker endpoint in the network.
  endpoint_discovered,
  /// Lost all paths to a Broker endpoint.
  endpoint_unreachable,
};

Status messages have an optional context and an optional descriptive message. The member function context<T> returns a const T* if the context is available. The type of available context information is dependent on the status code enum sc. For example, all sc::peer_* status codes include an endpoint_info context as well as a message.

2.2. Exchanging Zeek Events

The communication model discussed so far remains generic for all Broker clients in that it doesn’t associate any semantics with the values exchanged through messages. In practice, however, senders and receivers will need to agree on a specific data layout for the values exchanged, so that they interpret them in the same way. This is in particular true for exchanging events with Zeek—which is one of the main applications for Broker in the first place. To support that, Broker provides built-in support for sending and receiving Zeek events through a small Zeek-specific shim on top of the generic message model. The shim encapsulates Zeek events and takes care of converting them into the expected lower-level message layout that gets transmitted. This way, Zeek events can be exchanged between an external Broker client and Zeek itself—and also even just between Broker clients without any Zeek instances at all.

Here’s a complete ping/ping example between a C++ Broker client and Zeek:

# ping.zeek

redef exit_only_after_terminate = T;

global pong: event(n: int);

event ping(n: int)
	{
	event pong(n);
	}

event zeek_init()
	{
	Broker::subscribe("/topic/test");
	Broker::listen("127.0.0.1", 9999/tcp);
	Broker::auto_publish("/topic/test", pong);
	}
// ping.cc

#include <cassert>
#include <iostream>

#include "broker/broker.hh"
#include "broker/zeek.hh"

using namespace broker;

int main() {
    // Setup endpoint and connect to Zeek.
    endpoint ep;
    auto sub = ep.make_subscriber({"/topic/test"});
    auto ss = ep.make_status_subscriber(true);
    ep.peer("127.0.0.1", 9999);

    // Wait until connection is established.
    for ( bool has_peer = false; !has_peer; ) {
      auto val = ss.get();
      if ( auto st = get_if<status>(&val) )
        has_peer = st->code() == sc::peer_added;
    }

    // Do five ping / pong.
    for ( int n = 0; n < 5; n++ ) {
        // Send event "ping(n)".
        zeek::Event ping("ping", {n});
        ep.publish("/topic/test", ping);

        // Wait for "pong" reply event.
        auto msg = sub.get();
        zeek::Event pong(move_data(msg));
        std::cout << "received " << pong.name() << pong.args() << std::endl;
    }
}
# g++ -std=c++11 -lbroker -lcaf_core -lcaf_io -lcaf_openssl -o ping ping.cc
# zeek ping.zeek &
# ./ping
received pong[0]
received pong[1]
received pong[2]
received pong[3]
received pong[4]

2.3. Gateways

Broker was designed with peer-to-peer communication in mind. All endpoints in the network form a single publish/subscribe layer. This implies that each endpoint is aware of every other endpoint in the network as well as what topics they have subscribed to.

This level of transparency enables source routing, but it comes at a cost. Endpoints flood subscriptions and topology changes to the entire network. The volume of flooded messages remains small, as long as primarily endpoints with high availability and a stable set of subscriptions join the network. However, short-lived or unstable endpoints may increase the amount of messages in the network quickly. Furthermore, the more endpoints join the network, the more state and bookkeeping overhead accumulates.

The overhead becomes especially prominent on endpoints that join the network only to publish data but were placed on the edges of the network. Such endpoints usually end up sending all—or nearly all—of their messages to another, well-connected endpoint that distributes the messages. Nevertheless, these producing endpoints still have to flood their subscriptions to the entire network and get stored in all routing tables. In the Zeek ecosystem, the Zeek Agent fits this exact role. Agents run at the edge of the network and ideally should not consume more network bandwidth and CPU cycles than necessary.

Gateways provide a way to separate the well-connected “inner” endpoints from endpoints at the edges that generally cannot contribute to the overall connectivity of the network but still incur messaging and bookkeeping overhead.

2.3.1. Topology

Gateways separate the overlay into two domains: external and internal. The external domain consists of stable, well-connected endpoints that build the core of the publish/subscribe layer. The internal domain consists of endpoints that need no knowledge of the entire overlay, because all ways would pass through the gateway anyway. This means, the gateway is the only way in or out for endpoints in the internal domain, as illustrated in the figure below.

_images/gateway.png

Aside from forwarding messages between the two domains, gateways render all endpoints of the internal domain completely opaque to endpoints in the external domain and vice versa.

To endpoints in the external domain, a gateway appears as the regular endpoint E. It subscribes to all topics that were subscribed by any endpoint in the internal domain and all messages published in the internal domain appear as if E was the publisher.

The endpoint in the internal domain, I is the mirror image of E: it hides all endpoints from the external domain.

The two endpoints E and I actually exist, i.e., the gateway starts both endpoints in the same process and creates a “shortcut” between the two. Every subscription or published events on one gets forwarded to the other. However, E and I are not aware of each other and the forwarded events and subscriptions appear as if they had a local subscriber or publisher.

Warning

The endpoints E and I use the same ID. When setting up a gateway, make sure that no other endpoint provides connectivity between the internal and the external domain. Otherwise, E could receive messages from I and vice versa. Since they share one ID, endpoints in the network would receive contradictory messages from what appears to be the same endpoint.

2.3.2. Setup

Broker includes the standalone tool broker-gateway. When started, it creates the two endpoints E and I in the same process. Each of the two endpoints listens to its own port for incoming peerings. A minimal setup would only set the two ports, as shown below.

broker-gateway --internal.port=8080 --external.port=9090

Users can also configure the gateway to connect to a list of predefined peers on startup. For example:

broker-gateway --internal.port=8080 \
               --internal.peers=[tcp://mars:1234, tcp://venus:2345] \
               --external.port=9090 \
               --external.peers=[tcp://proxima-centauri:3456]

The invocation above would listen on port 8080 for incoming peerings in the internal domain and tries to connect to mars on port 1234 as well as to venus on port 2345. In the external domain, the gateway would listen on port 9090 and try to connect to proxima-centauri on port 3456.

Instead of using the command line, users could also provide a broker.conf file with the following content:

internal {
  port = 8080
  peers = [
    <tcp://mars:1234>,
    <tcp://venus:2345>,
  ]
}
external {
  port = 9090
  peers = [
    <tcp://proxima-centauri:3456>,
  ]
}

There is also a third parameter for the domains: disable-forwarding. In particular, setting internal.disable-forwarding to true causes the gateway to not only isolate endpoints in the internal domain from endpoints in the external domains, but also endpoints within the internal domain from each other.

In setups where all endpoints of the internal domain connect only to the gateway and do not need to interact with each other, setting this flag reduces any messaging to the bare minimum by leading each endpoint in the internal domain to believe that there is exactly one other endpoint in the network—the gateway.