Redis Egress
Nstream provides a Redis Adapter library that greatly facilitates publishing to Redis databases. This guide demonstrates how to start a Redis connection pool and publish state from Web Agents using minimal boilerplate.
Prerequisites
Dependencies
Gradle
implementation 'io.nstream:nstream-adapter-redis:4.13.21'
Maven
<dependency>
<groupId>io.nstream</groupId>
<artifactId>nstream-adapter-redis</artifactId>
<version>4.13.21</version>
<type>module</type> <!-- Remove or comment this line for non-modular projects -->
</dependency>
Publishing
Nstream provides the abstract RedisPublishingAgent
which with configuration and extension can publish state to a Redis database.
Configuration
Here we give a full example of the configuration required to prepare a RedisPublishingAgent
- we will discuss the implementation of the MyRedisPublishingAgent
in the next section.
# server.recon
provisions: {
@provision("redisPool") {
class: "nstream.adapter.redis.PoolProvision"
def: {
"host": "localhost"
"port": 6379
}
}
}
vehicle: @fabric {
@plane(class: "nstream.adapter.runtime.AppPlane")
@node {
uri: "/redis/publisher"
@agent(class: "vehicle.redis.MyRedisPublishingAgent") {
redisEgressConf: @redisEgressSettings {
poolProvisionName: redisPool
}
}
}
}
Extension
Using configuration to handle the connection pool removes the need for most boilerplate in the implementation of the agent.
The RedisPublishingAgent
has a type parameter <P>
, this is the value type to be published, we will use the Redis hash value type in our example:
public class MyRedisPublishingAgent extends RedisPublishingAgent<Map<String, String>> {
To complete extension, we have to implement two methods:
@Override
protected void publish(Value value) throws DeferrableException {
// Convert the value to be published into the parameterized type and call the below
}
@Override
protected void publish(P value) throws DeferrableException {
// Publish the Redis value
}
While implementing these methods, the pool
variable is available, this is the configured connection pool object.
The first method can be implemented generally using a Swim Form
to cast the Value
to an accepted Redis type.
The RedisForms
class provides static methods (such as forHash()
and forSet()
) to help with most cases.
@Override
protected void publish(Value value) throws DeferrableException {
publish(RedisForms.forHash().cast(value));
}
The second method is where the publishing call is implemented. This will vary depending on the desired behaviour and type but here we continue our example and show how to set a hash:
@Override
protected void publish(Map<String, String> value) {
this.pool.hset(value.get("id"), value);
}
Publish On Event
The simplest method of publication is to publish any event or update received by a lane.
All we need do is call the publish(Value value)
method from the appropriate callback of the lane.
public class MyRedisPublishingAgent extends RedisPublishingAgent<Map<String, String>> {
@SwimLane("insert")
CommandLane<Value> insert = this.<Value>commandLane()
.onCommand(this::publish);
@Override
protected void publish(Value value) {
publish(RedisForms.forHash().cast(value));
}
@Override
protected void publish(Map<String, String> vehicle) {
this.pool.hset(
"vehicle:" + vehicle.get("id"),
vehicle
);
}
}
In this example we use a command lane to insert
values into a Redis database.
The didSet
callback on a ValueLane
could also be used.
Periodic
Another common approach to publication would be to publish the state of the agent periodically.
In this case we must manage a timer, making use of the scheduling methods provided.
To schedule a timer after setup we place the code in the stagePublication
lifecycle callback - making sure to call the super.stagePublication()
first.
public class MyRedisPublishingAgent extends RedisPublishingAgent<Map<String, String>> {
private TimerRef timerRef;
@SwimLane("state")
ValueLane<Value> state = this.<Value>valueLane();
private void publishState() {
publish(this.state.get());
}
@Override
protected void publish(Value value) {
publish(RedisForms.forHash().cast(value));
}
@Override
protected void publish(Map<String, String> vehicle) {
this.pool.hset(
"vehicle:" + vehicle.get("id"),
vehicle
);
}
@Override
protected void stagePublication() {
super.stagePublication();
this.timerRef = scheduleAtFixedRate(() -> this.timerRef, 10000, 60000, this::publishState);
}
}
In this example, every minute the agent will set its current state to the database.
Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).