Forwarding#
The diaspora-ctl forward command runs a long-lived daemon that reads events
from one or more source topics and forwards them to destination topics. This enables
cross-driver event forwarding (e.g., from a local files-based topic to a remote
Kafka topic) without writing any application code.
Each forwarding policy runs in its own thread with a dedicated consumer and producer, isolating failures and avoiding shared-state issues.
Basic usage#
The forwarding daemon is configured with a YAML file specifying the drivers to use and the forwarding policies to apply.
diaspora-ctl forward --config <config.yaml>
Or with additional options:
diaspora-ctl forward --config <config.yaml> --logging <level>
The --config argument is mandatory and points to a YAML configuration file.
The --logging argument is optional and can be one of trace, debug,
info, warn, error, critical, or off
(default: info).
The daemon runs continuously until it receives a SIGINT or SIGTERM signal
(e.g., via Ctrl+C or kill). On shutdown, each worker thread flushes its
producer before terminating, ensuring no events are lost.
Configuration file format#
The configuration file uses YAML format and consists of two sections:
a drivers mapping defining the streaming backends, and a forward list
defining the forwarding policies.
Defining drivers#
Each driver is defined as a sub-mapping under drivers. The mapping key becomes the
driver’s name (used in forwarding policies), and the type field specifies which
Diaspora driver implementation to load. All other fields are passed as metadata to the driver.
drivers:
local:
type: files
options:
root_path: /data/local
remote:
type: kafka
options:
bootstrap: "kafka:9092"
This defines two drivers: local (using the files driver) and remote
(using a Kafka driver). The options mapping is converted to JSON and passed
as metadata when creating the driver.
Defining forwarding policies#
Each entry in the forward list defines a policy that copies events from a source
topic to a destination topic. The from and to fields use the format
driver_name/topic_name.
forward:
- from: local/source-topic
to: remote/dest-topic
Multiple policies can be defined to forward from multiple source topics simultaneously. Each policy runs in its own thread.
Example configurations#
Local forwarding#
The simplest use case forwards events between two topics on the same driver. This can be useful for replicating data or for feeding derived topics.
drivers:
local:
type: files
options:
root_path: /tmp/my-data
forward:
- from: local/source-topic
to: local/dest-topic
diaspora-ctl forward --config forward_local.yaml
Cross-driver forwarding#
A more common use case is forwarding events between different drivers, for instance from a local files-based staging area to a remote Kafka cluster.
drivers:
local:
type: files
options:
root_path: /data/local
remote:
type: octopus
options:
kafka:
bootstrap.servers: "kafka:9092"
forward:
- from: local/source-topic
to: remote/dest-topic
- from: local/topic-a
to: remote/topic-b
This configuration defines two forwarding policies that run concurrently, each in its own thread.
Custom data selection and allocation#
For advanced use cases, you can specify custom DataSelector and
DataAllocator plugins that control which parts of an event’s data are read
and how memory is allocated for the data on the consumer side. These are loaded
dynamically via dlopen and specified using the format
library.so:factory_function.
drivers:
local:
type: files
options:
root_path: /data/local
remote:
type: kafka
options:
bootstrap: "kafka:9092"
forward:
- from: local/detector-events
to: remote/detector-events
data_selector: "libmyplugin.so:my_selector"
data_allocator: "libmyplugin.so:my_allocator"
The factory functions must be extern "C" functions returning a
diaspora::DataSelector or diaspora::DataAllocator respectively:
#include <diaspora/DataSelector.hpp>
#include <diaspora/DataAllocator.hpp>
extern "C" diaspora::DataSelector my_selector() {
return [](const diaspora::Metadata& metadata,
const diaspora::DataDescriptor& descriptor) {
// Return the full descriptor to select all data,
// or DataDescriptor{} to skip data entirely
return descriptor;
};
}
extern "C" diaspora::DataAllocator my_allocator() {
return [](const diaspora::Metadata& metadata,
const diaspora::DataDescriptor& descriptor) {
// Allocate memory for the incoming data
auto* buffer = new char[descriptor.size()];
return diaspora::DataView{
buffer, descriptor.size(), buffer,
[](void* ctx) { delete[] static_cast<char*>(ctx); }
};
};
}
How it works#
The forwarding daemon operates as follows:
Startup: The YAML configuration is parsed and validated. All drivers are instantiated and all source/destination topics are opened. If any driver or topic cannot be created, the daemon exits immediately with an error.
Worker threads: For each
forwardpolicy, a dedicated thread is spawned containing a consumer (reading from the source topic) and a producer (writing to the destination topic).Event loop: Each worker thread continuously pulls events from its consumer with a 200ms timeout (to allow responsive shutdown checks). Successfully pulled events are pushed to the destination producer and then acknowledged on the source.
Graceful shutdown: Upon receiving
SIGINTorSIGTERM, each worker thread exits its loop, flushes its producer (with a 5-second timeout), and terminates. The daemon waits for all threads to join before exiting.
Important
The forwarding daemon requires that source and destination topics already exist
before it is started. Use diaspora-ctl topic create to create them first.
Important
If a transient error occurs during event pulling or pushing (e.g., a temporary network issue), the worker thread logs the error and retries after a short delay. The daemon does not exit on transient errors.