Redis Ingress

Nstream provides a Redis Adapter library that greatly facilitates ingestion from Redis databases. This guide demonstrates how to start polling Redis and process records in Web Agents using minimal boilerplate.

Prerequisites

Dependencies

Gradle

implementation 'io.nstream:nstream-adapter-redis:4.11.19'

Maven

<dependency>
  <groupId>io.nstream</groupId>
  <artifactId>nstream-adapter-redis</artifactId>
  <version>4.11.19</version>
  <type>module</type>
</dependency>

Glossary

Polling

If:

Then you simply need to configure a RedisIngestingPatch in the server.receon file:

# server.recon
provisions: {
  @provision("redisPool") {
    class: "nstream.adapter.redis.PoolProvision"
    def: {
      "host": "localhost"
      "port": 6379
    }
  }
}

vehicle: @fabric {
  @plane(class: "nstream.adapter.runtime.AppPlane")
  
  @node {
    uri: "/redis/poller"
    @agent(class: "nstream.adapter.redis.RedisIngestingPatch") {
      redisIngressConf: @redisIngressSettings {
        firstFetchDelayMillis: 0
        fetchIntervalMillis: 120000
        poolProvisionName: redisPool
        type: "hash"
        match: "vehicle:*"
        relaySchema: @command {
          nodeUri: {
            "/vehicle/",
            $value.id
          }
          laneUri: "update"
        }
      }
    }
  }

}

Configuration

The presence or absence of three properties (key, type, match) will configure the patch in one of two possible ways:

Multiple Key Scan

If you would like to scan multiple key-value pairs with a single poll then the above configuration is correct for your case. The available properties are:

Name Description Required Default Example
match The pattern of keys to include in scan False Will scan all keys user:*
type The key-value type to scan for and return False Will infer on each scan hash

It is recommended to use the type property to save extra calls inferring the value type.

Note: If all properties are omitted, the whole database will be scanned.

Single Key Query

It may be the case that you just want to retrieve the value of a single key every poll, for this use the key property. The available properties are:

Name Description Required Default Example
key The key True   properties
type The value type of the key False Will infer on first scan hash

Common Variations

Timing Strategy

The default timing strategy fires a Redis scan task with a fixed period between execution, with the class’s Agent#stageReception() callback initiating the process. There are two aspects that you may wish to change:

A rather common alternative for the latter is to instead invoke stageReception() from the onCommand() callback of some CommandLane.

Custom Ingest Logic

If relaying records to another agent is not sufficient, or you would like to add custom logic on ingestion, the RedisIngestingPatch can be easily overridden. Create a new class that extends the RedisIngestingPatch and override the ingest method.

  @Override
  protected void ingest(Value value) throws DeferrableException {
    // Custom ingestion of record as a Swim Value
  }

Remember to update your server.recon file with the new agent you just created, instead of the RedisIngestingPatch.

Streams

The Nstream Redis adapter also provides an ingesting patch that can be configured to read from a Redis stream and ingest new entries.

To ingest a Redis stream simply configure a RedisStreamIngestingPatch in the server.recon file:

# server.recon
provisions: {
  @provision("redisPool") {
    class: "nstream.adapter.redis.PoolProvision"
    def: {
      "host": "localhost"
      "port": 6379
    }
  }
}

vehicle: @fabric {
  @plane(class: "nstream.adapter.runtime.AppPlane")
  
  @node {
    uri: "/redis/stream"
    @agent(class: "nstream.adapter.redis.RedisStreamIngestingPatch") {
      redisIngressConf: @redisIngressSettings {
        poolProvisionName: redisPool
        streams: "trucks cars 0 $"
        relaySchema: @command {
          nodeUri: {
            "/vehicle/",
            $value.id
          }
          laneUri: "update"
        }
      }
    }
  }

}

The settings available are similar to those provided with the Redis CLI:

Name Description Required
streams Space separated list of stream names followed by starting IDs - as would be used with Redis CLI True
block The time in milliseconds to block for False
count The amount of entries to retrieve in a single read False
groupName The name of the consumer group False
consumer The name of the consumer within the group False

As with the Redis XREAD operation, the majority of starting IDs will either be 0, to read all entries, or $, to read only new entries.

To use consumer groups ensure the groupName and consumer settings are populated. As with the Redis XREADGROUP operation, the majority of starting IDs will either be 0, to read received but not acknowledged entries, or >, to read only new entries.

Common Variations

Custom Ingest Logic

If relaying stream entries to another agent is not sufficient, or you would like to add custom logic on ingestion of entries, the RedisStreamIngestingPatch can be easily overridden. Create a new class that extends the RedisStreamIngestingPatch and override one of the ingest methods, depending on whether you would like to ingest a Map.Entry<String, StreamEntry> or Value type.

  @Override
  protected void ingest(Map.Entry<String, StreamEntry> unstructured) throws DeferrableException {
    // Custom ingestion of a (Stream Name, Stream Entry) map entry
  }

  @Override
  protected void ingest(Value structured) throws DeferrableException {
    // Custom ingestion of stream entry as a Swim Value
  }

Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).