Getting started#
Without going into details that will be covered later, the Diaspora Stream API essentially borrows the terminology of event-driven systems like Kafka: events are produced into and consumed from topics, which are themselves split into a number partitions. Each partition represents an append-only log.
In this documentation, we will use the files driver that ships with the Diaspora Stream API. Other implementations are discussed in a later section.
Creating a topic and a partition#
Short of writing C++ or Python code, the easiest way to create a new topic
is to use the diaspora-ctl command line tool as follows.
diaspora-ctl topic create --name my_topic \
--driver files \
--driver.root_path /tmp/diaspora-data \
--topic.num_partitions 1
This command takes –name and –driver as mandatory arguments. Driver options
can be supplied using the --driver.<option-name> syntax (e.g. here the root_path
option, which specifies where to store the data in the file system). Similarly, topic
options can be supplied using --topic.<option-name>.
Note
If many options must be passed to the driver (resp. the topic), a
--driver-config <filename.json> may be supplied (resp. --topic-config),
which provides the options in the form of a JSON file. diaspora-ctl will merge
options coming from a JSON file with options coming from the command line, with the
latter overwriting the former.
We can check that the topic has been created successfully by listing the available topic as follows.
diaspora-ctl topic list --driver files --driver.root_path /tmp/diaspora-data
Using the Diaspora Stream API library#
The Diaspora Stream API can be used in C++ or in Python (if built with Python support). The following CMakeLists.txt file shows how to link a C++ application against the Diaspora Stream API library in CMake.
cmake_minimum_required (VERSION 3.21)
project (diaspora-stream-api-example C CXX)
set (CMAKE_CXX_STANDARD 17)
set (CMAKE_CXX_STANDARD_REQUIRED ON)
if (PROJECT_IS_TOP_LEVEL) # allows building from inside the diaspora-stream-api source tree
find_package (diaspora-stream-api REQUIRED)
endif ()
add_executable (myproducer producer.cpp)
target_link_libraries (myproducer diaspora-stream-api)
add_executable (myconsumer consumer.cpp)
target_link_libraries (myconsumer diaspora-stream-api)
In Python, the API is located in the diaspora_stream.api module.
The sections hereafter show how to use both the C++ and Python interface to produce
and consume events.
Simple producer application#
The following code exemplifies a producer.
We first create a Driver object, passing it some options including the root_path
required by the “files” driver. The first argument passed to Driver::New (or the Driver’s
constructor in Python), “files”, tells it to load the Files driver implementation. This
implementation is built into the library, so it does not need to be dynamically loaded.
In general, passing “<name>” as the argument will make the API automatically search for
lib<name>.so in LD_LIBRARY_PATH. If the implementation is located in a different
library, the caller may use the syntax "<name>:lib<the-library-name>.so" .
We then open the topic we have created, using driver.openTopic()
(driver.open_topic() in Python), which gives us a TopicHandle
to interact with the topic.
We create a Producer using topic.producer(), and we use
it in a loop to create 100 events with their Metadata and DataView
parts (we always send the same metadata here and we don’t provide any data).
In Python, the metadata part can be either a str representing a valid JSON document,
or a dict convertible to JSON, and the data part can be anything that satisfies
the buffer protocol, or a list of objects that each satisfy the buffer protocol.
The push() function is non-blocking. It returns a future object that callers
can wait on to obtain the event ID after the event has been stored. The call to wait()
in the code below is however commented: a better practice consists of periodically flushing
the producer by calling producer.flush(), or at least wait on futures as late as possible
so as to overlap the sending of the event with actual work from the application.
producer.flush() is also a non-blocking function. It returns a future that can be awaited.
All the Future objects used in the Diaspora Stream API have a wait() function taking
a timeout value in milliseconds. This timeout is not the timeout of the underlying operation
itself, it is the duration after which the wait() call should unblock even if the operation
has not completed. In C++, wait() returns an std::optional. If the wait
times out, this optional will not be set. In Python, wait() will return None if it
has timed out. A wait() call that has timed out can be retried. Finally, passing a negative
value to wait() will make the code block until the result is available (no timeout).
Important
While the producer will make a copy of the metadata part of an event,
it will NOT make a copy of its data part. It is the caller’s responsibility
to ensure that the data part remains alive and is not modified until the
corresponding call to wait() or flush(). Forgetting this is
the number one cause of data corruption or crashes.
#include <diaspora/Driver.hpp>
#include <diaspora/TopicHandle.hpp>
#include <iostream>
int main(int argc, char** argv) {
if(argc != 3) {
std::cerr << "Usage: " << argv[0] << " <root_path> <topic>" << std::endl;
return -1;
}
auto root_path = argv[1];
auto topic_name = argv[2];
try {
diaspora::Metadata driver_options;
driver_options.json()["root_path"] = root_path;
diaspora::Driver driver = diaspora::Driver::New("files", driver_options);
diaspora::TopicHandle topic = driver.openTopic(topic_name);
diaspora::Producer producer = topic.producer();
for(size_t i = 0; i < 100; ++i) {
auto future = producer.push(
diaspora::Metadata{R"({"x":42,"name":"bob"})"},
diaspora::DataView{}
);
// auto event_id = future.wait(-1).value();
}
producer.flush().wait(-1);
} catch(const diaspora::Exception& ex) {
std::cerr << ex.what() << std::endl;
}
return 0;
}
import sys
from diaspora_stream.api import Driver
def produce(root_path: str, topic_name: str):
driver_options = {
"root_path": root_path,
}
driver = Driver(backend="files", options=driver_options)
topic = driver.open_topic(topic_name)
producer = topic.producer()
for i in range(0, 100):
future = producer.push(
metadata={"x": i*42, "name": "bob"},
data=bytes())
event_id = future.wait()
if __name__ == "__main__":
if len(sys.argv) != 3:
print(f"Usage: {sys.argv[0]} <root_path> <topic_name>")
root_path = sys.argv[1]
topic_name = sys.argv[2]
produce(root_path, topic_name)
Simple consumer application#
The following code shows how to create a consumer and use it to consume the events.
The consumer object is created with a name. This is for the underlying backend
to keep track of the last event that was acknowledged by each application.
In case of a crash of the application, it will be able to restart from the
last acknowledged event. This acknowledgement is done using the
Event’s acknowledge() function, which in the example bellow
is called every 10 events.
consumer.pull() is a non-blocking function returning a Future.
Waiting for this future with .wait() returns an optional Event
object from which we can retrieve an event ID as well as the event’s metadata
and data.
As it is, the data associated with an event will not be pulled automatically by the consumer, contrary to the event’s metadata. Further in this documentation you will learn how to pull this data, or part of it.
#include <diaspora/Driver.hpp>
#include <diaspora/TopicHandle.hpp>
#include <iostream>
int main(int argc, char** argv) {
if(argc != 3) {
std::cerr << "Usage: " << argv[0] << " <root_path> <topic_name>" << std::endl;
return -1;
}
auto root_path = argv[1];
auto topic_name = argv[2];
try {
diaspora::Metadata options;
options.json()["root_path"] = root_path;
diaspora::Driver driver = diaspora::Driver::New("files", options);
diaspora::TopicHandle topic = driver.openTopic(topic_name);
diaspora::Consumer consumer = topic.consumer("my_consumer");
for(size_t i = 0; i < 100; ++i) {
diaspora::Event event = consumer.pull().wait(-1).value();
std::cout << event.id() << ": " << event.metadata().json().dump() << std::endl;
if((i+1) % 10 == 0) event.acknowledge();
}
} catch(const diaspora::Exception& ex) {
std::cerr << ex.what() << std::endl;
}
return 0;
}
import sys
from diaspora_stream.api import Driver
def consume(root_path: str, topic_name: str):
driver_options = {
"root_path": root_path,
}
driver = Driver(backend="files", options=driver_options)
topic = driver.open_topic(topic_name)
consumer = topic.consumer(name="my_consumer")
for i in range(0, 100):
event = consumer.pull().wait(-1)
print(event.metadata)
if (i+1) % 10:
event.acknowledge()
if __name__ == "__main__":
if len(sys.argv) != 4:
print(f"Usage: {sys.argv[0]} <root_path> <topic_name>")
root_path = sys.argv[1]
topic_name = sys.argv[2]
consume(root_path, topic_name)