PostgreSQL Listen Ingress

Nstream provides a PostgreSQL Adapter library that greatly facilitates ingestion from PostgreSQL notification channels - the JDBC adapter covers ingestion from tables. This guide demonstrates how to listen to a channel and process notifications in Web Agents using minimal boilerplate.

Prerequisites

Dependencies

Gradle

implementation 'io.nstream:nstream-adapter-postgresql:4.12.20'

Maven

<dependency>
  <groupId>io.nstream</groupId>
  <artifactId>nstream-adapter-postgresql</artifactId>
  <version>4.12.20</version>
  <type>module</type> <!-- Remove or comment this line for non-modular projects -->
</dependency>

Listen Patch

The following server.recon wholly configures a PostgresListenPatch that can listen to a channel:

# server.recon
provisions: {
  @provision("hikari") {
    class: "nstream.adapter.jdbc.ConnectionPoolProvision"
    def: {
      "jdbcUrl": "jdbc:postgresql://database:5432/mydb"
      "dataSource.driver": "org.postgresql.Driver"
      "dataSource.user": postgres,
      "dataSource.password": postgres,
      "dataSource.databaseName": mydb
    }
  }
}

"postgresql-adapter": @fabric {
  @plane(class: "nstream.adapter.runtime.AppPlane")
  @node {
    uri: "/postgresql/listen"
    @agent(class: "nstream.adapter.postgresql.PostgresListenPatch") {
      postgresListenConf: @postgresListenSettings {
        connectionPoolProvisionName: "hikari"
        channel: "myChannel"
        contentTypeOverride: json
        relaySchema: @command {
          nodeUri: "/notification/processor"
          laneUri: "notify"
        }
      }
    }
  }
}

Common Variations

If relaying notifications to another agent is not sufficient, or you would like to add custom logic on ingestion of notifications, the PostgresListenPatch can be easily overridden. Create a new class that extends the PostgresListenPatch and override one of the ingest methods, depending on whether you would like to ingest a PGNotification or Value type.

  @Override
  protected void ingest(PGNotification unstructured) throws DeferrableException {
    // Custom ingestion of a PGNotification
  }

  @Override
  protected void ingest(Value structured) throws DeferrableException {
    // Custom ingestion of notification as a Swim Value
  }

Remember to update your server.recon file with the new agent you just created, instead of the PostgresListenPatch.


Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).