DynamoDB Ingress
Nstream provides a DynamoDB Adapter library that greatly facilitates ingestion from DynamoDB databases and tables. This guide demonstrates how to start scanning Dynamo and process items in Web Agents using minimal boilerplate.
Prerequisites
Dependencies
Gradle
implementation 'io.nstream:nstream-adapter-dynamodb:4.12.20'
Maven
<dependency>
<groupId>io.nstream</groupId>
<artifactId>nstream-adapter-dynamodb</artifactId>
<version>4.12.20</version>
<type>module</type> <!-- Remove or comment this line for non-modular projects -->
</dependency>
Glossary
- ingestion: The act of converting external messages into events utilized by Web Agents
-
DynamoDbIngestingAgent:
An abstract Web Agent class with built-in methods relevant to DynamoDB scanning -
DynamoDbIngestingPatch:
A concrete (but extendable) agent that collects items intoswim.structure.Values
-
DynamoDbIngressSettings:
A plain old Java object (POJO) that configures a single DynamoDB agent or patch above
Polling
If:
- Your scan is static
- Response bodies are converted into
Values
during ingestion - The recurring timing can resemble ScheduledExecutorService#scheduleAtFixedRate
Then you simply need to configure a DynamoDbIngestingPatch
in the server.receon
file:
# server.recon
provisions: {
@provision("dynamoClient") {
class: "nstream.adapter.dynamodb.ClientProvision"
def: {
"region": "us-west-1"
"endpointOverride": "http://dynamodb:8000"
}
}
}
vehicle: @fabric {
@plane(class: "nstream.adapter.runtime.AppPlane")
@node {
uri: "/dynamo/poller"
@agent(class: "nstream.adapter.dynamodb.DynamoDbIngestingPatch") {
dynamoDbIngressConf: @dynamoDbIngressSettings {
firstFetchDelayMillis: 0
fetchIntervalMillis: 120000
clientProvisionName: dynamoClient
table: vehicles
relaySchema: @command {
nodeUri: "/vehicle/$id"
laneUri: "update"
}
}
}
}
}
Here we configure a client for a local DynamoDB instance.
Note that you’ll need to include any driver dependencies (in this case dynamodb
) in your classpath.
Common Variations
Timing Strategy
The default timing strategy fires a DynamoDB scan task with a fixed period between fires (regardless of task duration), with the class’s Agent#didStart()
callback initiating the process.
There are two aspects that you may wish to change:
- To alter the periodicity strategy, override
DynamoDbIngestingAgent#stageReception
to not invokeNstreamAgent#scheduleAtFixedRate
- To disable the automatic task startup, override
didStart()
to not callstageReception()
A rather common alternative for the latter is to instead invoke stageReception()
from the onCommand()
callback of some CommandLane
.
Custom Ingest Logic
If relaying items to another agent is not sufficient, or you would like to add custom logic on ingestion of items, the DynamoDbIngestingPatch
can be easily overridden.
Create a new class that extends the DynamoDbIngestingPatch
and override one of the ingest
methods, depending on whether you would like to ingest a Map<String, AtrributeValue>
or Value
type.
@Override
protected void ingest(Map<String, AtrributeValue> unstructured) throws DeferrableException {
// Custom ingestion of an item as an attribute map
}
@Override
protected void ingest(Value structured) throws DeferrableException {
// Custom ingestion of an item as a Swim Value
}
Remember to update your server.recon
file with the new agent you just created, instead of the DynamoDbIngestingPatch
.
Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).