Topics

Contents

Topics#

Events in the Diaspora Stream API are pushed into topics. A topic is a distributed collection of partitions to which events are appended. When creating a topic, users have to give it a name, and optionally provide three objects.

  • Validator: a validator is an object that validates that the metadata and data part comply with what is expected for the topic. Metadata are JSON documents by default, so for instance a validator could check that some expected fields are present. If the metadata part describes the data part in some way, a validator could check that this description is actually correct. This validation will happen before the event is sent to any server, resulting in an exception if the event is not valid. If not provided, the default validator will accept all the events it is presented with.

  • Partition selector: a partition selector is an object that is given a list of available partitions for a topic and that will make a decision on which partition each event will be sent to, based on the event’s metadata, or based on any other strategy. If not provided, the default partition selector will cycle through the partitions in a round robin manner.

  • Serializer: a serializer is an object that can serialize a Metadata object into a binary representation, and deserialize a binary representation back into a Metadata object. If not provided, the default serializer will convert the Metadata into a string representation.

../_images/TopicPipeline-dark.svg ../_images/TopicPipeline-light.svg

Diaspora API implementations may take advantage of multithreading to parallelize and pipeline the execution of the validator, partition selector, and serializer over many events. These objects can be customized and parameterized. For instance, a validator that checks the content of a JSON metadata could be provided with a list of fields it expects to find in the metadata of each event.

Hereafter we continue to use the “files” driver for the Diaspora Stream API, though the same code will work with any implementation of the API.

Creating a topic#

The following code snippets show how to create a topic. Such topic creation should generally be done using the diaspora-ctl command-line tool, however it is also possible to create topics in C++ and in Python. Our custom validator, partition selector, and serializer are provided using the "name:library.so" format. This tells the API to dynamically load the specified libraries to get access to their implementation.

Important

If you get an error indicating that dlopen has failed to find your library for the validator, partition selector, or serializer, make sure that LD_LIBRARY_PATH contains the path to find these libraries.

# long arguments version
diaspora-ctl topic create --name collisions \
    --driver files --driver.root_path /tmp/my-data \
    --topic.num_partitions 3 \
    --validator energy_validator:libenergy_validator.so \
    --validator.energy_max 100 \
    --partition-selector energy_partition_selector:libenergy_partition_selector.so \
    --partition-selector.energy_max 100 \
    --serializer energy_serializer:libenergy_serializer.so \
    --serializer.energy_max 100

Configuration parameters of each objects are passed using hierarchical command-line options. For instance, --validator.x 42 --validator.y.z abc will produce the configuration { "x": 42, "y": { "z": "abc" }}.

Let’s take a look at the implementation of the validator, partition selector, and serializer classes.

Important

Validators, partition selectors, and serializers must currently be implemented in C++, even when used in Python.

#include <diaspora/Validator.hpp>


class EnergyValidator final : public diaspora::ValidatorInterface {

    const size_t energy_max;

    public:

    EnergyValidator(size_t _energy_max)
    : energy_max(_energy_max) {}

    void validate(const diaspora::Metadata& metadata, const diaspora::DataView& data) const override {
        if(!metadata.json().is_object())
            throw diaspora::InvalidMetadata{
                "EnergyValidator expects metadata to be a JSON object"};
        if(!metadata.json().contains("energy"))
            throw diaspora::InvalidMetadata{
                "EnergyValidator expects metadata to contain an \"energy\" field"};
        if(!metadata.json()["energy"].is_number_integer())
            throw diaspora::InvalidMetadata{
                "EnergyValidator expects x_max field to be an integer"};
        if(metadata.json()["energy"].get<size_t>() >= energy_max)
            throw diaspora::InvalidMetadata{
                std::string{"EnergyValidator expects energy value to be lower than"}
                + std::to_string(energy_max)};
        (void)data; // the validator could also validate the content of the data
    }

    diaspora::Metadata metadata() const override {
        return diaspora::Metadata{
            {{"energy_max", energy_max}}
        };
    }

    static std::unique_ptr<diaspora::ValidatorInterface> create(const diaspora::Metadata& metadata) {
        if(!metadata.json().is_object())
            throw diaspora::InvalidMetadata{
                "EnergyValidator configuration should be a JSON object"};
        if(!metadata.json().contains("energy_max"))
            throw diaspora::InvalidMetadata{
                "EnergyValidator configuration should contain an \"energy_max\" field"};
        if(!metadata.json()["energy_max"].is_number_integer())
            throw diaspora::InvalidMetadata{
                "EnergyValidator configuration's energy_max field should be an integer"};
        return std::make_unique<EnergyValidator>(metadata.json()["energy_max"].get<size_t>());
    }

};

DIASPORA_REGISTER_VALIDATOR(_, energy_validator, EnergyValidator);

The EnergyValidator class inherits from diaspora::ValidatorInterface and provides the validate member function. This function checks for the presence of an energy field of type unsigned integer and checks that its value is less than an energy_max value provided when creating the validator. If validation fails, the validate function throws an exception.

Important

The DIASPORA_REGISTER_VALIDATOR macro must be used to tell the API about the EnergyValidator class. Its first argument is the name by which we will refer to the class in user code (“energy_validator”), the second argument is the name of the class itself (EnergyValidator).

