March 7 - June 27, 2024 - Nstream is hitting the road with Confluent on the #DataInMotionTour! / Learn More

Druid Egress

Overview

Nstream provides a Druid Adapter library that facilitates publishing to Apache Druid. This guide:

  1. Outlines the process of using Apache Druid for egress operations, specifically focusing on Druid’s JSON-based batch write to Druid. This involves inserting data into Druid by making requests to the Overlord service using HTTP POST requests at the Tasks API endpoint.
  2. Demonstrates how to open a connection pool and publish messages from Web Agents using minimal boilerplate.

This approach allows for efficient handling of large data sets in a batch process, making it ideal for scenarios requiring robust capabilities for writing data to Druid.

Prerequisites

Dependencies

Gradle

implementation 'io.nstream:nstream-adapter-druid:4.9.16'

Maven

<dependency>
  <groupId>io.nstream</groupId>
  <artifactId>nstream-adapter-druid</artifactId>
  <version>4.9.16</version>
  <type>module</type>
</dependency>

Publishing

Nstream provides the abstract DruidPublishingAgent, which, with configuration and extension, can publish state to Druid.

Configuration

Here we give a full example of the configuration required to prepare a DruidPublishingAgent - we will discuss the implementation of the MyDruidPublishingAgent in the next section.

# server.recon
# Note: Replace the path for 'druidEgressSpecFilePath' below with the actual file
#  path to Druid's JSON-based spec for writing to Druid .
  
provisions: {
  @provision("druid-egress-provision") {
    class: "nstream.adapter.druid.DruidEgressConnectionProvision"
    def: {
      "druidEgressUrl": "http://localhost:8081/druid/indexer/v1/task",
      "druidEgressSpecFilePath": "path/to/write-to-druid-spec.json"
    }
  }
}

"druid-adapter": @fabric {
  @plane(class: "nstream.adapter.runtime.AppPlane")

  @node {
    uri: "/publisher"
    @agent(class: "nstream.adapter.druid.MyDruidPublishingAgent") {
      druidEgressConf: @druidEgressSettings {
        connectionProvisionName: "druid-egress-provision"
      }
    }
  }
}

Extension

Using configuration to handle the connection removes the need for most boilerplate in the implementation of the agent. Extending the DruidPublishingAgent requires implementation of one method:

  @Override
  protected void publish(Value value) throws DeferrableException {
    // Implement Druid's Tasks API call (for writing to Druid) to publish the passed value
  }

While implementing the method, a HTTP POST Object is made available for enabling the HTTP Client to execute the POST request after preparing Druid’s JSON-based spec for writing to Druid. Here we give examples of full implementations that publish on different conditions.

Publish On Event

The simplest method of publication is to publish any event or update received by a lane. All we need do is call the publish(Value value) method from the appropriate callback of the lane.

private static class MyDruidPublishingAgent extends DruidPublishingAgent {

  @SwimLane("insert")
  CommandLane<Value> insert = this.<Value>commandLane()
          .onCommand(this::publish);
  
  @Override
  protected void publish(Value value) {
    try (CloseableHttpClient client = HttpClients.createDefault()) {
      final String httpResponse = client.execute(this.getHttpPostObject(), response -> {
        assert response.getCode() == HttpStatus.SC_OK;
        return EntityUtils.toString(response.getEntity(), "UTF-8");
      });
      System.out.println("Response from Druid: " + httpResponse);
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

  @Override
  public void stagePublication() {
    super.stagePublication();
    publish(Value v);
  }
}

In the above example we use a insert command lane to insert or update values into a Druid segment.

Druid data update works seamlessly using the Nstream Druid Adapter. Below, we take an example where initial data is loaded using Druid’s JSON-based spec for writing to Druid and file filter, and then the data is updated for the same “intervals” but with different input data and file filter using the Druid’s JSON-based spec for writing to Druid.

server.recon details for loading initial data using updates-init-index.json - Druid’s JSON-based spec for writing to Druid:

# Note: Replace the path for 'druidEgressSpecFilePath' below with the actual file
#  path to the JSON spec.

provisions: {
  @provision("druid-egress-provision") {
    class: "nstream.adapter.druid.DruidEgressConnectionProvision"
    def: {
      "druidEgressUrl": "http://localhost:8081/druid/indexer/v1/task",
      "druidEgressSpecFilePath": "path/to/updates-init-index.json"
      "druidUpdatedEgressSpecFilePath": "path/to/updates-overwrite-index.json"
    }
  }
}

"druid-adapter": @fabric {
  @plane(class: "nstream.adapter.runtime.AppPlane")

  @node {
    uri: "/publisher"
    @agent(class: "nstream.adapter.druid.MyDruidPublishingAgent") {
      druidEgressConf: @druidEgressSettings {
        connectionProvisionName: "druid-egress-provision"
      }
    }
  }
}

In the above server.recon configuration’s provision, two egress spec file paths are present. druidEgressSpecFilePath is the initial spec for egress. druidUpdatedEgressSpecFilePath is the updated spec for egress.

Once the update file path provision property is loaded and publish task is executed, the “updates-tutorial” data source is updated and can be observed by querying the particular Druid data source.

Periodic

Another common approach to publication would be to publish the state of the agent periodically. In this case we must manage a timer, making use of the scheduling methods provided. To schedule a timer after setup we place the code in the stagePublication lifecycle callback - making sure to call the super.stagePublication() first.

public class MyDruidPublishingAgent extends DruidPublishingAgent {

  private TimerRef timerRef;

  @SwimLane("state")
  ValueLane<Value> state = this.<Value>valueLane();

  private void publishState() {
    publish(this.state.get());
  }

  @Override
  protected void publish(Value value) {
    try (CloseableHttpClient client = HttpClients.createDefault()) {
      final String httpResponse = client.execute(this.getHttpPostObject(), response -> {
        assert response.getCode() == HttpStatus.SC_OK;
        return EntityUtils.toString(response.getEntity(), "UTF-8");
      });
      System.out.println("Response from Druid: " + httpResponse);
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

  @Override
  public void stagePublication() {
    super.stagePublication();
    this.timerRef = scheduleAtFixedRate(() -> this.timerRef, 10000, 60000, this::publishState);
  }

}

Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).