Consumers#
Applications that need to consume events from a topic will need
to create a Consumer instance. This object is an interface to consume
events from a designated list of partitions of a topic.
Creating a consumer#
One can obtain a Consumer instance from a TopicHandle the same way
as producers, as exemplified hereafter.
diaspora::TopicHandle topic = driver.openTopic("collisions");
diaspora::BatchSize batch_size = diaspora::BatchSize::Adaptive();
diaspora::ThreadPool thread_pool = driver.defaultThreadPool();
diaspora::DataSelector data_selector =
[](const diaspora::Metadata& md, const diaspora::DataDescriptor& dd) -> diaspora::DataDescriptor {
if(md.json()["energy"] > 20) {
return dd;
} else {
return diaspora::DataDescriptor{};
}
};
diaspora::DataAllocator data_allocator =
[](const diaspora::Metadata& md, const diaspora::DataDescriptor& dd) -> diaspora::DataView {
(void)md;
char* ptr = new char[dd.size()];
return diaspora::DataView{ptr, dd.size()};
};
diaspora::Consumer consumer = topic.consumer(
"app2", thread_pool, batch_size, data_selector, data_allocator);
from diaspora_stream.api import ThreadPool, AdaptiveBatchSize, DataDescriptor
batch_size = AdaptiveBatchSize
thread_pool = driver.make_thread_pool(0)
def data_selector(metadata, descriptor):
if metadata["energy"] > 20:
return descriptor
else:
return None
def data_allocator(metadata, descriptor):
# note that we return a *list* of objects satisfying the buffer protocol
return [ bytearray(descriptor.size) ]
consumer = topic.consumer(
name="app2",
thread_pool=thread_pool,
batch_size=batch_size,
data_selector=data_selector,
data_allocator=data_allocator)
A consumer can be created with five parameters, four of which are optional.
Name: the consumer name is mandatory. Backends will keep track of the last event acknowledged by consumers, so that if an application stops and restarts with the same consumer name, it will restart consuming events from the last acknowledged event. At present,
Consumerswith the same name should not pull from the same partition.Batch size: the batch size is the number of events to batch together on the server side before the batch is sent to the consumer.
diaspora::BatchSize::Adaptive()(AdaptiveBatchSizein Python) can be used to tell the server to adapt the batch size at run time: the server will aim to send batches as soon as possible but will increase the batch size if the consumer is not responding fast enough.Maximum batches: the maximum number of batches that can be pre-fetched by the consumer at any one time.
Data selector: the consumer first receives the metadata part of an event and runs the user-provided data selector function on the metadata to know whether the data should be pulled. This function takes the metadata part of the event as well as a
DataDescriptorinstance. The latter is an opaque key that is implementation-dependent (i.e. set by the backend to locate the actual data). The above code is an example of data selector that will tell the consumer to pull the data only if the “energy” field in the metadata is greater than 20. It does so by returning the providedDataDescriptorif the field is greater than 20, and by returningdiaspora::DataDescriptor{}(orNonein Python) if it isn’t. The data selector could tell the consumer to pull only a subset of an event’s data. More on this in the Data descriptors section.Data allocator: if the data selector returned a non-null
DataDescriptor, the user-provided data allocator function is invoked by the consumer. This function takes the event’s metadata and theDataDescriptorreturned by the data selector, and must return adiaspora::DataViewobject pointing to the location in memory where the application wishes for the data to be placed. This memory could be non-contiguous, it could be allocated by the data allocator or it could point to some already allocated memory somewhere. Remember that aDataViewobject does not own the memory it points to. The application is therefore responsible for keeping the memory alive while the consumer pulls the data into it, and freeing it later if necessary.Thread pool: a thread pool can be provided to run the data selector and data allocator on multiple events in parallel.
Important
In Python, the consumer cannot yet make use of a ThreadPool for consuming data
(this is because Python has a global interpreter lock that prevents concurrent invokation
of Python functions from multiple threads from C++). Hence we use the driver’s default thread pool
in the code above (we could have, alternatively, not specified the thread_pool argument
at all).
In Python, if your consumer intends to always request the full data part of each event, and would
like said data in the form of a bytearray, you may use the FullDataSelector
and ByteArrayAllocator from the diaspora_stream.api module as data selector and
data allocator respectively. These are variables, not classes. The latter will create a Python
bytearray to host the data.
Pulling events#
Now that we have a consumer fetching events (and potentially their data) in the background, we can pull the events out of the consumer. The following code shows how to do this.
diaspora::Future<std::optional<diaspora::Event>> future = consumer.pull();
future.completed(); // returns true if the future has completed
diaspora::Event event = future.wait(-1).value();
diaspora::DataView data = event.data();
diaspora::Metadata metadata = event.metadata();
diaspora::EventID event_id = event.id();
event.acknowledge();
delete[] static_cast<char*>(data.segments()[0].ptr);
future = consumer.pull()
future.completed # returns true if the future has completed
event = future.wait(-1)
data = event.data
metadata = event.metadata
event_id = event.event_id
event.acknowledge()
consumer.pull() is a non-blocking function that returns a
Future<std::optional<Event>> (FutureEvent in Python) that can be tested for
completion and waited on. Waiting on the future gets us an std::optional<Event> instance
which, if set, contains the event’s metadata and data.
The call to event.acknowledge() tells the backend that
all the events in the partition up to this one have been processed by this consumer
and should not be sent again, should the consumer restart.
Note
In the C++ example we have allocated the memory for the data in our data allocator function, so we need to free it when we no longer need it. In Python, the event will share ownership of the data returned by the allocator and garbage collection will free the buffer at a later time.
Data descriptors#
The DataDescriptor class is an opaque key created by the backend
to reference the data associated with an event. In the above example, the data selector
either selected the full data associated with an event by returning the descriptor that
was passed to it, or declines the data entirely by returning a default-constructed
diaspora::DataDescriptor in C++ or None in Python.
The DataDescriptor class however provides methods to build a new
DataDescriptor referencing a subset of the data. Let’s consider the example
of events containing data that represent an image of dimensions W*H, stored
as a row-major array of uint8_t values (for simplicity, assuming monochrome image).
We wish to only access a rectangle region of dimensions w*h at offset (x,y),
as shown in the picture below.
The data selector is given a descriptor D for the full data. D.size()
(D.size in Python) will return W*H. We can first use
auto d1 = D.makeSubView(y*W+x, W*h) (D.make_sub_view in Python) to select
only the rows containing the rectangle we are interested in. This function takes the offset
at which to start the selection and the size of the selection.
We can then use auto d2 = d1.makeStridedView(0, h, w, W-w) (d1.make_strided_view in Python).
This function takes the offset at which to start the selection, the number of “blocks”, the length of
each block, and the gap between each block.
By having the data selector return d2, the backend will know that the consumer
is only interested in this sub-region of the data and will transfer only the requested data.
Note
The above selection could have been simplified as D.makeStridedView(y*W+x, h, w, W-w),
we presented it in two steps to showcase both makeSubView and makeStridedView.
A third function, makeUnstructuredView, takes an arbitrary list of (offset, size)
pairs to make an unstructure selection of the data.