Redis Egress

Nstream provides a Redis Adapter library that greatly facilitates publishing to Redis databases. This guide demonstrates how to start a Redis connection pool and publish state from Web Agents using minimal boilerplate.

Prerequisites

Dependencies

Gradle

implementation 'io.nstream:nstream-adapter-redis:4.15.23'

Maven

<dependency>
  <groupId>io.nstream</groupId>
  <artifactId>nstream-adapter-redis</artifactId>
  <version>4.15.23</version>
  <type>module</type> <!-- Remove or comment this line for non-modular projects -->
</dependency>

Configure a Client

Firstly, we configure a Redis client in the form of a connection pool provision, to be used by all Redis agents:

# server.recon
provisions: {
  @provision("redisPool") {
    class: "nstream.adapter.redis.PoolProvision"
    def: {
      "host": "localhost"
      "port": 6379
    }
  }
}

This a very simple example but many settings are available to configure the pool:

Name Description Default
host Hostname of Redis instance to connect to localhost
port Port number of Redis instance to connect to 6379
user Redis user  
password Redis password  
database Redis database to connect to 0
clientName Client name to use for connection  
ssl Whether to use SSL, true or false false
caCertPath Path to truststore when using SSL  
caCertPassword Password to truststore when using SSL  
userCertPath Path to user certificate when using SSL  
userCertPassword Password to user certificate when using SSL  
timeoutMillis Connection and socket timeout in ms 2000
connectionTimeoutMillis Connection timeout in ms 2000
socketTimeoutMillis Socket timeout in ms 2000
blockingSocketTimeoutMillis Blocking socket timeout in ms 0
maxTotal Maximum active connections in pool 8
maxIdle Maximum idle connections in pool 8
minIdle Minimum idle connections in pool 0
maxWait Maximum number of ms to wait for a connection to become available -1000
timeBetweenEvictionRuns Time in ms between checking for idle connections in pool -1000
blockWhenExhausted Block while waiting for connection to become available, true or false true
testWhileIdle Enables sending a PING command periodically while the connection is idle, true or false false

Publishing

Nstream provides the abstract RedisPublishingAgent which with configuration and extension can publish state to a Redis database.

Here we give a full example of the configuration required to prepare a RedisPublishingAgent - we will discuss the implementation of the MyRedisPublishingAgent in the next section.

# server.recon
provisions: {
  @provision("redisPool") {
    class: "nstream.adapter.redis.PoolProvision"
    def: {
      "host": "localhost"
      "port": 6379
    }
  }
}

vehicle: @fabric {
  @plane(class: "nstream.adapter.runtime.AppPlane")
  
  @node {
    uri: "/redis/publisher"
    @agent(class: "vehicle.redis.MyRedisPublishingAgent") {
      redisEgressConf: @redisEgressSettings {
        poolProvisionName: redisPool
      }
    }
  }
  
  # ... other agents

}

Extension

Using configuration to handle the connection pool removes the need for most boilerplate in the implementation of the agent. The RedisPublishingAgent has a type parameter <P>, this is the value type to be published, we will use the Redis hash value type in our example:

public class MyRedisPublishingAgent extends RedisPublishingAgent<Map<String, String>> {

To complete extension, we have to implement two methods:

  @Override
  protected void publish(Value value) throws DeferrableException {
    // Convert the value to be published into the parameterized type and call the below
  }

  @Override
  protected void publish(P value) throws DeferrableException {
    // Publish the Redis value
  }

While implementing these methods, the pool variable is available, this is the configured connection pool object.

The first method can be implemented generally using a Swim Form to cast the Value to an accepted Redis type. The RedisForms class provides static methods (such as forHash() and forSet()) to help with most cases.

  @Override
  protected void publish(Value value) throws DeferrableException {
    publish(RedisForms.forHash().cast(value));
  }

The second method is where the publishing call is implemented. This will vary depending on the desired behaviour and type but here we continue our example and show how to set a hash:

@Override
  protected void publish(Map<String, String> value) {
    this.pool.hset(value.get("id"), value);
  }

Publish On Event

The simplest method of publication is to publish any event or update received by a lane. All we need do is call the publish(Value value) method from the appropriate callback of the lane.

public class MyRedisPublishingAgent extends RedisPublishingAgent<Map<String, String>> {

  @SwimLane("insert")
  CommandLane<Value> insert = this.<Value>commandLane()
      .onCommand(this::publish);

  @Override
  protected void publish(Value value) {
    publish(RedisForms.forHash().cast(value));
  }

  @Override
  protected void publish(Map<String, String> vehicle) {
    this.pool.hset(
        "vehicle:" + vehicle.get("id"),
        vehicle
    );
  }

}

In this example we use a command lane to insert values into a Redis database. The didSet callback on a ValueLane could also be used.

Periodic

Another common approach to publication would be to publish the state of the agent periodically. In this case we must manage a timer, making use of the scheduling methods provided. To schedule a timer after setup we place the code in the stagePublication lifecycle callback - making sure to call the super.stagePublication() first.

public class MyRedisPublishingAgent extends RedisPublishingAgent<Map<String, String>> {

  private TimerRef timerRef;

  @SwimLane("state")
  ValueLane<Value> state = this.<Value>valueLane();

  private void publishState() {
    publish(this.state.get());
  }

  @Override
  protected void publish(Value value) {
    publish(RedisForms.forHash().cast(value));
  }

  @Override
  protected void publish(Map<String, String> vehicle) {
    this.pool.hset(
        "vehicle:" + vehicle.get("id"),
        vehicle
    );
  }

  @Override
  protected void stagePublication() {
    super.stagePublication();
    this.timerRef = scheduleAtFixedRate(() -> this.timerRef, 10000, 60000, this::publishState);
  }

}

In this example, every minute the agent will set its current state to the database.


Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).