Producers#

Applications that need to produce events into one or more topics will need to create a Producer instance. This object is an interface to produce events into a designated topic. It will internally run the validator, partition selector, and serializer on the events it is being passed to validate the event’s metadata and data, select a destination partition for each event, and serialize the event’s metadata into batches aimed at the same partition.

Creating a producer#

To obtain a Producer instance, one must first instantiate a Driver, before obtaining a TopicHandle by opening a topic. The TopicHandle can then be used to create a Producer, as exemplified hereafter.

diaspora::TopicHandle topic = driver.openTopic("collisions");

diaspora::ThreadPool thread_pool = driver.makeThreadPool(diaspora::ThreadCount{4});
diaspora::BatchSize  batch_size = diaspora::BatchSize::Adaptive();
diaspora::MaxNumBatches max_num_batches = diaspora::MaxNumBatches{2};
diaspora::Ordering   ordering = diaspora::Ordering::Loose; // or Strict

diaspora::Producer producer = topic.producer(
        "app1", thread_pool, batch_size, max_num_batches, ordering);

A producer can be created with five optional parameters.

  • Name: the producer name is not used in current implementations of the API but may be in the future, it is therefore advised to give your producer a name. If you have a multi-process producer application, it is recommended to give all producer instances the same name.

  • Thread pool: the producer will run the topic’s validator, partition selector, and serializer in user-level threads pushed into a thread pool. The thread pool is backed by a number of hardware threads. If not specified, the default thread pool of the driver will be used.

  • Batch size: the batch size is the number of events to batch together before the batch is sent to the target partition. diaspora::BatchSize::Adaptive() (AdaptiveBatchSize in Python) can be used to tell the producer to adapt the batch size at run time: the producer will aim to send batches as soon as possible but will increase the batch size if the server is not responding fast enough.

  • Maximum batches: this parameter controls the maximum number of batches that can be pending on the client before push calls start blocking. Increasing this number may be useful in bursty applications.

  • Ordering: because the producer may run the validator, partition selector, and serializer in tasks posted to the thread pool, one could imagine an application posting event A then event B, but event A takes more time being validated than event B and event B ends up in the batch before event A. diaspora::Ordering::Loose (Ordering.Loose in Python) allows this behavior. diaspora::Ordering::Strict (Ordering.Strict) forces events that target the same batch to be added to the batch in the same order they were produced by the application. This constraint may limit parallelism opportunities in the producer and should be used only if necessary.

Producing events#

As explained earlier, the Diaspora Stream API splits events into two parts: metadata and data. The metadata part is a small JSON document, and can be batched with the metadata of other events to issue fewer RPCs to partition managers. The data part is optional and represents potentially larger, raw data that can benefit from being transferred via zero-copy mechanisms such as RDMA.

As an example, imagine an application that produces high-resolution images out of a series of detectors at regular intervals. The metadata part of an event might be a JSON fragment containing the timestamp and detector information (e.g., calibration parameters), as well as information about the images (e.g., dimensions, pixel format). The data part of an event would be the image itself.

The code bellow shows how to create the data and metadata pieces of an event.

std::vector<char> segment1 = { 'a', 'b', 'c', 'd' };

// expose 1 segment using its pointer and size
diaspora::DataView data1{segment1.data(), segment1.size()};

std::vector<char> segment2 = { 'e', 'f' };

// expose 2 non-contiguous segments using diaspora::Data::Segment
diaspora::DataView data2{{
    diaspora::DataView::Segment{segment1.data(), segment1.size()},
    diaspora::DataView::Segment{segment2.data(), segment2.size()}
}};
diaspora::Metadata metadata1{R"({"energy": 42})"};

using json = nlohmann::json;
auto md = json::object();
md["energy"] = 42;
diaspora::Metadata metadata2{md};

The first diaspora::DataView data1 object is a view of a single contiguous segment of memory underlying the segment1 vector. The second diaspora::DataView data2 object is a view of two non-contiguous segments.

The first diaspora::Metadata object, metadata1, is created from a raw string representing a JSON object with and “energy” field. The second Metadata object contains the same information but is initialized using an nlohmann::json instance, which is the library used by Diaspora to manage JSON data in C++.

Important

In C++, a diaspora::DataView object is a non-owning view of a potentially non-contiguous series of memory segments. You can think of it as a list of std::span<char>. This means that (1) you need to make sure that the application does not free the memory before it has been transferred, and (2) you need to make sure not to write the memory while it is being transferred.

In Python, the equivalent of a diaspora::DataView is a list of any objects satisfying the buffer protocol (e.g., bytes, bytearray, numpy arrays, etc.). When pushing the data into a producer, the producer will share ownership of this list, there is therefore no danger that the memory underlying these objects is freed. However the user should still take care that they are not written to until the data has been transferred.

Having created the metadata and the data part of an event, we can now push the event into the producer, as shown in the code bellow.

diaspora::Future<std::optional<diaspora::EventID>> future = producer.push(metadata1, data1);
future.completed(); // returns true if the future has completed

producer.push(metadata2, data2);

// the value passed to wait is a timeout in milliseconds,
// -1 indicates that the wait call should block until the value is available
diaspora::EventID event_id_1 = future.wait(-1).value();

producer.flush();

The producer’s push function takes the metadata and data objects and returns a Future. Such a future can be tested for completion (future.completed()) and can be blocked on until it completes (future.wait()). The latter method takes a timeout (in milliseconds) and returns an std::optional<diaspora::EventID> representing the event ID of the created event (64-bits unsigned integer). It is perfectly OK to drop the future if you do not care to wait for its completion or for the resulting event ID, as exemplified with the second event. Event IDs are monotonically increasing and are per-partition, so two events stored in distinct partitions can end up with the same ID.

Calling producer.flush() will force all the pending batches of events to be sent, regardless of whether they have reached the requested size. It can be useful to ensure that all the events have been sent either periodically or before terminating the application. Note that this is also a non-blocking call returning a Future.