Google Pub/Sub Egress

Nstream provides a Google Pub/Sub Adapter library that greatly facilitates producing messages to Google Pub/Sub topics. This guide demonstrates how to publish state within Web Agents to external Google Pub/Sub topics with minimal boilerplate.

Prerequisites

To use the Google Pub/Sub Adapter, include the following dependency in your project:

<dependency>
  <groupId>io.nstream</groupId>
  <artifactId>nstream-adapter-pubsub</artifactId>
  <version>4.13.21</version>
</dependency>

Alternatively, you can manage dependencies using your preferred build tool, such as Maven or Gradle.

Glossary

Publishing to Google Pub/Sub

Nstream provides the PubSubPublishingAgent which can be extended to publish messages to Google Pub/Sub topics. This is achieved with minimal boilerplate by utilizing provisions and configurations.

Configuration

Here we provide an example of the configuration for publishing.

# server.recon
provisions: {
  @provision("pubsub-publisher-provision") {
    class: "nstream.adapter.pubsub.PublisherProvision",
    def: {
      "projectId": "your-google-cloud-project-id",
      "topicName": "your-topic-name",
      "publisherProvisionName": "pubsub-publisher-provision"
    }
  }
}

"pubsub-adapter": @fabric {
  @plane(class: "nstream.adapter.runtime.AppPlane")
  @node {
    uri: "/dynamic/:id"
    @agent(class: "pubsub.adapter.CustomPublishingAgent") {
      pubsubEgressConf: @pubsubEgressSettings {
        publisherProvisionName: "pubsub-publisher-provision"
      }
    }
  }
}

Provision

A PublisherProvision must be configured to create a Publisher for Google Pub/Sub. The provision configuration includes necessary properties:

Egress Settings

The pubsubEgressConf configuration defines the settings for the PubSubPublishingAgent:

Publishing Messages

Extending Functionality

By extending the PubSubPublishingAgent, you can implement custom logic for publishing messages to Google Pub/Sub topics.

Here’s an example of a custom publishing agent:

public class CustomPublishingAgent extends PubSubPublishingAgent {

  @SwimLane("toPublish")
  ValueLane<Value> toPublish = this.<Value>valueLane()
      .didSet((n, o) -> {
        if (n != null && n.isDistinct() && !n.equals(o)) {
          publish(n);
        }
      });

  @Override
  protected void stagePublication() {
    loadSettings("pubsubEgressConf");
    assignPublisher(ProvisionLoader.<Publisher>getProvision(
        this.egressSettings.getPublisherProvisionName()).value());
  }
}

In this example:

Publish on Event

The simplest method of publication is to publish any event or update received by a lane. All we need to do is call the publish(Value msg) method from the appropriate callback of the lane. Here’s an example of publication on event:

public class CustomPublishingAgent extends PubSubPublishingAgent {

  @SwimLane("publish")
  CommandLane<Value> publish = this.<Value>commandLane()
      .onCommand(this::publish);

  @Override
  protected void stagePublication() {
    loadSettings("pubsubEgressConf");
    assignPublisher(ProvisionLoader.<Publisher>getProvision(
        this.egressSettings.getPublisherProvisionName()).value());
  }
}

In this example:

Periodic Publishing

Another common approach is to publish the state of the agent periodically. In this case, we manage a timer, making use of the scheduling methods provided. Here’s an example of periodic publication:

public class CustomPublishingAgent extends PubSubPublishingAgent {

  private TimerRef timerRef;

  @SwimLane("state")
  ValueLane<Value> state = this.<Value>valueLane();

  private void publishState() {
    publish(this.state.get());
  }

  @Override
  protected void stagePublication() {
    loadSettings("pubsubEgressConf");
    assignPublisher(ProvisionLoader.<Publisher>getProvision(
        this.egressSettings.getPublisherProvisionName()).value());
    this.timerRef = scheduleAtFixedRate(() -> this.timerRef, 1000, 60000, this::publishState);
  }
}

In this example:

Advanced Configuration

You can customize the Publisher settings for advanced use cases by modifying the PublisherProvision or directly configuring the Publisher in your agent. The below example demonstrates creation of a publisher client with custom batching settings to publish messages.

public class CustomPublishingAgent extends PubSubPublishingAgent {

  @SwimLane("toPublish")
  ValueLane<Value> toPublish = this.<Value>valueLane()
      .didSet((n, o) -> {
        if (n != null && n.isDistinct() && !n.equals(o)) {
          publish(n);
        }
      });

  @Override
  protected void stagePublication() {
    loadSettings("pubsubEgressConf");
    try {
      TopicName topicName = TopicName.of("your-project-id", "your-topic-name");
      Publisher publisher = Publisher.newBuilder(topicName)
          // Customize the publisher settings here
          .setBatchingSettings(
              Publisher.Builder.DEFAULT_BATCHING_SETTINGS.toBuilder()
                  .setElementCountThreshold(100L)
                  .setRequestByteThreshold(1024L)
                  .build())
          .build();
      assignPublisher(publisher);
    } catch (IOException e) {
      throw new RuntimeException("Failed to create publisher", e);
    }
  }
}

Troubleshooting Tips

Additional Resources


Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).