MQTT Ingress

Nstream provides an MQTT Adapter library that greatly facilitates ingestion from MQTT topics. This guide demonstrates how to consume from MQTT topics and process responses in Web Agents using minimal boilerplate.

Prerequisites

Dependencies

Gradle

implementation 'io.nstream:nstream-adapter-mqtt:4.15.23'

Maven

<dependency>
  <groupId>io.nstream</groupId>
  <artifactId>nstream-adapter-mqtt</artifactId>
  <version>4.15.23</version>
  <type>module</type> <!-- Remove or comment this line for non-modular projects -->
</dependency>

Glossary

Ideal MqttIngestingPatch Conditions

If:

Then you simply need to use the MqttIngestingPatch class, often without overriding any methods. Let’s demonstrate this by configuring an agent to receive messages from a local MQTT broker, wholly in server.recon:

# server.recon
"provisions": {
  @provision("mqtt-consumer") {
    class: "nstream.adapter.common.provision.PropertiesProvision"
    def: {
      "server.uri": "tcp://localhost:1883"
      "client.id": nstream-jet
      topic: meters
    }
  }
}

vehicle: @fabric {
  @plane(class: "nstream.adapter.runtime.AppPlane")
  # MqttIngestingAgent replacement
  @node {
    uri: "/mqtt"
    @agent(class: "nstream.adapter.mqtt.MqttIngestingPatch") {
      mqttIngressConf: @mqttIngressSettings {
        consumerPropertiesProvisionName: "mqtt-consumer"
        contentTypeOverride: "json"
        relaySchema: @command {
          # Handles JSON values of the form {"id":...,...}
          nodeUri: "/vehicle/$id"
          laneUri: "addMessage"
        }
      }
    }
  }
}

# Configure desired web settings (e.g. port)
# ...

Configuration

Consumer Properties

There are only three mandatory properties required within the Provision definition:

MqttIngressSettings

The previous configurations can be seen as “connection-oriented”. This part is “ingestion-oriented”, responsible for dictating what to do given a message

Common Variations

Consumer Authentication

Simply provide the optional properties user.name and password within your Provision definition.

Custom Ingest Logic

If relaying messages to another agent is not sufficient, or you would like to add custom logic on ingestion of messages, the MqttIngestingPatch can be easily overridden. Create a new class that extends the MqttIngestingPatch and override the ingest method.

// CustomIngestingAgent.java
public class CustomIngestingAgent extends MqttIngestingPatch {

  @Override
  public void ingest(MqttReceipt unstructured) throws DeferrableException {
    // Custom ingestion of an individual message
  }

}

Remember to update your server.recon file with the new agent you just created, instead of the MqttIngestingPatch.

TLS-mandated Broker

This functionality will be available in a near-future Nstream release.


Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).