MongoDB Ingress
Nstream provides a MongoDB Adapter library that greatly facilitates ingestion from MongoDB databases and collections. This guide demonstrates how to start polling Mongo and process documents in Web Agents using minimal boilerplate.
Prerequisites
Dependencies
Gradle
implementation 'io.nstream:nstream-adapter-mongodb:4.15.23'
Maven
<dependency>
<groupId>io.nstream</groupId>
<artifactId>nstream-adapter-mongodb</artifactId>
<version>4.15.23</version>
<type>module</type> <!-- Remove or comment this line for non-modular projects -->
</dependency>
Glossary
- ingestion: The act of converting external messages into events utilized by Web Agents
-
MongoDbIngestingAgent
/MongoDbChangeStreamIngestingAgent:
An abstract Web Agent class with built-in methods relevant to MongoDB querying -
MongoDbIngestingPatch
/MongoDbChangeStreamIngestingPatch:
A concrete (but extendable) agent that collectsDocuments
intoswim.structure.Values
-
MongoDbIngressSettings:
A plain old Java object (POJO) that configures a single MongoDB agent or patch above
Polling
If:
- Your query is static
- Response bodies are converted into
Values
during ingestion - The recurring timing can resemble ScheduledExecutorService#scheduleAtFixedRate
Then you simply need to configure a MongoDbIngestingPatch
in the server.receon
file:
# server.recon
provisions: {
@provision("mongoClient") {
class: "nstream.adapter.mongodb.ClientProvision"
def: {
"connectionString": "mongodb://mongo/myMongoDatabase"
}
}
}
vehicle: @fabric {
@plane(class: "nstream.adapter.runtime.AppPlane")
@node {
uri: "/mongo/poller"
@agent(class: "nstream.adapter.mongodb.MongoDbIngestingPatch") {
mongoDbIngressConf: @mongoDbIngressSettings {
firstFetchDelayMillis: 0
fetchIntervalMillis: 120000
clientProvisionName: mongoClient
database: myMongoDatabase
collection: vehicles
relaySchema: @command {
nodeUri: "/vehicle/$id"
laneUri: "update"
}
}
}
}
}
Note that you’ll need to include any driver dependencies (in this case mongodb-driver-sync
) in your classpath.
Configuration
Query and Projection
Two properties, query
and projection
, are also configurable from MongoDbIngressSettings
, these properties accept Json strings and can be used to filter results and the fields returned for each document.
See the MongoDB find documentation for more details on these properties and their use.
Common Variations
Timing Strategy
The default timing strategy fires a MongoDB query task with a fixed period between fires (regardless of task duration), with the class’s Agent#didStart()
callback initiating the process.
There are two aspects that you may wish to change:
- To alter the periodicity strategy, override
MongoDbIngestingAgent#stageReception
to not invokeNstreamAgent#scheduleAtFixedRate
- To disable the automatic task startup, override
didStart()
to not callstageReception()
A rather common alternative for the latter is to instead invoke stageReception()
from the onCommand()
callback of some CommandLane
.
Variable Query
If the parameters for each request are not statically known, then the very simple MongoDbIngressSettings
POJO cannot express the desired functionality.
The general-purpose alternative strategy is as follows:
- Within your
MongoDbIngestingPatch
extension, store the state required to dynamically build each request in one or more class-local variables, updating these variables correctly during the Web Agent’s lifecycle - Override
MongoDbIngestingPatch#find
within your concrete class to read that state while building each request
Custom Ingest Logic
If relaying documents to another agent is not sufficient, or you would like to add custom logic on ingestion of documents, the MongoDbIngestingPatch
can be easily overridden.
Create a new class that extends the MongoDbIngestingPatch
and override one of the ingest
methods, depending on whether you would like to ingest a Document
or Value
type.
@Override
protected void ingest(Document unstructured) throws DeferrableException {
// Custom ingestion of a Document
}
@Override
protected void ingest(Value structured) throws DeferrableException {
// Custom ingestion of document as a Swim Value
}
Remember to update your server.recon
file with the new agent you just created, instead of the MongoDbIngestingPatch
.
Change Streams
MongoDB change streams offer a method of subscribing to changes to a database or collection, without the need for polling. The Nstream MongoDB adapter also provides an ingesting patch that can be configured to subscribe to a collection’s change stream and ingest change documents.
To ingest the change stream of a collection simply configure a MongoDbChangeStreamIngestingPatch
in the server.recon
file:
# server.recon
provisions: {
@provision("mongoClient") {
class: "nstream.adapter.mongodb.ClientProvision"
def: {
"connectionString": "mongodb://mongo/myMongoDatabase"
}
}
}
vehicle: @fabric {
@plane(class: "nstream.adapter.runtime.AppPlane")
@node {
uri: "/mongo/changestream"
@agent(class: "nstream.adapter.mongodb.MongoDbChangeStreamIngestingPatch") {
mongoDbIngressConf: @mongoDbIngressSettings {
clientProvisionName: mongoClient
database: myMongoDatabase
collection: vehicles
relaySchema: @command {
nodeUri: "/vehicle/$id"
laneUri: "update"
}
}
}
}
}
Note that you’ll need to include any driver dependencies (in this case mongodb-driver-sync
) in your classpath.
Common Variations
Custom Ingest Logic
If relaying change stream documents to another agent is not sufficient, or you would like to add custom logic on ingestion of change stream documents, the MongoDbChangeStreamIngestingPatch
can be easily overridden.
Create a new class that extends the MongoDbChangeStreamIngestingPatch
and override one of the ingest
methods, depending on whether you would like to ingest a ChangeStreamDocument<Document>
or Value
type.
@Override
protected void ingest(ChangeStreamDocument<Document> unstructured) throws DeferrableException {
// Custom ingestion of a ChangeStreamDocument
}
@Override
protected void ingest(Value structured) throws DeferrableException {
// Custom ingestion of change stream document as a Swim Value
}
Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).