Broker User Manual

Broker is a library for type-rich publish/subscribe communication in Zeek’s data model.

Outline

Section 1 introduces Broker’s key components and basic terminology, such as endpoints, messages, topics, and data stores.

Section 2 shows how one can send and receive data with Broker’s publish/subscribe communication primitives. By structuring applications in independent endpoints and peering with other endpoints, one can create a variety of different communication topologies that perform topic-based message routing.

Section 3 presents Broker’s data model, which applications can pack into messages and publish under given topics. The same data model is also used by Broker’s data stores.

Section 4 introduces data stores, a distributed key-value abstraction operating with the complete data model, for both keys and values. Users interact with a data store frontend, which is either an authoritative master or a clone replica. The master can choose to keep its data in various backends, currently either in-memory, or persistently through SQLite, or RocksDB.

Section 6 discusses the Broker’s Python bindings, which transparently expose all of the library’s functionality to Python scripts.

Finally, Section 7 dives deep into the architecture and implementation of Broker. This Section is meant for guiding C++ software developers that wish to contribute to Broker.

Synopsis

#include <iostream>
#include <broker/broker.hh>

using namespace broker;

int main()
{
    endpoint ep;
    ep.peer("1.2.3.4", 9999); // Connect to remote endpoint on given address/port.

    // Messages

    ep.publish("/test/t1", set{1, 2, 3}); // Publish data under a given topic.

    auto sub = ep.make_subscriber({"/test/t2"}); // Subscribe to incoming messages for topic.
    auto msg = sub.get(); // Wait for one incoming message.
    std::cout << "got data for topic " << get_topic(msg) << ": " << get_data(msg) << std::endl;

    // Data stores

    auto m = ep.attach_master("yoda", backend::memory); // Create data store.

    m->put(4.2, -42); // Write into store.
    m->put("bar", vector{true, 7u, now()});

    if ( auto d = m->get(4.2) ) // Look up in store.
        std::cout << "value of 4.2 is " << to_string(d) << std::endl;
    else
        std::cout << "no such key: 4.2" << std::endl;
}

Overview

The Broker library enables applications to communicate in Zeek’s type-rich data model via publish/subscribe messaging. Moreover, Broker offers distributed key-value stores to facilitate unified data management and persistence.

The figure below introduces the graphic terminology we use throughout this manual.

_images/terminology.png

Moreover, all C++ code examples assume using namespace broker for conciseness.

Communication

Broker structures an application in terms of endpoints, which represent data senders and receivers. Endpoints can peer with other endpoints to communicate with their neighbors. An endpoint can send a message to its peers by publishing data under a specific topic. If any endpoint holds a subscription to the topic, it will receive the corresponding data.

Endpoints can efficiently communicate within the same OS process, as well as transparently communicate with endpoints in a different OS process or on a remote machine. For in-memory endpoints, sending a message boils down to passing a pointer. For remote communication, Broker serializes messages transparently. This allows for a variety of different communication patterns. The following figure illustrates an exemplary topology.

_images/high-level-comm.png

A process hosts one or more endpoints. Endpoints can communicate within or across processes as well as machine boundaries.

The fundamental unit of exchange is a message, which consists of a topic and data. Endpoints may choose to forward received messages to their own peers that share a matching topic.

The API allows for both synchronous and asynchronous communication. Internally, Broker operates entirely asynchronously by leveraging the C++ Actor Framework (CAF). Users can receive messages either explicitly polling for them, or by installing a callback to execute as they come in.

See Section 2 for concrete usage examples.

Data Model

Broker comes with a rich data model, since the library’s primary objective involves communication with Zeek and related applications. The fundamental unit of communication is data, which can hold any of the following concrete types:

  • none

  • boolean

  • count

  • integer

  • real

  • timespan

  • timestamp

  • string

  • address

  • subnet

  • port

  • vector

  • set

  • table

Section 3 discusses the various types and their API in depth.

From these data units, one then composes messages to be exchanged. Broker does generally not impose any further structure on messages, it’s up to sender and receiver to agree. For communication with Zeek, however, Broker provides an additional event abstraction that defines the specific message layout that Zeek expects for exchanging Zeek events.

Data Stores

Data stores complement endpoint communication with a distributed key-value abstraction operating in the full data model. One can attach one or more data stores to an endpoint. A data store has a frontend, which determines its behavior, and a backend, which represents the type of database for storing data. There exist two types of frontends: master and clone. A master is the authoritative source for the key-value store, whereas a clone represents a local cache. Only the master can perform mutating operations on the store, which it then pushes to all its clones over the existing peering communication channel. A clone has a full copy of the data for faster access, but transparently sends any modifying operations to its master first. Only when the master propagates back the change, the result of the operation becomes visible at the clone. The figure below illustrates how one can deploy a master with several clones.

_images/stores.png

Each data store has a name that identifies the master. This name must be unique among the endpoint’s peers. The master can choose to keep its data in various backends, which are currently: in-memory, and SQLite.

Section 4 illustrates how to use data stores in different settings.

Troubleshooting

By default, Broker keeps console output to a minimum. When running a Broker cluster, this bare minimum may omit too much information for troubleshooting.

Users can enable more output either by setting environment variables or by providing a broker.conf file. Custom Broker appliations also may support passing command line arguments (Zeek does not forward command line arguments to Broker).

In order to get a high-level view of what Broker is doing internally, we recommend setting:

BROKER_CONSOLE_VERBOSITY=info

Settings this environment variable before running Zeek (or any other Broker application) prints high-level events such as new network connections, peering requests, etc. The runtime cost of enabling this option and the volume of printed lines is moderate.

Troubleshooting a Broker application (or Zeek scripts that communicate over Broker) sometimes requires tapping into the exchanged messages directly. Setting the verbosity to debug instead will provide such details:

BROKER_CONSOLE_VERBOSITY=debug

Note that using this verbosity level will slow down Broker and produce a high volume of printed output.

Setting BROKER_FILE_VERBOSITY instead (or in addition) causes Broker to print the output to a file. This is particularly useful when troubleshooting a cluster, since it allows to run a test setup first and then collect all files for the analysis.

The file output is also more detailed than the console output, as it includes information such as source file locations, timestamps, and functions names.

In case setting environment variables is impossible or file-based configuration is simply more convenient, creating a file called broker.conf in the working directory of the application (before running it) provides an alternative way of configuring Broker.

A minimal configuration file that sets console and file verbosity looks like this:

logger {
  ; note the single quotes!
  console-verbosity = 'info'
  file-verbosity = 'debug'
}

The environment variables take precedence over configuration file entries (but command line arguments have the highest priority).

Broker is based on CAF, so experienced users can also use the broker.conf to tweak various settings. Making use of advanced features is most helpful for developers that contribute to Broker’s CAF-based C++ source code. For seeing the “full picture”, including CAF log output, developers can build CAF with log level debug or trace (either by calling configure --with-log-level=LVL or passing CAF_LOG_LEVEL=LVL to CMake directly when using the embedded CAF version) and add the entry component-blacklist = [] to the logger section of the broker.conf file.

Communication

Broker’s primary objective is to facilitate efficient communication through a publish/subscribe model. In this model, entities send data by publishing to a specific topic, and receive data by subscribing to topics of interest. The asynchronous nature of publish/subscribe makes it a popular choice for loosely coupled, distributed systems.

Broker is the successor of Broccoli. Broker enables arbitrary applications to communicate in Zeek’s data model. In this chapter, we first describe generic Broker communication between peers that don’t assume any specific message layout. Afterwards, we show how to exchange events with Zeek through an additional Zeek-specific shim on top of Broker’s generic messages.

Exchanging Broker Messages

We start with a discussion of generic message exchange between Broker clients. At the Broker level, messages are just arbitrary values that have no further semantics attached. It’s up to senders and receivers to agree on a specific layout of messages (e.g., a set of doubles for a measurement series).

Endpoints

Broker encapsulates its entire peering setup in an endpoint object. Multiple instances of an endpoint can exist in the same process, but each endpoint features a thread-pool and (configurable) scheduler, which determines the execution of Broker’s components. Using a single endpoint per OS process guarantees the most efficient usage of available hardware resources. Nonetheless, multiple Broker applications can seamlessly operate when linked together, as there exists no global library state.

Note

Instances of type endpoint have reference semantics: that is, they behave like a reference in that it’s impossible to obtain an invalid one (unlike a null pointer). An endpoint can also be copied around cheaply, but is not safe against access from concurrent threads.

Peerings

In order to publish or receive messages an endpoint needs to peer with other endpoints. A peering is a bidirectional relationship between two endpoints. Peering endpoints exchange subscriptions and then forward messages accordingly. This allows for creating flexible communication topologies that use topic-based message routing.

