RabbitMQ Egress
Nstream provides a RabbitMQ Adapter library that simplifies the process of publishing messages to RabbitMQ queues. This guide demonstrates how to configure a message producer and publish messages from Web Agents with minimal setup.
Prerequisites
Dependencies
Gradle
implementation 'io.nstream:nstream-adapter-rabbitmq:4.15.23'
Maven
<dependency>
<groupId>io.nstream</groupId>
<artifactId>nstream-adapter-rabbitmq</artifactId>
<version>4.15.23</version>
<type>module</type> <!-- Remove or comment this line for non-modular projects -->
</dependency>
Publishing
Nstream facilitates the RabbitMQ message publishing through the RabbitMqPublishingPatch
, which may be extended to
publish messages to RabbitMQ queues. This can be achieved via configuration and direct subclassing.
Configuration
Here we give a full example of the configuration that prepares a RabbitMqPublishingPatch
.
# server.recon
provisions: {
@provision("connectionFactory") {
class: "nstream.adapter.rabbitmq.ConnectionFactoryProvision"
use: "connectionFactory.properties"
}
@provision("rabbitMqConnection") {
class: "nstream.adapter.rabbitmq.ConnectionProvision"
use: "connection.properties"
}
}
"rabbitmq-adapter": @fabric {
@plane(class: "swim.api.plane.AbstractPlane")
@node {
uri: "/producer/odd"
@agent(class: "rabbitmq.adapter.MyRabbitMqPublishingAgent")
}
@node {
uri: "/producer/even"
@agent(class: "rabbitmq.adapter.MyRabbitMqPublishingAgent")
}
}
# Configure desired web settings (e.g. port)
# ...
ConnectionFactory, Connection, Channel and Producer
- ConnectionFactory is used to create a Connection.
- A Connection creates a Channel.
- A Channel is used to declare a queue.
As a first step, for a Connection
to be instantiated, a singleton ConnectionFactoryProvision
must be configured.
The above snippet is minimal, and an exhaustive set of config options may be found here.
The RabbitMqProducer
provision/properties must be configured against the aforementioned connection in order to publish
state. Some of the key RabbitMQ Producer provisions include specifying the exchange, routing key, and whether the
message is mandatory, which is crucial for how messages are handled if they cannot be routed.
Extension
Using configuration to handle connections removes the need for most boilerplate in the implementation of the agent.
Extending the RabbitMqPublishingPatch
exposes the publish(Value value)
method which will publish the given value to
the configured RabbiMQ queue. The only two implementation decisions now are what and when to publish - we give two
examples in the following sections.
Publish On Event
The simplest method of publication is to publish any event or update received by a lane.
All we need do is call the publish(Value value)
method from the appropriate callback of the lane.
public class MyRabbitMqPublishingAgent extends RabbitMqPublishingPatch {
@SwimLane("publish")
CommandLane<Value> publish = this.<Value>commandLane()
.onCommand(this::publish);
@Override
protected void stagePublication() {
assignConnection(ProvisionLoader.<Connection>getProvision("rabbitMqConnection").value());
assignChannelAndProducer(null, () -> { });
}
}
Note: the didSet
callback on a ValueLane
, the didUpdate
callback on a MapLane
, and many others are equally valid alternatives.
Periodic
Another common approach to publication would be to publish the state of the agent periodically. In this case we must manage a timer, making use of the scheduling methods provided.
public class MyRabbitMqPublishingAgent extends RabbitMqPublishingPatch {
private TimerRef timerRef;
@SwimLane("state")
ValueLane<Value> state = this.<Value>valueLane();
private void publishState() {
publish(this.state.get());
}
@Override
protected void stagePublication() {
assignConnection(ProvisionLoader.<Connection>getProvision("rabbitMqConnection").value());
assignChannelAndProducer(null, () -> { });
this.timerRef = scheduleAtFixedRate(() -> this.timerRef, 1000, 60000, this::publishState);
}
}
Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).