Google Pub/Sub Egress
Nstream provides a Google Pub/Sub Adapter library that greatly facilitates producing messages to Google Pub/Sub topics. This guide demonstrates how to publish state within Web Agents to external Google Pub/Sub topics with minimal boilerplate.
Prerequisites
To use the Google Pub/Sub Adapter, include the following dependency in your project:
<dependency>
<groupId>io.nstream</groupId>
<artifactId>nstream-adapter-pubsub</artifactId>
<version>4.13.21</version>
</dependency>
Alternatively, you can manage dependencies using your preferred build tool, such as Maven
or Gradle
.
Glossary
- Publishing: The act of writing state found within some Web Agent to an external sink.
- PubSubPublishingAgent: An abstract Web Agent class with built-in methods to facilitate producing messages to Google Pub/Sub topics.
-
PubSubEgressSettings: A Plain Old Java Object (POJO) that holds configuration details for a
PubSubPublishingAgent
, facilitating customization of message publishing. - PublisherProvision: A provision class that handles creating and managing a Publisher instance for Google Pub/Sub.
Publishing to Google Pub/Sub
Nstream provides the PubSubPublishingAgent
which can be extended to publish messages to Google Pub/Sub topics. This is achieved with minimal boilerplate by utilizing provisions and configurations.
Configuration
Here we provide an example of the configuration for publishing.
# server.recon
provisions: {
@provision("pubsub-publisher-provision") {
class: "nstream.adapter.pubsub.PublisherProvision",
def: {
"projectId": "your-google-cloud-project-id",
"topicName": "your-topic-name",
"publisherProvisionName": "pubsub-publisher-provision"
}
}
}
"pubsub-adapter": @fabric {
@plane(class: "nstream.adapter.runtime.AppPlane")
@node {
uri: "/dynamic/:id"
@agent(class: "pubsub.adapter.CustomPublishingAgent") {
pubsubEgressConf: @pubsubEgressSettings {
publisherProvisionName: "pubsub-publisher-provision"
}
}
}
}
Provision
A PublisherProvision
must be configured to create a Publisher for Google Pub/Sub. The provision configuration includes necessary properties:
-
projectId
: Your Google Cloud project ID. -
topicName
: The name of the Google Pub/Sub topic to publish messages to. -
publisherProvisionName
: A unique name for the publisher provision.
Egress Settings
The pubsubEgressConf
configuration defines the settings for the PubSubPublishingAgent
:
-
publisherProvisionName
: Links to the publisher provision defined inserver.recon
.
Publishing Messages
- To publish messages, simply call the
publish(Value msg)
method with the message you want to send. - The
PubSubPublishingAgent
handles the conversion of theValue
into aPubsubMessage
and uses the Publisher to send it to the configured topic.
Extending Functionality
By extending the PubSubPublishingAgent
, you can implement custom logic for publishing messages to Google Pub/Sub topics.
Here’s an example of a custom publishing agent:
public class CustomPublishingAgent extends PubSubPublishingAgent {
@SwimLane("toPublish")
ValueLane<Value> toPublish = this.<Value>valueLane()
.didSet((n, o) -> {
if (n != null && n.isDistinct() && !n.equals(o)) {
publish(n);
}
});
@Override
protected void stagePublication() {
loadSettings("pubsubEgressConf");
assignPublisher(ProvisionLoader.<Publisher>getProvision(
this.egressSettings.getPublisherProvisionName()).value());
}
}
In this example:
- The
CustomPublishingAgent
extendsPubSubPublishingAgent
. - A
ValueLane
namedtoPublish
is created. When this lane is updated, thedidSet
callback is triggered, which publishes the new value to the configured Pub/Sub topic. - The
stagePublication()
method loads the egress settings and assigns a Publisher using the provision.
Publish on Event
The simplest method of publication is to publish any event or update received by a lane. All we need to do is call the publish(Value msg)
method from the appropriate callback of the lane.
Here’s an example of publication on event:
public class CustomPublishingAgent extends PubSubPublishingAgent {
@SwimLane("publish")
CommandLane<Value> publish = this.<Value>commandLane()
.onCommand(this::publish);
@Override
protected void stagePublication() {
loadSettings("pubsubEgressConf");
assignPublisher(ProvisionLoader.<Publisher>getProvision(
this.egressSettings.getPublisherProvisionName()).value());
}
}
In this example:
-
CommandLane
publish: Receives commands (messages) and publishes them directly to the Pub/Sub topic. -
stagePublication()
method: Loads the egress settings and assigns the Publisher from the provision.
Periodic Publishing
Another common approach is to publish the state of the agent periodically. In this case, we manage a timer, making use of the scheduling methods provided. Here’s an example of periodic publication:
public class CustomPublishingAgent extends PubSubPublishingAgent {
private TimerRef timerRef;
@SwimLane("state")
ValueLane<Value> state = this.<Value>valueLane();
private void publishState() {
publish(this.state.get());
}
@Override
protected void stagePublication() {
loadSettings("pubsubEgressConf");
assignPublisher(ProvisionLoader.<Publisher>getProvision(
this.egressSettings.getPublisherProvisionName()).value());
this.timerRef = scheduleAtFixedRate(() -> this.timerRef, 1000, 60000, this::publishState);
}
}
In this example:
-
ValueLane
state: Holds the current state of the agent. -
publishState()
Method: Publishes the current state to the Pub/Sub topic. -
TimerRef timerRef
: SchedulespublishState()
to run periodically (every 60 seconds after an initial delay of 1 second). -
stagePublication()
Method: Loads the egress settings, assigns the Publisher, and sets up the periodic publishing.
Advanced Configuration
You can customize the Publisher settings for advanced use cases by modifying the PublisherProvision or directly configuring the Publisher in your agent. The below example demonstrates creation of a publisher client with custom batching settings to publish messages.
public class CustomPublishingAgent extends PubSubPublishingAgent {
@SwimLane("toPublish")
ValueLane<Value> toPublish = this.<Value>valueLane()
.didSet((n, o) -> {
if (n != null && n.isDistinct() && !n.equals(o)) {
publish(n);
}
});
@Override
protected void stagePublication() {
loadSettings("pubsubEgressConf");
try {
TopicName topicName = TopicName.of("your-project-id", "your-topic-name");
Publisher publisher = Publisher.newBuilder(topicName)
// Customize the publisher settings here
.setBatchingSettings(
Publisher.Builder.DEFAULT_BATCHING_SETTINGS.toBuilder()
.setElementCountThreshold(100L)
.setRequestByteThreshold(1024L)
.build())
.build();
assignPublisher(publisher);
} catch (IOException e) {
throw new RuntimeException("Failed to create publisher", e);
}
}
}
Troubleshooting Tips
- Connection Errors: Ensure that your Google Cloud credentials are correctly configured and that your application has the necessary permissions to publish to the Pub/Sub topic.
-
Message Publishing Issues: Check that your
PublisherProvision
is correctly set up and that the topic exists. - Latency Issues: Adjust batching and flow control settings in the Publisher to optimize for latency or throughput based on your application’s needs.
- Logging: Enable logging in your application to monitor the publishing process and catch any errors.
Additional Resources
Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).