Producers#
Applications that need to produce events into one or more topics will need
to create a Producer instance. This object is an interface to produce
events into a designated topic. It will internally run the validator, partition
selector, and serializer on the events it is being passed to validate the event’s
metadata and data, select a destination partition for each event, and serialize
the event’s metadata into batches aimed at the same partition.
Creating a producer#
To obtain a Producer instance, one must first instantiate a Driver,
before obtaining a TopicHandle by opening a topic. The TopicHandle
can then be used to create a Producer, as exemplified hereafter.
diaspora::TopicHandle topic = driver.openTopic("collisions");
diaspora::ThreadPool thread_pool = driver.makeThreadPool(diaspora::ThreadCount{4});
diaspora::BatchSize batch_size = diaspora::BatchSize::Adaptive();
diaspora::MaxNumBatches max_num_batches = diaspora::MaxNumBatches{2};
diaspora::Ordering ordering = diaspora::Ordering::Loose; // or Strict
diaspora::Producer producer = topic.producer(
"app1", thread_pool, batch_size, max_num_batches, ordering);
from diaspora_stream.api import ThreadPool, AdaptiveBatchSize, Ordering
topic = driver.open_topic("collisions")
thread_pool = driver.make_thread_pool(4)
batch_size = AdaptiveBatchSize # or an integer > 0
max_num_batches = 2
ordering = Ordering.Strict # or Ordering.Loose
producer = topic.producer(
name="app1",
batch_size=batch_size,
max_num_batches=max_num_batches,
thread_pool=thread_pool,
ordering=ordering)
A producer can be created with five optional parameters.
Name: the producer name is not used in current implementations of the API but may be in the future, it is therefore advised to give your producer a name. If you have a multi-process producer application, it is recommended to give all producer instances the same name.
Thread pool: the producer will run the topic’s validator, partition selector, and serializer in user-level threads pushed into a thread pool. The thread pool is backed by a number of hardware threads. If not specified, the default thread pool of the driver will be used.
Batch size: the batch size is the number of events to batch together before the batch is sent to the target partition.
diaspora::BatchSize::Adaptive()(AdaptiveBatchSizein Python) can be used to tell the producer to adapt the batch size at run time: the producer will aim to send batches as soon as possible but will increase the batch size if the server is not responding fast enough.Maximum batches: this parameter controls the maximum number of batches that can be pending on the client before
pushcalls start blocking. Increasing this number may be useful in bursty applications.Ordering: because the producer may run the validator, partition selector, and serializer in tasks posted to the thread pool, one could imagine an application posting event A then event B, but event A takes more time being validated than event B and event B ends up in the batch before event A.
diaspora::Ordering::Loose(Ordering.Loosein Python) allows this behavior.diaspora::Ordering::Strict(Ordering.Strict) forces events that target the same batch to be added to the batch in the same order they were produced by the application. This constraint may limit parallelism opportunities in the producer and should be used only if necessary.
Producing events#
As explained earlier, the Diaspora Stream API splits events into two parts: metadata and data. The metadata part is a small JSON document, and can be batched with the metadata of other events to issue fewer RPCs to partition managers. The data part is optional and represents potentially larger, raw data that can benefit from being transferred via zero-copy mechanisms such as RDMA.
As an example, imagine an application that produces high-resolution images out of a series of detectors at regular intervals. The metadata part of an event might be a JSON fragment containing the timestamp and detector information (e.g., calibration parameters), as well as information about the images (e.g., dimensions, pixel format). The data part of an event would be the image itself.
The code bellow shows how to create the data and metadata pieces of an event.
std::vector<char> segment1 = { 'a', 'b', 'c', 'd' };
// expose 1 segment using its pointer and size
diaspora::DataView data1{segment1.data(), segment1.size()};
std::vector<char> segment2 = { 'e', 'f' };
// expose 2 non-contiguous segments using diaspora::Data::Segment
diaspora::DataView data2{{
diaspora::DataView::Segment{segment1.data(), segment1.size()},
diaspora::DataView::Segment{segment2.data(), segment2.size()}
}};
diaspora::Metadata metadata1{R"({"energy": 42})"};
using json = nlohmann::json;
auto md = json::object();
md["energy"] = 42;
diaspora::Metadata metadata2{md};
The first diaspora::DataView data1 object is a view of a single contiguous
segment of memory underlying the segment1 vector. The second
diaspora::DataView data2 object is a view of two non-contiguous segments.
The first diaspora::Metadata object, metadata1, is created from a
raw string representing a JSON object with and “energy” field. The second Metadata
object contains the same information but is initialized using an nlohmann::json
instance, which is the library used by Diaspora to manage JSON data in C++.
# data can be any object adhering to the buffer protocol
data1 = b"abcd"
data2 = bytearray("efgh", encoding="ascii")
data3 = memoryview(data1)
# or a list of such objects
data4 = [data1, data2, data3]
metadata1 = """{"energy": 42}""" # use a string
metadata2 = {"energy": 42} # or use a dictionary
The first variable data1 is a read-only bytes buffer. data2
is a bytearray, and data3 is a memoryview of data1.
All three types adhere to the buffer protocol and can be used for the data part of
an event. Other types such as NumPy arrays also adhere to this protocol.
data4, as a list of objects following the buffer protocol, can also be used
to handle non-regular memory.
The first metadata object, metadata1, is a string containing JSON information.
The second, metadata2, is a dictionary. Both can be used for the metadata part
of the event.
Important
In C++, a diaspora::DataView object is a non-owning view of a potentially
non-contiguous series of memory segments. You can think of it as a list of
std::span<char>. This means that (1) you need to make sure that the application
does not free the memory before it has been transferred, and (2) you need to make sure
not to write the memory while it is being transferred.
In Python, the equivalent of a diaspora::DataView is a list of any objects
satisfying the buffer protocol
(e.g., bytes, bytearray, numpy arrays, etc.).
When pushing the data into a producer, the producer will share ownership of
this list, there is therefore no danger that the memory underlying these objects
is freed. However the user should still take care that they are not written to
until the data has been transferred.
Having created the metadata and the data part of an event, we can now push the event into the producer, as shown in the code bellow.
diaspora::Future<std::optional<diaspora::EventID>> future = producer.push(metadata1, data1);
future.completed(); // returns true if the future has completed
producer.push(metadata2, data2);
// the value passed to wait is a timeout in milliseconds,
// -1 indicates that the wait call should block until the value is available
diaspora::EventID event_id_1 = future.wait(-1).value();
producer.flush();
future = producer.push(metadata=metadata1, data=data1)
future.completed # returns True if the future has completed
event_id = future.wait()
producer.flush().wait(-1)
The producer’s push function takes the metadata and data objects and returns a
Future. Such a future can be tested for completion (future.completed()) and
can be blocked on until it completes (future.wait()). The latter method takes a timeout
(in milliseconds) and returns an std::optional<diaspora::EventID> representing the
event ID of the created event (64-bits unsigned integer).
It is perfectly OK to drop the future if you do not care to wait for its completion or
for the resulting event ID, as exemplified with the second event. Event IDs are monotonically
increasing and are per-partition, so two events stored in distinct partitions can end up with the same ID.
Calling producer.flush() will force all the pending batches of events
to be sent, regardless of whether they have reached the requested size. It can be useful to ensure
that all the events have been sent either periodically or before terminating the application.
Note that this is also a non-blocking call returning a Future.