JDBC Ingress
Nstream provides a Java Database Connectivity (JDBC) Adapter library that greatly facilitates ingestion from relational databases. This guide demonstrates how to repeatedly poll databases over JDBC and process responses in Web Agents using minimal boilerplate.
Prerequisites
-
JDK 11+: The library takes advantage of
java.net.http.HttpClient
under the hood, which is only available starting Java 11.
Dependencies
Gradle
implementation 'io.nstream:nstream-adapter-jdbc:4.2.0.5'
Maven
<dependency>
<groupId>io.nstream</groupId>
<artifactId>nstream-adapter-jdbc</artifactId>
<version>4.2.0.5</version>
<type>module</type>
</dependency>
Glossary
- ingestion: The act of converting an external response into messages utilized by Web Agents
-
JdbcIngestingAgent:
An abstract Web Agent class with built-in methods relevant to an JDBC query/result cycle -
JdbcIngressSettings:
A plain old Java object (POJO) that configures a singleJdbcIngestingAgent
-
JdbcIngestingPatch:
A concrete (but extendable)JdbcIngestingAgent
subclass that collectsResultSets
into aswim.structure.Values
JdbcIngestingPatch
Conditions
Ideal If:
- Your query is static
- Response bodies are converted into
Values
during ingestion - The recurring timing can resemble ScheduledExecutorService#scheduleAtFixedRate
Then you simply need to extend the JdbcIngestingPatch
class and override at most one method.
Consider a MySQL table with the following schema:
CREATE TABLE sites (
id INTEGER PRIMARY KEY,
lat FLOAT,
lon FLOAT
);
The following server.recon
wholly configures a JdbcIngestingPatch
that can query this table:
# server.recon
provisions: {
@provision("hikari") {
class: "nstream.adapter.jdbc.ConnectionPoolProvision"
def: {
"jdbcUrl": "jdbc:mysql://database:5432/mydb"
"dataSource.driver": "com.mysql.jdbc.Driver"
"dataSource.user": public,
"dataSource.password": password,
"dataSource.databaseName": mydb
}
}
}
"jdbc-adapter": @fabric {
@plane(class: "nstream.adapter.runtime.AppPlane")
@node {
uri: "/bridge/foo"
@agent(class: "nstream.adapter.jdbc.JdbcIngestingAgent") {
jdbcIngressConf: @jdbcIngressSettings {
connectionPoolProvisionName: "hikari"
query: "SELECT * FROM sites"
molderSchema: @resultSetMolder {
@int(id),
@float(lat),
@float(lon)
},
relaySchema: @foreach {
@command {
nodeUri: {
"/site/",
$id
},
laneUri: "info"
}
}
}
}
}
}
Note that you’ll need to include any driver dependencies (in this case mysql-connector-j
) in your classpath.
Common Variations
Timing Strategy
The default timing strategy fires a JDBC query task with a fixed period between fires (regardless of task duration), with the class’s Agent#didStart()
callback initiating the process.
There are two aspects that you may wish to change:
- To alter the periodicity strategy, override
JdbcIngestingAgent#stageReception
to not invokeExecutorAgent#scheduleAtFixedRate
- To disable the automatic task startup, override
didStart()
to not callstageReception()
A rather common alternative for the latter is to instead invoke stageReception()
from the onCommand()
callback of some CommandLane
:
Variable Query
If the parameters for each request are not statically known, then the very simple JdbcIngressSettings
POJO cannot express the desired functionality.
The general-purpose alternative strategy is as follows:
- Within your
JdbcIngestingPatch
extension, store the state required to dynamically build each request in one or more class-local variables, updating these variables correctly during the Web Agent’s lifecycle - Override
JdbcIngestingAgent#prepareRequest
within your concrete class to read that state while building each request