RabbitMQ Ingress
Nstream provides a RabbitMQ Adapter library that greatly facilitates ingestion from RabbitMQ queues. This guide demonstrates how to consume from RabbitMQ queues and process responses in Web Agents using minimal boilerplate.
Prerequisites
Dependencies
Gradle
implementation 'io.nstream:nstream-adapter-rabbitmq:4.13.21'
Maven
<dependency>
<groupId>io.nstream</groupId>
<artifactId>nstream-adapter-rabbitmq</artifactId>
<version>4.13.21</version>
<type>module</type> <!-- Remove or comment this line for non-modular projects -->
</dependency>
Glossary
- ingestion: The act of converting an external response into messages utilized by Web Agents
-
RabbitMqIngestingAgent:
An abstract Web Agent class with built-in methods relevant to aRabbitMqConsumer
poll cycle -
RabbitMqIngressSettings:
A plain old Java object (POJO) that configures a singleRabbitMqIngestingAgent
-
RabbitMqIngestingPatch:
A concrete (but extendable)RabbitMqIngestingAgent
subclass that collects RabbitMq message body byte array intoswim.structure.Values
RabbitMqIngestingPatch
Conditions
Ideal If:
- A message may be acknowledged as soon as it is consumed.
- Message bodies are directly translated into Values during ingestion.
Then you simply need to use the RabbitMqIngestingPatch
class, often without overriding any methods.
Let’s demonstrate this by configuring an agent to receive messages from a local RabbitMq instance, managed through
.properties
files and a server.recon
file which ties together the provisions
and settings required to operate the
adapter.
# connectionFactory.properties
# Configures the RabbitMQ connection factory settings.
rabbitmq.host=broker
# connection.properties
# Specifies the provision to use for creating connections.
connectionFactoryProvisionName=connectionFactory
# consumerProps.properties
# Defines properties for the consumer, such as the queue name.
queue=validationQueue
# server.recon
provisions: {
@provision("connectionFactory") {
class: "nstream.adapter.rabbitmq.ConnectionFactoryProvision"
use: "connectionFactory.properties"
}
@provision("rabbitMqConnection") {
class: "nstream.adapter.rabbitmq.ConnectionProvision"
use: "connection.properties"
}
@provision("consumerProperties") {
class: "nstream.adapter.common.provision.PropertiesProvision"
use: "consumerProps.properties"
}
}
"rabbitmq-adapter": @fabric {
@plane(class: "swim.api.plane.AbstractPlane")
@node {
uri: "/bridge/foo"
@agent(class: "rabbitmq.adapter.RabbitMqIngestingPatch") {
rabbitMqIngressConf: @rabbitMqIngressSettings {
contentTypeOverride: json
}
}
}
}
# Configure desired web settings (e.g. port)
# ...
A Provision
is a resource that is utilized by, but may need to be loaded independently of, a Swim server. A Provision
must be fully configurable by some .properties
file.
A ProvisionRegistry
provides by-name lookup of Provisions
to a Swim server. Load each ProvisionRegistry
before its
corresponding server starts.
# provisions.recon
provisions: {
@provision("connectionFactory") {
class: "nstream.adapter.rabbitmq.ConnectionFactoryProvision"
config: "connectionFactory.properties"
}
@provision("rabbitMqConnection") {
class: "nstream.adapter.rabbitmq.ConnectionProvision"
config: "connection.properties"
}
@provision("consumerProperties") {
class: "nstream.adapter.common.provision.PropertiesProvision"
config: "consumerProps.properties"
}
}
Configuration
ConnectionFactory, Connection, Channel and Consumer
- ConnectionFactory is used to create a Connection.
- A Connection creates a Channel.
- A Channel is used to declare a queue and set up a Consumer.
- A Consumer handles message delivery, acknowledging messages as they are received.
As a first step, for a Connection
to be instantiated, a singleton ConnectionFactoryProvision
must be configured.
The above snippet is minimal, and an exhaustive set of config options may be found here.
The consumer configuration is handled the same way.
RabbitMqIngressSettings
The previous configurations can be seen as “connection-oriented”. This part is “ingestion-oriented”, responsible for both:
- Linking the RabbitMq client connection and the RabbitMq consumer together
- Dictating what to do when a message is set up for ingestion
Common Variations
Custom Ingest Logic
If relaying messages to another agent is not sufficient, or you would like to add custom logic on ingestion of messages,
the RabbitMqIngestingPatch
can be easily overridden. Create a new class that extends the RabbitMqIngestingPatch
and
override the ingest
method.
// CustomIngestingAgent.java
public class CustomIngestingAgent extends RabbitMqIngestingPatch {
@Override
protected void ingest(byte[] unstructured) throws DeferrableException {
// Custom ingestion of an individual message
}
}
Remember to update your server.recon
file with the new agent you just created, instead of the RabbitMqIngestingPatch
.
Manual Acknowledgements
By default, messages are acknowledged as soon as they are consumed, without waiting upon the results of any processing
by downstream Web Agents. Future nstream-toolkit
releases will introduce some convenient abstractions over the most
common patterns that require finer control, so stay tuned! In the meantime, please feel free to share any of your ideas.
Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).