Kafka Egress
Nstream provides a Kafka Adapter library that greatly facilitates producing messages to Kafka topics. This guide demonstrates how to publish state within Web Agents to external Kafka topics with minimal boilerplate.
The illustrative example here will share the same data format as that outlined in the Kafka Ingress guide and the Backend Tutorial. The only difference here is that we will demonstrate producing to brokers instead of consuming from them.
Instead of being collected in a standalone repository for this article’s code snippets like with our other how-to guides, this will be more interesting – the code builds a data simulator for the Backend Tutorial.
The relevant Java files will be in the nstream.starter.sim
package, and the relevant server configuration is src/main/resources/sim.recon
.
Prerequisites
Dependencies
Gradle
implementation 'io.nstream:nstream-adapter-kafka:4.12.20'
Maven
<dependency>
<groupId>io.nstream</groupId>
<artifactId>nstream-adapter-kafka</artifactId>
<version>4.12.20</version>
<type>module</type> <!-- Remove or comment this line for non-modular projects -->
</dependency>
Glossary
- publishing: The act of writing state found within some Web Agent to an external sink
-
KafkaPublishingAgent:
An abstract Web Agent class with built-in methods to facilitate producing Kafka messages to a prescribed broker (i.e. one with statically known bootstrap servers).
KafkaProducer
Delegation
Under the hood, the Kafka Adapter delegates egress operations to KafkaProducer
instances.
The primary value of this adapter library comes from abstracting over the Apache-specific behaviors and runtime management into patterns that play nicely with Swim.
Assignment
Simply invoke assignProducer(Producer)
to assign a KafkaProducer
to a KafkaPublishingAgent
.
Instantiating the KafkaProducer
is accomplished easy enough in Java code, but we’ll demonstrate a no-code option that utilizes KafkaProducerProvision
:
# sim.recon
provisions: {
@provision("vehicles-kafka-producer") {
class: "nstream.adapter.kafka.KafkaProducerProvision"
def: {
"key.serializer": "org.apache.kafka.common.serialization.IntegerSerializer"
"value.serializer": "org.apache.kafka.common.serialization.StringSerializer"
"bootstrap.servers": "localhost:29092"
"request.timeout.ms": 5000,
"max.block.ms": 5000
}
}
}
The assignProducer
call would then look like:
// VehiclePublishingAgent.java
import org.apache.kafka.clients.producer.Producer;
import nstream.adapter.common.provision.ProvisionLoader;
import nstream.adapter.kafka.KafkaPublishingAgent;
public class VehiclePublishingAgent extends KafkaPublishingAgent<Value, Integer, String> {
@Override
public void didStart() {
super.didStart();
assignProducer(ProvisionLoader.<Producer<Integer, String>>
getProvision("vehicles-kafka-producer").value());
}
}
Note: it is recommended to re-use KafkaProducer
instances whenever possible (usually equivalent to whether bootstrap servers are the same).
KafkaProducer
instances are thread-safe.
Producing
Once you’ve created a Kafka ProducerRecord
, you may simply invoke publishAsync()
against it to send it to your configured Kafka topic without blocking any Web Agent threads.
Optional: createPublishable()
For a given KafkaPublishingAgent
instance, the transformation between the publish-desired Web Agent state and ProducerRecord
is often highly predictable.
The first type parameter on KafkaPublishingAgent
represents the type of said Web Agent state (and the input type of the createPublishable
convenience method).
You may bypass this and invoke publishAsync()
directly, but a common pattern is to publish the state of a ValueLane
every time it receives an update; using createPublishable
leads to clean code in this situation:
// import ...
import org.apache.kafka.clients.producer.ProducerRecord;
import swim.api.SwimLane;
import swim.api.lane.ValueLane;
import swim.json.Json;
import swim.structure.Value;
public class VehiclePublishingAgent extends KafkaPublishingAgent<Value, Integer, String> {
@SwimLane("toPublish")
ValueLane<Value> toPublish = this.<Value>valueLane()
.didSet((n, o) -> {
if (n != null && n.isDistinct() && !n.equals(o)) {
final ProducerRecord<Integer, String> result = createPublishable(n);
// Consider logging before publishing if things don't seem to be working
publishAsync(result);
}
});
@Override
protected ProducerRecord<Integer, String> createPublishable(Value state) {
return new ProducerRecord<>("vehicles-integer-json",
getProp("id").intValue(),
Json.toString(state));
}
// ...
}
Trying it Out
That’s really all you need to know! The remainder of the simulation logic is not at all relevant to Kafka Egress behavior, but we’ll explain it quickly for completeness:
- Each vehicle-corresponding Web Agent has two traits: the
VehiclePublishingAgent
that we’ve implemented, and aVehicleSimAgent
- The
VehicleSimAgent
periodically generates a value and posts it to itstoPublish
lane -
toPublish
matches the name of the lane we implemented above, soVehiclePublishAgent.toPublish#didSet()
fires, triggering the publish flow
Notice that a single KafkaProducer
instance is used concurrently by multiple VehiclePublishingAgents
; the KafkaProducer
documentation declares this a safe operation.
Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).