Confluent Ingress

Confluent Logo

Nstream provides a Confluent Adapter library that simplifies connectivity to Confluent Cloud This guide demonstrates how to consume from Confluent topics and process responses in Web Agents using minimal boilerplate.

Prerequisites

Dependencies

Gradle

implementation 'io.nstream:nstream-adapter-confluent:4.14.22'

Maven

<dependency>
  <groupId>io.nstream</groupId>
  <artifactId>nstream-adapter-confluent</artifactId>
  <version>4.14.22</version>
  <type>module</type> <!-- Remove or comment this line for non-modular projects -->
</dependency>

Glossary

Ideal ConfluentIngestingPatch Conditions

If:

Then you simply need to use the ConfluentIngestingPatch class, often without overriding any methods. Let’s demonstrate this by recreating the ConfluentConsumerAgent example from the open source documentation wholly in server.recon:

The Confluent Adapter library is designed to simplify the connectivity to Confluent Cloud. To connect you need your Confluent Cloud bootstrap.servers, api.key and api-secret. The underlying adapter is very similar to the Kafka Adapter and will have identical behaviors because they share common code. The Confluent Adapter library will automatically add additional properties that are required for the Kafka client will need to connect to Confluent Cloud.

Protect your secrets

Make sure that you protect your secrets.
We suggest storing your credentials in a Kubernetes Secret

# confluent.properties
# required properties
ccloud.bootstrap.servers=<ccloud bootstrap.servers>
ccloud.api.key=<ccloud api key>
ccloud.api.secret=<ccloud api secret>

# the above settings will be used to generate the configuration needed 
# to start a Java Kafka Client.

# Any setting with kafka. will be overridden before the underlying Java 
# Kafka client is created.
kafka.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
kafka.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
# server.recon
provisions: {
  @provision("consumer-properties") {
    class: "nstream.adapter.common.provision.PropertiesProvision",
    # Confluent consumer properties go here
    config: "/secrets/confluent.properties"
  }
}

vehicle: @fabric {
  @plane(class: "nstream.adapter.runtime.AppPlane")
  # ConfluentConsumingAgent replacement
  @node {
    uri: "/confluent"
    @agent(class: "nstream.adapter.confluent.ConfluentIngestingPatch") {
      # ConfluentIngressSettings values go here
      confluentIngressConf: @kafkaIngressSettings {
        consumerPropertiesProvisionName: "consumer-properties"
        name: test-consumer
        topics: {"schema-topic"}
        valueContentTypeOverride: "json"
        relaySchema: @command {
          nodeUri: "/vehicle/$key" # or "/vehicle/$value.id"
          laneUri: "addMessage"
          value: $value
        }
      }
    }
  }
  # VehicleAgent config can be copied directly, but you'll of course
  # need to implement VehicleAgent in Java.
}

# Configure desired web settings (e.g. port)

Parallelized Consumption from Multi-Partitioned Topics

An n-partitioned Confluent topic can be consumed by a single KafkaConsumer, with no changes from what we showed above. However, Kafka libraries also support parallel (threads or processes) consumption of partitioned topics as a means to improve throughput.

To do this in server.recon, simply declare another ConfluentIngestingPatch that refers to the same PropertiesProvision in its KafkaIngressSettings#consumerPropertiesProvisionName. For example, if the of the above topic had two partitions, then the optimal server.recon would look like:

# server.recon
provisions: {
  @provision("consumer-properties") {
    class: "nstream.adapter.common.provision.PropertiesProvision",
    ... # copied directly from above
  }
}

vehicle: @fabric {
  @plane(class: "nstream.adapter.runtime.AppPlane")
  @node {
    uri: "/confluent/0"
    @agent(class: "nstream.adapter.confluent.ConfluentIngestingPatch") {
      confluentIngressConf: @kafkaIngressSettings {
        consumerPropertiesProvisionName: "consumer-properties"
        ... # copied directly from above
      }
    }
  }
  @node {
    uri: "/confluent/1"
    @agent(class: "nstream.adapter.confluent.ConfluentIngestingPatch") {
      confluentIngressConf: @kafkaIngressSettings {
        consumerPropertiesProvisionName: "consumer-properties"
        ... # copied directly from above
      }
    }
  }
  # ...
}

# ...

Manage Consumer Group IDs

Ensure that your group.id parameter within the Provision definition is unique to your Nstream application. Otherwise, the broker will decide which consumers will receive which messages, i.e. the Nstream process(es) may not receive all (or any) messages.

Common Variations

Receiving Complex Data Types

Visit the following links for more information about:

