MongoDB Egress

Nstream provides a MongoDB Adapter library that greatly facilitates publishing to MongoDB collections. This guide demonstrates how to start a MongoDB client and publish state from Web Agents using minimal boilerplate.

Prerequisites

Dependencies

Gradle

implementation 'io.nstream:nstream-adapter-mongodb:4.11.19'

Maven

<dependency>
  <groupId>io.nstream</groupId>
  <artifactId>nstream-adapter-mongodb</artifactId>
  <version>4.11.19</version>
  <type>module</type>
</dependency>

Publishing

Nstream provides the abstract MongoDbPublishingAgent which with configuration and extension can publish state to MongoDB collections.

Configuration

Here we give a full example of the configuration required to prepare a MongoDbPublishingAgent - we will discuss the implementation of the MyMongoDbPublishingAgent in the next section.

# server.recon
provisions: {
  @provision("mongoClient") {
    class: "nstream.adapter.mongodb.ClientProvision"
    def: {
      "connectionString": "mongodb://mongo/myMongoDatabase"
    }
  }
}

vehicle: @fabric {
  @plane(class: "nstream.adapter.runtime.AppPlane")
  
  @node {
    uri: "/mongo/publisher"
    @agent(class: "vehicle.mongodb.MyMongoDbPublishingAgent") {
      mongoDbEgressConf: @mongoDbEgressSettings {
        clientProvisionName: mongoClient
        database: myDatabase
        collection: vehicles
      }
    }
  }

}

Note: database and collection properties are optional but will be useful in the next section.

Extension

Using configuration to handle the client removes the need for most boilerplate in the implementation of the agent. Extending the MongoDbPublishingAgent requires implementation of two publish methods:

  @Override
  protected void publish(Value value) throws DeferrableException {
    // Convert the value to be published into a MongoDB object and call the below
  }

  @Override
  protected void publish(Document document) throws DeferrableException {
    // Publish the MongoDB object
  }

While implementing these methods, egressSettings and client variables are available, as defined in the configuration shown in the previous section.

The first method can be implemented generally by using Json as an intermediary:

  @Override
  protected void publish(Value value) throws DeferrableException {
    publish(Document.parse(Json.toString(value)));
  }

Note: While this is an easy to implement general solution, it may be more performant to convert directly from a Value to a Document.

The second method is where the publishing query is implemented, this will vary depending on the desired behaviour (insert, update, replace, etc). In the following sections we give examples of two common patterns.

Publish On Event

The simplest method of publication is to publish any event or update received by a lane. All we need do is call the publish(Value value) method from the appropriate callback of the lane.

public class MyMongoDbPublishingAgent extends MongoDbPublishingAgent {

  @SwimLane("insert")
  CommandLane<Value> insert = this.<Value>commandLane()
      .onCommand(this::publish);

  @Override
  protected void publish(Value structure) {
    publish(Document.parse(Json.toString(structure)));
  }

  @Override
  protected void publish(Document publishable) {
    this.client.getDatabase(this.egressSettings.database())
        .getCollection(this.egressSettings.collection())
        .insertOne(publishable);
  }

}

In this example we use a command lane to insert values into a MongoDB collection. The didSet callback on a ValueLane could also be used but an update or replace may be more appropriate in this case.

Periodic

Another common approach to publication would be to publish the state of the agent periodically. In this case we must manage a timer, making use of the scheduling methods provided. To schedule a timer after setup we place the code in the stagePublication lifecycle callback - making sure to call the super.stagePublication() first.

public class MyMongoDbPublishingAgent extends MongoDbPublishingAgent {

  private TimerRef timerRef;

  @SwimLane("state")
  ValueLane<Value> state = this.<Value>valueLane();

  private void publishState() {
    publish(this.state.get());
  }

  @Override
  protected void publish(Value structure) {
    publish(Document.parse(Json.toString(structure)));
  }

  @Override
  protected void publish(Document publishable) {
    final Document filter = new Document().append("vehicleId", this.state.get().get("vehicleId").stringValue());
    this.client.getDatabase(this.egressSettings.database())
        .getCollection(this.egressSettings.collection())
        .replaceOne(filter, publishable);
  }

  @Override
  protected void stagePublication() {
    super.stagePublication();
    this.timerRef = scheduleAtFixedRate(() -> this.timerRef, 10000, 60000, this::publishState);
  }

}

In this example, every minute the agent will replace (notice the replaceOne call) the document in the collection with the same vehicleId with its current state.


Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).