NATS Ingress
Overview
NATS is a lightweight, high performance messaging system that makes it easy to communicate between distributed systems. By integrating NATS with Nstream, users can efficiently stream data into Nstream for real-time processing and analytics.
Agenda of this guide:
- Provides instructions on setting up NATS as an ingress data source for Nstream.
- Provides a NATS Adapter library that greatly facilitates ingestion from NATS Streams and corresponding Subjects.
- Demonstrates how to consume from NATS Subjects and process responses in Web Agents using minimal boilerplate.
Prerequisites
Dependencies
Gradle
implementation 'io.nstream:nstream-adapter-nats:4.12.20'
Maven
<dependency>
<groupId>io.nstream</groupId>
<artifactId>nstream-adapter-nats</artifactId>
<version>4.12.20</version>
<type>module</type> <!-- Remove or comment this line for non-modular projects -->
</dependency>
Glossary
- ingestion: The act of converting an external response into messages utilized by Web Agents
-
NatsIngestingAgent:
An abstract Web Agent class with built-in methods relevant to aNatsConsumer
poll cycle -
NatsIngressSettings:
A plain old Java object (POJO) that configures a singleNatsIngestingAgent
-
NatsIngestingPatch:
A concrete (but extendable)NatsIngestingAgent
subclass that collectsNATS Message
intoswim.structure.Values
NatsIngestingPatch
Conditions
Ideal If:
-
Message
bodies are converted intoValues
during ingestion
Then you simply need to use the NatsIngestingPatch
class, often without overriding any methods.
Let’s demonstrate this by configuring an agent to receive messages from a NATS connection, wholly in server.recon
:
# server.recon
"provisions": {
@provision("natsConnectionProvision") {
class: "nstream.adapter.nats.NatsConnectionProvision",
def: {
"servers": "nats-adapter-broker:4222",
}
}
}
"nats-adapter": @fabric {
@plane(class: "nstream.adapter.runtime.AppPlane")
# NatsIngestingAgent replacement
@node {
uri: "/ingesting"
@agent(class: "nstream.adapter.nats.NatsIngestingPatch") {
natsIngressConf: @natsIngressSettings {
firstFetchDelayMillis: 5000,
fetchIntervalMillis: 20000,
natsIngressStreamName: "NatsStream",
natsIngressSubjectName: "NatsSubject"
contentTypeOverride: json
relaySchema: @command {
nodeUri: "/dynamic/$val+1"
laneUri: "unused"
value:
}
}
}
}
}
# Configure desired web settings (e.g. port)
# ...
Configuration
Connection and Consumer
As a first step, a singleton NatsConnectionProvision
must be configured so a consumer may be instantiated against it.
The above snippet is minimal, and an exhaustive set of config options may be found here.
The consumer configuration is handled the same way.
NatsIngressSettings
The previous configurations can be seen as “connection-oriented”. This part is “ingestion-oriented”, responsible for both:
- Linking the NATS Connection provision and consumer together
- Dictating what to do given a message
Common Variations
Custom Ingest Logic
If relaying messages to another agent is not sufficient, or you would like to add custom logic on ingestion of messages,
the NatsIngestingPatch
can be easily overridden.
Create a new class that extends the NatsIngestingPatch
and override the ingest
method.
// CustomIngestingAgent.java
public class CustomIngestingAgent extends NatsIngestingPatch {
@Override
protected void ingest(Message unstructured) throws DeferrableException {
// Custom ingestion of an individual message
}
}
Remember to update your server.recon
file with the new agent you just created, instead of the NatsIngestingPatch
.
Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).