JMS Ingress
Nstream provides a Java Message Service (JMS) Adapter library that greatly facilitates ingestion from JMS topics and queues. This guide demonstrates how to start a message consumer and process messages in Web Agents using minimal boilerplate.
Prerequisites
Dependencies
Gradle
implementation 'io.nstream:nstream-adapter-jms:4.14.22'
Maven
<dependency>
<groupId>io.nstream</groupId>
<artifactId>nstream-adapter-jms</artifactId>
<version>4.14.22</version>
<type>module</type> <!-- Remove or comment this line for non-modular projects -->
</dependency>
Glossary
- ingestion: The act of converting external messages into events utilized by Web Agents
-
JmsIngestingAgent:
An abstract Web Agent class with built-in methods relevant to JMS message consumption -
JmsIngressSettings:
A plain old Java object (POJO) that configures a singleJmsIngestingAgent
orJmsIngestingPatch
-
JmsIngestingPatch:
A concrete (but extendable)JmsIngestingAgent
subclass that collectsMessages
intoswim.structure.Values
JmsIngestingPatch
Conditions
Ideal If:
- The JMS consumer session’s acknowledgement mode is set to
AUTO_ACKNOWLEDGE
-
Message
bodies are converted intoValues
during ingestion
Then you simply need to use the JmsIngestingPatch
class, often without overriding any methods.
Let’s demonstrate this by configuring an agent to receive messages from a local activeMQ installation, wholly in server.recon
:
# server.recon
provisions: {
@provision("activeMqConnectionFactory") {
class: "nstream.adapter.jms.ConnectionFactoryProvision"
def: {
"connectionFactoryClass": "org.apache.activemq.ActiveMQConnectionFactory"
"brokerURL": "tcp://localhost:61616"
}
}
@provision("activeMqConnection") {
class: "nstream.adapter.jms.ConnectionProvision"
def: {
connectionFactoryProvisionName: "activeMqConnectionFactory"
}
}
}
vehicle: @fabric {
@plane(class: "nstream.adapter.runtime.AppPlane")
@node {
uri: "/jms"
@agent(class: "nstream.adapter.jms.JmsIngestingPatch") {
jmsIngressConf: @jmsIngressSettings {
connectionProvisionName: "activeMqConnection",
destination: "topic://vehicles",
contentTypeOverride: "json",
relaySchema: @command {
nodeUri: "/vehicle/$value.id"
laneUri: "addMessage",
value: $value
}
}
}
}
}
Note that you’ll need to include any driver dependencies (in this case activemq
) in your classpath.
Configuration
Connection
A ConnectionFactory
provision must be configured in order to create connections - see above for a simple example.
connectionFactoryClass
is a required property of the provision and must be the class name of the ConnectionFactory
for the JMS provider you are using.
Providing the ConnectionFactory
class follows the Java Bean naming convention then properties of the ConnectionFactory
can also be set here - such as brokerURL
in the above example.
By referencing the ConnectionFactory
provision, Connection
provisions can now be created, although a single connection will often be sufficient per server, for use in agent configurations.
Destination
The destination JMS topic or queue can be configured in the jmsIngressConf
as seen above.
Topic and queue names are prepended with topic://
and queue://
respectively.
JNDI
JNDI can be used to configure JMS specific properties by adding your jndi.properties
file to the jar.
Example jndi.properties
file for ActiveMQ:
java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url = tcp://activemq:61616
topic.topicName = myTopic
Now in the server.recon
file, configure the provisions and connectors to use JNDI to lookup resources.
This is done by creating a JNDI provision and referencing it where needed:
# server.recon
provisions: {
@provision("jndi") {
class: "nstream.adapter.jms.JndiProvision"
}
@provision("activeMqConnectionFactory") {
class: "nstream.adapter.jms.ConnectionFactoryProvision"
def: {
"jndiProvisionName": "jndi"
}
}
@provision("activeMqConnection") {
class: "nstream.adapter.jms.ConnectionProvision"
def: {
connectionFactoryProvisionName: "activeMqConnectionFactory"
}
}
}
vehicle: @fabric {
@plane(class: "nstream.adapter.runtime.AppPlane")
@node {
uri: "/jms"
@agent(class: "nstream.adapter.jms.JmsIngestingPatch") {
jmsIngressConf: @jmsIngressSettings {
connectionProvisionName: "activeMqConnection",
jndiProvisionName: "jndi",
jndiDestination: "topicName",
contentTypeOverride: "json",
relaySchema: @command {
nodeUri: "/vehicle/$value.id"
laneUri: "addMessage",
value: $value
}
}
}
}
}
Common Variations
Custom Ingest Logic
If relaying messages to another agent is not sufficient, or you would like to add custom logic on ingestion of messages, the JmsIngestingPatch
can be easily overridden.
Create a new class that extends the JmsIngestingPatch
and override one of the ingest
methods, depending on whether you would like to ingest a Message
or Value
type.
@Override
protected void ingest(Message unstructured) throws DeferrableException {
// Custom ingestion of a Message
}
@Override
protected void ingest(Value messageStructure) throws DeferrableException {
// Custom ingestion of message as a Swim Value
}
Remember to update your server.recon
file with the new agent you just created, instead of the JmsIngestingPatch
.
Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).