JMS Egress

Nstream provides a Java Message Service (JMS) Adapter library that greatly facilitates publishing to JMS topics and queues. This guide demonstrates how to start a message producer and publish messages from Web Agents using minimal boilerplate.

Prerequisites

Dependencies

Gradle

implementation 'io.nstream:nstream-adapter-jms:4.11.19'

Maven

<dependency>
  <groupId>io.nstream</groupId>
  <artifactId>nstream-adapter-jms</artifactId>
  <version>4.11.19</version>
  <type>module</type>
</dependency>

Publishing

Nstream provides the JmsPublishingPatch which with configuration and extension can publish state to JMS topics and queues.

Configuration

Here we give a full example of the configuration required to prepare a JmsPublishingPatch - we will discuss the implementation of the MyJmsPublishingAgent in the next section.

# server.recon
provisions: {
  @provision("activeMqConnectionFactory") {
    class: "nstream.adapter.jms.ConnectionFactoryProvision"
    def: {
      "connectionFactoryClass": "org.apache.activemq.ActiveMQConnectionFactory"
      "brokerURL": "tcp://localhost:61616"
    }
  }
  @provision("activeMqConnection") {
    class: "nstream.adapter.jms.ConnectionProvision"
    def: {
      connectionFactoryProvisionName: "activeMqConnectionFactory"
    }
  }
}

vehicle: @fabric {
  @plane(class: "nstream.adapter.runtime.AppPlane")
  
  @node {
    uri: "/jms"
    @agent(class: "vehicle.jms.MyJmsPublishingAgent") {
      jmsEgressConf: @jmsEgressSettings {
        connectionProvisionName: "activeMqConnection",
        destination: "topic://vehicles",
        contentTypeOverride: "json"
      }
    }
  }

}

Connection

A ConnectionFactory provision must be configured in order to create connections - see above for a simple example. connectionFactoryClass is a required property of the provision and must be the class name of the ConnectionFactory for the JMS provider you are using. Providing the ConnectionFactory class follows the Java Bean naming convention then properties of the ConnectionFactory can also be set here - such as brokerURL in the above example.

By referencing the ConnectionFactory provision, Connection provisions can now be created, although a single connection will often be sufficient per server, for use in agent configurations.

Destination

The destination JMS topic or queue can be configured in the jmsEgressConf as seen above. Topic and queue names are prepended with topic:// and queue:// respectively.

Extension

Using configuration to handle connections removes the need for most boilerplate in the implementation of the agent. Extending the JmsPublishingPatch exposes the publish(Value value) method which will publish the given value to the configured JMS destination. The only two implementation decisions now are what and when to publish - we give two examples in the following sections.

Publish On Event

The simplest method of publication is to publish any event or update received by a lane. All we need do is call the publish(Value value) method from the appropriate callback of the lane.

public class MyJmsPublishingAgent extends JmsPublishingPatch {
  
  @SwimLane("publish")
  CommandLane<Value> publish = this.<Value>commandLane()
      .onCommand(this::publish);
  
}

Note: The didSet callback on a ValueLane is just as valid.

Periodic

Another common approach to publication would be to publish the state of the agent periodically. In this case we must manage a timer, making use of the scheduling methods provided. The scheduling code is added in the callback of assignDestination to avoid attempting to publish before the connection has been created.

public class MyJmsPublishingAgent extends JmsPublishingPatch {

  private TimerRef timerRef;

  @SwimLane("state")
  ValueLane<Value> state = this.<Value>valueLane();

  private void publishState() {
    publish(this.state.get());
  }

  @Override
  protected void stagePublication() {
    loadSettings("jmsEgressConf");
    assignConnection(ProvisionLoader.<Connection>getProvision(this.egressSettings.connectionProvisionName())
        .value());
    assignDestination(this.egressSettings.destination(), () -> {
      info(nodeUri() + ": successfully staged producer for publication");
      this.timerRef = scheduleAtFixedRate(() -> this.timerRef, 1000, 60000, this::publishState);
    });
  }
  
}

Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).