MQTT Ingress
Nstream provides an MQTT Adapter library that greatly facilitates ingestion from MQTT topics. This guide demonstrates how to consume from MQTT topics and process responses in Web Agents using minimal boilerplate.
Prerequisites
Dependencies
Gradle
implementation 'io.nstream:nstream-adapter-mqtt:4.15.23'
Maven
<dependency>
<groupId>io.nstream</groupId>
<artifactId>nstream-adapter-mqtt</artifactId>
<version>4.15.23</version>
<type>module</type> <!-- Remove or comment this line for non-modular projects -->
</dependency>
Glossary
- ingestion: The act of converting an external response into messages utilized by Web Agents
-
MqttIngestingAgent:
An abstract Web Agent class with built-in methods relevant to aMqttConsumer
poll cycle -
MqttIngressSettings:
A plain old Java object (POJO) that configures a singleMqttIngestingAgent
-
MqttIngestingPatch:
A concrete (but extendable)MqttIngestingAgent
subclass that collectsMessages
intoswim.structure.Values
MqttIngestingPatch
Conditions
Ideal If:
- The MQTT broker does not require authentication or TLS
-
ConsumerRecord
bodies are converted intoValues
during ingestion
Then you simply need to use the MqttIngestingPatch
class, often without overriding any methods.
Let’s demonstrate this by configuring an agent to receive messages from a local MQTT broker, wholly in server.recon
:
# server.recon
"provisions": {
@provision("mqtt-consumer") {
class: "nstream.adapter.common.provision.PropertiesProvision"
def: {
"server.uri": "tcp://localhost:1883"
"client.id": nstream-jet
topic: meters
}
}
}
vehicle: @fabric {
@plane(class: "nstream.adapter.runtime.AppPlane")
# MqttIngestingAgent replacement
@node {
uri: "/mqtt"
@agent(class: "nstream.adapter.mqtt.MqttIngestingPatch") {
mqttIngressConf: @mqttIngressSettings {
consumerPropertiesProvisionName: "mqtt-consumer"
contentTypeOverride: "json"
relaySchema: @command {
# Handles JSON values of the form {"id":...,...}
nodeUri: "/vehicle/$id"
laneUri: "addMessage"
}
}
}
}
}
# Configure desired web settings (e.g. port)
# ...
Configuration
Consumer Properties
There are only three mandatory properties required within the Provision
definition:
-
server.uri
: The network-address of the running broker -
client.id
: A (usually unique) identifier of this process to the broker -
topic
: The MQTT topic of interest
MqttIngressSettings
The previous configurations can be seen as “connection-oriented”. This part is “ingestion-oriented”, responsible for dictating what to do given a message
Common Variations
Consumer Authentication
Simply provide the optional properties user.name
and password
within your Provision
definition.
Custom Ingest Logic
If relaying messages to another agent is not sufficient, or you would like to add custom logic on ingestion of messages, the MqttIngestingPatch
can be easily overridden.
Create a new class that extends the MqttIngestingPatch
and override the ingest
method.
// CustomIngestingAgent.java
public class CustomIngestingAgent extends MqttIngestingPatch {
@Override
public void ingest(MqttReceipt unstructured) throws DeferrableException {
// Custom ingestion of an individual message
}
}
Remember to update your server.recon
file with the new agent you just created, instead of the MqttIngestingPatch
.
TLS-mandated Broker
This functionality will be available in a near-future Nstream release.
Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).