March 7 - June 27, 2024 - Nstream is hitting the road with Confluent on the #DataInMotionTour! / Learn More

JDBC Ingress

Nstream provides a Java Database Connectivity (JDBC) Adapter library that greatly facilitates ingestion from relational databases. This guide demonstrates how to repeatedly poll databases over JDBC and process responses in Web Agents using minimal boilerplate.

Prerequisites

Dependencies

Gradle

implementation 'io.nstream:nstream-adapter-jdbc:4.9.16'

Maven

<dependency>
  <groupId>io.nstream</groupId>
  <artifactId>nstream-adapter-jdbc</artifactId>
  <version>4.9.16</version>
  <type>module</type>
</dependency>

Glossary

Ideal JdbcIngestingPatch Conditions

If:

Then you simply need to extend the JdbcIngestingPatch class and override at most one method.

Consider a MySQL table with the following schema:

CREATE TABLE sites (
  id INTEGER PRIMARY KEY,
  lat FLOAT,
  lon FLOAT
);

The following server.recon wholly configures a JdbcIngestingPatch that can query this table:

# server.recon
provisions: {
  @provision("hikari") {
    class: "nstream.adapter.jdbc.ConnectionPoolProvision"
    def: {
      "jdbcUrl": "jdbc:mysql://database:5432/mydb"
      "dataSource.driver": "com.mysql.jdbc.Driver"
      "dataSource.user": public,
      "dataSource.password": password,
      "dataSource.databaseName": mydb
    }
  }
}

"jdbc-adapter": @fabric {
  @plane(class: "nstream.adapter.runtime.AppPlane")
  @node {
    uri: "/bridge/foo"
    @agent(class: "nstream.adapter.jdbc.JdbcIngestingAgent") {
      jdbcIngressConf: @jdbcIngressSettings {
        connectionPoolProvisionName: "hikari"
        query: "SELECT * FROM sites"
        molderSchema: @resultSetMolder {
          @int(id),
          @float(lat),
          @float(lon)
        },
        relaySchema: @foreach {
          @command {
            nodeUri: {
              "/site/",
              $id
            },
            laneUri: "info"
          }
        }
      }
    }
  }
}

Note that you’ll need to include any driver dependencies (in this case mysql-connector-j) in your classpath.

Common Variations

Timing Strategy

The default timing strategy fires a JDBC query task with a fixed period between fires (regardless of task duration), with the class’s Agent#didStart() callback initiating the process. There are two aspects that you may wish to change:

A rather common alternative for the latter is to instead invoke stageReception() from the onCommand() callback of some CommandLane:

Variable Query

If the parameters for each request are not statically known, then the very simple JdbcIngressSettings POJO cannot express the desired functionality. The general-purpose alternative strategy is as follows:


Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).