PostgreSQL Notify Egress
Nstream provides a PostgreSQL Adapter library that greatly facilitates notifying PostgreSQL channels - the JDBC adapter covers publishing to tables. This guide demonstrates how to notify a channel from Web Agents using minimal boilerplate.
Prerequisites
-
JDK 11+: The library takes advantage of
java.net.http.HttpClient
under the hood, which is only available starting Java 11.
Dependencies
Gradle
implementation 'io.nstream:nstream-adapter-postgresql:4.12.20'
Maven
<dependency>
<groupId>io.nstream</groupId>
<artifactId>nstream-adapter-postgresql</artifactId>
<version>4.12.20</version>
<type>module</type> <!-- Remove or comment this line for non-modular projects -->
</dependency>
Notify
Nstream provides the abstract PostgresNotifyAgent
which with configuration and extension can notify a channel.
Configuration
Here we give a full example of the configuration required to prepare a PostgresNotifyAgent
- we will discuss the implementation of the MyPostgresNotifyAgent
in the next section.
# server.recon
provisions: {
@provision("hikari") {
class: "nstream.adapter.jdbc.ConnectionPoolProvision"
def: {
"jdbcUrl": "jdbc:postgresql://database:5432/mydb"
"dataSource.driver": "org.postgresql.Driver"
"dataSource.user": postgres,
"dataSource.password": postgres,
"dataSource.databaseName": mydb
}
}
}
"postgresql-adapter": @fabric {
@plane(class: "nstream.adapter.runtime.AppPlane")
@node {
uri: "/postgresql/notify"
@agent(class: "example.MyPostgresNotifyAgent") {
postgresNotifyConf: @postgresNotifySettings {
connectionPoolProvisionName: "hikari"
channel: myChannel
}
}
}
}
Extension
Using configuration to handle the connection removes the need for most boilerplate in the implementation of the agent.
Extending the PostgresNotifyAgent
requires implementation of a publish method:
@Override
protected void publish(Value value) throws DeferrableException {
// Convert the value to be published into a String object and call the String publish method
}
This can be implemented generally using Json:
@Override
protected void publish(Value structure) {
publish(Json.toString(structure));
}
The remaining implementation depends on how often you would like to notify, in the following sections we go over the common patterns.
Publish On Event
The simplest method of publication is to publish any event or update received by a lane.
All we need do is call the publish(Value value)
method from the appropriate callback of the lane.
public class MyPostgresNotifyAgent extends PostgresNotifyAgent {
@SwimLane("notify")
CommandLane<Value> notify = this.<Value>commandLane()
.onCommand(this::publish);
@Override
protected void publish(Value structure) {
publish(Json.toString(structure));
}
}
In this example we use a command lane to notify
values into a PostgreSQL channel.
Periodic
Another common approach to publication would be to publish the state of the agent periodically.
In this case we must manage a timer, making use of the scheduling methods provided.
To schedule a timer after setup we place the code in the stagePublication
lifecycle callback - making sure to call the super.stagePublication()
first.
public class MyPostgresNotifyAgent extends PostgresNotifyAgent {
private TimerRef timerRef;
@SwimLane("state")
ValueLane<Value> state = this.<Value>valueLane();
private void publishState() {
publish(this.state.get());
}
@Override
protected void publish(Value structure) {
publish(Json.toString(structure));
}
@Override
protected void stagePublication() {
super.stagePublication();
this.timerRef = scheduleAtFixedRate(() -> this.timerRef, 10000, 60000, this::publishState);
}
}
In this example, every minute the agent will notify the channel with the current state of the agent lane.
Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).