PostgreSQL Listen Ingress
Nstream provides a PostgreSQL Adapter library that greatly facilitates ingestion from PostgreSQL notification channels - the JDBC adapter covers ingestion from tables. This guide demonstrates how to listen to a channel and process notifications in Web Agents using minimal boilerplate.
Prerequisites
-
JDK 11+: The library takes advantage of
java.net.http.HttpClient
under the hood, which is only available starting Java 11.
Dependencies
Gradle
implementation 'io.nstream:nstream-adapter-postgresql:4.12.20'
Maven
<dependency>
<groupId>io.nstream</groupId>
<artifactId>nstream-adapter-postgresql</artifactId>
<version>4.12.20</version>
<type>module</type> <!-- Remove or comment this line for non-modular projects -->
</dependency>
Listen Patch
The following server.recon
wholly configures a PostgresListenPatch
that can listen to a channel:
# server.recon
provisions: {
@provision("hikari") {
class: "nstream.adapter.jdbc.ConnectionPoolProvision"
def: {
"jdbcUrl": "jdbc:postgresql://database:5432/mydb"
"dataSource.driver": "org.postgresql.Driver"
"dataSource.user": postgres,
"dataSource.password": postgres,
"dataSource.databaseName": mydb
}
}
}
"postgresql-adapter": @fabric {
@plane(class: "nstream.adapter.runtime.AppPlane")
@node {
uri: "/postgresql/listen"
@agent(class: "nstream.adapter.postgresql.PostgresListenPatch") {
postgresListenConf: @postgresListenSettings {
connectionPoolProvisionName: "hikari"
channel: "myChannel"
contentTypeOverride: json
relaySchema: @command {
nodeUri: "/notification/processor"
laneUri: "notify"
}
}
}
}
}
Common Variations
If relaying notifications to another agent is not sufficient, or you would like to add custom logic on ingestion of notifications, the PostgresListenPatch
can be easily overridden.
Create a new class that extends the PostgresListenPatch
and override one of the ingest
methods, depending on whether you would like to ingest a PGNotification
or Value
type.
@Override
protected void ingest(PGNotification unstructured) throws DeferrableException {
// Custom ingestion of a PGNotification
}
@Override
protected void ingest(Value structured) throws DeferrableException {
// Custom ingestion of notification as a Swim Value
}
Remember to update your server.recon
file with the new agent you just created, instead of the PostgresListenPatch
.
Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).