JDBC Egress

Nstream provides a JDBC Adapter library that greatly facilitates publishing to databases. This guide demonstrates how to open a connection pool and publish messages from Web Agents using minimal boilerplate.

Prerequisites

Dependencies

Gradle

implementation 'io.nstream:nstream-adapter-jdbc:4.12.20'

Maven

<dependency>
  <groupId>io.nstream</groupId>
  <artifactId>nstream-adapter-jdbc</artifactId>
  <version>4.12.20</version>
  <type>module</type> <!-- Remove or comment this line for non-modular projects -->
</dependency>

Publishing

Nstream provides the abstract JdbcPublishingAgent which with configuration and extension can publish state to databases using JDBC.

Configuration

Here we give a full example of the configuration required to prepare a JdbcPublishingAgent - we will discuss the implementation of the MyJdbcPublishingAgent in the next section.

# server.recon
provisions: {
  @provision("hikariPool") {
    class: "nstream.adapter.jdbc.ConnectionPoolProvision"
    def: {
      "jdbcUrl": "jdbc:postgresql://database:5432/mydb"
      "dataSource.driver": "org.postgresql.Driver"
      "dataSource.user": postgres,
      "dataSource.password": postgres,
      "dataSource.databaseName": mydb
    }
  }
}

vehicle: @fabric {
  @plane(class: "nstream.adapter.runtime.AppPlane")
  
  @node {
    uri: "/publisher"
    @agent(class: "vehicle.jdbc.MyJdbcPublishingAgent") {
      jdbcEgressConf: @jdbcEgressSettings {
        connectionPoolProvisionName: "hikariPool"
      }
    }
  }

}

Extension

Using configuration to handle the connection pool removes the need for most boilerplate in the implementation of the agent. Extending the JdbcPublishingAgent requires implementation of one method:

  @Override
  protected void publish(Value value) throws DeferrableException {
    // Implement database call to publish the passed value
  }

While implementing the method, the pool variable is made available for getting connections and preparing statements. Here we give two examples of full implementations that publish on different conditions.

Publish On Event

The simplest method of publication is to publish any event or update received by a lane. All we need do is call the publish(Value value) method from the appropriate callback of the lane.

public class MyJdbcPublishingAgent extends JdbcPublishingAgent {

  @SwimLane("insert")
  CommandLane<Value> insert = this.<Value>commandLane()
      .onCommand(this::publish);

  @Override
  protected void publish(Value value) {
    try (Connection conn = this.pool.getConnection();
         PreparedStatement statement = conn.prepareStatement("INSERT INTO vehicle (id) VALUES (?)")) {
      statement.setString(1, value.get("id").stringValue());
      statement.execute();
    } catch (SQLException ex) {
      ex.printStackTrace();
    }
  }

}

In this example we use a command lane to insert values into a database table. The didSet callback on a ValueLane could also be used but an update or upsert may be more appropriate in this case.

Periodic

Another common approach to publication would be to publish the state of the agent periodically. In this case we must manage a timer, making use of the scheduling methods provided. To schedule a timer after setup we place the code in the stagePublication lifecycle callback - making sure to call the super.stagePublication() first.

public class MyJdbcPublishingAgent extends JdbcPublishingAgent {

  private TimerRef timerRef;

  @SwimLane("state")
  ValueLane<Value> state = this.<Value>valueLane();

  private void publishState() {
    publish(this.state.get());
  }

  @Override
  protected void publish(Value value) {
    try (Connection conn = this.pool.getConnection();
         PreparedStatement statement = conn.prepareStatement("UPDATE vehicle SET status = ? WHERE id = ?")) {
      statement.setString(1, value.get("status").stringValue());
      statement.setString(2, value.get("id").stringValue());
      statement.execute();
    } catch (SQLException ex) {
      ex.printStackTrace();
    }
  }

  @Override
  public void stagePublication() {
    super.stagePublication();
    this.timerRef = scheduleAtFixedRate(() -> this.timerRef, 10000, 60000, this::publishState);
  }

}

In this example, every minute the agent will update the vehicle’s status in the table.


Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).