Redis Ingress

Nstream provides a Redis Adapter library that greatly facilitates ingestion from Redis databases. This guide demonstrates how to poll or stream from Redis and process records in Web Agents using minimal boilerplate.

Prerequisites

Dependencies

Gradle

implementation 'io.nstream:nstream-adapter-redis:4.15.23'

Maven

<dependency>
  <groupId>io.nstream</groupId>
  <artifactId>nstream-adapter-redis</artifactId>
  <version>4.15.23</version>
  <type>module</type> <!-- Remove or comment this line for non-modular projects -->
</dependency>

Configure a Client

Firstly, we configure a Redis client in the form of a connection pool provision, to be used by all Redis agents:

# server.recon
provisions: {
  @provision("redisPool") {
    class: "nstream.adapter.redis.PoolProvision"
    def: {
      "host": "localhost"
      "port": 6379
    }
  }
}

This a very simple example but many settings are available to configure the pool:

Name Description Default
host Hostname of Redis instance to connect to localhost
port Port number of Redis instance to connect to 6379
user Redis user  
password Redis password  
database Redis database to connect to 0
clientName Client name to use for connection  
ssl Whether to use SSL, true or false false
caCertPath Path to truststore when using SSL  
caCertPassword Password to truststore when using SSL  
userCertPath Path to user certificate when using SSL  
userCertPassword Password to user certificate when using SSL  
timeoutMillis Connection and socket timeout in ms 2000
connectionTimeoutMillis Connection timeout in ms 2000
socketTimeoutMillis Socket timeout in ms 2000
blockingSocketTimeoutMillis Blocking socket timeout in ms 0
maxTotal Maximum active connections in pool 8
maxIdle Maximum idle connections in pool 8
minIdle Minimum idle connections in pool 0
maxWait Maximum number of ms to wait for a connection to become available -1000
timeBetweenEvictionRuns Time in ms between checking for idle connections in pool -1000
blockWhenExhausted Block while waiting for connection to become available, true or false true
testWhileIdle Enables sending a PING command periodically while the connection is idle, true or false false

Polling

If:

Then you simply need to configure a RedisIngestingPatch in the server.receon file:

# server.recon
provisions: {
  @provision("redisPool") {
    class: "nstream.adapter.redis.PoolProvision"
    def: {
      "host": "localhost"
      "port": 6379
    }
  }
}

vehicle: @fabric {
  @plane(class: "nstream.adapter.runtime.AppPlane")
  
  @node {
    uri: "/redis/poller"
    @agent(class: "nstream.adapter.redis.RedisIngestingPatch") {
      redisIngressConf: @redisIngressSettings {
        firstFetchDelayMillis: 0
        fetchIntervalMillis: 120000
        poolProvisionName: redisPool
        type: "hash"
        match: "vehicle:*"
        relaySchema: @command {
          nodeUri: "/vehicle/$value.id
          laneUri: "update"
        }
      }
    }
  }
  
  # ... other agents

}

Configuration

The presence or absence of three properties (key, type, match) will configure the patch in one of two possible ways:

Multiple Key Scan

If you would like to scan multiple key-value pairs with a single poll then the above configuration is correct for your case. The available properties are:

Name Description Required Default Example
match The pattern of keys to include in scan False Will scan all keys user:*
type The value type to scan for and return. Supported types: [string, hash, set, zset, list] False Will infer on each scan hash

It is recommended to use the type property to save extra calls inferring the value type.

Note: If all properties are omitted, the whole database will be scanned.

Single Key Query

It may be the case that you just want to retrieve the value of a single key every poll, for this use the key property. The available properties are:

Name Description Required Default Example
key The key True   properties
type The value type. Supported types: [string, hash, set, zset, list] False Will infer on first scan hash

Common Variations

The RedisForms helper class provides standard Swim Forms for most common Redis return types, such as forHash() and forZset(), which may prove useful when implementing custom behaviour.

Timing Strategy

The default timing strategy fires a Redis scan task with a fixed period between execution, with the class’s Agent#stageReception() callback initiating the process. There are two aspects that you may wish to change:

A rather common alternative for the latter is to instead invoke stageReception() from the onCommand() callback of some CommandLane.

Custom Ingest Logic

If relaying records to another agent is not sufficient, or you would like to add custom logic on ingestion, the RedisIngestingPatch can be easily overridden. Create a new class that extends the RedisIngestingPatch and override the ingest method.

  @Override
  protected void ingest(Value value) throws DeferrableException {
    // Custom ingestion of record as a Swim Value
  }

Remember to update your server.recon file with the new agent you just created, instead of the RedisIngestingPatch.

Streams

The Nstream Redis adapter also provides an ingesting patch that can be configured to read from Redis streams and ingest new entries.

To ingest Redis streams simply configure a RedisStreamIngestingPatch in the server.recon file:

# server.recon
provisions: {
  @provision("redisPool") {
    class: "nstream.adapter.redis.PoolProvision"
    def: {
      "host": "localhost"
      "port": 6379
    }
  }
}

vehicle: @fabric {
  @plane(class: "nstream.adapter.runtime.AppPlane")
  
  @node {
    uri: "/redis/stream"
    @agent(class: "nstream.adapter.redis.RedisStreamIngestingPatch") {
      redisIngressConf: @redisIngressSettings {
        poolProvisionName: redisPool
        streams: "trucks cars 0 $"
        relaySchema: @command {
          nodeUri: "/vehicle/$value.id"
          laneUri: "update"
        }
      }
    }
  }
  
  # ... other agents

}

The settings available are similar to those provided with the Redis CLI:

Name Description Required
streams Space separated list of stream names followed by starting IDs - as would be used with Redis CLI True
block The time in milliseconds to block for False
count The amount of entries to retrieve in a single read False
groupName The name of the consumer group False
consumer The name of the consumer within the group False

As with the Redis XREAD operation, the majority of starting IDs will either be 0, to read all entries, or $, to read only new entries.

To use consumer groups ensure the groupName and consumer settings are populated. As with the Redis XREADGROUP operation, the majority of starting IDs will either be 0, to read received but not acknowledged entries, or >, to read only new entries.

Common Variations

The type of an item to be ingested from a Redis stream is Map.Entry<String, StreamEntry>, where the key of the map entry is the stream name. This allows the origin stream to be identified in cases where multiple streams are being ingested. The RedisForms.forStreamEntry() helper method provides a Swim Form for easy conversion to and from a Value.

Custom Ingest Logic

If relaying stream entries to another agent is not sufficient, or you would like to add custom logic on ingestion of entries, the RedisStreamIngestingPatch can be easily overridden. Create a new class that extends the RedisStreamIngestingPatch and override one of the ingest methods, depending on whether you would like to ingest a Map.Entry<String, StreamEntry> or Value type.

  @Override
  protected void ingest(Map.Entry<String, StreamEntry> unstructured) throws DeferrableException {
    // Custom ingestion of a (Stream Name, Stream Entry) map entry
  }

  @Override
  protected void ingest(Value structured) throws DeferrableException {
    // Custom ingestion of stream entry as a Swim Value
  }

Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).