Pulsar Egress

Nstream provides a Pulsar Adapter library that greatly facilitates publishing to Pulsar topics. This guide demonstrates how to start a message producer and publish messages from Web Agents using minimal boilerplate.

Prerequisites

Dependencies

Gradle

implementation 'io.nstream:nstream-adapter-pulsar:4.11.19'

Maven

<dependency>
  <groupId>io.nstream</groupId>
  <artifactId>nstream-adapter-pulsar</artifactId>
  <version>4.11.19</version>
  <type>module</type>
</dependency>

Publishing

Nstream provides the PulsarPublishingPatch which may be extended to publish state to Pulsar topics. Extension is best accomplished via a mix of configuration and direct subclassing.

Configuration

Here we give a full example of the configuration that prepares a PulsarPublishingPatch.

# server.recon
provisions: {
  @provision("pulsarClient") {
    class: "nstream.adapter.pulsar.PulsarClientProvision",
    def: {
      # Mandatory
      "serviceUrl": "pulsar://localhost:6650"
      # Additional options; link below has full configuration listing:
      # https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java
      "connectionsPerBroker": 1
    }
  }
  @provision("producer") {
    class: "nstream.adapter.pulsar.PulsarProducerProvision$Bytes"
    def: {
      # Mandatory (note: not included in ProducerConfigurationData)
      "clientProvisionName": "pulsarClient",
      # Mandatory (note: part of ProducerConfigurationData)
      "topicName": "myTopic"
      # Additional options; link below has full configuration listing:
      # https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ProducerConfigurationData.java
    }
  }
}

vehicle: @fabric {
  @plane(class: "nstream.adapter.runtime.AppPlane")
  @node {
    uri: "/pulsar"
    @agent(class: "vehicle.pulsar.MyPulsarPublishingAgent")
  }
}

Client

One PulsarClient provision must be configured in order to create external connections. The possible configuration values are defined by the Pulsar’s ClientConfigurationData class.

Producer

One PulsarProducer provision must be configured against the aforementioned client in order to publish state. The possible configuration values are defined by the Pulsar’s ClientConfigurationData class, and an additional clientProvisionName property that identifies the client.

Extension

Note: unlike with the *IngestingPatch classes and many *PublishingPatch classes, there is no prop-level configuration object in PulsarPublishingPatch; users must instead invoke stagePublication() manually.

Using configuration to handle connections removes the need for most boilerplate in the implementation of the agent. Extending the PulsarPublishingPatch exposes the publish(Value value) method which will publish the given value to the configured Pulsar topic. The only two implementation decisions now are what and when to publish - we give two examples in the following sections.

Publish On Event

The simplest method of publication is to publish any event or update received by a lane. All we need do is call the publish(Value value) method from the appropriate callback of the lane.

public class MyPulsarPublishingAgent extends PulsarPublishingPatch<byte[]> {
  
  @SwimLane("publish")
  CommandLane<Value> publish = this.<Value>commandLane()
      .onCommand(this::publish);

  @Override
  protected void stagePublication() {
    assignProducer(ProvisionLoader.<Producer<byte[]>>getProvision("producer").value());
  }
  
}

Note: the didSet callback on a ValueLane, the didUpdate callback on a MapLane, and many others are equally valid alternatives.

Periodic

Another common approach to publication would be to publish the state of the agent periodically. In this case we must manage a timer, making use of the scheduling methods provided. The scheduling code is added in the callback of assignDestination to avoid attempting to publish before the connection has been created.

public class MyPulsarPublishingAgent extends PulsarPublishingPatch<byte[]> {

  private TimerRef timerRef;

  @SwimLane("state")
  ValueLane<Value> state = this.<Value>valueLane();

  private void publishState() {
    publish(this.state.get());
  }

  @Override
  protected void stagePublication() {
    assignProducer(ProvisionLoader.<Producer<byte[]>>getProvision("producer").value());
    this.timerRef = scheduleAtFixedRate(() -> this.timerRef, 1000, 60000, this::publishState);
  }
  
}

Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).