JMS Ingress

Nstream provides a Java Message Service (JMS) Adapter library that greatly facilitates ingestion from JMS topics and queues. This guide demonstrates how to start a message consumer and process messages in Web Agents using minimal boilerplate.

Prerequisites

Dependencies

Gradle

implementation 'io.nstream:nstream-adapter-jms:4.12.20'

Maven

<dependency>
  <groupId>io.nstream</groupId>
  <artifactId>nstream-adapter-jms</artifactId>
  <version>4.12.20</version>
  <type>module</type> <!-- Remove or comment this line for non-modular projects -->
</dependency>

Glossary

Ideal JmsIngestingPatch Conditions

If:

Then you simply need to use the JmsIngestingPatch class, often without overriding any methods. Let’s demonstrate this by configuring an agent to receive messages from a local activeMQ installation, wholly in server.recon:

# server.recon
provisions: {
  @provision("activeMqConnectionFactory") {
    class: "nstream.adapter.jms.ConnectionFactoryProvision"
    def: {
      "connectionFactoryClass": "org.apache.activemq.ActiveMQConnectionFactory"
      "brokerURL": "tcp://localhost:61616"
    }
  }
  @provision("activeMqConnection") {
    class: "nstream.adapter.jms.ConnectionProvision"
    def: {
      connectionFactoryProvisionName: "activeMqConnectionFactory"
    }
  }
}

vehicle: @fabric {
  @plane(class: "nstream.adapter.runtime.AppPlane")
  
  @node {
    uri: "/jms"
    @agent(class: "nstream.adapter.jms.JmsIngestingPatch") {
      jmsIngressConf: @jmsIngressSettings {
        connectionProvisionName: "activeMqConnection",
        destination: "topic://vehicles",
        contentTypeOverride: "json",
        relaySchema: @command {
          nodeUri: "/vehicle/$value.id"
          laneUri: "addMessage",
          value: $value
        }
      }
    }
  }

}

Note that you’ll need to include any driver dependencies (in this case activemq) in your classpath.

Configuration

Connection

A ConnectionFactory provision must be configured in order to create connections - see above for a simple example. connectionFactoryClass is a required property of the provision and must be the class name of the ConnectionFactory for the JMS provider you are using. Providing the ConnectionFactory class follows the Java Bean naming convention then properties of the ConnectionFactory can also be set here - such as brokerURL in the above example.

By referencing the ConnectionFactory provision, Connection provisions can now be created, although a single connection will often be sufficient per server, for use in agent configurations.

Destination

The destination JMS topic or queue can be configured in the jmsIngressConf as seen above. Topic and queue names are prepended with topic:// and queue:// respectively.

JNDI

JNDI can be used to configure JMS specific properties by adding your jndi.properties file to the jar. Example jndi.properties file for ActiveMQ:

java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url = tcp://activemq:61616
topic.topicName = myTopic

Now in the server.recon file, configure the provisions and connectors to use JNDI to lookup resources. This is done by creating a JNDI provision and referencing it where needed:

# server.recon
provisions: {
  @provision("jndi") {
    class: "nstream.adapter.jms.JndiProvision"
  }
  @provision("activeMqConnectionFactory") {
    class: "nstream.adapter.jms.ConnectionFactoryProvision"
    def: {
      "jndiProvisionName": "jndi"
    }
  }
  @provision("activeMqConnection") {
    class: "nstream.adapter.jms.ConnectionProvision"
    def: {
      connectionFactoryProvisionName: "activeMqConnectionFactory"
    }
  }
}

vehicle: @fabric {
  @plane(class: "nstream.adapter.runtime.AppPlane")
  
  @node {
    uri: "/jms"
    @agent(class: "nstream.adapter.jms.JmsIngestingPatch") {
      jmsIngressConf: @jmsIngressSettings {
        connectionProvisionName: "activeMqConnection",
        jndiProvisionName: "jndi",
        jndiDestination: "topicName",
        contentTypeOverride: "json",
        relaySchema: @command {
          nodeUri: "/vehicle/$value.id"
          laneUri: "addMessage",
          value: $value
        }
      }
    }
  }

}

Common Variations

Custom Ingest Logic

If relaying messages to another agent is not sufficient, or you would like to add custom logic on ingestion of messages, the JmsIngestingPatch can be easily overridden. Create a new class that extends the JmsIngestingPatch and override one of the ingest methods, depending on whether you would like to ingest a Message or Value type.

  @Override
  protected void ingest(Message unstructured) throws DeferrableException {
    // Custom ingestion of a Message
  }

  @Override
  protected void ingest(Value messageStructure) throws DeferrableException {
    // Custom ingestion of message as a Swim Value
  }

Remember to update your server.recon file with the new agent you just created, instead of the JmsIngestingPatch.


Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).