DynamoDB Ingress

Nstream provides a DynamoDB Adapter library that greatly facilitates ingestion from DynamoDB databases and tables. This guide demonstrates how to start scanning Dynamo and process items in Web Agents using minimal boilerplate.




implementation 'io.nstream:nstream-adapter-dynamodb:4.6.9'






Then you simply need to configure a DynamoDbIngestingPatch in the server.receon file:

# server.recon
provisions: {
  @provision("dynamoClient") {
    class: "nstream.adapter.dynamodb.ClientProvision"
    def: {
      "region": "us-west-1"
      "endpointOverride": "http://dynamodb:8000"

vehicle: @fabric {
  @plane(class: "nstream.adapter.runtime.AppPlane")
  @node {
    uri: "/dynamo/poller"
    @agent(class: "nstream.adapter.dynamodb.DynamoDbIngestingPatch") {
      dynamoDbIngressConf: @dynamoDbIngressSettings {
        firstFetchDelayMillis: 0
        fetchIntervalMillis: 120000
        clientProvisionName: dynamoClient
        table: vehicles
        relaySchema: @command {
          nodeUri: {
          laneUri: "update"


Here we configure a client for a local DynamoDB instance.

Note that you’ll need to include any driver dependencies (in this case dynamodb) in your classpath.

Common Variations

Timing Strategy

The default timing strategy fires a DynamoDB scan task with a fixed period between fires (regardless of task duration), with the class’s Agent#didStart() callback initiating the process. There are two aspects that you may wish to change:

A rather common alternative for the latter is to instead invoke stageReception() from the onCommand() callback of some CommandLane.

Custom Ingest Logic

If relaying items to another agent is not sufficient, or you would like to add custom logic on ingestion of items, the DynamoDbIngestingPatch can be easily overridden. Create a new class that extends the DynamoDbIngestingPatch and override one of the ingest methods, depending on whether you would like to ingest a Map<String, AtrributeValue> or Value type.

  protected void ingest(Map<String, AtrributeValue> unstructured) throws DeferrableException {
    // Custom ingestion of an item as an attribute map

  protected void ingest(Value structured) throws DeferrableException {
    // Custom ingestion of an item as a Swim Value

Remember to update your server.recon file with the new agent you just created, instead of the DynamoDbIngestingPatch.

Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).