JDBC Egress
Nstream provides a JDBC Adapter library that greatly facilitates publishing to databases. This guide demonstrates how to open a connection pool and publish messages from Web Agents using minimal boilerplate.
Prerequisites
Dependencies
Gradle
implementation 'io.nstream:nstream-adapter-jdbc:4.14.22'
Maven
<dependency>
<groupId>io.nstream</groupId>
<artifactId>nstream-adapter-jdbc</artifactId>
<version>4.14.22</version>
<type>module</type> <!-- Remove or comment this line for non-modular projects -->
</dependency>
Publishing
Nstream provides the abstract JdbcPublishingAgent
which with configuration and extension can publish state to databases using JDBC.
Configuration
Here we give a full example of the configuration required to prepare a JdbcPublishingAgent
- we will discuss the implementation of the MyJdbcPublishingAgent
in the next section.
# server.recon
provisions: {
@provision("hikariPool") {
class: "nstream.adapter.jdbc.ConnectionPoolProvision"
def: {
"jdbcUrl": "jdbc:postgresql://database:5432/mydb"
"dataSource.driver": "org.postgresql.Driver"
"dataSource.user": postgres,
"dataSource.password": postgres,
"dataSource.databaseName": mydb
}
}
}
vehicle: @fabric {
@plane(class: "nstream.adapter.runtime.AppPlane")
@node {
uri: "/publisher"
@agent(class: "vehicle.jdbc.MyJdbcPublishingAgent") {
jdbcEgressConf: @jdbcEgressSettings {
connectionPoolProvisionName: "hikariPool"
}
}
}
}
Extension
Using configuration to handle the connection pool removes the need for most boilerplate in the implementation of the agent.
Extending the JdbcPublishingAgent
requires implementation of one method:
@Override
protected void publish(Value value) throws DeferrableException {
// Implement database call to publish the passed value
}
While implementing the method, the pool
variable is made available for getting connections and preparing statements.
Here we give two examples of full implementations that publish on different conditions.
Publish On Event
The simplest method of publication is to publish any event or update received by a lane.
All we need do is call the publish(Value value)
method from the appropriate callback of the lane.
public class MyJdbcPublishingAgent extends JdbcPublishingAgent {
@SwimLane("insert")
CommandLane<Value> insert = this.<Value>commandLane()
.onCommand(this::publish);
@Override
protected void publish(Value value) {
try (Connection conn = this.pool.getConnection();
PreparedStatement statement = conn.prepareStatement("INSERT INTO vehicle (id) VALUES (?)")) {
statement.setString(1, value.get("id").stringValue());
statement.execute();
} catch (SQLException ex) {
ex.printStackTrace();
}
}
}
In this example we use a command lane to insert
values into a database table.
The didSet
callback on a ValueLane
could also be used but an update
or upsert
may be more appropriate in this case.
Periodic
Another common approach to publication would be to publish the state of the agent periodically.
In this case we must manage a timer, making use of the scheduling methods provided.
To schedule a timer after setup we place the code in the stagePublication
lifecycle callback - making sure to call the super.stagePublication()
first.
public class MyJdbcPublishingAgent extends JdbcPublishingAgent {
private TimerRef timerRef;
@SwimLane("state")
ValueLane<Value> state = this.<Value>valueLane();
private void publishState() {
publish(this.state.get());
}
@Override
protected void publish(Value value) {
try (Connection conn = this.pool.getConnection();
PreparedStatement statement = conn.prepareStatement("UPDATE vehicle SET status = ? WHERE id = ?")) {
statement.setString(1, value.get("status").stringValue());
statement.setString(2, value.get("id").stringValue());
statement.execute();
} catch (SQLException ex) {
ex.printStackTrace();
}
}
@Override
public void stagePublication() {
super.stagePublication();
this.timerRef = scheduleAtFixedRate(() -> this.timerRef, 10000, 60000, this::publishState);
}
}
In this example, every minute the agent will update the vehicle’s status
in the table.
Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).