Redis Ingress
Nstream provides a Redis Adapter library that greatly facilitates ingestion from Redis databases. This guide demonstrates how to start polling Redis and process records in Web Agents using minimal boilerplate.
Prerequisites
Dependencies
Gradle
implementation 'io.nstream:nstream-adapter-redis:4.11.19'
Maven
<dependency>
<groupId>io.nstream</groupId>
<artifactId>nstream-adapter-redis</artifactId>
<version>4.11.19</version>
<type>module</type>
</dependency>
Glossary
- ingestion: The act of converting external messages into events utilized by Web Agents
-
RedisIngestingAgent:
An abstract Web Agent class with built-in methods relevant to Redis querying -
RedisIngestingPatch:
A concrete (but extendable) agent that collects records intoswim.structure.Values
-
RedisIngressSettings:
A plain old Java object (POJO) that configures a single Redis agent or patch above
Polling
If:
- Your query is static
- Response bodies are converted into
Values
during ingestion - The recurring timing can resemble ScheduledExecutorService#scheduleAtFixedRate
Then you simply need to configure a RedisIngestingPatch
in the server.receon
file:
# server.recon
provisions: {
@provision("redisPool") {
class: "nstream.adapter.redis.PoolProvision"
def: {
"host": "localhost"
"port": 6379
}
}
}
vehicle: @fabric {
@plane(class: "nstream.adapter.runtime.AppPlane")
@node {
uri: "/redis/poller"
@agent(class: "nstream.adapter.redis.RedisIngestingPatch") {
redisIngressConf: @redisIngressSettings {
firstFetchDelayMillis: 0
fetchIntervalMillis: 120000
poolProvisionName: redisPool
type: "hash"
match: "vehicle:*"
relaySchema: @command {
nodeUri: {
"/vehicle/",
$value.id
}
laneUri: "update"
}
}
}
}
}
Configuration
The presence or absence of three properties (key
, type
, match
) will configure the patch in one of two possible ways:
Multiple Key Scan
If you would like to scan multiple key-value pairs with a single poll then the above configuration is correct for your case. The available properties are:
Name | Description | Required | Default | Example |
---|---|---|---|---|
match |
The pattern of keys to include in scan | False | Will scan all keys | user:* |
type |
The key-value type to scan for and return | False | Will infer on each scan | hash |
It is recommended to use the type
property to save extra calls inferring the value type.
Note: If all properties are omitted, the whole database will be scanned.
Single Key Query
It may be the case that you just want to retrieve the value of a single key every poll, for this use the key
property.
The available properties are:
Name | Description | Required | Default | Example |
---|---|---|---|---|
key |
The key | True | properties |
|
type |
The value type of the key | False | Will infer on first scan | hash |
Common Variations
Timing Strategy
The default timing strategy fires a Redis scan task with a fixed period between execution, with the class’s Agent#stageReception()
callback initiating the process.
There are two aspects that you may wish to change:
- To alter the periodicity strategy, override
RedisIngestingAgent#stageReception
to not invokeNstreamAgent#scheduleWithFixedDelay
- To disable the automatic task startup, override
didStart()
to not callstageReception()
A rather common alternative for the latter is to instead invoke stageReception()
from the onCommand()
callback of some CommandLane
.
Custom Ingest Logic
If relaying records to another agent is not sufficient, or you would like to add custom logic on ingestion, the RedisIngestingPatch
can be easily overridden.
Create a new class that extends the RedisIngestingPatch
and override the ingest
method.
@Override
protected void ingest(Value value) throws DeferrableException {
// Custom ingestion of record as a Swim Value
}
Remember to update your server.recon
file with the new agent you just created, instead of the RedisIngestingPatch
.
Streams
The Nstream Redis adapter also provides an ingesting patch that can be configured to read from a Redis stream and ingest new entries.
To ingest a Redis stream simply configure a RedisStreamIngestingPatch
in the server.recon
file:
# server.recon
provisions: {
@provision("redisPool") {
class: "nstream.adapter.redis.PoolProvision"
def: {
"host": "localhost"
"port": 6379
}
}
}
vehicle: @fabric {
@plane(class: "nstream.adapter.runtime.AppPlane")
@node {
uri: "/redis/stream"
@agent(class: "nstream.adapter.redis.RedisStreamIngestingPatch") {
redisIngressConf: @redisIngressSettings {
poolProvisionName: redisPool
streams: "trucks cars 0 $"
relaySchema: @command {
nodeUri: {
"/vehicle/",
$value.id
}
laneUri: "update"
}
}
}
}
}
The settings available are similar to those provided with the Redis CLI:
Name | Description | Required |
---|---|---|
streams |
Space separated list of stream names followed by starting IDs - as would be used with Redis CLI | True |
block |
The time in milliseconds to block for | False |
count |
The amount of entries to retrieve in a single read | False |
groupName |
The name of the consumer group | False |
consumer |
The name of the consumer within the group | False |
As with the Redis XREAD
operation, the majority of starting IDs will either be 0
, to read all entries, or $
, to read only new entries.
To use consumer groups ensure the groupName
and consumer
settings are populated.
As with the Redis XREADGROUP
operation, the majority of starting IDs will either be 0
, to read received but not acknowledged entries, or >
, to read only new entries.
Common Variations
Custom Ingest Logic
If relaying stream entries to another agent is not sufficient, or you would like to add custom logic on ingestion of entries, the RedisStreamIngestingPatch
can be easily overridden.
Create a new class that extends the RedisStreamIngestingPatch
and override one of the ingest
methods, depending on whether you would like to ingest a Map.Entry<String, StreamEntry>
or Value
type.
@Override
protected void ingest(Map.Entry<String, StreamEntry> unstructured) throws DeferrableException {
// Custom ingestion of a (Stream Name, Stream Entry) map entry
}
@Override
protected void ingest(Value structured) throws DeferrableException {
// Custom ingestion of stream entry as a Swim Value
}
Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).