An endpoint can either initiate a peering itself by connecting to remote locations, or wait for an incoming request:

// Open port and subscribe to 'foo' with all
// incoming peerings.
// Establish outgoing peering and subscribe to 'bar'.
endpoint ep1;
auto sub1 = ep1.make_subscriber({"/topic/test"});
ep1.peer("127.0.0.1", 9999);

endpoint ep0;
auto sub0 = ep0.make_subscriber({"/topic/test"});
ep0.listen("127.0.0.1", 9999);
Sending Data

In Broker a message consists of a topic-data pair. That is, endpoints publish values as data instances along with a topic that steers them to interested subscribers:

ep.publish("/topic/test", "42"); // Message is a single number.
ep.publish("/topic/test", vector{1, 2, 3}); // Message is a vector of values.

Note

Publishing a message can be a no-op if there exists no subscriber. Because Broker has fire-and-forget messaging semantics, the runtime does not generate a notification if no subscribers exist.

One can also explicitly create a dedicated publisher for a specific topic first, and then use that to send subsequent messages. This approach is better suited for high-volume streams, as it leverages CAF’s demand management internally:

auto pub = ep.make_publisher("/topic/test");
pub.publish("42"); // Message is a single number.
pub.publish(vector{1, 2, 3}); // Message is a vector.

Finally, there’s also a streaming version of the publisher that pulls messages from a producer as capacity becomes available on the output channel; see endpoint::publish_all and endpoint::publish_all_no_sync.

See Section 3 for a detailed discussion on how to construct values for messages in the form of various types of data instances.

Receiving Data

Endpoints receive data by creating a subscriber attached to the topics of interest. Subscriptions are prefix-based, matching all topics that start with a given string. A subscriber can either retrieve incoming messages explicitly by calling get or poll (synchronous API), or spawn a background worker to process messages as they come in (asynchronous API).

Synchronous API

The synchronous API exists for applications that want to poll for messages explicitly. Once a subscriber is registered for topics, calling get will wait for a new message:

endpoint ep;
auto sub = ep.make_subscriber({"/topic/test"});
auto msg = sub.get();
auto topic = get_topic(msg);
auto data_ = get_data(msg);
std::cout << "topic: " << topic << " data: " << data_ << std::endl;

By default the function get blocks until the subscriber has at least one message available, which it then returns. Each retrieved message consists of the same two elements that the publisher passed along: the topic that the message has been published to, and the message’s payload in the form of an arbitray Broker value, (i.e., a data instance). The example just prints them both out.

Blocking indefinitely until messages arrive often won’t work well, in particular not in combination with existing event loops or polling. Therefore, get takes an additional optional timeout parameter to wait only for a certain amount of time. Alternatively, one can also use available to explicitly check for available messages, or poll to extract just all currently pending messages (which may be none):

if ( sub.available() )
    msg = sub.get(); // Won't block now.

for ( auto m : sub.poll() ) // Iterate over all available messages
    std::cout << "topic: " << get_topic(m) << " data: " << get_data(m) << std::endl;

For integration into event loops, subscriber also provides a file descriptor that signals whether messages are available:

auto fd = sub.fd();
::pollfd p = {fd, POLLIN, 0};
auto n = ::poll(&p, 1, -1);
if (n < 0)
    std::terminate(); // poll failed

if (n == 1 && p.revents & POLLIN) {
    auto msg = sub.get(); // Won't block now.
    // ...
    }
Asynchronous API

TODO: Document.

Status and Error Messages

Broker informs clients about any communication errors—and optionally also about non-critical connectivity changes—through separate status messages. To get access to that information, one creates a status_subscriber, which provides a similar synchronous get/available/poll API as the standard message subscriber. By default, a status_subscriber returns only errors:

auto ss = ep.make_status_subscriber();

if ( ss.available() ) {
    auto ss_res = ss.get();
    auto err = get<error>(ss_res); // Won't block now.
    std::cerr << "Broker error:" << err.code() << ", " << to_string(err) << std::endl;
}

Errors reflect failures that may impact the correctness of operation. err.code() returns an enum ec that codifies existing error codes:

enum class ec : uint8_t {
  /// Not-an-error.
  none,
  /// The unspecified default error code.
  unspecified = 1,
  /// Version incompatibility.
  peer_incompatible,
  /// Referenced peer does not exist.
  peer_invalid,
  /// Remote peer not listening.
  peer_unavailable,
  /// Remote peer closed the connection during handshake.
  peer_disconnect_during_handshake,
  /// An peering request timed out.
  peer_timeout,
  /// Master with given name already exist.
  master_exists,
  /// Master with given name does not exist.
  no_such_master,
  /// The given data store key does not exist.
  no_such_key,
  /// The store operation timed out.
  request_timeout = 10,
  /// The operation expected a different type than provided
  type_clash,
  /// The data value cannot be used to carry out the desired operation.
  invalid_data,
  /// The storage backend failed to execute the operation.
  backend_failure,
  /// The clone store has not yet synchronized with its master, or it has
  /// been disconnected for too long.
  stale_data,
  /// Opening a file failed.
  cannot_open_file,
  /// Writing to an open file failed.
  cannot_write_file,
  /// Received an unknown key for a topic.
  invalid_topic_key,
  /// Reached the end of an input file.
  end_of_file,
  /// Received an unknown type tag value.
  invalid_tag,
  /// Received an invalid message.
  invalid_message = 20,
  /// Deserialized an invalid status.
  invalid_status,
  /// Converting between two data types or formats failed.
  conversion_failed,
  /// Adding a consumer to a producer failed because the producer already added
  /// the consumer.
  consumer_exists,
  /// A producer or consumer did not receive any message from a consumer within
  /// the configured timeout.
  connection_timeout,
  /// Called a member function without satisfying its preconditions.
  bad_member_function_call,
  /// Attempted to use the same request_id twice.
  repeated_request_id,
  /// A clone ran out of sync with the master.
  broken_clone,
  /// Canceled an operation because the system is shutting down.
  shutting_down,
  /// Canceled a peering request due to invalid or inconsistent data.
  invalid_peering_request,
  /// Broker attempted to trigger a second handshake to a peer while the first
  /// handshake did not complete.
  repeated_peering_handshake_request = 30,
  /// Received an unexpected or duplicate message during endpoint handshake.
  unexpected_handshake_message,
  /// Handshake failed due to invalid state transitions.
  invalid_handshake_state,
  /// Dispatching a message failed because no path to the receiver exists.
  no_path_to_peer,
  /// Unable to accept or establish peerings since no connector is available.
  no_connector_available,
  /// Opening a resource failed.
  cannot_open_resource,
  /// Failed to serialize an object to text or binary output.
  serialization_failed,
  /// Failed to deserialize an object from text or binary input.
  deserialization_failed,
  /// Broker refused binary input due to a magic number mismatch.
  wrong_magic_number,
  /// Broker closes a connection because a prior connection exists.
  redundant_connection,
  /// Broker encountered a
  logic_error = 40,
};

To receive non-critical status messages as well, specify that when creating the status_subscriber:

auto ss = ep.make_status_subscriber(true); // Get status updates and errors.

