March 7 - June 27, 2024 - Nstream is hitting the road with Confluent on the #DataInMotionTour! / Learn More

Redis Egress

Nstream provides a Redis Adapter library that greatly facilitates publishing to Redis databases. This guide demonstrates how to start a Redis connection pool and publish state from Web Agents using minimal boilerplate.

Prerequisites

Dependencies

Gradle

implementation 'io.nstream:nstream-adapter-redis:4.7.10'

Maven

<dependency>
  <groupId>io.nstream</groupId>
  <artifactId>nstream-adapter-redis</artifactId>
  <version>4.7.10</version>
  <type>module</type>
</dependency>

Publishing

Nstream provides the abstract RedisPublishingAgent which with configuration and extension can publish state to a Redis database.

Configuration

Here we give a full example of the configuration required to prepare a RedisPublishingAgent - we will discuss the implementation of the MyRedisPublishingAgent in the next section.

# server.recon
provisions: {
  @provision("redisPool") {
    class: "nstream.adapter.redis.PoolProvision"
    def: {
      "host": "localhost"
      "port": 6379
    }
  }
}

vehicle: @fabric {
  @plane(class: "nstream.adapter.runtime.AppPlane")
  
  @node {
    uri: "/redis/publisher"
    @agent(class: "vehicle.redis.MyRedisPublishingAgent") {
      redisEgressConf: @redisEgressSettings {
        poolProvisionName: redisPool
      }
    }
  }

}

Extension

Using configuration to handle the connection pool removes the need for most boilerplate in the implementation of the agent. The RedisPublishingAgent has a type parameter <P>, this is the value type to be published, we will use the Redis hash value type in our example:

public class MyRedisPublishingAgent extends RedisPublishingAgent<Map<String, String>> {

To complete extension, we have to implement two methods:

  @Override
  protected void publish(Value value) throws DeferrableException {
    // Convert the value to be published into the parameterized type and call the below
  }

  @Override
  protected void publish(P value) throws DeferrableException {
    // Publish the Redis value
  }

While implementing these methods, the pool variable is available, this is the configured connection pool object.

The first method can be implemented generally using a Swim Form to cast the Value to an accepted Redis type. The RedisForms class provides static methods (such as forHash() and forSet()) to help with most cases.

  @Override
  protected void publish(Value value) throws DeferrableException {
    publish(RedisForms.forHash().cast(value));
  }

The second method is where the publishing call is implemented. This will vary depending on the desired behaviour and type but here we continue our example and show how to set a hash:

@Override
  protected void publish(Map<String, String> value) {
    this.pool.hset(value.get("id"), value);
  }

Publish On Event

The simplest method of publication is to publish any event or update received by a lane. All we need do is call the publish(Value value) method from the appropriate callback of the lane.

public class MyRedisPublishingAgent extends RedisPublishingAgent<Map<String, String>> {

  @SwimLane("insert")
  CommandLane<Value> insert = this.<Value>commandLane()
      .onCommand(this::publish);

  @Override
  protected void publish(Value value) {
    publish(RedisForms.forHash().cast(value));
  }

  @Override
  protected void publish(Map<String, String> vehicle) {
    this.pool.hset(
        "vehicle:" + vehicle.get("id"),
        vehicle
    );
  }

}

In this example we use a command lane to insert values into a Redis database. The didSet callback on a ValueLane could also be used.

Periodic

Another common approach to publication would be to publish the state of the agent periodically. In this case we must manage a timer, making use of the scheduling methods provided. To schedule a timer after setup we place the code in the stagePublication lifecycle callback - making sure to call the super.stagePublication() first.

public class MyRedisPublishingAgent extends RedisPublishingAgent<Map<String, String>> {

  private TimerRef timerRef;

  @SwimLane("state")
  ValueLane<Value> state = this.<Value>valueLane();

  private void publishState() {
    publish(this.state.get());
  }

  @Override
  protected void publish(Value value) {
    publish(RedisForms.forHash().cast(value));
  }

  @Override
  protected void publish(Map<String, String> vehicle) {
    this.pool.hset(
        "vehicle:" + vehicle.get("id"),
        vehicle
    );
  }

  @Override
  protected void stagePublication() {
    super.stagePublication();
    this.timerRef = scheduleAtFixedRate(() -> this.timerRef, 10000, 60000, this::publishState);
  }

}

In this example, every minute the agent will set its current state to the database.


Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).