NATS Egress
Nstream provides a NATS Adapter library that greatly facilitates publishing to NATS Streams and corresponding Subjects. This guide demonstrates how to start a message producer and publish messages from Web Agents using minimal boilerplate.
Prerequisites
Dependencies
Gradle
implementation 'io.nstream:nstream-adapter-nats:4.12.20'
Maven
<dependency>
<groupId>io.nstream</groupId>
<artifactId>nstream-adapter-nats</artifactId>
<version>4.12.20</version>
<type>module</type> <!-- Remove or comment this line for non-modular projects -->
</dependency>
Publishing
Nstream provides the NatsPublishingPatch
which may be extended to publish messages to NATS Subjects.
Extension is best accomplished via a mix of configuration and direct subclassing.
Configuration
Here we give a full example of the configuration that prepares a NatsPublishingPatch
.
# server.recon
"provisions": {
@provision("natsConnectionProvision") {
class: "nstream.adapter.nats.NatsConnectionProvision",
def: {
"servers": "nats-adapter-broker:4222",
}
}
}
"nats-adapter": @fabric {
@plane(class: "nstream.adapter.runtime.AppPlane")
@node {
uri: "/dynamic/:id"
@agent(class: "nats.adapter.nats.CustomNatsPublishingAgent") {
natsEgressConf: @natsEgressSettings {
connectionProvisionName: "natsConnectionProvision"
natsEgressStreamName: "NatsStream",
natsEgressSubjectName: "NatsSubject"
}
}
}
}
# Configure desired web settings (e.g. port)
# ...
Configuration
Connection and Consumer
As a first step, a singleton NatsConnectionProvision
must be configured in order to create external connections.
The above snippet is minimal, and an exhaustive set of config options may be found here.
The publisher configuration is handled the same way.
NatsEgressSettings
The previous configurations can be seen as “connection-oriented”. This part is “publishing-oriented”, responsible for both:
- Linking the NATS Connection provision and publisher together
- Dictating which NATS Stream and Subject to publish a message.
Extension
Using configuration to handle connections removes the need for most boilerplate in the implementation of the agent.
Extending the NatsPublishingAgent
exposes the publish(Value value)
method which will publish the given value to the
configured NATS Stream and the corresponding Subject. The only two implementation decisions now are what and when to
publish - we give two examples in the following sections.
Publish On Event
The simplest method of publication is to publish any event or update received by a lane.
All we need do is call the publish(Value value)
method from the appropriate callback of the lane.
public class CustomNatsPublishingAgent extends NatsPublishingAgent {
@SwimLane("publish")
CommandLane<Value> publish = this.<Value>commandLane()
.onCommand(this::publish);
@Override
protected void publish(Value structure) {
publish(Json.toString(structure).getBytes(StandardCharsets.UTF_8));
}
@Override
protected void stagePublication() {
loadSettings("natsEgressConf");
// Setup NATS Connection and command publish lane with the desired value
}
}
Note: the didSet
callback on a ValueLane
, the didUpdate
callback on a MapLane
, and many others are equally
valid alternatives.
Periodic
Another common approach to publication would be to publish the state of the agent periodically. In this case we must manage a timer, making use of the scheduling methods provided.
public class CustomNatsPublishingAgent extends NatsPublishingAgent {
private TimerRef timerRef;
@SwimLane("state")
ValueLane<Value> state = this.<Value>valueLane();
private void publishState() {
publish(this.state.get());
}
@Override
protected void publish(Value structure) {
publish(Json.toString(structure).getBytes(StandardCharsets.UTF_8));
this.timerRef = scheduleAtFixedRate(() -> this.timerRef, 1000, 60000, this::publishState);
}
@Override
protected void stagePublication() {
loadSettings("natsEgressConf");
// Setup NATS Connection and manage the periodic publishing with the aforementioned timer.
}
}
Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).