#include <diaspora/PartitionSelector.hpp>
#include <diaspora/InvalidMetadata.hpp>


class EnergyPartitionSelector final : public diaspora::PartitionSelectorInterface {

    const size_t                      energy_max;
    std::vector<diaspora::PartitionInfo> m_targets;

    public:

    EnergyPartitionSelector(size_t _energy_max)
    : energy_max(_energy_max) {}

    void setPartitions(const std::vector<diaspora::PartitionInfo>& targets) override {
        m_targets = targets;
    }

    size_t selectPartitionFor(const diaspora::Metadata& metadata, std::optional<size_t> requested) override {
        if(requested.has_value()) {
           if(m_targets.size() >= *requested) {
               throw diaspora::Exception("Invalid requested partition number");
           } else {
               return *requested;
           }
        }
        auto energy = metadata.json()["energy"].get<size_t>();
        auto i = energy*m_targets.size()/energy_max;
        return i;
    }

    diaspora::Metadata metadata() const override {
        return diaspora::Metadata{};
    }

    static std::unique_ptr<diaspora::PartitionSelectorInterface> create(
            const diaspora::Metadata& metadata) {
        if(!metadata.json().is_object())
            throw diaspora::InvalidMetadata{
                "EnergyPartitionSelector configuration should be a JSON object"};
        if(!metadata.json().contains("energy_max"))
            throw diaspora::InvalidMetadata{
                "EnergyPartitionSelector configuration should contain an \"energy_max\" field"};
        if(!metadata.json()["energy_max"].is_number_integer())
            throw diaspora::InvalidMetadata{
                "EnergyPartitionSelector configuration's energy_max field should be an integer"};
        return std::make_unique<EnergyPartitionSelector>(metadata.json()["energy_max"].get<size_t>());
    }

};

DIASPORA_REGISTER_PARTITION_SELECTOR(_, energy_partition_selector, EnergyPartitionSelector);

The EnergyPartitionSelector is also initialized with an energy_max value and uses it to aggregate events into uniform “bins” of similar energy values. It inherits from diaspora::PartitionSelectorInterface and we call DIASPORA_REGISTER_PARTITION_SELECTOR to make it available to use.

#include <diaspora/Serializer.hpp>


class EnergySerializer final : public diaspora::SerializerInterface {

    size_t energy_max;

    public:

    EnergySerializer(size_t _energy_max)
    : energy_max(_energy_max) {}

    void serialize(diaspora::Archive& archive, const diaspora::Metadata& metadata) const override {
        size_t energy = metadata.json()["energy"].get<size_t>();
        if(energy_max <= std::numeric_limits<uint8_t>::max()) {
            uint8_t val = static_cast<uint8_t>(energy);
            archive.write(&val, sizeof(val));
        } else if(energy_max <= std::numeric_limits<uint16_t>::max()) {
            uint16_t val = static_cast<uint16_t>(energy);
            archive.write(&val, sizeof(val));
        } else if(energy_max <= std::numeric_limits<uint32_t>::max()) {
            uint32_t val = static_cast<uint32_t>(energy);
            archive.write(&val, sizeof(val));
        } else {
            uint64_t val = static_cast<uint64_t>(energy);
            archive.write(&val, sizeof(val));
        }
    }

    void deserialize(diaspora::Archive& archive, diaspora::Metadata& metadata) const override {
        metadata = diaspora::Metadata{}; // ensure we have an empty JSON object
        if(energy_max <= std::numeric_limits<uint8_t>::max()) {
            uint8_t val;
            archive.read(&val, sizeof(val));
            metadata.json()["energy"] = val;
        } else if(energy_max <= std::numeric_limits<uint16_t>::max()) {
            uint16_t val;
            archive.read(&val, sizeof(val));
            metadata.json()["energy"] = val;
        } else if(energy_max <= std::numeric_limits<uint32_t>::max()) {
            uint32_t val;
            archive.read(&val, sizeof(val));
            metadata.json()["energy"] = val;
        } else {
            uint64_t val;
            archive.read(&val, sizeof(val));
            metadata.json()["energy"] = val;
        }
    }

    diaspora::Metadata metadata() const override {
        return diaspora::Metadata{
            {{"energy_max", energy_max}}
        };
    }

    static std::unique_ptr<diaspora::SerializerInterface> create(const diaspora::Metadata& metadata) {
        if(!metadata.json().is_object())
            throw diaspora::InvalidMetadata{
                "EnergySerializer configuration should be a JSON object"};
        if(!metadata.json().contains("energy_max"))
            throw diaspora::InvalidMetadata{
                "EnergySerializer configuration should contain an \"energy_max\" field"};
        if(!metadata.json()["energy_max"].is_number_integer())
            throw diaspora::InvalidMetadata{
                "EnergySerializer configuration's energy_max field should be an integer"};
        return std::make_unique<EnergySerializer>(metadata.json()["energy_max"].get<size_t>());
    }

};

DIASPORA_REGISTER_SERIALIZER(_, energy_serializer, EnergySerializer);

The EnergySerializer is also initialized with an energy_max value. This value is used to choose an appropriate number of bytes for the raw representation of the energy when it is serialized. EnergySerializer inherits from SerializerInterface and is registered using DIASPORA_REGISTER_SERIALIZER.