Redis Ingress
Nstream provides a Redis Adapter library that greatly facilitates ingestion from Redis databases. This guide demonstrates how to poll or stream from Redis and process records in Web Agents using minimal boilerplate.
Prerequisites
Dependencies
Gradle
implementation 'io.nstream:nstream-adapter-redis:4.15.23'
Maven
<dependency>
<groupId>io.nstream</groupId>
<artifactId>nstream-adapter-redis</artifactId>
<version>4.15.23</version>
<type>module</type> <!-- Remove or comment this line for non-modular projects -->
</dependency>
Configure a Client
Firstly, we configure a Redis client in the form of a connection pool provision, to be used by all Redis agents:
# server.recon
provisions: {
@provision("redisPool") {
class: "nstream.adapter.redis.PoolProvision"
def: {
"host": "localhost"
"port": 6379
}
}
}
This a very simple example but many settings are available to configure the pool:
Name | Description | Default |
---|---|---|
host |
Hostname of Redis instance to connect to | localhost |
port |
Port number of Redis instance to connect to | 6379 |
user |
Redis user | |
password |
Redis password | |
database |
Redis database to connect to | 0 |
clientName |
Client name to use for connection | |
ssl |
Whether to use SSL, true or false
|
false |
caCertPath |
Path to truststore when using SSL | |
caCertPassword |
Password to truststore when using SSL | |
userCertPath |
Path to user certificate when using SSL | |
userCertPassword |
Password to user certificate when using SSL | |
timeoutMillis |
Connection and socket timeout in ms | 2000 |
connectionTimeoutMillis |
Connection timeout in ms | 2000 |
socketTimeoutMillis |
Socket timeout in ms | 2000 |
blockingSocketTimeoutMillis |
Blocking socket timeout in ms | 0 |
maxTotal |
Maximum active connections in pool | 8 |
maxIdle |
Maximum idle connections in pool | 8 |
minIdle |
Minimum idle connections in pool | 0 |
maxWait |
Maximum number of ms to wait for a connection to become available | -1000 |
timeBetweenEvictionRuns |
Time in ms between checking for idle connections in pool | -1000 |
blockWhenExhausted |
Block while waiting for connection to become available, true or false
|
true |
testWhileIdle |
Enables sending a PING command periodically while the connection is idle, true or false
|
false |
Polling
If:
- Your query is static
- Response bodies are converted into
Values
during ingestion - The recurring timing can resemble ScheduledExecutorService#scheduleAtFixedRate
Then you simply need to configure a RedisIngestingPatch
in the server.receon
file:
# server.recon
provisions: {
@provision("redisPool") {
class: "nstream.adapter.redis.PoolProvision"
def: {
"host": "localhost"
"port": 6379
}
}
}
vehicle: @fabric {
@plane(class: "nstream.adapter.runtime.AppPlane")
@node {
uri: "/redis/poller"
@agent(class: "nstream.adapter.redis.RedisIngestingPatch") {
redisIngressConf: @redisIngressSettings {
firstFetchDelayMillis: 0
fetchIntervalMillis: 120000
poolProvisionName: redisPool
type: "hash"
match: "vehicle:*"
relaySchema: @command {
nodeUri: "/vehicle/$value.id
laneUri: "update"
}
}
}
}
# ... other agents
}
Configuration
The presence or absence of three properties (key
, type
, match
) will configure the patch in one of two possible ways:
Multiple Key Scan
If you would like to scan multiple key-value pairs with a single poll then the above configuration is correct for your case. The available properties are:
Name | Description | Required | Default | Example |
---|---|---|---|---|
match |
The pattern of keys to include in scan | False | Will scan all keys | user:* |
type |
The value type to scan for and return. Supported types: [string , hash , set , zset , list ] |
False | Will infer on each scan | hash |
It is recommended to use the type
property to save extra calls inferring the value type.
Note: If all properties are omitted, the whole database will be scanned.
Single Key Query
It may be the case that you just want to retrieve the value of a single key every poll, for this use the key
property.
The available properties are:
Name | Description | Required | Default | Example |
---|---|---|---|---|
key |
The key | True | properties |
|
type |
The value type. Supported types: [string , hash , set , zset , list ] |
False | Will infer on first scan | hash |
Common Variations
The RedisForms
helper class provides standard Swim Forms for most common Redis return types, such as forHash()
and forZset()
, which may prove useful when implementing custom behaviour.
Timing Strategy
The default timing strategy fires a Redis scan task with a fixed period between execution, with the class’s Agent#stageReception()
callback initiating the process.
There are two aspects that you may wish to change:
- To alter the periodicity strategy, override
RedisIngestingAgent#stageReception
to not invokeNstreamAgent#scheduleWithFixedDelay
- To disable the automatic task startup, override
didStart()
to not callstageReception()
A rather common alternative for the latter is to instead invoke stageReception()
from the onCommand()
callback of some CommandLane
.
Custom Ingest Logic
If relaying records to another agent is not sufficient, or you would like to add custom logic on ingestion, the RedisIngestingPatch
can be easily overridden.
Create a new class that extends the RedisIngestingPatch
and override the ingest
method.
@Override
protected void ingest(Value value) throws DeferrableException {
// Custom ingestion of record as a Swim Value
}
Remember to update your server.recon
file with the new agent you just created, instead of the RedisIngestingPatch
.
Streams
The Nstream Redis adapter also provides an ingesting patch that can be configured to read from Redis streams and ingest new entries.
To ingest Redis streams simply configure a RedisStreamIngestingPatch
in the server.recon
file:
# server.recon
provisions: {
@provision("redisPool") {
class: "nstream.adapter.redis.PoolProvision"
def: {
"host": "localhost"
"port": 6379
}
}
}
vehicle: @fabric {
@plane(class: "nstream.adapter.runtime.AppPlane")
@node {
uri: "/redis/stream"
@agent(class: "nstream.adapter.redis.RedisStreamIngestingPatch") {
redisIngressConf: @redisIngressSettings {
poolProvisionName: redisPool
streams: "trucks cars 0 $"
relaySchema: @command {
nodeUri: "/vehicle/$value.id"
laneUri: "update"
}
}
}
}
# ... other agents
}
The settings available are similar to those provided with the Redis CLI:
Name | Description | Required |
---|---|---|
streams |
Space separated list of stream names followed by starting IDs - as would be used with Redis CLI | True |
block |
The time in milliseconds to block for | False |
count |
The amount of entries to retrieve in a single read | False |
groupName |
The name of the consumer group | False |
consumer |
The name of the consumer within the group | False |
As with the Redis XREAD
operation, the majority of starting IDs will either be 0
, to read all entries, or $
, to read only new entries.
To use consumer groups ensure the groupName
and consumer
settings are populated.
As with the Redis XREADGROUP
operation, the majority of starting IDs will either be 0
, to read received but not acknowledged entries, or >
, to read only new entries.
Common Variations
The type of an item to be ingested from a Redis stream is Map.Entry<String, StreamEntry>
, where the key of the map entry is the stream name.
This allows the origin stream to be identified in cases where multiple streams are being ingested.
The RedisForms.forStreamEntry()
helper method provides a Swim Form for easy conversion to and from a Value
.
Custom Ingest Logic
If relaying stream entries to another agent is not sufficient, or you would like to add custom logic on ingestion of entries, the RedisStreamIngestingPatch
can be easily overridden.
Create a new class that extends the RedisStreamIngestingPatch
and override one of the ingest
methods, depending on whether you would like to ingest a Map.Entry<String, StreamEntry>
or Value
type.
@Override
protected void ingest(Map.Entry<String, StreamEntry> unstructured) throws DeferrableException {
// Custom ingestion of a (Stream Name, Stream Entry) map entry
}
@Override
protected void ingest(Value structured) throws DeferrableException {
// Custom ingestion of stream entry as a Swim Value
}
Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).