March 7 - June 27, 2024 - Nstream is hitting the road with Confluent on the #DataInMotionTour! / Learn More

NATS Egress

Nstream provides a NATS Adapter library that greatly facilitates publishing to NATS Streams and corresponding Subjects. This guide demonstrates how to start a message producer and publish messages from Web Agents using minimal boilerplate.

Prerequisites

Dependencies

Gradle

implementation 'io.nstream:nstream-adapter-nats:4.9.16'

Maven

<dependency>
  <groupId>io.nstream</groupId>
  <artifactId>nstream-adapter-nats</artifactId>
  <version>4.9.16</version>
  <type>module</type>
</dependency>

Publishing

Nstream provides the NatsPublishingPatch which may be extended to publish messages to NATS Subjects. Extension is best accomplished via a mix of configuration and direct subclassing.

Configuration

Here we give a full example of the configuration that prepares a NatsPublishingPatch.

# server.recon
"provisions": {
  @provision("natsConnectionProvision") {
    class: "nstream.adapter.nats.NatsConnectionProvision",
    def: {
      "servers": "nats-adapter-broker:4222",
    }
  }
}

"nats-adapter": @fabric {
  @plane(class: "nstream.adapter.runtime.AppPlane")
  @node {
    uri: "/dynamic/:id"
    @agent(class: "nats.adapter.nats.CustomNatsPublishingAgent") {
      natsEgressConf: @natsEgressSettings {
        connectionProvisionName: "natsConnectionProvision"
        natsEgressStreamName: "NatsStream",
        natsEgressSubjectName: "NatsSubject"
      }
    }
  }
}

# Configure desired web settings (e.g. port)
# ...

Configuration

Connection and Consumer

As a first step, a singleton NatsConnectionProvision must be configured in order to create external connections. The above snippet is minimal, and an exhaustive set of config options may be found here.

The publisher configuration is handled the same way.

NatsEgressSettings

The previous configurations can be seen as “connection-oriented”. This part is “publishing-oriented”, responsible for both:

Extension

Using configuration to handle connections removes the need for most boilerplate in the implementation of the agent. Extending the NatsPublishingAgent exposes the publish(Value value) method which will publish the given value to the configured NATS Stream and the corresponding Subject. The only two implementation decisions now are what and when to publish - we give two examples in the following sections.

Publish On Event

The simplest method of publication is to publish any event or update received by a lane. All we need do is call the publish(Value value) method from the appropriate callback of the lane.

public class CustomNatsPublishingAgent extends NatsPublishingAgent {
  
  @SwimLane("publish")
  CommandLane<Value> publish = this.<Value>commandLane()
      .onCommand(this::publish);
  
  @Override
  protected void publish(Value structure) {
    publish(Json.toString(structure).getBytes(StandardCharsets.UTF_8));
  }

  @Override
  protected void stagePublication() {
    loadSettings("natsEgressConf");
    // Setup NATS Connection and command publish lane with the desired value
  }
}

Note: the didSet callback on a ValueLane, the didUpdate callback on a MapLane, and many others are equally valid alternatives.

Periodic

Another common approach to publication would be to publish the state of the agent periodically. In this case we must manage a timer, making use of the scheduling methods provided.

public class CustomNatsPublishingAgent extends NatsPublishingAgent {

  private TimerRef timerRef;

  @SwimLane("state")
  ValueLane<Value> state = this.<Value>valueLane();

  private void publishState() {
    publish(this.state.get());
  }

  @Override
  protected void publish(Value structure) {
    publish(Json.toString(structure).getBytes(StandardCharsets.UTF_8));
    this.timerRef = scheduleAtFixedRate(() -> this.timerRef, 1000, 60000, this::publishState);
  }

  @Override
  protected void stagePublication() {
    loadSettings("natsEgressConf");
    // Setup NATS Connection and manage the periodic publishing with the aforementioned timer.
  }
}

Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).