RabbitMQ Ingress

Nstream provides a RabbitMQ Adapter library that greatly facilitates ingestion from RabbitMQ queues. This guide demonstrates how to consume from RabbitMQ queues and process responses in Web Agents using minimal boilerplate.

Prerequisites

Dependencies

Gradle

implementation 'io.nstream:nstream-adapter-rabbitmq:4.11.19'

Maven

<dependency>
  <groupId>io.nstream</groupId>
  <artifactId>nstream-adapter-rabbitmq</artifactId>
  <version>4.11.19</version>
  <type>module</type>
</dependency>

Glossary

Ideal RabbitMqIngestingPatch Conditions

If:

Then you simply need to use the RabbitMqIngestingPatch class, often without overriding any methods.

Let’s demonstrate this by configuring an agent to receive messages from a local RabbitMq instance, managed through .properties files and a server.recon file which ties together the provisions and settings required to operate the adapter.

# connectionFactory.properties
# Configures the RabbitMQ connection factory settings.
rabbitmq.host=broker
# connection.properties
# Specifies the provision to use for creating connections.
connectionFactoryProvisionName=connectionFactory
# consumerProps.properties
# Defines properties for the consumer, such as the queue name.
queue=validationQueue
# server.recon
provisions: {
  @provision("connectionFactory") {
    class: "nstream.adapter.rabbitmq.ConnectionFactoryProvision"
    use: "connectionFactory.properties"
  }
  @provision("rabbitMqConnection") {
    class: "nstream.adapter.rabbitmq.ConnectionProvision"
    use: "connection.properties"
  }
  @provision("consumerProperties") {
    class: "nstream.adapter.common.provision.PropertiesProvision"
    use: "consumerProps.properties"
  }
}

"rabbitmq-adapter": @fabric {
  @plane(class: "swim.api.plane.AbstractPlane")
  @node {
    uri: "/bridge/foo"
    @agent(class: "rabbitmq.adapter.RabbitMqIngestingPatch") {
      rabbitMqIngressConf: @rabbitMqIngressSettings {
        contentTypeOverride: json
      }
    }
  }
}

# Configure desired web settings (e.g. port)
# ...

A Provision is a resource that is utilized by, but may need to be loaded independently of, a Swim server. A Provision must be fully configurable by some .properties file.

A ProvisionRegistry provides by-name lookup of Provisions to a Swim server. Load each ProvisionRegistry before its corresponding server starts.

# provisions.recon
provisions: {
  @provision("connectionFactory") {
    class: "nstream.adapter.rabbitmq.ConnectionFactoryProvision"
    config: "connectionFactory.properties"
  }
  @provision("rabbitMqConnection") {
    class: "nstream.adapter.rabbitmq.ConnectionProvision"
    config: "connection.properties"
  }
  @provision("consumerProperties") {
    class: "nstream.adapter.common.provision.PropertiesProvision"
    config: "consumerProps.properties"
  }
}

Configuration

ConnectionFactory, Connection, Channel and Consumer

As a first step, for a Connection to be instantiated, a singleton ConnectionFactoryProvision must be configured. The above snippet is minimal, and an exhaustive set of config options may be found here.

The consumer configuration is handled the same way.

RabbitMqIngressSettings

The previous configurations can be seen as “connection-oriented”. This part is “ingestion-oriented”, responsible for both:

Common Variations

Custom Ingest Logic

If relaying messages to another agent is not sufficient, or you would like to add custom logic on ingestion of messages, the RabbitMqIngestingPatch can be easily overridden. Create a new class that extends the RabbitMqIngestingPatch and override the ingest method.

// CustomIngestingAgent.java
public class CustomIngestingAgent extends RabbitMqIngestingPatch {


  @Override
  protected void ingest(byte[] unstructured) throws DeferrableException {
    // Custom ingestion of an individual message
  }

}

Remember to update your server.recon file with the new agent you just created, instead of the RabbitMqIngestingPatch.

Manual Acknowledgements

By default, messages are acknowledged as soon as they are consumed, without waiting upon the results of any processing by downstream Web Agents. Future nstream-toolkit releases will introduce some convenient abstractions over the most common patterns that require finer control, so stay tuned! In the meantime, please feel free to share any of your ideas.


Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).