March 7 - June 27, 2024 - Nstream is hitting the road with Confluent on the #DataInMotionTour! / Learn More

MongoDB Ingress

Nstream provides a MongoDB Adapter library that greatly facilitates ingestion from MongoDB databases and collections. This guide demonstrates how to start polling Mongo and process documents in Web Agents using minimal boilerplate.

Prerequisites

Dependencies

Gradle

implementation 'io.nstream:nstream-adapter-mongodb:4.9.16'

Maven

<dependency>
  <groupId>io.nstream</groupId>
  <artifactId>nstream-adapter-mongodb</artifactId>
  <version>4.9.16</version>
  <type>module</type>
</dependency>

Glossary

Polling

If:

Then you simply need to configure a MongoDbIngestingPatch in the server.receon file:

# server.recon
provisions: {
  @provision("mongoClient") {
    class: "nstream.adapter.mongodb.ClientProvision"
    def: {
      "connectionString": "mongodb://mongo/myMongoDatabase"
    }
  }
}

vehicle: @fabric {
  @plane(class: "nstream.adapter.runtime.AppPlane")
  
  @node {
    uri: "/mongo/poller"
    @agent(class: "nstream.adapter.mongodb.MongoDbIngestingPatch") {
      mongoDbIngressConf: @mongoDbIngressSettings {
        firstFetchDelayMillis: 0
        fetchIntervalMillis: 120000
        clientProvisionName: mongoClient
        database: myMongoDatabase
        collection: vehicles
        relaySchema: @command {
          nodeUri: {
            "/vehicle/",
            $id
          }
          laneUri: "update"
        }
      }
    }
  }

}

Note that you’ll need to include any driver dependencies (in this case mongodb-driver-sync) in your classpath.

Configuration

Query and Projection

Two properties, query and projection, are also configurable from MongoDbIngressSettings, these properties accept Json strings and can be used to filter results and the fields returned for each document. See the MongoDB find documentation for more details on these properties and their use.

Common Variations

Timing Strategy

The default timing strategy fires a MongoDB query task with a fixed period between fires (regardless of task duration), with the class’s Agent#didStart() callback initiating the process. There are two aspects that you may wish to change:

A rather common alternative for the latter is to instead invoke stageReception() from the onCommand() callback of some CommandLane.

Variable Query

If the parameters for each request are not statically known, then the very simple MongoDbIngressSettings POJO cannot express the desired functionality. The general-purpose alternative strategy is as follows:

Custom Ingest Logic

If relaying documents to another agent is not sufficient, or you would like to add custom logic on ingestion of documents, the MongoDbIngestingPatch can be easily overridden. Create a new class that extends the MongoDbIngestingPatch and override one of the ingest methods, depending on whether you would like to ingest a Document or Value type.

  @Override
  protected void ingest(Document unstructured) throws DeferrableException {
    // Custom ingestion of a Document
  }

  @Override
  protected void ingest(Value structured) throws DeferrableException {
    // Custom ingestion of document as a Swim Value
  }

Remember to update your server.recon file with the new agent you just created, instead of the MongoDbIngestingPatch.

Change Streams

MongoDB change streams offer a method of subscribing to changes to a database or collection, without the need for polling. The Nstream MongoDB adapter also provides an ingesting patch that can be configured to subscribe to a collection’s change stream and ingest change documents.

To ingest the change stream of a collection simply configure a MongoDbChangeStreamIngestingPatch in the server.recon file:

# server.recon
provisions: {
  @provision("mongoClient") {
    class: "nstream.adapter.mongodb.ClientProvision"
    def: {
      "connectionString": "mongodb://mongo/myMongoDatabase"
    }
  }
}

vehicle: @fabric {
  @plane(class: "nstream.adapter.runtime.AppPlane")
  
  @node {
    uri: "/mongo/changestream"
    @agent(class: "nstream.adapter.mongodb.MongoDbChangeStreamIngestingPatch") {
      mongoDbIngressConf: @mongoDbIngressSettings {
        clientProvisionName: mongoClient
        database: myMongoDatabase
        collection: vehicles
        relaySchema: @command {
          nodeUri: {
            "/vehicle/",
            $id
          }
          laneUri: "update"
        }
      }
    }
  }

}

Note that you’ll need to include any driver dependencies (in this case mongodb-driver-sync) in your classpath.

Common Variations

Custom Ingest Logic

If relaying change stream documents to another agent is not sufficient, or you would like to add custom logic on ingestion of change stream documents, the MongoDbChangeStreamIngestingPatch can be easily overridden. Create a new class that extends the MongoDbChangeStreamIngestingPatch and override one of the ingest methods, depending on whether you would like to ingest a ChangeStreamDocument<Document> or Value type.

  @Override
  protected void ingest(ChangeStreamDocument<Document> unstructured) throws DeferrableException {
    // Custom ingestion of a ChangeStreamDocument
  }

  @Override
  protected void ingest(Value structured) throws DeferrableException {
    // Custom ingestion of change stream document as a Swim Value
  }

Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).