Druid Ingress

Overview

This guide demonstrates how to set up Apache Druid as a data source for Nstream, using the Druid SQL Avatica JDBC Driver to query and insert data into Nstream. This integration is ideal for environments requiring the streamlined ingestion of queried data from Druid into Nstream for further processing and analytics.

Prerequisites

Dependencies

Gradle

implementation 'io.nstream:nstream-adapter-druid:4.12.20'

Maven

<dependency>
  <groupId>io.nstream</groupId>
  <artifactId>nstream-adapter-druid</artifactId>
  <version>4.12.20</version>
  <type>module</type> <!-- Remove or comment this line for non-modular projects -->
</dependency>

Glossary

Ideal DruidIngestingPatch Conditions

If:

Then you simply need to extend the DruidIngestingPatch class and override at most one method.

Consider a Druid Dataset wikiticker-2015-09-12-sampled.json.gz with the following dimensions schema presented via a JSON spec:

{
  "spec" : {
    "dataSchema" : {
      "dataSource" : "wikipedia",
      "dimensionsSpec" : {
        "dimensions" : [
          "channel",
          "cityName",
          "comment",
          "countryIsoCode",
          "countryName",
          "isAnonymous",
          "isMinor",
          "isNew",
          "isRobot",
          "isUnpatrolled",
          "metroCode",
          "namespace",
          "page",
          "regionIsoCode",
          "regionName",
          "user",
          { "name": "added", "type": "long" },
          { "name": "deleted", "type": "long" },
          { "name": "delta", "type": "long" }
        ]
      }
    }
  }
}

The following server.recon wholly configures a DruidIngestingPatch that can query this table:

# server.recon
provisions: {
  @provision("druid-provision") {
    class: "nstream.adapter.druid.ConnectionProvision"
    def: {
      "druidUrl": "jdbc:avatica:remote:url=http://localhost:8888/druid/v2/sql/avatica/;transparent_reconnection=true"
    }
  }
}

"druid-adapter": @fabric {
  @plane(class: "nstream.adapter.runtime.AppPlane")
  @node {
    uri: "/bridge/foo"
    @agent(class: "nstream.adapter.druid.DruidIngestingPatch") {
      druidIngressConf: @druidIngressSettings {
        connectionProvisionName: "druid-provision"
        firstFetchDelayMillis: 20000
        query: "SELECT \"__time\", \"cityName\", \"comment\" FROM \"wikipedia\" WHERE \"cityName\" IS NOT NULL LIMIT 9"
        molderSchema: @resultSetAssembler {
        __time,
        cityName,
        comment
        },
        relaySchema: @foreach {
          @command {
            nodeUri: "/dynamic/$cityName",
            laneUri: "info"
            value: $comment
          }
        }
      }
    }
  }

  @node {
    uri: "/dynamic/:id"
    @agent(class: "nstream.adapter.druid.DruidIngestingPatch")
  }
}

Note that you’ll need to include any driver dependencies (in this case avatica-core) in your classpath.

Common Variations

Timing Strategy

The default timing strategy fires a Druid SQL query task with a fixed period between fires (regardless of task duration), with the class’s Agent#didStart() callback initiating the process. There are two aspects that you may wish to change:

A rather common alternative for the latter is to instead invoke stageReception() from the onCommand() callback of some CommandLane.

Variable Query

If the parameters for each request are not statically known, then the very simple DruidIngressSettings POJO cannot express the desired functionality. The general-purpose alternative strategy is as follows:


Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).