6. Python Bindings

Almost all functionality of Broker is also accessible through Python bindings. The Python API mostly mimics the C++ interface, but adds transparent conversion between Python values and Broker values. In the following we demonstrate the main parts of the Python API, assuming a general understanding of Broker’s concepts and the C++ interface.

Note

Broker’s Python bindings require Python 3.5 or greater.

6.1. Installation in a Virtual Environment

To install Broker’s python bindings in a virtual environment, the python-prefix configuration option can be specified and the python header files must be on the system for the version of python in the virtual environment. You can also use the prefix configuration option to install the main Broker library and headers into an isolated location.

$ virtualenv -p python3 /Users/user/sandbox/broker/venv
$ . /Users/user/sandbox/broker/venv/bin/activate
$ ./configure --prefix=/Users/user/sandbox/broker --python-prefix=$(python3 -c 'import sys; print(sys.exec_prefix)')
$ make install
$ python3 -c 'import broker; print(broker.__file__)'
/Users/user/sandbox/broker/venv/lib/python3.7/site-packages/broker/__init__.py

6.2. Communication

Just as in C++, you first set up peerings between endpoints and create subscriber for the topics of interest:

        with broker.Endpoint() as ep1, \
             broker.Endpoint() as ep2, \
             ep1.make_subscriber("/test") as s1, \
             ep2.make_subscriber("/test") as s2:
            port = ep1.listen("127.0.0.1", 0)
            self.assertTrue(ep2.peer("127.0.0.1", port, 1.0))

            ep1.await_peer(ep2.node_id())
            ep2.await_peer(ep1.node_id())

You can then start publishing messages. In Python a message is just a list of values, along with the corresponding topic. The following publishes a simple message consisting of just one string, and then has the receiving endpoint wait for it to arrive:

            ep2.publish("/test", ["ping"])
            (t, d) = s1.get()
            # t == "/test", d == ["ping"]

Example of publishing a small batch of two slightly more complex messages with two separate topics:

            msg1 = ("/test/2", (1, 2, 3))
            msg2 = ("/test/3", (42, "foo", {"a": "A", "b": ipaddress.IPv4Address('1.2.3.4')}))
            ep2.publish_batch(msg1, msg2)

As you see with the 2nd message there, elements can be either standard Python values or instances of Broker wrapper classes; see the data model section below for more.

The subscriber instances have more methods matching their C++ equivalent, including available for checking for pending messages, poll() for getting available messages without blocking, fd() for retrieving a select-able file descriptor, and {add,remove}_topic for changing the subscription list.

6.3. Exchanging Zeek Events

The Broker Python bindings come with support for representing Zeek events as well. Here’s the Python version of the C++ ping example shown earlier:

# ping.zeek

redef exit_only_after_terminate = T;

global pong: event(n: int);

event ping(n: int)
	{
	event pong(n);
	}

event zeek_init()
	{
	Broker::subscribe("/topic/test");
	Broker::listen("127.0.0.1", 9999/tcp);
	Broker::auto_publish("/topic/test", pong);
	}
# ping.py

import sys
import broker

# Setup endpoint and connect to Zeek.
with broker.Endpoint() as ep, \
     ep.make_subscriber("/topic/test") as sub, \
     ep.make_status_subscriber(True) as ss:

    ep.peer("127.0.0.1", 9999)

    # Wait until connection is established.
    st = ss.get()

    if not (type(st) == broker.Status and st.code() == broker.SC.PeerAdded):
        print("could not connect")
        sys.exit(0)

    for n in range(5):
        # Send event "ping(n)".
        ping = broker.zeek.Event("ping", n);
        ep.publish("/topic/test", ping);

        # Wait for "pong" reply event.
        (t, d) = sub.get()
        pong = broker.zeek.Event(d)
        print("received {}{}".format(pong.name(), pong.args()))
# python3 ping.py
received pong[0]
received pong[1]
received pong[2]
received pong[3]
received pong[4]

6.4. Data Model

