Kafka Ingress
Nstream provides a Kafka Adapter library that greatly facilitates ingestion from Kafka topics. This guide demonstrates how to consume from Kafka topics and process responses in Web Agents using minimal boilerplate.
Prerequisites
Dependencies
Gradle
implementation 'io.nstream:nstream-adapter-kafka:4.15.23'
Maven
<dependency>
<groupId>io.nstream</groupId>
<artifactId>nstream-adapter-kafka</artifactId>
<version>4.15.23</version>
<type>module</type> <!-- Remove or comment this line for non-modular projects -->
</dependency>
Glossary
- ingestion: The act of converting an external response into messages utilized by Web Agents
-
KafkaIngestingAgent:
An abstract Web Agent class with built-in methods relevant to aKafkaConsumer
poll cycle -
KafkaIngressSettings:
A plain old Java object (POJO) that configures a singleKafkaIngestingAgent
-
KafkaIngestingPatch:
A concrete (but extendable)KafkaIngestingAgent
subclass that collectsConsumerRecords
intoswim.structure.Values
KafkaIngestingPatch
Conditions
Ideal If:
-
enable.auto.commit
istrue
-
ConsumerRecord
bodies are converted intoValues
during ingestion
Then you simply need to use the KafkaIngestingPatch
class, often without overriding any methods.
Let’s demonstrate this by recreating the KafkaConsumerAgent
example from the open source documentation wholly in server.recon
:
# server.recon
provisions: {
@provision("consumer-properties") {
class: "nstream.adapter.common.provision.PropertiesProvision",
# Kafka consumer properties go here
def: {
"bootstrap.servers": "your-bootstrap-host:9092",
"group.id": "your-group",
"key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer"
}
}
}
vehicle: @fabric {
@plane(class: "nstream.adapter.runtime.AppPlane")
# KafkaConsumingAgent replacement
@node {
uri: "/kafka"
@agent(class: "nstream.adapter.kafka.KafkaIngestingPatch") {
# KafkaIngressSettings values go here
kafkaIngressConf: @kafkaIngressSettings {
consumerPropertiesProvisionName: "consumer-properties"
topics: {"schema-topic"}
valueContentTypeOverride: "json"
relaySchema: @command {
nodeUri: "/vehicle/$key" # or "/vehicle/$value.id"
laneUri: "addMessage"
value: $value
}
}
}
}
# VehicleAgent config can be copied directly, but you'll of course
# need to implement VehicleAgent in Java.
}
# Configure desired web settings (e.g. port)
# ...
Parallelized Consumption from Multi-Partitioned Topics
An n
-partitioned Kafka topic can be consumed by a single KafkaConsumer
, with no changes from what we showed above.
However, Kafka libraries also support parallel (threads or processes) consumption of partitioned topics as a means to improve throughput.
To do this in server.recon
, simply declare another KafkaIngestingPatch
that refers to the same PropertiesProvision
in its KafkaIngressSettings#consumerPropertiesProvisionName
.
For example, if the of the above topic had two partitions, then the optimal server.recon
would look like:
# server.recon
provisions: {
@provision("consumer-properties") {
class: "nstream.adapter.common.provision.PropertiesProvision",
# Kafka consumer properties go here
def: {
... # copied directly from above
}
}
}
vehicle: @fabric {
@plane(class: "nstream.adapter.runtime.AppPlane")
@node {
uri: "/kafka/0"
@agent(class: "nstream.adapter.kafka.KafkaIngestingPatch") {
kafkaIngressConf: @kafkaIngressSettings {
consumerPropertiesProvisionName: "consumer-properties"
... # copied directly from above
}
}
}
@node {
uri: "/kafka/1"
@agent(class: "nstream.adapter.kafka.KafkaIngestingPatch") {
kafkaIngressConf: @kafkaIngressSettings {
consumerPropertiesProvisionName: "consumer-properties"
... # copied directly from above
}
}
}
# ...
}
# ...
Manage Consumer Group IDs
Ensure that your group.id parameter within the Provision definition is unique to your Nstream application. Otherwise, the broker will decide which consumers will receive which messages, i.e. the Nstream process(es) may not receive all (or any) messages.
Common Variations
Receiving Complex Data Types
Visit the following links for more information about:
Overriding Broker-Assigned Partitions
Some use cases require complete control over which KafkaConsumers
are assigned which partitions.
In Kafka libraries, this is done using KafkaConsumer#assign(topic, partition)
instead of KafkaConsumer#subscribe
with a settings-configured group.id
:
# server.recon
provisions: {
@provision("consumer-properties") {
class: "nstream.adapter.common.provision.PropertiesProvision",
# Kafka consumer properties go here
def: {
# Mostly the same as before, but don't specify a group.id
"bootstrap.servers": "your-bootstrap-host:9092",
"key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer"
}
}
}
vehicle: @fabric {
@plane(class: "nstream.adapter.runtime.AppPlane")
# KafkaConsumingAgent replacement
@node {
uri: "/kafka/0"
@agent(class: "nstream.adapter.kafka.KafkaIngestingPatch") {
kafkaIngressConf: @kafkaIngressSettings {
# Mostly the same as before, but replace topics with an
# assignmentSchema construct
consumerPropertiesProvisionName: "consumer-properties"
valueContentTypeOverride: "json"
assignmentSchema: @assignment {
topic: "schema-topic",
partition: 0
}
relaySchema: @command {
nodeUri: "/vehicle/$key" # or "/vehicle/$value.id"
laneUri: "addMessage"
value: $value
}
}
}
}
@node {
uri: "/kafka/1"
@agent(class: "nstream.adapter.kafka.KafkaIngestingPatch") {
kafkaIngressConf: @kafkaIngressSettings {
# Copy everything from previous agent, except partition
# ...
assignmentSchema: @assignment {
topic: "schema-topic",
partition: 1
}
# ...
}
}
}
# ...
}
# ...
Custom Ingest Logic
If relaying messages to another agent is not sufficient, or you would like to add custom logic on ingestion of messages, the KafkaIngestingPatch
can be easily overridden.
Create a new class that extends the KafkaIngestingPatch
and override either ingest
or ingestBatch
(the latter is higher in the pipeline, and by default defers to ingest
).
// CustomIngestingAgent.java
public class CustomIngestingAgent<K, V> extends KafkaIngestingPatch<K, V> {
@Override
protected void ingest(ConsumerRecord<K, V> unstructured)
throws DeferrableException {
// Custom ingestion of an individual record
}
protected void ingestBatch(ConsumerRecords<K, V> batch) {
// Custom ingestion of a poll() invocation's result.
// Overriding this makes ingest() unused unless specifically
// invoked in the implementation.
}
}
Remember to update your server.recon
file with the new agent you just created, instead of the KafkaIngestingPatch
.
Manual Consumption Triggering
It is often desired to begin Kafka consumption via an explicit command message instead of on Web Agent startup:
// KafkaConsumerAgent.java
// import...
import swim.api.lane.CommandLane;
public class KafkaConsumerAgent extends KafkaIngestingPatch {
@SwimLane("triggerReception")
CommandLane<String> triggerReception = this.<String>commandLane()
.onCommand(s -> {
if ("start".equals(s)) {
stageReception();
}
});
@Override
public void didStart() {
System.out.println(nodeUri() + ": didStart");
// No call to stageReception() unlike in superclass
}
}
Backoff Strategy
While the Kafka-recommended pattern is to continuously invoke poll
, you may wish to be less aggressive.
This is a perfect use case for the inherited NstreamAgent#scheduleWithInformedBackoff
method.
Below we utilize this method to trigger backoff only when a returned result is empty (i.e. resume the continuous poll
as long as results are nonempty):
// KafkaConsumerAgent.java
public class KafkaConsumerAgent extends KafkaIngestingPatch {
@Override
protected void stageReception() {
prepareConsumer();
this.pollTimer = scheduleWithInformedBackoff(this::pollTimer,
this.ingressSettings.firstFetchDelayMillis(),
this::nextBackoff,
i -> !i.isEmpty(),
500L,
this::poll,
this::ingestBatch);
}
private long nextBackoff(ConsumerRecords<Integer, String> records, long oldBackoff) {
if (!records.isEmpty()) {
return 0L;
} else if (oldBackoff < 0) {
return 500L;
} else if (oldBackoff < 4000) {
// Exponential backoff until 4 seconds
return Math.min(oldBackoff * 2, 4000L);
} else {
// Linear backoff subsequently, to a max of 8 seconds
return Math.min(oldBackoff + 1000L, 8000L);
}
}
}
Manual Commits
The consumption strategy outlined here dedicates a thread to consuming messages, but processes those messages asynchronously (by delegating to parallel-running Web Agents). While this provides extremely high throughput, combining this with automatic commits may result in a situation where processing a message throws an exception after that message’s offset has been committed by the consumer. This is potentially problematic when at-least once delivery of every message is required.
There is no catch-all solution to problems of this (rather advanced) nature.
At one extreme, you may avoid this headache altogether and process everything within the consumer thread, though this introduces significant loss of throughput.
A more common pattern is to commit in batches – every time a VehicleAgent
(or equivalent) successfully or unsuccessfully processes a message, it lets the KafkaIngestingAgent
know (most intuitively via a command()
to a new CommandLane
), and the KafkaIngestingAgent
periodically issues a commit()
based on the information at hand.
Future nstream-toolkit
releases will introduce some convenient abstractions over the most common such patterns seen in practice, so stay tuned!
Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).