PostgreSQL Notify Egress

Nstream provides a PostgreSQL Adapter library that greatly facilitates notifying PostgreSQL channels - the JDBC adapter covers publishing to tables. This guide demonstrates how to notify a channel from Web Agents using minimal boilerplate.

Prerequisites

Dependencies

Gradle

implementation 'io.nstream:nstream-adapter-postgresql:4.12.20'

Maven

<dependency>
  <groupId>io.nstream</groupId>
  <artifactId>nstream-adapter-postgresql</artifactId>
  <version>4.12.20</version>
  <type>module</type> <!-- Remove or comment this line for non-modular projects -->
</dependency>

Notify

Nstream provides the abstract PostgresNotifyAgent which with configuration and extension can notify a channel.

Configuration

Here we give a full example of the configuration required to prepare a PostgresNotifyAgent - we will discuss the implementation of the MyPostgresNotifyAgent in the next section.

# server.recon
provisions: {
  @provision("hikari") {
    class: "nstream.adapter.jdbc.ConnectionPoolProvision"
    def: {
      "jdbcUrl": "jdbc:postgresql://database:5432/mydb"
      "dataSource.driver": "org.postgresql.Driver"
      "dataSource.user": postgres,
      "dataSource.password": postgres,
      "dataSource.databaseName": mydb
    }
  }
}

"postgresql-adapter": @fabric {
  @plane(class: "nstream.adapter.runtime.AppPlane")
  @node {
    uri: "/postgresql/notify"
    @agent(class: "example.MyPostgresNotifyAgent") {
      postgresNotifyConf: @postgresNotifySettings {
        connectionPoolProvisionName: "hikari"
        channel: myChannel
      }
    }
  }
}

Extension

Using configuration to handle the connection removes the need for most boilerplate in the implementation of the agent. Extending the PostgresNotifyAgent requires implementation of a publish method:

  @Override
  protected void publish(Value value) throws DeferrableException {
    // Convert the value to be published into a String object and call the String publish method
  }

This can be implemented generally using Json:

  @Override
  protected void publish(Value structure) {
    publish(Json.toString(structure));
  }

The remaining implementation depends on how often you would like to notify, in the following sections we go over the common patterns.

Publish On Event

The simplest method of publication is to publish any event or update received by a lane. All we need do is call the publish(Value value) method from the appropriate callback of the lane.

public class MyPostgresNotifyAgent extends PostgresNotifyAgent {

  @SwimLane("notify")
  CommandLane<Value> notify = this.<Value>commandLane()
      .onCommand(this::publish);

  @Override
  protected void publish(Value structure) {
    publish(Json.toString(structure));
  }

}

In this example we use a command lane to notify values into a PostgreSQL channel.

Periodic

Another common approach to publication would be to publish the state of the agent periodically. In this case we must manage a timer, making use of the scheduling methods provided. To schedule a timer after setup we place the code in the stagePublication lifecycle callback - making sure to call the super.stagePublication() first.

public class MyPostgresNotifyAgent extends PostgresNotifyAgent {

  private TimerRef timerRef;

  @SwimLane("state")
  ValueLane<Value> state = this.<Value>valueLane();

  private void publishState() {
    publish(this.state.get());
  }

  @Override
  protected void publish(Value structure) {
    publish(Json.toString(structure));
  }

  @Override
  protected void stagePublication() {
    super.stagePublication();
    this.timerRef = scheduleAtFixedRate(() -> this.timerRef, 10000, 60000, this::publishState);
  }

}

In this example, every minute the agent will notify the channel with the current state of the agent lane.


Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).