JDBC Ingress

Nstream provides a Java Database Connectivity (JDBC) Adapter library that greatly facilitates ingestion from relational databases. This guide demonstrates how to repeatedly poll databases over JDBC and process responses in Web Agents using minimal boilerplate.

Prerequisites

Dependencies

Gradle

implementation 'io.nstream:nstream-adapter-jdbc:4.12.20'

Maven

<dependency>
  <groupId>io.nstream</groupId>
  <artifactId>nstream-adapter-jdbc</artifactId>
  <version>4.12.20</version>
  <type>module</type> <!-- Remove or comment this line for non-modular projects -->
</dependency>

Glossary

Ideal JdbcIngestingPatch Conditions

If:

Then you simply need to extend the JdbcIngestingPatch class and override at most one method.

Consider a MySQL table with the following schema:

CREATE TABLE sites (
  object_id INTEGER PRIMARY KEY,
  name VARCHAR(255),
  latitude FLOAT,
  longitude FLOAT
);

The following server.recon wholly configures a JdbcIngestingPatch that can query this table:

# server.recon
provisions: {
  @provision("hikari") {
    class: "nstream.adapter.jdbc.ConnectionPoolProvision"
    def: {
      "jdbcUrl": "jdbc:mysql://database:5432/mydb"
      "dataSource.driver": "com.mysql.jdbc.Driver"
      "dataSource.user": public,
      "dataSource.password": password,
      "dataSource.databaseName": mydb
    }
  }
}

"jdbc-adapter": @fabric {
  @plane(class: "nstream.adapter.runtime.AppPlane")
  @node {
    uri: "/bridge/foo"
    @agent(class: "nstream.adapter.jdbc.JdbcIngestingPatch") {
      jdbcIngressConf: @jdbcIngressSettings {
        connectionPoolProvisionName: "hikari"
        query: "SELECT object_id, name, latitude, longitude FROM sites"
        # Transforms a ResultSet row into the structure:
        #   {"id": ..., "name": ..., "lat": ..., "lon":, ...}
        # Note the parenthesized names, NOT the ResultSet column names,
        #   configure the resulting Value!
        molderSchema: @resultSetAssembler {
          @int(id), # first column
          @string(name), # second column, etc.
          @float(lat),
          @float(lon)
        },
        relaySchema: @foreach {
          @command {
            nodeUri: "/site/$id"
            laneUri: "info"
          }
        }
      }
    }
  }
}

Driver Dependencies

Make sure to include any dependencies required by your desired drivers. In this example, we require mysql-connector-j in order to be able to configure the driver listed under the Hikari provision.

Let’s examine these components in a bit more detail.

ConnectionPoolProvision

By deferring connection management to the Hikari Connection Pool library, the JDBC adapter enables efficient and low-code ingestion capability even in environments that contain concurrent JDBC reads.

See here for further details of and the configuration options for this provision type.

JdbcIngressSettings#molderSchema

Querying a database over JDBC yields a java.sql.ResultSet, an interface that provides few details about the returned value’s structure and instead expects the end-user to choose how to interpret it.

To perform this interpretation without custom Java code, your JdbcIngressSettings must configure a molderSchema that defines the desired transformation from ResultSet to swim.structure.Value. The overall syntax for the molderSchema field is as follows:

molderSchema: @resultSetAssembler {
  @$TYPE($TARGET_NAME),
  ...
}

Note: multiple returned rows in a ResultSet are handled transparently without additional effort. The ingress bridge will process rows in the order in which they are returned.

JdbcIngressSettings#relaySchema

See the Relay DSL docs for a general overview.

When you configure your relaySchema, ensure that any selectors correspond to $TARGET_NAMEs in the molderSchema, which are not the column names in your database unless you explicitly configured the molderSchema this way.

In the example configuration, the $id selector correctly selects the first column of a ResultSet row only because of the molderSchema configuration.

Common Variations

Timing Strategy

The default timing strategy fires a JDBC query task with a fixed period between fires (regardless of task duration), with the class’s Agent#didStart() callback initiating the process. There are two aspects that you may wish to change:

A rather common alternative for the latter is to instead invoke stageReception() from the onCommand() callback of some CommandLane:

Variable Query

If the parameters for each request are not statically known, then the very simple JdbcIngressSettings POJO cannot express the desired functionality. The general-purpose alternative strategy is as follows:


Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).