DynamoDB Egress

Nstream provides a DynamoDB Adapter library that greatly facilitates publishing to DynamoDB tables. This guide demonstrates how to start a DynamoDB client and publish state from Web Agents using minimal boilerplate.

Prerequisites

Dependencies

Gradle

implementation 'io.nstream:nstream-adapter-dynamodb:4.11.19'

Maven

<dependency>
  <groupId>io.nstream</groupId>
  <artifactId>nstream-adapter-dynamodb</artifactId>
  <version>4.11.19</version>
  <type>module</type>
</dependency>

Publishing

Nstream provides the abstract DynamoDbPublishingAgent which with configuration and extension can publish state to DynamoDB tables.

Configuration

Here we give a full example of the configuration required to prepare a DynamoDbPublishingAgent - we will discuss the implementation of the MyDynamoDbPublishingAgent in the next section.

# server.recon
provisions: {
  @provision("dynamoClient") {
    class: "nstream.adapter.dynamodb.ClientProvision"
    def: {
      "region": "us-west-1"
      "endpointOverride": "http://dynamodb:8000"
    }
  }
}

vehicle: @fabric {
  @plane(class: "nstream.adapter.runtime.AppPlane")
  
  @node {
    uri: "/dynamo/publisher"
    @agent(class: "vehicle.dynamodb.MyDynamoDbPublishingAgent") {
      dynamoDbEgressConf: @dynamoDbEgressSettings {
        clientProvisionName: dynamoClient
        table: vehicles
      }
    }
  }

}

Note: The table property is optional but will be useful in the next section.

Extension

Using configuration to handle the client removes the need for most boilerplate in the implementation of the agent. Extending the DynamoDbPublishingAgent requires implementation of two publish methods:

  @Override
  protected void publish(Value value) throws DeferrableException {
    // Convert the value to be published into an attribute map and call the below
  }

  @Override
  protected void publish(Map<String, AttributeValue> publishable) throws DeferrableException {
    // Publish the DynamoDB item
  }

While implementing these methods, egressSettings and client variables are available, as defined in the configuration shown in the previous section.

The first method, which converts a Value to a publishable AttributeValue map, will vary greatly depending on the format of the data and table schema. Here we give a simple example where we extract some key fields to build a map using the DynamoDB client libraries:

  @Override
  protected void publish(Value value) {
    final Map<String, AttributeValue> item = new HashMap<>();
    item.put("id", AttributeValue.fromN(value.get("id").stringValue()));
    item.put("longitude", AttributeValue.fromN(value.get("longitude").stringValue()));
    item.put("latitude", AttributeValue.fromN(value.get("latitude").stringValue()));
    publish(item);
  }

The second method is where the publishing command is implemented, this will vary depending on the desired behaviour (put, update, etc). Here we give an implementation that will insert (or replace if the key already exists) the item into the table:

  @Override
  protected void publish(Map<String, AttributeValue> publishable) {
    PutItemRequest request = PutItemRequest.builder().tableName(this.egressSettings.table()).item(publishable).build();
    this.client.putItem(request);
  }

Finally, we must now decide when to publish state to the DynamoDB table. There are two common approaches which we will explore in the following sections (with full agent examples for each).

Publish On Event

The simplest method of publication is to publish any event or update received by a lane. All we need do is call the publish(Value value) method from the appropriate callback of the lane.

public class MyDynamoDbPublishingAgent extends DynamoDbPublishingAgent {

  @SwimLane("putItem")
  CommandLane<Value> putItem = this.<Value>commandLane()
      .onCommand(this::publish);

  @Override
  protected void publish(Value value) {
    final Map<String, AttributeValue> item = new HashMap<>();
    item.put("id", AttributeValue.fromN(value.get("id").stringValue()));
    item.put("longitude", AttributeValue.fromN(value.get("longitude").stringValue()));
    item.put("latitude", AttributeValue.fromN(value.get("latitude").stringValue()));
    publish(item);
  }

  @Override
  protected void publish(Map<String, AttributeValue> publishable) {
    PutItemRequest request = PutItemRequest.builder().tableName(this.egressSettings.table()).item(publishable).build();
    this.client.putItem(request);
  }

}

In this example we use a command lane putItem to put items into a DynamoDB table. The didSet callback on a ValueLane would be just as valid.

Periodic

Another common approach to publication would be to publish the state of the agent periodically. In this case we must manage a timer, making use of the scheduling methods provided. To schedule a timer after setup we place the code in the stagePublication lifecycle callback - making sure to call the super.stagePublication() first.

public class MyDynamoDbPublishingAgent extends DynamoDbPublishingAgent {

  private TimerRef timerRef;

  @SwimLane("state")
  ValueLane<Value> state = this.<Value>valueLane();

  private void publishState() {
    publish(this.state.get());
  }

  @Override
  protected void publish(Value value) {
    final Map<String, AttributeValue> item = new HashMap<>();
    item.put("id", AttributeValue.fromN(value.get("id").stringValue()));
    item.put("longitude", AttributeValue.fromN(value.get("longitude").stringValue()));
    item.put("latitude", AttributeValue.fromN(value.get("latitude").stringValue()));
    publish(item);
  }

  @Override
  protected void publish(Map<String, AttributeValue> publishable) {
    PutItemRequest request = PutItemRequest.builder().tableName(this.egressSettings.table()).item(publishable).build();
    this.client.putItem(request);
  }

  @Override
  protected void stagePublication() {
    super.stagePublication();
    this.timerRef = scheduleAtFixedRate(() -> this.timerRef, 10000, 60000, this::publishState);
  }

}

In this example, every minute the agent will put the state of the agent into the DynamoDB table.


Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).