Topics#
Events in the Diaspora Stream API are pushed into topics. A topic is a distributed collection of partitions to which events are appended. When creating a topic, users have to give it a name, and optionally provide three objects.
Validator: a validator is an object that validates that the metadata and data part comply with what is expected for the topic. Metadata are JSON documents by default, so for instance a validator could check that some expected fields are present. If the metadata part describes the data part in some way, a validator could check that this description is actually correct. This validation will happen before the event is sent to any server, resulting in an exception if the event is not valid. If not provided, the default validator will accept all the events it is presented with.
Partition selector: a partition selector is an object that is given a list of available partitions for a topic and that will make a decision on which partition each event will be sent to, based on the event’s metadata, or based on any other strategy. If not provided, the default partition selector will cycle through the partitions in a round robin manner.
Serializer: a serializer is an object that can serialize a
Metadataobject into a binary representation, and deserialize a binary representation back into aMetadataobject. If not provided, the default serializer will convert theMetadatainto a string representation.
Diaspora API implementations may take advantage of multithreading to parallelize and pipeline the execution of the validator, partition selector, and serializer over many events. These objects can be customized and parameterized. For instance, a validator that checks the content of a JSON metadata could be provided with a list of fields it expects to find in the metadata of each event.
Hereafter we continue to use the “files” driver for the Diaspora Stream API, though the same code will work with any implementation of the API.
Creating a topic#
The following code snippets show how to create a topic. Such topic creation should generally
be done using the diaspora-ctl command-line tool, however it is also possible to create
topics in C++ and in Python. Our custom validator, partition selector,
and serializer are provided using the "name:library.so" format. This tells the
API to dynamically load the specified libraries to get access to their implementation.
Important
If you get an error indicating that dlopen has failed to find your library for
the validator, partition selector, or serializer, make sure that LD_LIBRARY_PATH
contains the path to find these libraries.
# long arguments version
diaspora-ctl topic create --name collisions \
--driver files --driver.root_path /tmp/my-data \
--topic.num_partitions 3 \
--validator energy_validator:libenergy_validator.so \
--validator.energy_max 100 \
--partition-selector energy_partition_selector:libenergy_partition_selector.so \
--partition-selector.energy_max 100 \
--serializer energy_serializer:libenergy_serializer.so \
--serializer.energy_max 100
Configuration parameters of each objects are passed using hierarchical
command-line options. For instance,
--validator.x 42 --validator.y.z abc
will produce the configuration { "x": 42, "y": { "z": "abc" }}.
diaspora::Validator validator =
diaspora::Validator::FromMetadata(
diaspora::Metadata{{
{"type","energy_validator:libenergy_validator.so"},
{"energy_max", 100}}}
);
diaspora::PartitionSelector selector =
diaspora::PartitionSelector::FromMetadata(
diaspora::Metadata{{
{"type","energy_partition_selector:libenergy_partition_selector.so"},
{"energy_max", 100}}}
);
diaspora::Serializer serializer =
diaspora::Serializer::FromMetadata(
diaspora::Metadata{{
{"type","energy_serializer:libenergy_serializer.so"},
{"energy_max", 100}}}
);
driver.createTopic("collisions", diaspora::Metadata{}, validator, selector, serializer);
from diaspora_stream.api import Validator, PartitionSelector, Serializer
validator = Validator.from_metadata(
type="energy_validator:libenergy_validator.so", energy_max=100)
selector = PartitionSelector.from_metadata(
type="energy_partition_selector:libenergy_partition_selector.so", energy_max=100)
serializer = Serializer.from_metadata(
type="energy_serializer:libenergy_serializer.so", energy_max=100)
driver.create_topic(
name="collisions",
validator=validator,
partition_selector=selector,
serializer=serializer)
Let’s take a look at the implementation of the validator, partition selector, and serializer classes.
Important
Validators, partition selectors, and serializers must currently be implemented in C++, even when used in Python.
#include <diaspora/Validator.hpp>
class EnergyValidator final : public diaspora::ValidatorInterface {
const size_t energy_max;
public:
EnergyValidator(size_t _energy_max)
: energy_max(_energy_max) {}
void validate(const diaspora::Metadata& metadata, const diaspora::DataView& data) const override {
if(!metadata.json().is_object())
throw diaspora::InvalidMetadata{
"EnergyValidator expects metadata to be a JSON object"};
if(!metadata.json().contains("energy"))
throw diaspora::InvalidMetadata{
"EnergyValidator expects metadata to contain an \"energy\" field"};
if(!metadata.json()["energy"].is_number_integer())
throw diaspora::InvalidMetadata{
"EnergyValidator expects x_max field to be an integer"};
if(metadata.json()["energy"].get<size_t>() >= energy_max)
throw diaspora::InvalidMetadata{
std::string{"EnergyValidator expects energy value to be lower than"}
+ std::to_string(energy_max)};
(void)data; // the validator could also validate the content of the data
}
diaspora::Metadata metadata() const override {
return diaspora::Metadata{
{{"energy_max", energy_max}}
};
}
static std::unique_ptr<diaspora::ValidatorInterface> create(const diaspora::Metadata& metadata) {
if(!metadata.json().is_object())
throw diaspora::InvalidMetadata{
"EnergyValidator configuration should be a JSON object"};
if(!metadata.json().contains("energy_max"))
throw diaspora::InvalidMetadata{
"EnergyValidator configuration should contain an \"energy_max\" field"};
if(!metadata.json()["energy_max"].is_number_integer())
throw diaspora::InvalidMetadata{
"EnergyValidator configuration's energy_max field should be an integer"};
return std::make_unique<EnergyValidator>(metadata.json()["energy_max"].get<size_t>());
}
};
DIASPORA_REGISTER_VALIDATOR(_, energy_validator, EnergyValidator);
The EnergyValidator class inherits from diaspora::ValidatorInterface
and provides the validate member function. This function checks for the
presence of an energy field of type unsigned integer and checks that
its value is less than an energy_max value provided when creating the
validator. If validation fails, the validate function throws an exception.
Important
The DIASPORA_REGISTER_VALIDATOR macro must be used to tell the API
about the EnergyValidator class. Its first argument is the name by
which we will refer to the class in user code (“energy_validator”), the
second argument is the name of the class itself (EnergyValidator).
#include <diaspora/PartitionSelector.hpp>
#include <diaspora/InvalidMetadata.hpp>
class EnergyPartitionSelector final : public diaspora::PartitionSelectorInterface {
const size_t energy_max;
std::vector<diaspora::PartitionInfo> m_targets;
public:
EnergyPartitionSelector(size_t _energy_max)
: energy_max(_energy_max) {}
void setPartitions(const std::vector<diaspora::PartitionInfo>& targets) override {
m_targets = targets;
}
size_t selectPartitionFor(const diaspora::Metadata& metadata, std::optional<size_t> requested) override {
if(requested.has_value()) {
if(m_targets.size() >= *requested) {
throw diaspora::Exception("Invalid requested partition number");
} else {
return *requested;
}
}
auto energy = metadata.json()["energy"].get<size_t>();
auto i = energy*m_targets.size()/energy_max;
return i;
}
diaspora::Metadata metadata() const override {
return diaspora::Metadata{};
}
static std::unique_ptr<diaspora::PartitionSelectorInterface> create(
const diaspora::Metadata& metadata) {
if(!metadata.json().is_object())
throw diaspora::InvalidMetadata{
"EnergyPartitionSelector configuration should be a JSON object"};
if(!metadata.json().contains("energy_max"))
throw diaspora::InvalidMetadata{
"EnergyPartitionSelector configuration should contain an \"energy_max\" field"};
if(!metadata.json()["energy_max"].is_number_integer())
throw diaspora::InvalidMetadata{
"EnergyPartitionSelector configuration's energy_max field should be an integer"};
return std::make_unique<EnergyPartitionSelector>(metadata.json()["energy_max"].get<size_t>());
}
};
DIASPORA_REGISTER_PARTITION_SELECTOR(_, energy_partition_selector, EnergyPartitionSelector);
The EnergyPartitionSelector is also initialized with an energy_max
value and uses it to aggregate events into uniform “bins” of similar energy values.
It inherits from diaspora::PartitionSelectorInterface and we call
DIASPORA_REGISTER_PARTITION_SELECTOR to make it available to use.
#include <diaspora/Serializer.hpp>
class EnergySerializer final : public diaspora::SerializerInterface {
size_t energy_max;
public:
EnergySerializer(size_t _energy_max)
: energy_max(_energy_max) {}
void serialize(diaspora::Archive& archive, const diaspora::Metadata& metadata) const override {
size_t energy = metadata.json()["energy"].get<size_t>();
if(energy_max <= std::numeric_limits<uint8_t>::max()) {
uint8_t val = static_cast<uint8_t>(energy);
archive.write(&val, sizeof(val));
} else if(energy_max <= std::numeric_limits<uint16_t>::max()) {
uint16_t val = static_cast<uint16_t>(energy);
archive.write(&val, sizeof(val));
} else if(energy_max <= std::numeric_limits<uint32_t>::max()) {
uint32_t val = static_cast<uint32_t>(energy);
archive.write(&val, sizeof(val));
} else {
uint64_t val = static_cast<uint64_t>(energy);
archive.write(&val, sizeof(val));
}
}
void deserialize(diaspora::Archive& archive, diaspora::Metadata& metadata) const override {
metadata = diaspora::Metadata{}; // ensure we have an empty JSON object
if(energy_max <= std::numeric_limits<uint8_t>::max()) {
uint8_t val;
archive.read(&val, sizeof(val));
metadata.json()["energy"] = val;
} else if(energy_max <= std::numeric_limits<uint16_t>::max()) {
uint16_t val;
archive.read(&val, sizeof(val));
metadata.json()["energy"] = val;
} else if(energy_max <= std::numeric_limits<uint32_t>::max()) {
uint32_t val;
archive.read(&val, sizeof(val));
metadata.json()["energy"] = val;
} else {
uint64_t val;
archive.read(&val, sizeof(val));
metadata.json()["energy"] = val;
}
}
diaspora::Metadata metadata() const override {
return diaspora::Metadata{
{{"energy_max", energy_max}}
};
}
static std::unique_ptr<diaspora::SerializerInterface> create(const diaspora::Metadata& metadata) {
if(!metadata.json().is_object())
throw diaspora::InvalidMetadata{
"EnergySerializer configuration should be a JSON object"};
if(!metadata.json().contains("energy_max"))
throw diaspora::InvalidMetadata{
"EnergySerializer configuration should contain an \"energy_max\" field"};
if(!metadata.json()["energy_max"].is_number_integer())
throw diaspora::InvalidMetadata{
"EnergySerializer configuration's energy_max field should be an integer"};
return std::make_unique<EnergySerializer>(metadata.json()["energy_max"].get<size_t>());
}
};
DIASPORA_REGISTER_SERIALIZER(_, energy_serializer, EnergySerializer);
The EnergySerializer is also initialized with an energy_max value.
This value is used to choose an appropriate number of bytes for the raw representation
of the energy when it is serialized. EnergySerializer inherits from
SerializerInterface and is registered using
DIASPORA_REGISTER_SERIALIZER.