Kafka Egress

Nstream provides a Kafka Adapter library that greatly facilitates producing messages to Kafka topics. This guide demonstrates how to publish state within Web Agents to external Kafka topics with minimal boilerplate.

The illustrative example here will share the same data format as that outlined in the Kafka Ingress guide and the Backend Tutorial. The only difference here is that we will demonstrate producing to brokers instead of consuming from them.

Instead of being collected in a standalone repository for this article’s code snippets like with our other how-to guides, this will be more interesting – the code builds a data simulator for the Backend Tutorial. The relevant Java files will be in the nstream.starter.sim package, and the relevant server configuration is src/main/resources/sim.recon.

Prerequisites

Dependencies

Gradle

implementation 'io.nstream:nstream-adapter-kafka:4.12.20'

Maven

<dependency>
  <groupId>io.nstream</groupId>
  <artifactId>nstream-adapter-kafka</artifactId>
  <version>4.12.20</version>
  <type>module</type> <!-- Remove or comment this line for non-modular projects -->
</dependency>

Glossary

KafkaProducer Delegation

Under the hood, the Kafka Adapter delegates egress operations to KafkaProducer instances. The primary value of this adapter library comes from abstracting over the Apache-specific behaviors and runtime management into patterns that play nicely with Swim.

Assignment

Simply invoke assignProducer(Producer) to assign a KafkaProducer to a KafkaPublishingAgent.

Instantiating the KafkaProducer is accomplished easy enough in Java code, but we’ll demonstrate a no-code option that utilizes KafkaProducerProvision:

# sim.recon
provisions: {
  @provision("vehicles-kafka-producer") {
    class: "nstream.adapter.kafka.KafkaProducerProvision"
    def: {
      "key.serializer": "org.apache.kafka.common.serialization.IntegerSerializer"
      "value.serializer": "org.apache.kafka.common.serialization.StringSerializer"
      "bootstrap.servers": "localhost:29092"
      "request.timeout.ms": 5000,
      "max.block.ms": 5000
    }
  }
}

The assignProducer call would then look like:

// VehiclePublishingAgent.java
import org.apache.kafka.clients.producer.Producer;
import nstream.adapter.common.provision.ProvisionLoader;
import nstream.adapter.kafka.KafkaPublishingAgent;

public class VehiclePublishingAgent extends KafkaPublishingAgent<Value, Integer, String> {

  @Override
  public void didStart() {
    super.didStart();
    assignProducer(ProvisionLoader.<Producer<Integer, String>>
        getProvision("vehicles-kafka-producer").value());
  }

}

Note: it is recommended to re-use KafkaProducer instances whenever possible (usually equivalent to whether bootstrap servers are the same). KafkaProducer instances are thread-safe.

Producing

Once you’ve created a Kafka ProducerRecord, you may simply invoke publishAsync() against it to send it to your configured Kafka topic without blocking any Web Agent threads.

Optional: createPublishable()

For a given KafkaPublishingAgent instance, the transformation between the publish-desired Web Agent state and ProducerRecord is often highly predictable. The first type parameter on KafkaPublishingAgent represents the type of said Web Agent state (and the input type of the createPublishable convenience method).

You may bypass this and invoke publishAsync() directly, but a common pattern is to publish the state of a ValueLane every time it receives an update; using createPublishable leads to clean code in this situation:

// import ...
import org.apache.kafka.clients.producer.ProducerRecord;
import swim.api.SwimLane;
import swim.api.lane.ValueLane;
import swim.json.Json;
import swim.structure.Value;

public class VehiclePublishingAgent extends KafkaPublishingAgent<Value, Integer, String> {

  @SwimLane("toPublish")
  ValueLane<Value> toPublish = this.<Value>valueLane()
      .didSet((n, o) -> {
        if (n != null && n.isDistinct() && !n.equals(o)) {
          final ProducerRecord<Integer, String> result = createPublishable(n);
          // Consider logging before publishing if things don't seem to be working
          publishAsync(result);
        }
      });

  @Override
  protected ProducerRecord<Integer, String> createPublishable(Value state) {
    return new ProducerRecord<>("vehicles-integer-json",
        getProp("id").intValue(),
        Json.toString(state));
  }

  // ...
}

Trying it Out

That’s really all you need to know! The remainder of the simulation logic is not at all relevant to Kafka Egress behavior, but we’ll explain it quickly for completeness:

Notice that a single KafkaProducer instance is used concurrently by multiple VehiclePublishingAgents; the KafkaProducer documentation declares this a safe operation.


Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).