DynamoDB Egress
Nstream provides a DynamoDB Adapter library that greatly facilitates publishing to DynamoDB tables. This guide demonstrates how to start a DynamoDB client and publish state from Web Agents using minimal boilerplate.
Prerequisites
Dependencies
Gradle
implementation 'io.nstream:nstream-adapter-dynamodb:4.12.20'
Maven
<dependency>
<groupId>io.nstream</groupId>
<artifactId>nstream-adapter-dynamodb</artifactId>
<version>4.12.20</version>
<type>module</type> <!-- Remove or comment this line for non-modular projects -->
</dependency>
Publishing
Nstream provides the abstract DynamoDbPublishingAgent
which with configuration and extension can publish state to DynamoDB tables.
Configuration
Here we give a full example of the configuration required to prepare a DynamoDbPublishingAgent
- we will discuss the implementation of the MyDynamoDbPublishingAgent
in the next section.
# server.recon
provisions: {
@provision("dynamoClient") {
class: "nstream.adapter.dynamodb.ClientProvision"
def: {
"region": "us-west-1"
"endpointOverride": "http://dynamodb:8000"
}
}
}
vehicle: @fabric {
@plane(class: "nstream.adapter.runtime.AppPlane")
@node {
uri: "/dynamo/publisher"
@agent(class: "vehicle.dynamodb.MyDynamoDbPublishingAgent") {
dynamoDbEgressConf: @dynamoDbEgressSettings {
clientProvisionName: dynamoClient
table: vehicles
}
}
}
}
Note: The table
property is optional but will be useful in the next section.
Extension
Using configuration to handle the client removes the need for most boilerplate in the implementation of the agent.
Extending the DynamoDbPublishingAgent
requires implementation of two publish methods:
@Override
protected void publish(Value value) throws DeferrableException {
// Convert the value to be published into an attribute map and call the below
}
@Override
protected void publish(Map<String, AttributeValue> publishable) throws DeferrableException {
// Publish the DynamoDB item
}
While implementing these methods, egressSettings
and client
variables are available, as defined in the configuration shown in the previous section.
The first method, which converts a Value
to a publishable AttributeValue
map, will vary greatly depending on the format of the data and table schema.
Here we give a simple example where we extract some key fields to build a map using the DynamoDB client libraries:
@Override
protected void publish(Value value) {
final Map<String, AttributeValue> item = new HashMap<>();
item.put("id", AttributeValue.fromN(value.get("id").stringValue()));
item.put("longitude", AttributeValue.fromN(value.get("longitude").stringValue()));
item.put("latitude", AttributeValue.fromN(value.get("latitude").stringValue()));
publish(item);
}
The second method is where the publishing command is implemented, this will vary depending on the desired behaviour (put, update, etc). Here we give an implementation that will insert (or replace if the key already exists) the item into the table:
@Override
protected void publish(Map<String, AttributeValue> publishable) {
PutItemRequest request = PutItemRequest.builder().tableName(this.egressSettings.table()).item(publishable).build();
this.client.putItem(request);
}
Finally, we must now decide when to publish state to the DynamoDB table. There are two common approaches which we will explore in the following sections (with full agent examples for each).
Publish On Event
The simplest method of publication is to publish any event or update received by a lane.
All we need do is call the publish(Value value)
method from the appropriate callback of the lane.
public class MyDynamoDbPublishingAgent extends DynamoDbPublishingAgent {
@SwimLane("putItem")
CommandLane<Value> putItem = this.<Value>commandLane()
.onCommand(this::publish);
@Override
protected void publish(Value value) {
final Map<String, AttributeValue> item = new HashMap<>();
item.put("id", AttributeValue.fromN(value.get("id").stringValue()));
item.put("longitude", AttributeValue.fromN(value.get("longitude").stringValue()));
item.put("latitude", AttributeValue.fromN(value.get("latitude").stringValue()));
publish(item);
}
@Override
protected void publish(Map<String, AttributeValue> publishable) {
PutItemRequest request = PutItemRequest.builder().tableName(this.egressSettings.table()).item(publishable).build();
this.client.putItem(request);
}
}
In this example we use a command lane putItem
to put items into a DynamoDB table.
The didSet
callback on a ValueLane
would be just as valid.
Periodic
Another common approach to publication would be to publish the state of the agent periodically.
In this case we must manage a timer, making use of the scheduling methods provided.
To schedule a timer after setup we place the code in the stagePublication
lifecycle callback - making sure to call the super.stagePublication()
first.
public class MyDynamoDbPublishingAgent extends DynamoDbPublishingAgent {
private TimerRef timerRef;
@SwimLane("state")
ValueLane<Value> state = this.<Value>valueLane();
private void publishState() {
publish(this.state.get());
}
@Override
protected void publish(Value value) {
final Map<String, AttributeValue> item = new HashMap<>();
item.put("id", AttributeValue.fromN(value.get("id").stringValue()));
item.put("longitude", AttributeValue.fromN(value.get("longitude").stringValue()));
item.put("latitude", AttributeValue.fromN(value.get("latitude").stringValue()));
publish(item);
}
@Override
protected void publish(Map<String, AttributeValue> publishable) {
PutItemRequest request = PutItemRequest.builder().tableName(this.egressSettings.table()).item(publishable).build();
this.client.putItem(request);
}
@Override
protected void stagePublication() {
super.stagePublication();
this.timerRef = scheduleAtFixedRate(() -> this.timerRef, 10000, 60000, this::publishState);
}
}
In this example, every minute the agent will put
the state of the agent into the DynamoDB table.
Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).