March 7 - June 27, 2024 - Nstream is hitting the road with Confluent on the #DataInMotionTour! / Learn More

Redis Ingress

Nstream provides a Redis Adapter library that greatly facilitates ingestion from Redis databases. This guide demonstrates how to start polling Redis and process records in Web Agents using minimal boilerplate.

Prerequisites

Dependencies

Gradle

implementation 'io.nstream:nstream-adapter-redis:4.7.10'

Maven

<dependency>
  <groupId>io.nstream</groupId>
  <artifactId>nstream-adapter-redis</artifactId>
  <version>4.7.10</version>
  <type>module</type>
</dependency>

Glossary

Polling

If:

Then you simply need to configure a RedisIngestingPatch in the server.receon file:

# server.recon
provisions: {
  @provision("redisPool") {
    class: "nstream.adapter.redis.PoolProvision"
    def: {
      "host": "localhost"
      "port": 6379
    }
  }
}

vehicle: @fabric {
  @plane(class: "nstream.adapter.runtime.AppPlane")
  
  @node {
    uri: "/redis/poller"
    @agent(class: "nstream.adapter.redis.RedisIngestingPatch") {
      redisIngressConf: @redisDbIngressSettings {
        firstFetchDelayMillis: 0
        fetchIntervalMillis: 120000
        poolProvisionName: redisPool
        type: "hash"
        match: "vehicle:*"
        relaySchema: @command {
          nodeUri: {
            "/vehicle/",
            $value.id
          }
          laneUri: "update"
        }
      }
    }
  }

}

Configuration

The presence or absence of three properties (key, type, match) will configure the patch in one of two possible ways:

Multiple Key Scan

If you would like to scan multiple key-value pairs with a single poll then the above configuration is correct for your case. The available properties are:

Name Description Required Default Example
match The pattern of keys to include in scan False Will scan all keys user:*
type The key-value type to scan for and return False Will infer on each scan hash

It is recommended to use the type property to save extra calls inferring the value type.

Note: If all properties are omitted, the whole database will be scanned.

Single Key Query

It may be the case that you just want to retrieve the value of a single key every poll, for this use the key property. The available properties are:

Name Description Required Default Example
key The key True   properties
type The value type of the key False Will infer on first scan hash

Common Variations

Timing Strategy

The default timing strategy fires a Redis scan task with a fixed period between execution, with the class’s Agent#stageReception() callback initiating the process. There are two aspects that you may wish to change:

A rather common alternative for the latter is to instead invoke stageReception() from the onCommand() callback of some CommandLane.

Custom Ingest Logic

If relaying records to another agent is not sufficient, or you would like to add custom logic on ingestion, the RedisIngestingPatch can be easily overridden. Create a new class that extends the RedisIngestingPatch and override the ingest method.

  @Override
  protected void ingest(Value value) throws DeferrableException {
    // Custom ingestion of record as a Swim Value
  }

Remember to update your server.recon file with the new agent you just created, instead of the RedisIngestingPatch.


Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).