Redis Egress
Nstream provides a Redis Adapter library that greatly facilitates publishing to Redis databases. This guide demonstrates how to start a Redis connection pool and publish state from Web Agents using minimal boilerplate.
Prerequisites
Dependencies
Gradle
implementation 'io.nstream:nstream-adapter-redis:4.15.23'
Maven
<dependency>
<groupId>io.nstream</groupId>
<artifactId>nstream-adapter-redis</artifactId>
<version>4.15.23</version>
<type>module</type> <!-- Remove or comment this line for non-modular projects -->
</dependency>
Configure a Client
Firstly, we configure a Redis client in the form of a connection pool provision, to be used by all Redis agents:
# server.recon
provisions: {
@provision("redisPool") {
class: "nstream.adapter.redis.PoolProvision"
def: {
"host": "localhost"
"port": 6379
}
}
}
This a very simple example but many settings are available to configure the pool:
Name | Description | Default |
---|---|---|
host |
Hostname of Redis instance to connect to | localhost |
port |
Port number of Redis instance to connect to | 6379 |
user |
Redis user | |
password |
Redis password | |
database |
Redis database to connect to | 0 |
clientName |
Client name to use for connection | |
ssl |
Whether to use SSL, true or false
|
false |
caCertPath |
Path to truststore when using SSL | |
caCertPassword |
Password to truststore when using SSL | |
userCertPath |
Path to user certificate when using SSL | |
userCertPassword |
Password to user certificate when using SSL | |
timeoutMillis |
Connection and socket timeout in ms | 2000 |
connectionTimeoutMillis |
Connection timeout in ms | 2000 |
socketTimeoutMillis |
Socket timeout in ms | 2000 |
blockingSocketTimeoutMillis |
Blocking socket timeout in ms | 0 |
maxTotal |
Maximum active connections in pool | 8 |
maxIdle |
Maximum idle connections in pool | 8 |
minIdle |
Minimum idle connections in pool | 0 |
maxWait |
Maximum number of ms to wait for a connection to become available | -1000 |
timeBetweenEvictionRuns |
Time in ms between checking for idle connections in pool | -1000 |
blockWhenExhausted |
Block while waiting for connection to become available, true or false
|
true |
testWhileIdle |
Enables sending a PING command periodically while the connection is idle, true or false
|
false |
Publishing
Nstream provides the abstract RedisPublishingAgent
which with configuration and extension can publish state to a Redis database.
Here we give a full example of the configuration required to prepare a RedisPublishingAgent
- we will discuss the implementation of the MyRedisPublishingAgent
in the next section.
# server.recon
provisions: {
@provision("redisPool") {
class: "nstream.adapter.redis.PoolProvision"
def: {
"host": "localhost"
"port": 6379
}
}
}
vehicle: @fabric {
@plane(class: "nstream.adapter.runtime.AppPlane")
@node {
uri: "/redis/publisher"
@agent(class: "vehicle.redis.MyRedisPublishingAgent") {
redisEgressConf: @redisEgressSettings {
poolProvisionName: redisPool
}
}
}
# ... other agents
}
Extension
Using configuration to handle the connection pool removes the need for most boilerplate in the implementation of the agent.
The RedisPublishingAgent
has a type parameter <P>
, this is the value type to be published, we will use the Redis hash value type in our example:
public class MyRedisPublishingAgent extends RedisPublishingAgent<Map<String, String>> {
To complete extension, we have to implement two methods:
@Override
protected void publish(Value value) throws DeferrableException {
// Convert the value to be published into the parameterized type and call the below
}
@Override
protected void publish(P value) throws DeferrableException {
// Publish the Redis value
}
While implementing these methods, the pool
variable is available, this is the configured connection pool object.
The first method can be implemented generally using a Swim Form
to cast the Value
to an accepted Redis type.
The RedisForms
class provides static methods (such as forHash()
and forSet()
) to help with most cases.
@Override
protected void publish(Value value) throws DeferrableException {
publish(RedisForms.forHash().cast(value));
}
The second method is where the publishing call is implemented. This will vary depending on the desired behaviour and type but here we continue our example and show how to set a hash:
@Override
protected void publish(Map<String, String> value) {
this.pool.hset(value.get("id"), value);
}
Publish On Event
The simplest method of publication is to publish any event or update received by a lane.
All we need do is call the publish(Value value)
method from the appropriate callback of the lane.
public class MyRedisPublishingAgent extends RedisPublishingAgent<Map<String, String>> {
@SwimLane("insert")
CommandLane<Value> insert = this.<Value>commandLane()
.onCommand(this::publish);
@Override
protected void publish(Value value) {
publish(RedisForms.forHash().cast(value));
}
@Override
protected void publish(Map<String, String> vehicle) {
this.pool.hset(
"vehicle:" + vehicle.get("id"),
vehicle
);
}
}
In this example we use a command lane to insert
values into a Redis database.
The didSet
callback on a ValueLane
could also be used.
Periodic
Another common approach to publication would be to publish the state of the agent periodically.
In this case we must manage a timer, making use of the scheduling methods provided.
To schedule a timer after setup we place the code in the stagePublication
lifecycle callback - making sure to call the super.stagePublication()
first.
public class MyRedisPublishingAgent extends RedisPublishingAgent<Map<String, String>> {
private TimerRef timerRef;
@SwimLane("state")
ValueLane<Value> state = this.<Value>valueLane();
private void publishState() {
publish(this.state.get());
}
@Override
protected void publish(Value value) {
publish(RedisForms.forHash().cast(value));
}
@Override
protected void publish(Map<String, String> vehicle) {
this.pool.hset(
"vehicle:" + vehicle.get("id"),
vehicle
);
}
@Override
protected void stagePublication() {
super.stagePublication();
this.timerRef = scheduleAtFixedRate(() -> this.timerRef, 10000, 60000, this::publishState);
}
}
In this example, every minute the agent will set its current state to the database.
Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).