MongoDB Egress
Nstream provides a MongoDB Adapter library that greatly facilitates publishing to MongoDB collections. This guide demonstrates how to start a MongoDB client and publish state from Web Agents using minimal boilerplate.
Prerequisites
Dependencies
Gradle
implementation 'io.nstream:nstream-adapter-mongodb:4.12.20'
Maven
<dependency>
<groupId>io.nstream</groupId>
<artifactId>nstream-adapter-mongodb</artifactId>
<version>4.12.20</version>
<type>module</type> <!-- Remove or comment this line for non-modular projects -->
</dependency>
Publishing
Nstream provides the abstract MongoDbPublishingAgent
which with configuration and extension can publish state to MongoDB collections.
Configuration
Here we give a full example of the configuration required to prepare a MongoDbPublishingAgent
- we will discuss the implementation of the MyMongoDbPublishingAgent
in the next section.
# server.recon
provisions: {
@provision("mongoClient") {
class: "nstream.adapter.mongodb.ClientProvision"
def: {
"connectionString": "mongodb://mongo/myMongoDatabase"
}
}
}
vehicle: @fabric {
@plane(class: "nstream.adapter.runtime.AppPlane")
@node {
uri: "/mongo/publisher"
@agent(class: "vehicle.mongodb.MyMongoDbPublishingAgent") {
mongoDbEgressConf: @mongoDbEgressSettings {
clientProvisionName: mongoClient
database: myDatabase
collection: vehicles
}
}
}
}
Note: database
and collection
properties are optional but will be useful in the next section.
Extension
Using configuration to handle the client removes the need for most boilerplate in the implementation of the agent.
Extending the MongoDbPublishingAgent
requires implementation of two publish methods:
@Override
protected void publish(Value value) throws DeferrableException {
// Convert the value to be published into a MongoDB object and call the below
}
@Override
protected void publish(Document document) throws DeferrableException {
// Publish the MongoDB object
}
While implementing these methods, egressSettings
and client
variables are available, as defined in the configuration shown in the previous section.
The first method can be implemented generally by using Json as an intermediary:
@Override
protected void publish(Value value) throws DeferrableException {
publish(Document.parse(Json.toString(value)));
}
Note: While this is an easy to implement general solution, it may be more performant to convert directly from a Value
to a Document
.
The second method is where the publishing query is implemented, this will vary depending on the desired behaviour (insert, update, replace, etc). In the following sections we give examples of two common patterns.
Publish On Event
The simplest method of publication is to publish any event or update received by a lane.
All we need do is call the publish(Value value)
method from the appropriate callback of the lane.
public class MyMongoDbPublishingAgent extends MongoDbPublishingAgent {
@SwimLane("insert")
CommandLane<Value> insert = this.<Value>commandLane()
.onCommand(this::publish);
@Override
protected void publish(Value structure) {
publish(Document.parse(Json.toString(structure)));
}
@Override
protected void publish(Document publishable) {
this.client.getDatabase(this.egressSettings.database())
.getCollection(this.egressSettings.collection())
.insertOne(publishable);
}
}
In this example we use a command lane to insert
values into a MongoDB collection.
The didSet
callback on a ValueLane
could also be used but an update
or replace
may be more appropriate in this case.
Periodic
Another common approach to publication would be to publish the state of the agent periodically.
In this case we must manage a timer, making use of the scheduling methods provided.
To schedule a timer after setup we place the code in the stagePublication
lifecycle callback - making sure to call the super.stagePublication()
first.
public class MyMongoDbPublishingAgent extends MongoDbPublishingAgent {
private TimerRef timerRef;
@SwimLane("state")
ValueLane<Value> state = this.<Value>valueLane();
private void publishState() {
publish(this.state.get());
}
@Override
protected void publish(Value structure) {
publish(Document.parse(Json.toString(structure)));
}
@Override
protected void publish(Document publishable) {
final Document filter = new Document().append("vehicleId", this.state.get().get("vehicleId").stringValue());
this.client.getDatabase(this.egressSettings.database())
.getCollection(this.egressSettings.collection())
.replaceOne(filter, publishable);
}
@Override
protected void stagePublication() {
super.stagePublication();
this.timerRef = scheduleAtFixedRate(() -> this.timerRef, 10000, 60000, this::publishState);
}
}
In this example, every minute the agent will replace (notice the replaceOne
call) the document in the collection with the same vehicleId
with its current state.
Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).