Druid Egress
Overview
Nstream provides a Druid Adapter library that facilitates publishing to Apache Druid. This guide:
- Outlines the process of using Apache Druid for egress operations, specifically focusing on Druid’s JSON-based batch write to Druid. This involves inserting data into Druid by making requests to the Overlord service using HTTP POST requests at the Tasks API endpoint.
- Demonstrates how to open a connection pool and publish messages from Web Agents using minimal boilerplate.
This approach allows for efficient handling of large data sets in a batch process, making it ideal for scenarios requiring robust capabilities for writing data to Druid.
Prerequisites
- JDK 11+
-
Apache Druid Intallation
- Start a local run of Apache Druid
Dependencies
Gradle
implementation 'io.nstream:nstream-adapter-druid:4.15.23'
Maven
<dependency>
<groupId>io.nstream</groupId>
<artifactId>nstream-adapter-druid</artifactId>
<version>4.15.23</version>
<type>module</type> <!-- Remove or comment this line for non-modular projects -->
</dependency>
Publishing
Nstream provides the abstract DruidPublishingAgent
, which, with configuration and extension, can publish state to
Druid.
Configuration
Here we give a full example of the configuration required to prepare a DruidPublishingAgent
- we will discuss the
implementation of the MyDruidPublishingAgent
in the next section.
# server.recon
# Note: Replace the path for 'druidEgressSpecFilePath' below with the actual file
# path to Druid's JSON-based spec for writing to Druid .
provisions: {
@provision("druid-egress-provision") {
class: "nstream.adapter.druid.DruidEgressConnectionProvision"
def: {
"druidEgressUrl": "http://localhost:8081/druid/indexer/v1/task",
"druidEgressSpecFilePath": "path/to/write-to-druid-spec.json"
}
}
}
"druid-adapter": @fabric {
@plane(class: "nstream.adapter.runtime.AppPlane")
@node {
uri: "/publisher"
@agent(class: "nstream.adapter.druid.MyDruidPublishingAgent") {
druidEgressConf: @druidEgressSettings {
connectionProvisionName: "druid-egress-provision"
}
}
}
}
Extension
Using configuration to handle the connection removes the need for most boilerplate in the implementation of the agent.
Extending the DruidPublishingAgent
requires implementation of one method:
@Override
protected void publish(Value value) throws DeferrableException {
// Implement Druid's Tasks API call (for writing to Druid) to publish the passed value
}
While implementing the method, a HTTP POST Object is made available for enabling the HTTP Client to execute the POST request after preparing Druid’s JSON-based spec for writing to Druid. Here we give examples of full implementations that publish on different conditions.
Publish On Event
The simplest method of publication is to publish any event or update received by a lane.
All we need do is call the publish(Value value)
method from the appropriate callback of the lane.
private static class MyDruidPublishingAgent extends DruidPublishingAgent {
@SwimLane("insert")
CommandLane<Value> insert = this.<Value>commandLane()
.onCommand(this::publish);
@Override
protected void publish(Value value) {
try (CloseableHttpClient client = HttpClients.createDefault()) {
final String httpResponse = client.execute(this.getHttpPostObject(), response -> {
assert response.getCode() == HttpStatus.SC_OK;
return EntityUtils.toString(response.getEntity(), "UTF-8");
});
System.out.println("Response from Druid: " + httpResponse);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void stagePublication() {
super.stagePublication();
publish(Value v);
}
}
In the above example we use a insert
command lane to insert or update values into a Druid segment.
Druid data update works seamlessly using the Nstream Druid Adapter. Below, we take an example where initial data is loaded using Druid’s JSON-based spec for writing to Druid and file filter, and then the data is updated for the same “intervals” but with different input data and file filter using the Druid’s JSON-based spec for writing to Druid.
server.recon
details for loading initial data using updates-init-index.json - Druid’s JSON-based spec for writing to Druid:
# Note: Replace the path for 'druidEgressSpecFilePath' below with the actual file
# path to the JSON spec.
provisions: {
@provision("druid-egress-provision") {
class: "nstream.adapter.druid.DruidEgressConnectionProvision"
def: {
"druidEgressUrl": "http://localhost:8081/druid/indexer/v1/task",
"druidEgressSpecFilePath": "path/to/updates-init-index.json"
"druidUpdatedEgressSpecFilePath": "path/to/updates-overwrite-index.json"
}
}
}
"druid-adapter": @fabric {
@plane(class: "nstream.adapter.runtime.AppPlane")
@node {
uri: "/publisher"
@agent(class: "nstream.adapter.druid.MyDruidPublishingAgent") {
druidEgressConf: @druidEgressSettings {
connectionProvisionName: "druid-egress-provision"
}
}
}
}
In the above server.recon
configuration’s provision, two egress spec file paths are present.
druidEgressSpecFilePath
is the initial spec for egress.
druidUpdatedEgressSpecFilePath
is the updated spec for egress.
Once the update file path provision property is loaded and publish task is executed, the “updates-tutorial” data source is updated and can be observed by querying the particular Druid data source.
Periodic
Another common approach to publication would be to publish the state of the agent periodically.
In this case we must manage a timer, making use of the scheduling methods provided.
To schedule a timer after setup we place the code in the stagePublication
lifecycle callback - making sure to call
the super.stagePublication()
first.
public class MyDruidPublishingAgent extends DruidPublishingAgent {
private TimerRef timerRef;
@SwimLane("state")
ValueLane<Value> state = this.<Value>valueLane();
private void publishState() {
publish(this.state.get());
}
@Override
protected void publish(Value value) {
try (CloseableHttpClient client = HttpClients.createDefault()) {
final String httpResponse = client.execute(this.getHttpPostObject(), response -> {
assert response.getCode() == HttpStatus.SC_OK;
return EntityUtils.toString(response.getEntity(), "UTF-8");
});
System.out.println("Response from Druid: " + httpResponse);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void stagePublication() {
super.stagePublication();
this.timerRef = scheduleAtFixedRate(() -> this.timerRef, 10000, 60000, this::publishState);
}
}
Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).