JMS Egress
Nstream provides a Java Message Service (JMS) Adapter library that greatly facilitates publishing to JMS topics and queues. This guide demonstrates how to start a message producer and publish messages from Web Agents using minimal boilerplate.
Prerequisites
Dependencies
Gradle
implementation 'io.nstream:nstream-adapter-jms:4.15.23'
Maven
<dependency>
<groupId>io.nstream</groupId>
<artifactId>nstream-adapter-jms</artifactId>
<version>4.15.23</version>
<type>module</type> <!-- Remove or comment this line for non-modular projects -->
</dependency>
Publishing
Nstream provides the JmsPublishingPatch
which with configuration and extension can publish state to JMS topics and queues.
Configuration
Here we give a full example of the configuration required to prepare a JmsPublishingPatch
- we will discuss the implementation of the MyJmsPublishingAgent
in the next section.
# server.recon
provisions: {
@provision("activeMqConnectionFactory") {
class: "nstream.adapter.jms.ConnectionFactoryProvision"
def: {
"connectionFactoryClass": "org.apache.activemq.ActiveMQConnectionFactory"
"brokerURL": "tcp://localhost:61616"
}
}
@provision("activeMqConnection") {
class: "nstream.adapter.jms.ConnectionProvision"
def: {
connectionFactoryProvisionName: "activeMqConnectionFactory"
}
}
}
vehicle: @fabric {
@plane(class: "nstream.adapter.runtime.AppPlane")
@node {
uri: "/jms"
@agent(class: "vehicle.jms.MyJmsPublishingAgent") {
jmsEgressConf: @jmsEgressSettings {
connectionProvisionName: "activeMqConnection",
destination: "topic://vehicles",
contentTypeOverride: "json"
}
}
}
}
Connection
A ConnectionFactory
provision must be configured in order to create connections - see above for a simple example.
connectionFactoryClass
is a required property of the provision and must be the class name of the ConnectionFactory
for the JMS provider you are using.
Providing the ConnectionFactory
class follows the Java Bean naming convention then properties of the ConnectionFactory
can also be set here - such as brokerURL
in the above example.
By referencing the ConnectionFactory
provision, Connection
provisions can now be created, although a single connection will often be sufficient per server, for use in agent configurations.
Destination
The destination JMS topic or queue can be configured in the jmsEgressConf
as seen above.
Topic and queue names are prepended with topic://
and queue://
respectively.
JNDI
JNDI can be used to configure JMS specific properties by adding your jndi.properties
file to the jar.
Example jndi.properties
file for ActiveMQ:
java.naming.factory.initial = org.apache.activemq.jndi.ActiveMQInitialContextFactory
java.naming.provider.url = tcp://activemq:61616
topic.topicName = myTopic
Now in the server.recon
file, configure the provisions and connectors to use JNDI to lookup resources.
This is done by creating a JNDI provision and referencing it where needed:
# server.recon
provisions: {
@provision("jndi") {
class: "nstream.adapter.jms.JndiProvision"
}
@provision("activeMqConnectionFactory") {
class: "nstream.adapter.jms.ConnectionFactoryProvision"
def: {
"jndiProvisionName": "jndi"
}
}
@provision("activeMqConnection") {
class: "nstream.adapter.jms.ConnectionProvision"
def: {
connectionFactoryProvisionName: "activeMqConnectionFactory"
}
}
}
vehicle: @fabric {
@plane(class: "nstream.adapter.runtime.AppPlane")
@node {
uri: "/jms"
@agent(class: "vehicle.jms.MyJmsPublishingAgent") {
jmsEgressConf: @jmsEgressSettings {
connectionProvisionName: "activeMqConnection",
jndiProvisionName: "jndi",
jndiDestination: "topicName",
contentTypeOverride: "json"
}
}
}
}
Extension
Using configuration to handle connections removes the need for most boilerplate in the implementation of the agent.
Extending the JmsPublishingPatch
exposes the publish(Value value)
method which will publish the given value to the configured JMS destination.
The only two implementation decisions now are what and when to publish - we give two examples in the following sections.
Publish On Event
The simplest method of publication is to publish any event or update received by a lane.
All we need do is call the publish(Value value)
method from the appropriate callback of the lane.
public class MyJmsPublishingAgent extends JmsPublishingPatch {
@SwimLane("publish")
CommandLane<Value> publish = this.<Value>commandLane()
.onCommand(this::publish);
}
Note: The didSet
callback on a ValueLane
is just as valid.
Periodic
Another common approach to publication would be to publish the state of the agent periodically.
In this case we must manage a timer, making use of the scheduling methods provided.
The scheduling code is added in the callback of assignDestination
to avoid attempting to publish before the connection has been created.
public class MyJmsPublishingAgent extends JmsPublishingPatch {
private TimerRef timerRef;
@SwimLane("state")
ValueLane<Value> state = this.<Value>valueLane();
private void publishState() {
publish(this.state.get());
}
@Override
protected void stagePublication() {
loadSettings("jmsEgressConf");
assignConnection(ProvisionLoader.<Connection>getProvision(this.egressSettings.connectionProvisionName())
.value());
assignDestination(this.egressSettings.destination(), () -> {
info(nodeUri() + ": successfully staged producer for publication");
this.timerRef = scheduleAtFixedRate(() -> this.timerRef, 1000, 60000, this::publishState);
});
}
}
Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).