JDBC Ingress
Nstream provides a Java Database Connectivity (JDBC) Adapter library that greatly facilitates ingestion from relational databases. This guide demonstrates how to repeatedly poll databases over JDBC and process responses in Web Agents using minimal boilerplate.
Prerequisites
Dependencies
Gradle
implementation 'io.nstream:nstream-adapter-jdbc:4.12.20'
Maven
<dependency>
<groupId>io.nstream</groupId>
<artifactId>nstream-adapter-jdbc</artifactId>
<version>4.12.20</version>
<type>module</type> <!-- Remove or comment this line for non-modular projects -->
</dependency>
Glossary
- ingestion: The act of converting an external response into messages utilized by Web Agents
-
JdbcIngestingAgent:
An abstract Web Agent class with built-in methods relevant to an JDBC query/result cycle -
JdbcIngressSettings:
A plain old Java object (POJO) that configures a singleJdbcIngestingAgent
-
JdbcIngestingPatch:
A concrete (but extendable)JdbcIngestingAgent
subclass that collectsResultSets
into aswim.structure.Values
JdbcIngestingPatch
Conditions
Ideal If:
- Your query is static
- Response bodies are converted into
Values
during ingestion - The recurring timing can resemble ScheduledExecutorService#scheduleAtFixedRate
Then you simply need to extend the JdbcIngestingPatch
class and override at most one method.
Consider a MySQL table with the following schema:
CREATE TABLE sites (
object_id INTEGER PRIMARY KEY,
name VARCHAR(255),
latitude FLOAT,
longitude FLOAT
);
The following server.recon
wholly configures a JdbcIngestingPatch
that can query this table:
# server.recon
provisions: {
@provision("hikari") {
class: "nstream.adapter.jdbc.ConnectionPoolProvision"
def: {
"jdbcUrl": "jdbc:mysql://database:5432/mydb"
"dataSource.driver": "com.mysql.jdbc.Driver"
"dataSource.user": public,
"dataSource.password": password,
"dataSource.databaseName": mydb
}
}
}
"jdbc-adapter": @fabric {
@plane(class: "nstream.adapter.runtime.AppPlane")
@node {
uri: "/bridge/foo"
@agent(class: "nstream.adapter.jdbc.JdbcIngestingPatch") {
jdbcIngressConf: @jdbcIngressSettings {
connectionPoolProvisionName: "hikari"
query: "SELECT object_id, name, latitude, longitude FROM sites"
# Transforms a ResultSet row into the structure:
# {"id": ..., "name": ..., "lat": ..., "lon":, ...}
# Note the parenthesized names, NOT the ResultSet column names,
# configure the resulting Value!
molderSchema: @resultSetAssembler {
@int(id), # first column
@string(name), # second column, etc.
@float(lat),
@float(lon)
},
relaySchema: @foreach {
@command {
nodeUri: "/site/$id"
laneUri: "info"
}
}
}
}
}
}
Driver Dependencies
Make sure to include any dependencies required by your desired drivers. In this example, we require mysql-connector-j in order to be able to configure the driver listed under the Hikari provision.
Let’s examine these components in a bit more detail.
ConnectionPoolProvision
By deferring connection management to the Hikari Connection Pool library, the JDBC adapter enables efficient and low-code ingestion capability even in environments that contain concurrent JDBC reads.
See here for further details of and the configuration options for this provision type.
JdbcIngressSettings#molderSchema
Querying a database over JDBC yields a java.sql.ResultSet
, an interface that provides few details about the returned value’s structure and instead expects the end-user to choose how to interpret it.
To perform this interpretation without custom Java code, your JdbcIngressSettings
must configure a molderSchema
that defines the desired transformation from ResultSet
to swim.structure.Value
.
The overall syntax for the molderSchema
field is as follows:
molderSchema: @resultSetAssembler {
@$TYPE($TARGET_NAME),
...
}
- Each provided row corresponds to the respective column of the returned query (e.g. first row corresponds to first column).
-
$TYPE
may be one ofint
,float
,double
, or (default and fallback)string
. -
$TARGET_NAME
identifies the desiredSlot
key for the column; see the comment in the above snippet for further clarification.
Note: multiple returned rows in a ResultSet
are handled transparently without additional effort.
The ingress bridge will process rows in the order in which they are returned.
JdbcIngressSettings#relaySchema
See the Relay DSL docs for a general overview.
When you configure your relaySchema
, ensure that any selectors correspond to $TARGET_NAME
s in the molderSchema
, which are not the column names in your database unless you explicitly configured the molderSchema
this way.
In the example configuration, the $id
selector correctly selects the first column of a ResultSet
row only because of the molderSchema
configuration.
Common Variations
Timing Strategy
The default timing strategy fires a JDBC query task with a fixed period between fires (regardless of task duration), with the class’s Agent#didStart()
callback initiating the process.
There are two aspects that you may wish to change:
- To alter the periodicity strategy, override
JdbcIngestingAgent#stageReception
to not invokeNstreamAgent#scheduleAtFixedRate
- To disable the automatic task startup, override
didStart()
to not callstageReception()
A rather common alternative for the latter is to instead invoke stageReception()
from the onCommand()
callback of some CommandLane
:
Variable Query
If the parameters for each request are not statically known, then the very simple JdbcIngressSettings
POJO cannot express the desired functionality.
The general-purpose alternative strategy is as follows:
- Within your
JdbcIngestingPatch
extension, store the state required to dynamically build each request in one or more class-local variables, updating these variables correctly during the Web Agent’s lifecycle - Override
JdbcIngestingAgent#prepareRequest
within your concrete class to read that state while building each request
Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).