The Python API can represent the same type model as the C++ code. For all Broker types that have a direct mapping to a Python type, conversion is handled transparently as values are passed into, or retrieved from, Broker. For example, the message [1, 2, 3] above is automatically converted into a Broker list of three Broker integer values. In cases where there is not a direct Python equivalent for a Broker type (e.g., for count; Python does not have an unsigned integer class), the Broker module provides wrapper classes. The following table summarizes how Broker and Python values are mapped to each other:

Broker Type

Python representation

boolean

True/False

count

broker.Count(x)

integer

int

real

float

timespan

datetime.timedelta

timestamp

datetime.datetime

string

str

address

ipaddress.IPv4Address/ipaddress.IPv6Address

subnet

ipaddress.IPv4Network/ipaddress.IPv6Network

port

broker.Port(x, broker.Port.{TCP,UDP,ICMP,Unknown})

vector

tuple

set

set

table

dict

Note that either a Python tuple or Python list may convert to a Broker vector, but the canonical Python type representing a vector is a tuple. That is, whenever converting a Broker vector value into a Python value, you will get a tuple. A tuple is the canonical type here because it is an immutable type, but a list is mutable – we need to be able to represent tables indexed by vectors, tables are mapped to Python dictionaries, Python dictionaries only allow immutable index types, and so we must use a tuple to represent a vector.

6.5. Status and Error Messages

Status and error handling works through a status subscriber, again similar to the C++ interface:

        with broker.Endpoint() as ep1, \
             ep1.make_status_subscriber() as es1:
            r = ep1.peer("127.0.0.1", 1947, 0.0) # Try unavailable port, no retry
            self.assertEqual(r, False) # Not shown in docs.
            # s1.code() == broker.EC.PeerUnavailable
        with broker.Endpoint() as ep1, \
             broker.Endpoint() as ep2, \
             ep1.make_status_subscriber(True) as es1, \
             ep2.make_status_subscriber(True) as es2:

            port = ep1.listen("127.0.0.1", 0)
            self.assertEqual(ep2.peer("127.0.0.1", port, 1.0), True)

            ep1.await_peer(ep2.node_id())
            ep2.await_peer(ep1.node_id())

            st1 = es1.get(2)
            st2 = es2.get(2)
            # st1.code() == [broker.SC.EndpointDiscovered, broker.SC.PeerAdded]
            # st2.code() == [broker.SC.EndpointDiscovered, broker.SC.PeerAdded]

6.6. Data Stores

For data stores, the C++ API also directly maps to Python. The following instantiates a master store to then operate on:

        with broker.Endpoint() as ep1, \
             ep1.attach_master("test", broker.Backend.Memory) as m:

            m.put("key", "value")
            x = m.get("key")
            # x == "value"

In Python, both master and clone stores provide all the same accessor and mutator methods as C++. Some examples:

            m.increment("e", 1)
            m.decrement("f", 1)
            m.append("str", "ar")
            m.insert_into("set", 3)
            m.remove_from("set", 1)
            m.insert_into("table", 3, "D")
            m.remove_from("table", 1)
            m.push("vec", 3)
            m.push("vec", 4)
            m.pop("vec")

Here’s a more complete example of using a SQLite-backed data store from python, with the database being stored in mystore.sqlite:

# sqlite-listen.py

import broker

with broker.Endpoint() as ep, \
     ep.make_subscriber('/test') as s, \
     ep.make_status_subscriber(True) as ss:

    ep.listen('127.0.0.1', 9999)

    m = ep.attach_master('mystore',
                         broker.Backend.SQLite, {'path': 'mystore.sqlite'})

    while True:
        print(ss.get())
        print(m.get('foo'))
# sqlite-connect.py

import broker
import sys
import time

with broker.Endpoint() as ep, \
     ep.make_subscriber('/test') as s, \
     ep.make_status_subscriber(True) as ss:

    ep.peer('127.0.0.1', 9999, 1.0)

    st = ss.get();

    if not (type(st) == broker.Status and st.code() == broker.SC.PeerAdded):
        print('could not connect')
        sys.exit(1)

    c = ep.attach_clone('mystore')

    while True:
        time.sleep(1)
        c.increment('foo', 1)
        print(c.get('foo'))