Consumers#

Applications that need to consume events from a topic will need to create a Consumer instance. This object is an interface to consume events from a designated list of partitions of a topic.

Creating a consumer#

One can obtain a Consumer instance from a TopicHandle the same way as producers, as exemplified hereafter.

diaspora::TopicHandle topic = driver.openTopic("collisions");
diaspora::BatchSize  batch_size = diaspora::BatchSize::Adaptive();
diaspora::ThreadPool thread_pool = driver.defaultThreadPool();
diaspora::DataSelector data_selector =
    [](const diaspora::Metadata& md, const diaspora::DataDescriptor& dd) -> diaspora::DataDescriptor {
        if(md.json()["energy"] > 20) {
            return dd;
        } else {
            return diaspora::DataDescriptor{};
        }
    };
diaspora::DataAllocator data_allocator =
    [](const diaspora::Metadata& md, const diaspora::DataDescriptor& dd) -> diaspora::DataView {
        (void)md;
        char* ptr = new char[dd.size()];
        return diaspora::DataView{ptr, dd.size()};
    };
diaspora::Consumer consumer = topic.consumer(
    "app2", thread_pool, batch_size, data_selector, data_allocator);

A consumer can be created with five parameters, four of which are optional.

  • Name: the consumer name is mandatory. Backends will keep track of the last event acknowledged by consumers, so that if an application stops and restarts with the same consumer name, it will restart consuming events from the last acknowledged event. At present, Consumers with the same name should not pull from the same partition.

  • Batch size: the batch size is the number of events to batch together on the server side before the batch is sent to the consumer. diaspora::BatchSize::Adaptive() (AdaptiveBatchSize in Python) can be used to tell the server to adapt the batch size at run time: the server will aim to send batches as soon as possible but will increase the batch size if the consumer is not responding fast enough.

  • Maximum batches: the maximum number of batches that can be pre-fetched by the consumer at any one time.

  • Data selector: the consumer first receives the metadata part of an event and runs the user-provided data selector function on the metadata to know whether the data should be pulled. This function takes the metadata part of the event as well as a DataDescriptor instance. The latter is an opaque key that is implementation-dependent (i.e. set by the backend to locate the actual data). The above code is an example of data selector that will tell the consumer to pull the data only if the “energy” field in the metadata is greater than 20. It does so by returning the provided DataDescriptor if the field is greater than 20, and by returning diaspora::DataDescriptor{} (or None in Python) if it isn’t. The data selector could tell the consumer to pull only a subset of an event’s data. More on this in the Data descriptors section.

  • Data allocator: if the data selector returned a non-null DataDescriptor, the user-provided data allocator function is invoked by the consumer. This function takes the event’s metadata and the DataDescriptor returned by the data selector, and must return a diaspora::DataView object pointing to the location in memory where the application wishes for the data to be placed. This memory could be non-contiguous, it could be allocated by the data allocator or it could point to some already allocated memory somewhere. Remember that a DataView object does not own the memory it points to. The application is therefore responsible for keeping the memory alive while the consumer pulls the data into it, and freeing it later if necessary.

  • Thread pool: a thread pool can be provided to run the data selector and data allocator on multiple events in parallel.

Important

In Python, the consumer cannot yet make use of a ThreadPool for consuming data (this is because Python has a global interpreter lock that prevents concurrent invokation of Python functions from multiple threads from C++). Hence we use the driver’s default thread pool in the code above (we could have, alternatively, not specified the thread_pool argument at all).

In Python, if your consumer intends to always request the full data part of each event, and would like said data in the form of a bytearray, you may use the FullDataSelector and ByteArrayAllocator from the diaspora_stream.api module as data selector and data allocator respectively. These are variables, not classes. The latter will create a Python bytearray to host the data.

Pulling events#

Now that we have a consumer fetching events (and potentially their data) in the background, we can pull the events out of the consumer. The following code shows how to do this.

diaspora::Future<std::optional<diaspora::Event>> future = consumer.pull();
future.completed(); // returns true if the future has completed

diaspora::Event event        = future.wait(-1).value();
diaspora::DataView data      = event.data();
diaspora::Metadata metadata  = event.metadata();
diaspora::EventID event_id   = event.id();

event.acknowledge();

delete[] static_cast<char*>(data.segments()[0].ptr);

consumer.pull() is a non-blocking function that returns a Future<std::optional<Event>> (FutureEvent in Python) that can be tested for completion and waited on. Waiting on the future gets us an std::optional<Event> instance which, if set, contains the event’s metadata and data.

The call to event.acknowledge() tells the backend that all the events in the partition up to this one have been processed by this consumer and should not be sent again, should the consumer restart.

Note

In the C++ example we have allocated the memory for the data in our data allocator function, so we need to free it when we no longer need it. In Python, the event will share ownership of the data returned by the allocator and garbage collection will free the buffer at a later time.

Data descriptors#

The DataDescriptor class is an opaque key created by the backend to reference the data associated with an event. In the above example, the data selector either selected the full data associated with an event by returning the descriptor that was passed to it, or declines the data entirely by returning a default-constructed diaspora::DataDescriptor in C++ or None in Python.

The DataDescriptor class however provides methods to build a new DataDescriptor referencing a subset of the data. Let’s consider the example of events containing data that represent an image of dimensions W*H, stored as a row-major array of uint8_t values (for simplicity, assuming monochrome image). We wish to only access a rectangle region of dimensions w*h at offset (x,y), as shown in the picture below.

../_images/DataDescriptor-dark.svg ../_images/DataDescriptor-light.svg

The data selector is given a descriptor D for the full data. D.size() (D.size in Python) will return W*H. We can first use auto d1 = D.makeSubView(y*W+x, W*h) (D.make_sub_view in Python) to select only the rows containing the rectangle we are interested in. This function takes the offset at which to start the selection and the size of the selection.

We can then use auto d2 = d1.makeStridedView(0, h, w, W-w) (d1.make_strided_view in Python). This function takes the offset at which to start the selection, the number of “blocks”, the length of each block, and the gap between each block.

By having the data selector return d2, the backend will know that the consumer is only interested in this sub-region of the data and will transfer only the requested data.

Note

The above selection could have been simplified as D.makeStridedView(y*W+x, h, w, W-w), we presented it in two steps to showcase both makeSubView and makeStridedView.

A third function, makeUnstructuredView, takes an arbitrary list of (offset, size) pairs to make an unstructure selection of the data.