Confluent Egress

Confluent Logo

Nstream provides a Confluent Adapter library that greatly facilitates producing messages to Kafka topics hosted in Confluent Cloud. This guide demonstrates how to publish state within Web Agents to external such topics with minimal boilerplate.

The illustrative example here recreates the Kafka Egress guide, but it swaps a dev topic with one hosted in Confluent Cloud.

Prerequisites

Dependencies

Gradle

implementation 'io.nstream:nstream-adapter-confluent:4.11.19'

Maven

<dependency>
  <groupId>io.nstream</groupId>
  <artifactId>nstream-adapter-confluent</artifactId>
  <version>4.11.19</version>
  <type>module</type>
</dependency>

Publishing

Nstream provides the ConfluentPublishingPatch which may be extended to publish state to Kafka topics. Extension is best accomplished via a mix of configuration and direct subclassing.

Configuration

Here we give a full example of the configuration required to prepare a ConfluentPublishingPatch. Note that the SchemaVehiclePublishingAgent will be defined in Java code to subclass ConfluentPublishingPatch.

# sim.recon
provisions: {
  @provision("vehicles-kafka-producer") {
    class: "nstream.adapter.confluent.KafkaProducerProvision"
    def: {
      # Mandatory
      "key.serializer": "org.apache.kafka.common.serialization.IntegerSerializer"
      "value.serializer": "org.apache.kafka.common.serialization.StringSerializer"
      "bootstrap.servers": "localhost:29092"
      "ccloud.api.key": "..."
      "ccloud.api.secret": "..."
      "ccloud.schema.registry.key": "..."
      "ccloud.schema.registry.secret": "..."
      # Additional options; link below has full configuration listing:
      # https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html
      "request.timeout.ms": 5000,
      "max.block.ms": 5000
    }
  }
}

"sim": @fabric {
  @plane(class: "nstream.adapter.runtime.AppPlane")
  @node {
    pattern: "/vehicle/:id"
    @agent(class: "nstream.starter.sim.SchemaVehiclePublishingAgent")
  }
}

Producer

A Kafka Producer provision must be configured in order to publish messages. The possible configuration values are available here.

Note: in ths example, a single Producer provision is shared among multiple /vehicle/:id-corresponding Web Agents; this is the recommended pattern whenever the provision is thread-safe.

Extension

Using configuration to handle connections removes the need for most boilerplate in the implementation of the agent. Extending the ConfluentPublishingPatch exposes the publish(Value value) method which will publish the given value to the configured Pulsar topic. The only two implementation decisions now are what and when to publish - we give two examples in the following sections.

Note: unlike with the *IngestingPatch classes and many *PublishingPatch classes, there is no prop-level configuration object in ConfluentPublishingPatch; users must instead invoke stagePublication() manually.

Publish On Event

The simplest method of publication is to publish any event or update received by a lane. All we need do is call the publish(Value value) method from the appropriate callback of the lane.

public class SchemaVehiclePublishingAgent extends ConfluentPublishingPatch<Integer, String> {

  @SwimLane("publish")
  CommandLane<Value> publish = this.<Value>commandLane()
      .onCommand(this::publish);

  @Override
  protected void stagePublication() {
    assignProducer(ProvisionLoader.<Producer<Integer, String>>getProvision("vehicles-kafka-producer").value());
  }

}

Note: the didSet callback on a ValueLane, the didUpdate callback on a MapLane, and many others are equally valid alternatives.

Periodic

Another common approach to publication would be to publish the state of the agent periodically. In this case we must manage a timer, making use of the scheduling methods provided.

public class SchemaVehiclePublishingAgent<K, V> extends ConfluentPublishingPatch<Integer, String> {

  private TimerRef timerRef;

  @SwimLane("state")
  ValueLane<Value> state = this.<Value>valueLane();

  private void publishState() {
    publish(this.state.get());
  }

  @Override
  protected void stagePublication() {
    assignProducer(ProvisionLoader.<Producer<Integer, String>>getProvision("vehicles-kafka-producer").value());
    this.timerRef = scheduleAtFixedRate(() -> this.timerRef, 1000, 60000, this::publishState);
  }

}

Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).