if ( ss.available() ) {
    auto s = ss.get();

    if ( auto err = get_if<error>(&s) )
        std::cerr << "Broker error:" << err->code() << ", " << to_string(*err) << std::endl;

    if ( auto st = get_if<status>(&s) ) {
	if ( auto ctx = st->context<endpoint_info>() ) // Get the peer this is about if available.
           std::cerr << "Broker status update regarding "
	             << ctx->network->address
	             << ":" << to_string(*st) << std::endl;
	else
           std::cerr << "Broker status update:"
	             << to_string(*st) << std::endl;
    }

Status messages represent non-critical changes to the topology. For example, after a successful peering, both endpoints receive a peer_added status message. The concrete semantics of a status depend on its embedded code, which the enum sc codifies:

/// Broker's status codes.
/// @relates status
enum class sc : uint8_t {
  /// Indicates a default-constructed ::status.
  unspecified,
  /// Successfully added a direct connection to a peer.
  peer_added,
  /// Successfully removed a direct connection to a peer.
  peer_removed,
  /// Lost direct connection to a peer.
  peer_lost,
  /// Discovered a new Broker endpoint in the network.
  endpoint_discovered,
  /// Lost all paths to a Broker endpoint.
  endpoint_unreachable,
};

Status messages have an optional context and an optional descriptive message. The member function context<T> returns a const T* if the context is available. The type of available context information is dependent on the status code enum sc. For example, all sc::peer_* status codes include an endpoint_info context as well as a message.

Exchanging Zeek Events

The communication model discussed so far remains generic for all Broker clients in that it doesn’t associate any semantics with the values exchanged through messages. In practice, however, senders and receivers will need to agree on a specific data layout for the values exchanged, so that they interpret them in the same way. This is in particular true for exchanging events with Zeek—which is one of the main applications for Broker in the first place. To support that, Broker provides built-in support for sending and receiving Zeek events through a small Zeek-specific shim on top of the generic message model. The shim encapsulates Zeek events and takes care of converting them into the expected lower-level message layout that gets transmitted. This way, Zeek events can be exchanged between an external Broker client and Zeek itself—and also even just between Broker clients without any Zeek instances at all.

Here’s a complete ping/ping example between a C++ Broker client and Zeek:

# ping.zeek

redef exit_only_after_terminate = T;

global pong: event(n: int);

event ping(n: int)
	{
	event pong(n);
	}

event zeek_init()
	{
	Broker::subscribe("/topic/test");
	Broker::listen("127.0.0.1", 9999/tcp);
	Broker::auto_publish("/topic/test", pong);
	}
// ping.cc

#include <cassert>
#include <iostream>

#include "broker/broker.hh"
#include "broker/zeek.hh"

using namespace broker;

int main() {
    // Setup endpoint and connect to Zeek.
    endpoint ep;
    auto sub = ep.make_subscriber({"/topic/test"});
    auto ss = ep.make_status_subscriber(true);
    ep.peer("127.0.0.1", 9999);

    // Wait until connection is established.
    for ( bool has_peer = false; !has_peer; ) {
      auto val = ss.get();
      if ( auto st = get_if<status>(&val) )
        has_peer = st->code() == sc::peer_added;
    }

    // Do five ping / pong.
    for ( int n = 0; n < 5; n++ ) {
        // Send event "ping(n)".
        zeek::Event ping("ping", {n});
        ep.publish("/topic/test", ping);

        // Wait for "pong" reply event.
        auto msg = sub.get();
        zeek::Event pong(move_data(msg));
        std::cout << "received " << pong.name() << pong.args() << std::endl;
    }
}
# g++ -std=c++11 -lbroker -lcaf_core -lcaf_io -lcaf_openssl -o ping ping.cc
# zeek ping.zeek &
# ./ping
received pong[0]
received pong[1]
received pong[2]
received pong[3]
received pong[4]

Gateways

Broker was designed with peer-to-peer communication in mind. All endpoints in the network form a single publish/subscribe layer. This implies that each endpoint is aware of every other endpoint in the network as well as what topics they have subscribed to.

This level of transparency enables source routing, but it comes at a cost. Endpoints flood subscriptions and topology changes to the entire network. The volume of flooded messages remains small, as long as primarily endpoints with high availability and a stable set of subscriptions join the network. However, short-lived or unstable endpoints may increase the amount of messages in the network quickly. Furthermore, the more endpoints join the network, the more state and bookkeeping overhead accumulates.

The overhead becomes especially prominent on endpoints that join the network only to publish data but were placed on the edges of the network. Such endpoints usually end up sending all—or nearly all—of their messages to another, well-connected endpoint that distributes the messages. Nevertheless, these producing endpoints still have to flood their subscriptions to the entire network and get stored in all routing tables. In the Zeek ecosystem, the Zeek Agent fits this exact role. Agents run at the edge of the network and ideally should not consume more network bandwidth and CPU cycles than necessary.

Gateways provide a way to separate the well-connected “inner” endpoints from endpoints at the edges that generally cannot contribute to the overall connectivity of the network but still incur messaging and bookkeeping overhead.

Topology

Gateways separate the overlay into two domains: external and internal. The external domain consists of stable, well-connected endpoints that build the core of the publish/subscribe layer. The internal domain consists of endpoints that need no knowledge of the entire overlay, because all ways would pass through the gateway anyway. This means, the gateway is the only way in or out for endpoints in the internal domain, as illustrated in the figure below.

_images/gateway.png

Aside from forwarding messages between the two domains, gateways render all endpoints of the internal domain completely opaque to endpoints in the external domain and vice versa.

To endpoints in the external domain, a gateway appears as the regular endpoint E. It subscribes to all topics that were subscribed by any endpoint in the internal domain and all messages published in the internal domain appear as if E was the publisher.

The endpoint in the internal domain, I is the mirror image of E: it hides all endpoints from the external domain.

The two endpoints E and I actually exist, i.e., the gateway starts both endpoints in the same process and creates a “shortcut” between the two. Every subscription or published events on one gets forwarded to the other. However, E and I are not aware of each other and the forwarded events and subscriptions appear as if they had a local subscriber or publisher.

Warning

The endpoints E and I use the same ID. When setting up a gateway, make sure that no other endpoint provides connectivity between the internal and the external domain. Otherwise, E could receive messages from I and vice versa. Since they share one ID, endpoints in the network would receive contradictory messages from what appears to be the same endpoint.

Setup

Broker includes the standalone tool broker-gateway. When started, it creates the two endpoints E and I in the same process. Each of the two endpoints listens to its own port for incoming peerings. A minimal setup would only set the two ports, as shown below.

broker-gateway --internal.port=8080 --external.port=9090

Users can also configure the gateway to connect to a list of predefined peers on startup. For example:

broker-gateway --internal.port=8080 \
               --internal.peers=[tcp://mars:1234, tcp://venus:2345] \
               --external.port=9090 \
               --external.peers=[tcp://proxima-centauri:3456]

The invocation above would listen on port 8080 for incoming peerings in the internal domain and tries to connect to mars on port 1234 as well as to venus on port 2345. In the external domain, the gateway would listen on port 9090 and try to connect to proxima-centauri on port 3456.

Instead of using the command line, users could also provide a broker.conf file with the following content:

internal {
  port = 8080
  peers = [
    <tcp://mars:1234>,
    <tcp://venus:2345>,
  ]
}
external {
  port = 9090
  peers = [
    <tcp://proxima-centauri:3456>,
  ]
}

There is also a third parameter for the domains: disable-forwarding. In particular, setting internal.disable-forwarding to true causes the gateway to not only isolate endpoints in the internal domain from endpoints in the external domains, but also endpoints within the internal domain from each other.

In setups where all endpoints of the internal domain connect only to the gateway and do not need to interact with each other, setting this flag reduces any messaging to the bare minimum by leading each endpoint in the internal domain to believe that there is exactly one other endpoint in the network—the gateway.

Data Model

Broker offers a data model that is rich in types, closely modeled after Zeek. Both endpoints and data stores operate with the data abstraction as basic building block, which is a type-erased variant structure that can hold many different values.

There exists a total ordering on data, induced first by the type discriminator and then its value domain. For a example, an integer will always be smaller than a count. While a meaningful ordering exists only when comparing two values of the same type, the total ordering makes it possible to use data as index in associative containers.

Types

None

The none type has exactly one value: nil. A default-construct instance of data is of type none. One can use this value to represent optional or invalid data.

Arithmetic

The following types have arithmetic behavior.

Boolean

The type boolean can take on exactly two values: true and false. A boolean is a type alias for bool.

Count

A count is a 64-bit unsigned integer and type alias for uint64_t.

Integer

An integer is a 64-bit signed integer and type alias for int64_t.

Real

A real is a IEEE 754 double-precision floating point value, also commonly known as double.

Time

Broker offers two data types for expressing time: timespan and timestamp.

Both types seamlessly interoperate with the C++ standard library time facilities. In fact, they are concrete specializations of the time types in std::chrono:

using clock = std::chrono::system_clock;
using timespan = std::chrono::duration<int64_t, std::nano>;
using timestamp = std::chrono::time_point<clock, timespan>;
Timespan

A timespan represents relative time duration in nanoseconds. Given that the internal representation is a 64-bit signed integer, this allows for representing approximately 292 years.

Timestamp

A timestamp represents an absolute point in time. The frame of reference for a timestamp is the UNIX epoch, January 1, 1970. That is, a timestamp is simply an anchored timespan. The function now() returns the current wallclock time as a timestamp.

String

Broker directly supports std::string as one possible type of data.

Enum Value

An enum_value wraps enum types defined by Zeek by storing the enum value’s name as a std::string. The receiver is responsible for knowing how to map the name to the actual numeric value if it needs that information.

Networking

Broker comes with a few custom types from the networking domain.

Address

The type address is an IP address, which holds either an IPv4 or IPv6 address. One can construct an address from a byte sequence, along with specifying the byte order and address family. An address can be masked by zeroing a given number of bottom bits.

Subnet

A subnet represents an IP prefix in CIDR notation. It consists of two components: a network address and a prefix length.

Port

A port represents a transport-level port number. Besides TCP and UDP ports, there is a concept of an ICMP “port” where the source port is the ICMP message type and the destination port the ICMP message code.

Containers

Broker features the following container types: vector, set, and table.

Vector

A vector is a sequence of data.

It is a type alias for std::vector<data>.

Set

A set is a mathematical set with elements of type data. A fixed data value can occur at most once in a set.

It is a type alias for std::set<data>.

Table

A set is an associative array with keys and values of type data. That is, it maps data to data.

It is a type alias for std::map<data, data>.

Interface

The data abstraction offers two ways of interacting with the contained type instance:

  1. Querying a specific type T. Similar to C++17’s std::variant, the function get_if<T> returns either a T* if the contained type is T and nullptr otherwise:

    auto x = data{...};
    if (auto i = get_if<integer>(x))
      f(*i); // safe use of x
    

    Alternatively, the function get<T> returns a reference of type T& or const T&, based on whether the given data argument is const-qualified:

    auto x = data{...};
    auto& str = get<std::string>(x); // throws std::bad_cast on type clash
    f(str); // safe use of x
    
  2. Applying a visitor. Since data is a variant type, one can apply a visitor to it, i.e., dispatch a function call based on the type discriminator to the active type. A visitor is a polymorphic function object with overloaded operator() and a result_type type alias:

    struct visitor {
      using result_type = void;
    
      template <class T>
      result_type operator()(const T&) const {
        std::cout << ":-(" << std::endl;
      }
    
      result_type operator()(real r) const {
        std::cout << i << std::endl;
      }
    
      result_type operator()(integer i) const {
        std::cout << i << std::endl;
      }
    };
    
    auto x = data{42};
    visit(visitor{}, x); // prints 42
    x = 4.2;
    visit(visitor{}, x); // prints 4.2
    x = "42";
    visit(visitor{}, x); // prints :-(
    

Data Stores

In addition to transmitting data via publish/subscribe communication, Broker also offers a mechanism to store this very data. Data stores provide a distributed key-value interface that leverages the existing peer communication.

Aspects

A data store has two aspects: a frontend for interacting with the user, and a backend that defines the database type for the key-value store.

Frontend

Users interact with a data store through the frontend, which is either a master or a clone. A master is authoritative for the store, whereas a clone represents a local cache that is connected to the master. A clone cannot exist without a master. Only the master can perform mutating operations on the store, which it pushes out to all its clones. A clone has a full copy of the data in memory for faster access, but sends any modifying operations to its master first. Only when the master propagates back the change, the result of the operation becomes visible at the clone.

It is possible to attach one or more data stores to an endpoint, but each store must have a unique master name. For example, two peers cannot both have a master with the same name. When a clone connects to its master, it receives a full dump of the store:

_images/store-attach.png

While the master can apply mutating operations to the store directly, clones have to first send the operation to the master and wait for the replay for the operation to take on effect:

_images/store-modify.png
Backend

The master can choose to keep its data in various backends:

  1. Memory. This backend uses a hash-table to keep its data in memory. It is the fastest of all backends, but offers limited scalability and does not support persistence.

  2. SQLite. The SQLite backend stores its data in a SQLite3 format on disk. While offering persistence, it does not scale well to large volumes.

Operations

Key operations on data stores include attaching it to an endpoint, performing mutating operations, and retrieving values at specific keys.

Construction

The example below illustrates how to attach a master frontend with a memory backend:

  endpoint ep;
  auto ds = ep.attach_master("foo", backend::memory);

The factory function endpoint::attach_master has the following signature:

expected<store> attach_master(std::string name, backend type,
                              backend_options opts=backend_options());

The function takes as first argument the global name of the store, as second argument the type of store (broker::backend::{memory,sqlite,rocksdb}), and as third argument optionally a set of backend options, such as the path where to keep the backend on the filesystem. The function returns a expected<store> which encapsulates a type-erased reference to the data store.

Note

The type expected<T> encapsulates an instance of type T or an error, with an interface that has “pointer semantics” for syntactic convenience:

auto f(...) -> expected<T>;

auto x = f();
if (x)
  f(*x); // use instance of type T
else
  std::cout << to_string(x.error()) << std::endl;

In the failure case, the expected<T>::error() returns the error.

Modification

Data stores support the following mutating operations:

void put(data key, data value, optional<timespan> expiry = {}) const;

Stores the value at key, overwriting a potentially previously existing value at that location. If expiry is given, the new entry will automatically be removed after that amount of time.

void erase(data key) const;

Removes the value for the given key, if it exists.

void clear() const;

Removes all current store values.

void increment(data key, data amount, optional<timespan> expiry = {}) const;

Increments the existing value at key by the given amount. This is supported for numerical data types and for timestamps. If expiry is given, the modified entry’s expiration time will be updated accordingly.

void decrement(data key, data amount, optional<timespan> expiry = {}) const;

Decrements the existing value at key by the given amount. This is supported for numerical data types and for timestamps. If expiry is given, the modified entry’s expiration time will be updated accordingly.

void append(data key, data str, optional<timespan> expiry = {}) const;

Appends a new string str to an existing string value at key. If expiry is given, the modified entry’s expiration time will be updated accordingly.

void insert_into(data key, data index, optional<timespan> expiry = {}) const;

For an existing set value stored at key, inserts the value index into it. If expiry is given, the modified entry’s expiration time will be updated accordingly.

void insert_into(data key, data index, data value, optional<timespan> expiry = {}) const;

For an existing vector or table value stored at key, inserts value into it at index. If expiry is given, the modified entry’s expiration time will be updated accordingly.

void remove_from(data key, data index, optional<timespan> expiry = {}) const;

For an existing vector, set or table value stored at key, removes the value at index from it. If expiry is given, the modified entry’s expiration time will be updated accordingly.

void push(data key, data value, optional<timespan> expiry = {}) const;

For an existing vector at key, appends value to its end. If expiry is given, the modified entry’s expiration time will be updated accordingly.

void pop(data key, optional<timespan> expiry = {}) const;

For an existing vector at key, removes its last value. If expiry is given, the modified entry’s expiration time will be updated accordingly.

Direct Retrieval

Data stores support the following retrieval methods:

expected<data> get(data key) const;

Retrieves the value at key. If the key does not exist, returns an error ec::no_such_key.

auto result = ds->get("foo");
if (result)
    std::cout << *result << std::endl; // Print current value of 'foo'.
else if (result.error() == ec::no_such_key)
    std::cout << "key 'foo' does not exist'" << std::endl;
else if (result.error() == ec::backend_failure)
    std::cout << "something went wrong with the backend" << std::endl;
else
    std::cout << "could not retrieve value at key 'foo'" << std::endl;
expected<data> exists(data key) const;

Returns a boolean data value indicating whether key exists in the store.

expected<data> get_index_from_value(data key, data index) const;

For containers values (sets, tables, vectors) at key, retrieves a specific index from the value. For sets, the returned value is a boolean data instance indicating whether the index exists in the set. If key does not exist, returns an error ec::no_such_key.

expected<data> keys() const

Retrieves a copy of all the store’s current keys, returned as a set. Note that this is a potentially expensive operation if the store is large.

All of these methods may return the ec::stale_data error when querying a clone if it has yet to ever synchronize with its master or if has been disconnected from its master for too long of a time period. The length of time before a clone’s cache is deemed stale depends on an argument given to the endpoint::attach_clone method.

All these methods share the property that they will return the corresponding result directly. Due to Broker’s asynchronous operation internally, this means that they may block for short amounts of time until the result becomes available. If that’s a problem, you can receive results back asynchronously as well, see next section.

Note, however, that even with this direct interface, results may sometimes take a bit to reflect operations that clients perform (including the same client!). This effect is most pronounced when working through a clone: any local manipulations will need to go through the master before they become visible to the clone.

Proxy Retrieval

When integrating data store queries into an event loop, the direct retrieval API may not prove a good fit: request and response are coupled at lookup time, leading to potentially blocking operations. Therefore, Broker offers a second mechanism to lookup values in data stores. A store::proxy decouples lookup requests from responses and exposes a mailbox to integrate into event loops. When a using a proxy, each request receives a unique, monotonically increasing 64-bit ID that is hauled through the response:

// Add a value to a data store (master or clone).
ds->put("foo", 42);
// Create a proxy.
auto proxy = store::proxy{*ds};
// Perform an asynchyronous request to look up a value.
auto id = proxy.get("foo");
// Get a file descriptor for event loops.
auto fd = proxy.mailbox().descriptor();
// Wait for result.
::pollfd p = {fd, POLLIN, 0};
auto n = ::poll(&p, 1, -1);
if (n < 0)
    std::terminate(); // poll failed

if (n == 1 && p.revents & POLLIN) {
    auto response = proxy.receive(); // Retrieve result, won't block now.
    assert(response.id == id);
    // Check whether we got data or an error.
    if (response.answer)
        std::cout << *response.answer << std::endl; // may print 42
    else if (response.answer.error() == ec::no_such_key)
        std::cout << "no such key: 'foo'" << std::endl;
    else
        std::cout << "failed to retrieve value at key 'foo'" << std::endl;
}

The proxy provides the same set of retrieval methods as the direct interface, with all of them returning the corresponding ID to retrieve the result once it has come in.

Intra-store Communication

Broker uses two reserved topics to model communication between masters and clones: M and C. Masters subscribe to M when attached to an endpoint and publish to C to broadcast state transitions to its clones. Clones subscribe to C when attached to an endpoint and publish to M for propagating mutating operations to the master.

These topics also enable clones to find the master without knowing which endpoint it was attached to. When starting a clone, it periodically publishes a handshake to M until the master responds. This rendezvous process also makes it easier to setup cluster instances, because users can attach clones while connection to the master has not been established yet.

The publish/subscribe layer of Broker generally targets loosely coupled deployments and does neither ensure ordering nor guarantee delivery. Without these two properties, masters and clones could quickly run out of sync when messages arrive out of order or get lost. Hence, masters and clones use “virtual channels” to implement reliable and ordered communication on top of the publish/subscribe layer. The master includes a sequence number when publishing to C in order to enable clones to detect out-of-order delivery and ask the master to retransmit lost messages. Cumulative ACKs and per-clone state allow the master to delete transmitted messages as well as to detect unresponsive clones using timeouts.

For the low-level details of the channel abstraction, see the channels section in the developer guide.

Web Socket

Broker offers access to the publish/subscribe layer via WebSocket in order to make its data accessible to third parties.

WebSocket clients are treated as lightweight peers. Each Broker endpoint can be configured to act as a WebSocket server by either (1) setting the environment variable BROKER_WEB_SOCKET_PORT; (2) setting broker.web-socket.port on the command line or in the configuration file; or (3) from C++ by calling endpoint::web_socket_listen(). When running inside Zeek, scripts may call Broker::listen_websocket() to have Zeek start listening for incoming WebSocket connections.

Note

Broker uses the same SSL parameters for native and WebSocket peers.

JSON API v1

To access the JSON API, clients may connect to wss://<host>:<port>/v1/messages/json (SSL enabled, default) or ws://<host>:<port>/v1/messages/json (SSL disabled). On this WebSocket endpoint, Broker allows JSON-formatted text messages only.

Handshake

The broker endpoint expects a JSON array of strings as the first message. This array encodes the subscriptions as a list of topic prefixes that the client subscribes to. Clients that only publish data must send an empty JSON array.

After receiving the subscriptions, the Broker endpoint sends a single ACK message:

{
  "type": "ack",
  "endpoint": "<uuid>",
  "version": "<broker-version>"
}

In this message, <uuid> is the unique endpoint ID of the WebSocket server and <broker-version> is a string representation of the libbroker version, i.e., the result of broker::version::string(). For example:

{
  "type": "ack",
  "endpoint": "925c9110-5b87-57d9-9d80-b65568e87a44",
  "version": "2.2.0-22"
}
Protocol

After the handshake, the WebSocket client may only send Data Messages. The Broker endpoint converts every message to its native representation and publishes it.

The WebSocket server may send Data Messages (whenever a data message matches the subscriptions of the client) and Error Messages_ to the client.

Data Representation

Broker uses a recursive data type to represent its values (see Data Model). This data model does not map to JSON-native types without ambiguity, e.g., because Broker distinguishes between signed and unsigned number types.

In JSON, we represent each value as a JSON object with two keys: @data-type and data. The former identifies one of Broker’s data types (see below) and denotes how Broker parses the data field.

None

There is only exactly one valid input for encoding a none:

{
  "@data-type": "none",
  "data": {}
}
Boolean

The type boolean can take on exactly two values and maps to the native JSON boolean type:

{
  "@data-type": "boolean",
  "data": true
}
{
  "@data-type": "boolean",
  "data": false
}
Count

A count is a 64-bit unsigned integer and maps to a (positive) JSON integer. For example, Broker encodes the count 123 as:

{
  "@data-type": "count",
  "data": 123
}

Note

Passing a number with a decimal point (e.g. ‘1.0’) is an error.

Integer

The type integer maps to JSON integers. For example, Broker encodes the integer -7 as:

{
  "@data-type": "integer",
  "data": -7
}

Note

Passing a number with a decimal point (e.g. ‘1.0’) is an error.

Real

The type real maps to JSON numbers. For example, Broker encodes -7.5 as:

{
  "@data-type": "real",
  "data": -7.5
}
Timespan

A timespan has no equivalent in JSON and Broker thus encodes them as strings. The format for the string is <value><suffix>, whereas the value is an integer and suffix is one of:

ns

Nanoseconds.

ms

Milliseconds.

s

Seconds.

min

Minutes

h

Hours.

d

Days.

For example, 1.5 seconds may be encoded as:

{
  "@data-type": "timespan",
  "data": "1500ms"
}
Timestamp

Like timespan, Broker uses formatted strings to represent timestamp since there is no native JSON equivalent. Timestamps are encoded in ISO 8601 as YYYY-MM-DDThh:mm:ss.sss.

For example, Broker represents April 10, 2022 at precisely 7AM as:

{
  "@data-type": "timestamp",
  "data": "2022-04-10T07:00:00.000"
}
String

Strings simply map to JSON strings, e.g.:

{
  "@data-type": "string",
  "data": "Hello World!"
}
Enum Value

Broker internally represents enumeration values as strings. Hence, this type also maps to JSON strings:

{
  "@data-type": "enum-value",
  "data": "foo"
}
Address

Network addresses are encoded as strings and use the IETF-recommended string format for IPv4 and IPv6 addresses, respectively. For example:

{
  "@data-type": "address",
  "data": "2001:db8::"
}
Subnet

Network subnets are encoded in strings with “slash notation”, i.e., <address>/<prefix-length>. For example:

{
  "@data-type": "subnet",
  "data": "255.255.255.0/24"
}
Port

Ports are rendered as strings with the format <port-number>/<protocol>, whereas <port-number> is a 16-bit unsigned integer and protocol is one of tcp, udp, icmp, or ?. For example:

{
  "@data-type": "port",
  "data": "8080/tcp"
}
Vector

A vector is a sequence of data. This maps to a JSON array consisting of JSON objects (that in turn each have the @data-type and data keys again). For example:

"@data-type": "vector",
"data": [
  {
    "@data-type": "count",
    "data": 42
  },
  {
    "@data-type": "integer",
    "data": 23
  }
]
Set

Sets are similar to vector, but each object in the list may only appear once. For example:

"@data-type": "set",
"data": [
  {
    "@data-type": "string",
    "data": "foo"
  },
  {
    "@data-type": "string",
    "data": "bar"
  }
]
Table

Since Broker allows arbitrary types for the key (even a nested table), Broker cannot render tables as JSON objects. Hence, tables are mapped JSON arrays of key-value pairs, i.e., JSON objects with key and value. For example:

{
  "@data-type": "table",
  "data": [
    {
      "key": {
        "@data-type": "string",
        "data": "first-name"
      },
      "value": {
        "@data-type": "string",
        "data": "John"
      }
    },
    {
      "key": {
        "@data-type": "string",
        "data": "last-name"
      },
      "value": {
        "@data-type": "string",
        "data": "Doe"
      }
    }
  ]
}
Data Messages

Represents a user-defined message with topic and data.

A data message consists of these keys:

type

Always data-message.

topic

The Broker topic for the message. A client will only receive topics that match its subscriptions.

@data-type

Meta field that encodes how to parse the data field (see Data Representation).

data

Contains the actual payload of the message.

Example:

{
  "type": "data-message",
  "topic": "/foo/bar",
  "@data-type": "count",
  "data": 1
}
Error Messages

The error messages on the WebSocket connection give feedback to the client if the server discarded malformed input from the client or if there has been an error while processing the JSON text.

An error message consists of these keys:

type

Always error.

code

A string representation of one of Broker’s error codes. See Section 2.1.5.

context

A string that gives additional information as to what went wrong.

For example, sending the server How is it going? instead of a valid data message would cause it to send this error back to the client:

{
  "type": "error",
  "code": "deserialization_failed",
  "context": "input #1 contained malformed JSON -> caf::pec::unexpected_character(1, 1)"
}
Encoding of Zeek Events

Broker encodes Zeek events as nested vectors using the following structure: [<format-nr>, <type>, [<name>, <args>]]:

format-nr

A count denoting the format version. Currently, this is always 1.

type

A count denoting the encoded Zeek message type. For events, this is always 1. Other message types in Zeek are currently not safe for 3rd-party use.

name

Identifies the Zeek event.

args

Contains the arguments for the event in the form of another vector.

For example, an event called event_1 that has been published to topic /foo/bar with an integer argument 42 and a string argument test would render as:

{
  "type": "data-message",
  "topic": "/foo/bar",
  "@data-type": "vector",
  "data": [
    {
      "@data-type": "count",
      "data": 1
    },
    {
      "@data-type": "count",
      "data": 1
    },
    {
      "@data-type": "vector",
      "data": [
        {
          "@data-type": "string",
          "data": "event_1"
        },
        {
          "@data-type": "vector",
          "data": [
            {
              "@data-type": "integer",
              "data": 42
            },
            {
              "@data-type": "string",
              "data": "test"
            }
          ]
        }
      ]
    }
  ]
}

Python Bindings

Almost all functionality of Broker is also accessible through Python bindings. The Python API mostly mimics the C++ interface, but adds transparent conversion between Python values and Broker values. In the following we demonstrate the main parts of the Python API, assuming a general understanding of Broker’s concepts and the C++ interface.

Note

Broker’s Python bindings require Python 3.5 or greater.

Installation in a Virtual Environment

To install Broker’s python bindings in a virtual environment, the python-prefix configuration option can be specified and the python header files must be on the system for the version of python in the virtual environment. You can also use the prefix configuration option to install the main Broker library and headers into an isolated location.

$ virtualenv -p python3 /Users/user/sandbox/broker/venv
$ . /Users/user/sandbox/broker/venv/bin/activate
$ ./configure --prefix=/Users/user/sandbox/broker --python-prefix=$(python3 -c 'import sys; print(sys.exec_prefix)')
$ make install
$ python3 -c 'import broker; print(broker.__file__)'
/Users/user/sandbox/broker/venv/lib/python3.7/site-packages/broker/__init__.py

Communication

Just as in C++, you first set up peerings between endpoints and create subscriber for the topics of interest:

        with broker.Endpoint() as ep1, \
             broker.Endpoint() as ep2, \
             ep1.make_subscriber("/test") as s1, \
             ep2.make_subscriber("/test") as s2:
            port = ep1.listen("127.0.0.1", 0)
            self.assertTrue(ep2.peer("127.0.0.1", port, 1.0))

            ep1.await_peer(ep2.node_id())
            ep2.await_peer(ep1.node_id())

You can then start publishing messages. In Python a message is just a list of values, along with the corresponding topic. The following publishes a simple message consisting of just one string, and then has the receiving endpoint wait for it to arrive:

            ep2.publish("/test", ["ping"])
            (t, d) = s1.get()
            # t == "/test", d == ["ping"]

Example of publishing a small batch of two slightly more complex messages with two separate topics:

            msg1 = ("/test/2", (1, 2, 3))
            msg2 = ("/test/3", (42, "foo", {"a": "A", "b": ipaddress.IPv4Address('1.2.3.4')}))
            ep2.publish_batch(msg1, msg2)

As you see with the 2nd message there, elements can be either standard Python values or instances of Broker wrapper classes; see the data model section below for more.

The subscriber instances have more methods matching their C++ equivalent, including available for checking for pending messages, poll() for getting available messages without blocking, fd() for retrieving a select-able file descriptor, and {add,remove}_topic for changing the subscription list.

Exchanging Zeek Events

The Broker Python bindings come with support for representing Zeek events as well. Here’s the Python version of the C++ ping example shown earlier:

# ping.zeek

redef exit_only_after_terminate = T;

global pong: event(n: int);

event ping(n: int)
	{
	event pong(n);
	}

event zeek_init()
	{
	Broker::subscribe("/topic/test");
	Broker::listen("127.0.0.1", 9999/tcp);
	Broker::auto_publish("/topic/test", pong);
	}
# ping.py

import sys
import broker

# Setup endpoint and connect to Zeek.
with broker.Endpoint() as ep, \
     ep.make_subscriber("/topic/test") as sub, \
     ep.make_status_subscriber(True) as ss:

    ep.peer("127.0.0.1", 9999)

    # Wait until connection is established.
    st = ss.get()

    if not (type(st) == broker.Status and st.code() == broker.SC.PeerAdded):
        print("could not connect")
        sys.exit(0)

    for n in range(5):
        # Send event "ping(n)".
        ping = broker.zeek.Event("ping", n);
        ep.publish("/topic/test", ping);

        # Wait for "pong" reply event.
        (t, d) = sub.get()
        pong = broker.zeek.Event(d)
        print("received {}{}".format(pong.name(), pong.args()))
# python3 ping.py
received pong[0]
received pong[1]
received pong[2]
received pong[3]
received pong[4]

Data Model

The Python API can represent the same type model as the C++ code. For all Broker types that have a direct mapping to a Python type, conversion is handled transparently as values are passed into, or retrieved from, Broker. For example, the message [1, 2, 3] above is automatically converted into a Broker list of three Broker integer values. In cases where there is not a direct Python equivalent for a Broker type (e.g., for count; Python does not have an unsigned integer class), the Broker module provides wrapper classes. The following table summarizes how Broker and Python values are mapped to each other:

Broker Type

Python representation

boolean

True/False

count

broker.Count(x)

integer

int

real

float

timespan

datetime.timedelta

timestamp

datetime.datetime

string

str

address

ipaddress.IPv4Address/ipaddress.IPv6Address

subnet

ipaddress.IPv4Network/ipaddress.IPv6Network

port

broker.Port(x, broker.Port.{TCP,UDP,ICMP,Unknown})

vector

tuple

set

set

table

dict

Note that either a Python tuple or Python list may convert to a Broker vector, but the canonical Python type representing a vector is a tuple. That is, whenever converting a Broker vector value into a Python value, you will get a tuple. A tuple is the canonical type here because it is an immutable type, but a list is mutable – we need to be able to represent tables indexed by vectors, tables are mapped to Python dictionaries, Python dictionaries only allow immutable index types, and so we must use a tuple to represent a vector.

Status and Error Messages

Status and error handling works through a status subscriber, again similar to the C++ interface:

        with broker.Endpoint() as ep1, \
             ep1.make_status_subscriber() as es1:
            r = ep1.peer("127.0.0.1", 1947, 0.0) # Try unavailable port, no retry
            self.assertEqual(r, False) # Not shown in docs.
            # s1.code() == broker.EC.PeerUnavailable
        with broker.Endpoint() as ep1, \
             broker.Endpoint() as ep2, \
             ep1.make_status_subscriber(True) as es1, \
             ep2.make_status_subscriber(True) as es2:

            port = ep1.listen("127.0.0.1", 0)
            self.assertEqual(ep2.peer("127.0.0.1", port, 1.0), True)

            ep1.await_peer(ep2.node_id())
            ep2.await_peer(ep1.node_id())

            st1 = es1.get(2)
            st2 = es2.get(2)
            # st1.code() == [broker.SC.EndpointDiscovered, broker.SC.PeerAdded]
            # st2.code() == [broker.SC.EndpointDiscovered, broker.SC.PeerAdded]

Data Stores

For data stores, the C++ API also directly maps to Python. The following instantiates a master store to then operate on:

        with broker.Endpoint() as ep1, \
             ep1.attach_master("test", broker.Backend.Memory) as m:

            m.put("key", "value")
            x = m.get("key")
            # x == "value"

In Python, both master and clone stores provide all the same accessor and mutator methods as C++. Some examples:

            m.increment("e", 1)
            m.decrement("f", 1)
            m.append("str", "ar")
            m.insert_into("set", 3)
            m.remove_from("set", 1)
            m.insert_into("table", 3, "D")
            m.remove_from("table", 1)
            m.push("vec", 3)
            m.push("vec", 4)
            m.pop("vec")

Here’s a more complete example of using a SQLite-backed data store from python, with the database being stored in mystore.sqlite:

# sqlite-listen.py

import broker

with broker.Endpoint() as ep, \
     ep.make_subscriber('/test') as s, \
     ep.make_status_subscriber(True) as ss:

    ep.listen('127.0.0.1', 9999)

    m = ep.attach_master('mystore',
                         broker.Backend.SQLite, {'path': 'mystore.sqlite'})

    while True:
        print(ss.get())
        print(m.get('foo'))
# sqlite-connect.py

import broker
import sys
import time

with broker.Endpoint() as ep, \
     ep.make_subscriber('/test') as s, \
     ep.make_status_subscriber(True) as ss:

    ep.peer('127.0.0.1', 9999, 1.0)

    st = ss.get();

    if not (type(st) == broker.Status and st.code() == broker.SC.PeerAdded):
        print('could not connect')
        sys.exit(1)

    c = ep.attach_clone('mystore')

    while True:
        time.sleep(1)
        c.increment('foo', 1)
        print(c.get('foo'))

Developer Guide

Broker is based on the CAF, the C++ Actor Framework. Experience with CAF certainly helps, but a basic understanding of the actor model as well as publish/subscribe suffices for understanding (and contributing to) Broker.

In the code base of Broker, we frequently use templates, lambdas, and common C++ idioms such as CRTP and RAII.

Architecture

From a user perspective, the Broker endpoint is the primary component in the API (see Section 2.1.1). Internally, an endoint is a container for an actor system that hosts the core actor plus any number of subscribers and publishers. The figure below shows a simplified architecture of Broker in terms of actors.

Simplified architecture of a Broker endpoint in terms of actors.

An endpoint always contains exactly one core actor. From the perspective of the implementation, this actor is the primary component. It manages publishers and subscribers, maintains peering relations, forwards messages to remote peers, etc.

Broker uses four types of messages internally, whereas a message here simply means a copy-on-write (COW) tuple.

  1. A data message consists of a topic and user-defined data (cf. Data Model). Any direct user interaction on publishers or subscribers uses this message type internally.

  2. A command message consists of a topic and a private data type called internal_command. This type of message usually remains hidden to Broker users since this message type represents internal communication of data stores (between masters and clones).

  3. A packed message represents a data, command, or routing-related message in a serialized form. Each packed message consists of a type tag, a TTL field, a topic, and a byte buffer. The type tag stores the type information needed to deserialize the byte buffer. The TTL field is a counter that decrements whenever Broker forwards a message between peers. Once the counter reaches zero, Broker no longer forwards the message.

  4. A node message represents a message between two Broker nodes (endpoints). The routing and dispatching logic in the core actor operates on this message type. Node messages are tuples that consist of two endpoint IDs, one for the sender and one for the receiver, as well as a packed message that represents the actual content.

Broker organizes those message types in data flows, as depicted below:

All classes involved in implementing the core actor.

The core actor represents the main hub that connects local and remote flows. In general, publishers generate data for peers and subscribers consume data from peers. In the core actor, there is a central merge point where all messages flow though. Peers directly tap into this dispatching point. Since peers operate on node messages, they do not need to serialize or deserialize any payloads when writing to or reading from the network. The core actor selects all messages with local subscriptions from the central merge point and only deserializes node messages into data or command messages once.

Likewise, incoming messages from publishers get serialized once immediately after receiving them and then they flow as node messages into the central merge point.

Implementation

Endpoints, master stores, and clone stores (see Section 1) all map to actors. Endpoints wrap the actor system and the main component: the core actor (see architecture).

The Core Actor

As the name suggests, this actor embodies the central hub for the publish/subscribe communication. Everything flows through this actor: local subscriptions, peering requests, local and remote publications, control messages for the stores, and so on.

The data flow stages shown in the Architecture Section also appear in the source code. However, in the actual implementation we need to distinguish between data and command messages since they use different C++ types. Hence, the core actor primarily revolves around these member variables:

data_inputs

Merges all inputs from local publishers. We also push data directly into this merger when receiving publication messages that bypass the flow abstractions. These messages get generated from endpoint::publish. The central_merge consumes messages from this merger (after converting each data_message to a node_message).

command_inputs

Merges all inputs from data store actors. Just like data_inputs, we convert every incoming message to a node_message and then feed it into the central_merge.

central_merge

Merges inputs from connected peers, local publishers and local data store actors. Everything flows through this central point. This makes it easy to tap into the message flow of Broker: each new downstream simply filters messages of interest and then operates on the selected subset.

data_outputs

This stage makes all data messages that match local subscriptions (and that should be visible on this endpoint) available by filtering from the central_merge and deserializing the payloads. Broker initializes this stage lazily. As long as no local subscriber appears, Broker does not deserialize anything.

command_outputs

Similar to data_outputs, this stage makes command messages available to local data store actors. We also construct this member lazily.

New peers are modeled as a pair of flows: one for incoming node messages and one for outgoing node messages. The peers themselves are trivially implemented. We receive a connected socket from the connector after a successful peering handshake. We hand this socket over to a CAF socket manager that translates from the data flows to socket reads and writes. All we need in addition to the flow management is a trait class that informs CAF how to serialize and deserialize the data.

The core actor also emits messages for peering-related events that users can consume with status subscribers. For the peering-related events, the core actor implements the following callbacks that also make it easy to add additional logic to any of those events:

  • peer_discovered

  • peer_connected

  • peer_disconnected

  • peer_removed

  • peer_unreachable

  • cannot_remove_peer

  • peer_unavailable

Handshakes

Handshakes are performed by the connector. The core actor implements a listener interface to enable it to receive connected sockets after successful handshakes.

Broker uses a three-way handshake to make sure there is always exactly at most one connection between two peers. Each Broker endpoint has a unique ID (a randomly generated UUID). After establishing a TCP connection, Broker peers send a hello message with their own endpoint ID. By convention, the endpoint with the smaller ID becomes the originator. The example below depicts all handshake messages with two nodes, Peer A (establishes the TCP connection) and Peer B (has a smaller ID than A).

         +-------------+                    +-------------+
         |   Peer A    |                    |   Peer B    |
         +------+------+                    +------+------+
                |                                  |
 endpoint::peer |                                  |
+-------------->+                                  |
                |                                  |
                +---+                              |
                |   | try to connect via TCP       |
                +<--+                              |
                |                                  |
                | (hello)                          |
                +--------------------------------->+
                |                                  |
                |                 (originator_syn) |
                +<---------------------------------+
                |                                  |
                | (responder_syn_ack)              |
                +--------------------------------->+
                |                                  |
                |               (originator_ack)   |
                +<---------------------------------+
                |                                  |
                |                                  |

Peers abort handshakes with drop_conn messages when detecting redundant connections.

Logical Time

Broker has two types for modelling logical clocks:

  1. broker::lamport_timestamp

  2. broker::vector_timestamp

The former type is a thin wrapper (AKA strong typedef) for a 64-bit unsigned integer. It provides operator++ as well as the comparison operators.

Channels

Channels model logical connections between one producer and any number of consumers on top of an unreliable transport. Changes in the topology of Broker at runtime can cause reordering of messages if a faster path appears or loss of messages if a path disappears.

In places where Broker requires ordered and reliable communication, e.g., communication between clone and master actors, the class broker::internal::channel provides a building block to add ordering and reliability.

A channel is unaware of the underlying transport and leaves the rendezvous process (i.e., how producers learn handles of new consumers) to the user. The class channel defines message types as well as interfaces for producer and consumer implementations (both use CRTP to interface with user code).

Producer

The producer requires a Backend template parameter and expects a pointer of type Backend* in its constructor. This backend implements a transport layer for the channel and must provide the following interface (pseudo code):

interface Backend {
  // Sends a unicast message to `hdl`.
  void send(producer*, const Handle& hdl, const T& msg);

  // Sends a multicast message to all consumers.
  void broadcast(producer*, const T& msg)

  // Called to indicate that a consumer got removed by the producer.
  void drop(producer*, const Handle& hdl, ec reason)

  // Called to indicate that the producer received the initial ACK.
  void handshake_completed(producer*, const Handle& hdl)
};

The first argument is always the this pointer of the producer. This enables the backend to multiplex more than one producer at a time. The type Handle identifies a single consumer. In the data store actors, this is an entity_id. Finally, T is one of the following message types:

Type

Semantics

handshake

Transmits the first sequence number to a consumer.

event

Transmits ordered data to consumers.

retransmit_failed

Notifies that an event is no longer available.

heartbeat

Keeps connections to consumers alive.

Consumer

Similar to the producer, the consumer also requires a Backend for providing a transport and consuming incoming events (pseudo code):

interface Backend {
  // process a single event.
  void consume(consumer*, Payload)

  // Sends a control message to the producer.
  void send(consumer*, T)`

  // Process a lost event. The callback may abort further processing by
  // returning a non-default error. In this case, the consumer immediately
  // calls `close` with the returned error.
  error consume_nil(consumer*)

  // Drops this consumer. After calling this function, no further function
  // calls on the consumer are allowed (except calling the destructor).
  void close(consumer*, error)
};

The first argument is always the this pointer of the consumer. This enables the backend to multiplex more than one consumer at a time. The member function send always implicitly transmits control messages to the single producer. The type Payload is a template parameter of channel and denotes the content of event messages of the producer. Finally, T is one of the following message types:

Type

Semantics

cumulative_ack

Notifies the producer which events were processed.

nack

Notifies the producer that events got lost.

Consumers send cumulative_ack messages periodically, even if no messages were received. This enables the producer to keep track of which consumers are still alive and reachable.

Channels in Data Store Actors

In general, the master actor broadcasts state updates to its clones. This maps directly to the one-producer-many-consumers model of channel. However, clones can also take the role a producer when forwarding mutating operations to the master.

In a nutshell, the master actor (see master_actor.hh) always has a producer attached to it and any number of consumers:

using producer_type = channel_type::producer<master_state>;

using consumer_type = channel_type::consumer<master_state>;

producer_type output;

std::unordered_map<entity_id, consumer_type> inputs;

Conversely, the clone actor (see clone_actor.hh) always has a consumer attached to it and it may have a producer:

using consumer_type = channel_type::consumer<clone_state>;

using producer_type = channel_type::producer<clone_state, producer_base>;

consumer_type input;

std::unique_ptr<producer_type> output_ptr;

Clones initialize the field output_ptr lazily on the first mutating operation they need to forward to the master.

Mapping Channel to Command Messages

The message types defined in channel are never used for actor-to-actor communication directly. Instead, masters and clones exchange command_message objects, which consist of a topic and an internal_command (the Payload type for the channels). Masters and clones convert between Broker message types and channel message types on the fly (using a surjective mapping). The essential interface for internal_command is defined as follows:

enum class command_tag {
  action,
  producer_control,
  consumer_control,
};

class internal_command {
public:
  // ...
  using variant_type
    = std::variant<put_command, put_unique_command, put_unique_result_command,
                   erase_command, expire_command, add_command, subtract_command,
                   clear_command, attach_clone_command, attach_writer_command,
                   keepalive_command, cumulative_ack_command, nack_command,
                   ack_clone_command, retransmit_failed_command>;

  sequence_number_type seq;

  entity_id sender;

  variant_type content;
};

command_tag tag_of(const internal_command& cmd);

Furthermore, data store actors define channel_type as channel<entity_id, internal_command>. When processing an internal_command, the receiver first looks at the tag.

Control messages directly map to channel messages:

Internal Command Type

Channel Message Type

attach_writer_command

channel::handshake

ack_clone_command

channel::handshake

cumulative_ack_command

channel::cumulative_ack

nack_command

channel::nack

keepalive_command

channel::heartbeat

retransmit_failed_command

channel::retransmit_failed

Note that attach_clone_command does not map to any channel message type. This message is the discovery message used by clones to find the master. When receiving it, the master initiates the handshake on the channel by sending ack_clone_command (which contains a snapshot of the state and is thus not broadcasted).

When a clone adds a writer, it already knows the master and thus skips the discovery phase by directly sending the attach_writer_command handshake.

All internal commands that contain an action, such as put_comand, get forwarded to the channel as payload. Either by calling produce on a producer or by calling handle_event on a consumer. The latter then calls consume on the data store actor with the internal_command messages in the order defined by the sequence number.

Cluster Setup and Testing

Peering, path discovery, subscription propagation, etc. takes some unspecified amount of time when setting up a cluster. If a single manager is responsible for this setup, the work flow usually relies on some feedback to the manager to signal when the cluster is fully connected and ready to use. The same applies when writing high-level integration tests.

In order to wait for two nodes to add each other their routing tables and exchange subscriptions, the class endpoint provides the member function await_peer:

  /// Blocks execution of the current thread until either `whom` was added to
  /// the routing table and its subscription flooding reached this endpoint or a
  /// timeout occurs.
  /// @param whom ID of another endpoint.
  /// @param timeout An optional timeout for the configuring the maximum time
  ///                this function may block.
  /// @returns `true` if `whom` was added before the timeout, `false` otherwise.
  [[nodiscard]] bool
  await_peer(endpoint_id whom, timespan timeout = defaults::await_peer_timeout);

  /// Asynchronously runs `callback()` when `whom` was added to the routing
  /// table and its subscription flooding reached this endpoint.
  /// @param whom ID of another endpoint.
  /// @param callback A function object wrapping code for asynchronous
  ///                 execution. The argument for the callback is `true` if
  ///                 `whom` was added before the timeout, `false` otherwise.
  void await_peer(endpoint_id whom, std::function<void(bool)> callback,
                  timespan timeout = defaults::await_peer_timeout);

The first overload blocks the caller, until a timeout (or error) occurs or the awaited peer has connected. The second overload is an asynchronous version that takes a callback instead. On success, the endpoint calls the callback with true and otherwise it calls the callback with false.

To retrieve the entity_id from an endpoint object, simply call node_id(). For example, if both endpoints belong to the same process:

endpoint ep0;
endpoint ep1;
// ... call listen and peer ...
ep0.await_peer(ep1.node_id());
ep1.await_peer(ep0.node_id());

Note that ep0.await_peer(...) only confirms that ep0 has a path to the other endpoint and received a list of subscribed topics. To confirm a mutual relation, always call await_peer on both endpoints.

The Python bindings also expose the blocking overload of await_peer. For example, connecting three endpoints with data stores attached to them in a unit test can follow this recipe:

def run_tri_setup(self, f):
    with broker.Endpoint() as ep0, \
         broker.Endpoint() as ep1, \
         broker.Endpoint() as ep2, \
         ep0.attach_master("test", broker.Backend.Memory) as m, \
         ep1.attach_clone("test") as c1, \
         ep2.attach_clone("test") as c2:

        # connect the nodes
        port = ep0.listen("127.0.0.1", 0)
        self.assertTrue(ep1.peer("127.0.0.1", port))
        self.assertTrue(ep2.peer("127.0.0.1", port))

        # wait until the nodes are fully connected
        self.assertTrue(ep0.await_peer(ep1.node_id()))
        self.assertTrue(ep0.await_peer(ep2.node_id()))
        self.assertTrue(ep1.await_peer(ep0.node_id()))
        self.assertTrue(ep2.await_peer(ep0.node_id()))

        # wait until the clones have connected to the master
        self.assertTrue(c1.await_idle())
        self.assertTrue(c2.await_idle())

        f(m, c1, c2)

Note

When setting up a cluster, make sure to add subscribers (and data stores) before establishing the peering relations. Otherwise, the subscriptions get flooded after all connections have been established. This means any broadcasted event that arrives before the subscriptions gets lost.

Data Stores

When working with data stores, the member function store::await_idle allows establishing a predefined order:

  /// Blocks execution of the current thread until the frontend actor reached an
  /// IDLE state. On a master, this means that all clones have caught up with
  /// the master and have ACKed the most recent command. On a clone, this means
  /// that the master has ACKed any pending put commands from this store and
  /// that the clone is not waiting on any out-of-order messages from the
  /// master.
  /// @param timeout The maximum amount of time this function may block.
  /// @returns `true` if the frontend actor responded before the timeout,
  ///          `false` otherwise.
  [[nodiscard]] bool await_idle(timespan timeout
                                = defaults::store::await_idle_timeout);

  /// Asynchronously runs `callback(true)` when the frontend actor reached an
  /// IDLE state or `callback(false)` if the optional timeout triggered first
  /// (or in case of an error).
  /// @param timeout The maximum amount of time this function may block.
  /// @param callback A function object wrapping code for asynchronous
  ///                 execution. The argument for the callback is `true` if the
  ///                 frontend actor responded before the timeout, `false`
  ///                 otherwise.
  void await_idle(std::function<void(bool)> callback,
                  timespan timeout = defaults::store::await_idle_timeout);

What idle means depends on the role:

For a master, idle means the following:
  • There are no pending handshakes to clones.

  • All clones have ACKed the latest command.

  • All input buffers are empty, i.e., there exists no buffered command from a writer.

For a clone, idle means the following:
  • The clone successfully connected to the master.

  • The input buffer is empty, i.e., there exists no buffered command from the master.

  • All local writes (if any) have been ACKed by the master.

Just like await_peer, calling await_idle on only one store object usually does not guarantee the desired state. For example, consider a setup with one master (m) and three clones (c0, c1, and c2). When calling put on c0, await_idle would return after m has ACKed that it received the put command. At this point, c1 and c2 might not yet have seen the command. Hence, the process must also call await_idle on the master before it make the assumption that all data stores are in sync:

c0.put("foo", "bar");
if (!c0.await_idle()) {
  // ... handle timeout ...
}
if (!m.await_idle()) {
  // ... handle timeout ...
}

Note

In the example above, calling await_idle on c1 and c2 as well is not necessary. The master enters the idle mode after all clones have ACKed the latest command.

Glossary

Message

A message consists of a broker::topic and a broker::data. Broker stores messages as copy-on-write tuples (broker::data_message). This allows Broker to pass messages to many receivers without having to copy the content for each subscriber.

Filter

Each endpoint (see Section 2.1.1) controls the amount of data it receives from others by providing a list of topic prefixes. Whenever an endpoint publishes data, this list (the filter) is used to determine which peering endpoint should receive the data. For example, if the endpoints A and B have a peering relationship and B has announced the filter [/zeek/events/123/, /zeek/events/234/] then A would forward messages for the topic /zeek/events/123/foo to B, while not forwarding messages for the topic /zeek/events/456/foo.