Druid Ingress
Overview
This guide demonstrates how to set up Apache Druid as a data source for Nstream, using the Druid SQL Avatica JDBC Driver to query and insert data into Nstream. This integration is ideal for environments requiring the streamlined ingestion of queried data from Druid into Nstream for further processing and analytics.
Prerequisites
- JDK 11+
-
Apache Druid Intallation
- Start a local run of Apache Druid
Dependencies
Gradle
implementation 'io.nstream:nstream-adapter-druid:4.12.20'
Maven
<dependency>
<groupId>io.nstream</groupId>
<artifactId>nstream-adapter-druid</artifactId>
<version>4.12.20</version>
<type>module</type> <!-- Remove or comment this line for non-modular projects -->
</dependency>
Glossary
- ingestion: The act of converting an external response into messages utilized by Web Agents
-
DruidIngestingAgent:
An abstract Web Agent class with built-in methods relevant to a Druid SQL query/result cycle using the Avatica JDBC Driver -
DruidIngressSettings:
A plain old Java object (POJO) that configures a singleDruidIngestingAgent
-
DruidIngestingPatch:
A concrete (but extendable)DruidIngestingAgent
subclass that collectsResultSets
into aswim.structure.Values
DruidIngestingPatch
Conditions
Ideal If:
- Your query is static
- Response bodies are converted into
Values
during ingestion - The recurring timing can resemble ScheduledExecutorService#scheduleAtFixedRate
Then you simply need to extend the DruidIngestingPatch
class and override at most one method.
Consider a Druid Dataset wikiticker-2015-09-12-sampled.json.gz with the following dimensions schema presented via a JSON spec:
{
"spec" : {
"dataSchema" : {
"dataSource" : "wikipedia",
"dimensionsSpec" : {
"dimensions" : [
"channel",
"cityName",
"comment",
"countryIsoCode",
"countryName",
"isAnonymous",
"isMinor",
"isNew",
"isRobot",
"isUnpatrolled",
"metroCode",
"namespace",
"page",
"regionIsoCode",
"regionName",
"user",
{ "name": "added", "type": "long" },
{ "name": "deleted", "type": "long" },
{ "name": "delta", "type": "long" }
]
}
}
}
}
The following server.recon
wholly configures a DruidIngestingPatch
that can query this table:
# server.recon
provisions: {
@provision("druid-provision") {
class: "nstream.adapter.druid.ConnectionProvision"
def: {
"druidUrl": "jdbc:avatica:remote:url=http://localhost:8888/druid/v2/sql/avatica/;transparent_reconnection=true"
}
}
}
"druid-adapter": @fabric {
@plane(class: "nstream.adapter.runtime.AppPlane")
@node {
uri: "/bridge/foo"
@agent(class: "nstream.adapter.druid.DruidIngestingPatch") {
druidIngressConf: @druidIngressSettings {
connectionProvisionName: "druid-provision"
firstFetchDelayMillis: 20000
query: "SELECT \"__time\", \"cityName\", \"comment\" FROM \"wikipedia\" WHERE \"cityName\" IS NOT NULL LIMIT 9"
molderSchema: @resultSetAssembler {
__time,
cityName,
comment
},
relaySchema: @foreach {
@command {
nodeUri: "/dynamic/$cityName",
laneUri: "info"
value: $comment
}
}
}
}
}
@node {
uri: "/dynamic/:id"
@agent(class: "nstream.adapter.druid.DruidIngestingPatch")
}
}
Note that you’ll need to include any driver dependencies (in this case
avatica-core
) in your classpath.
Common Variations
Timing Strategy
The default timing strategy fires a Druid SQL query task with a fixed period between fires
(regardless of task duration), with the class’s Agent#didStart()
callback initiating the process.
There are two aspects that you may wish to change:
- To alter the periodicity strategy, override
DruidIngestingAgent#stageReception
to not invokeNstreamAgent#scheduleAtFixedRate
- To disable the automatic task startup, override
didStart()
to not callstageReception()
A rather common alternative for the latter is to instead invoke stageReception()
from the onCommand()
callback of
some CommandLane
.
Variable Query
If the parameters for each request are not statically known, then the very simple DruidIngressSettings
POJO cannot
express the desired functionality.
The general-purpose alternative strategy is as follows:
- Within your
DruidIngestingPatch
extension, store the state required to dynamically build each request in one or more class-local variables, updating these variables correctly during the Web Agent’s lifecycle - Override
DruidIngestingAgent#prepareRequest
within your concrete class to read that state while building each request
Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).