Overriding Broker-Assigned Partitions

Some use cases require complete control over which KafkaConsumers are assigned which partitions. In Kafka libraries, this is done using KafkaConsumer#assign(topic, partition) instead of KafkaConsumer#subscribe with a settings-configured group.id:

# confluent.properties
... # mostly the same as before, but don't define a group.id
# server.recon
provisions: {
  @provision("consumer-properties") {
    class: "nstream.adapter.common.provision.PropertiesProvision",
    config: ... # same as before
  }
}

vehicle: @fabric {
  @plane(class: "nstream.adapter.runtime.AppPlane")
  # KafkaConsumingAgent replacement
  @node {
    uri: "/confluent/0"
    @agent(class: "nstream.adapter.kafka.ConfluentIngestingPatch") {
      confluentIngressConf: @kafkaIngressSettings {
        # Mostly the same as before, but replace topics with an
        #   assignmentSchema construct
        consumerPropertiesProvisionName: "consumer-properties"
        valueContentTypeOverride: "json"
        assignmentSchema: @assignment {
          topic: "schema-topic",
          partition: 0
        }
        relaySchema: @command {
          nodeUri: "/vehicle/$key" # or "/vehicle/$value.id"
          laneUri: "addMessage"
          value: $value
        }
      }
    }
  }
  @node {
    uri: "/confluent/1"
    @agent(class: "nstream.adapter.kafka.ConfluentIngestingPatch") {
      confluentIngressConf: @kafkaIngressSettings {
        # Copy everything from previous agent, except partition
        # ...
        assignmentSchema: @assignment {
          topic: "schema-topic",
          partition: 1
        }
        # ...
      }
    }
  }
  # ...
}

# ...

Manual Consumption Triggering

It is often desired to begin Confluent consumption via an explicit command message instead of on Web Agent startup:

// ConfluentConsumerAgent.java
// import...
import swim.api.lane.CommandLane;

public class ConfluentConsumerAgent extends ConfluentIngestingPatch {

  @SwimLane("triggerReception")
  CommandLane<String> triggerReception = this.<String>commandLane()
      .onCommand(s -> {
        if ("start".equals(s)) {
          stageReception();
        }
      });

  @Override
  public void didStart() {
    System.out.println(nodeUri() + ": didStart");
    // No call to stageReception() unlike in superclass
  }

}

Backoff Strategy

While the Confluent-recommended pattern is to continuously invoke poll, you may wish to be less aggressive. This is a perfect use case for the inherited NstreamAgent#scheduleWithInformedBackoff method. Below we utilize this method to trigger backoff only when a returned result is empty (i.e. resume the continuous poll as long as results are nonempty):

// ConfluentConsumerAgent.java

public class ConfluentConsumerAgent extends ConfluentIngestingPatch {

  @Override
  protected void stageReception() {
    loadSettings("confluentIngressConf");
    this.confluentConsumer = ConfluentAdapterUtils.createConsumer(this.ingressSettings);
    this.confluentConsumer.subscribe(this.ingressSettings.topics());
    this.keyMolder = ContentMolder.cast(this.ingressSettings.keyMolder());
    this.valueMolder = ContentMolder.cast(this.ingressSettings.valueMolder());
    this.pollTimer = scheduleWithInformedBackoff(this::pollTimer,
        this.ingressSettings.firstFetchDelayMillis(),
        // An inherited method with a reasonable backoff implementation: exponential
        // for small values, then linear, to a max of 15 seconds (overridable)
        this::nextBackoff,
        i -> !i.isEmpty(),
        500L,
        () -> this.poll(this.ingressSettings.pollTimeoutMillis()),
        this::ingest);
  }

}

Manual Commits

The consumption strategy outlined here dedicates a thread to consuming messages, but processes those messages asynchronously (by delegating to parallel-running Web Agents). While this provides extremely high throughput, combining this with automatic commits may result in a situation where processing a message throws an exception after that message’s offset has been committed by the consumer. This is potentially problematic when at-least once delivery of every message is required.

There is no catch-all solution to problems of this (rather advanced) nature. At one extreme, you may avoid this headache altogether and process everything within the consumer thread, though this introduces significant loss of throughput. A more common pattern is to commit in batches – every time a VehicleAgent (or equivalent) successfully or unsuccessfully processes a message, it lets the ConfluentConsumerAgent know (most intuitively via a command() to a new CommandLane), and the ConfluentConsumerAgent periodically issues a commit() based on the information at hand.

Future nstream-toolkit releases will introduce some convenient abstractions over the most common such patterns seen in practice, so stay tuned!


Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).