Confluent Ingress
Nstream provides a Confluent Adapter library that simplifies connectivity to Confluent Cloud This guide demonstrates how to consume from Confluent topics and process responses in Web Agents using minimal boilerplate.
Prerequisites
- JDK 11+
Dependencies
Gradle
implementation 'io.nstream:nstream-adapter-confluent:4.12.20'
Maven
<dependency>
<groupId>io.nstream</groupId>
<artifactId>nstream-adapter-confluent</artifactId>
<version>4.12.20</version>
<type>module</type> <!-- Remove or comment this line for non-modular projects -->
</dependency>
Glossary
- ingestion: The act of converting an external response into messages utilized by Web Agents
-
ConfluentIngestingAgent:
An abstract Web Agent class with built-in methods relevant to aKafkaConsumer
poll cycle -
ConfluentIngressSettings:
A plain old Java object (POJO) that configures a singleConfluentIngestingAgent
-
ConfluentIngestingPatch:
A concrete (but extendable)ConfluentIngestingAgent
subclass that collectsConsumerRecords
into aswim.structure.Values
ConfluentIngestingPatch
Conditions
Ideal If:
-
enable.auto.commit
istrue
-
ConsumerRecord
bodies are converted intoValues
during ingestion
Then you simply need to use the ConfluentIngestingPatch
class, often without overriding any methods.
Let’s demonstrate this by recreating the ConfluentConsumerAgent
example from the open source documentation wholly in server.recon
:
The Confluent Adapter library is designed to simplify the connectivity to Confluent Cloud. To connect
you need your Confluent Cloud bootstrap.servers
, api.key
and api-secret
. The underlying adapter
is very similar to the Kafka Adapter and will have identical behaviors
because they share common code. The Confluent Adapter library will automatically add additional properties
that are required for the Kafka client will need to connect to
Confluent Cloud.
Protect your secrets
Make sure that you protect your secrets.
We suggest storing your credentials in a
Kubernetes Secret
# confluent.properties
# required properties
ccloud.bootstrap.servers=<ccloud bootstrap.servers>
ccloud.api.key=<ccloud api key>
ccloud.api.secret=<ccloud api secret>
# the above settings will be used to generate the configuration needed
# to start a Java Kafka Client.
# Any setting with kafka. will be overridden before the underlying Java
# Kafka client is created.
kafka.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
kafka.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
# server.recon
provisions: {
@provision("consumer-properties") {
class: "nstream.adapter.common.provision.PropertiesProvision",
# Confluent consumer properties go here
config: "/secrets/confluent.properties"
}
}
vehicle: @fabric {
@plane(class: "nstream.adapter.runtime.AppPlane")
# ConfluentConsumingAgent replacement
@node {
uri: "/confluent"
@agent(class: "nstream.adapter.confluent.ConfluentIngestingPatch") {
# ConfluentIngressSettings values go here
confluentIngressConf: @confluentIngressSettings {
consumerPropertiesProvisionName: "consumer-properties"
name: test-consumer
topics: {"schema-topic"}
valueContentTypeOverride: "json"
relaySchema: @command {
nodeUri: "/vehicle/$key" # or "/vehicle/$value.id"
laneUri: "addMessage"
value: $value
}
}
}
}
# VehicleAgent config can be copied directly, but you'll of course
# need to implement VehicleAgent in Java.
}
# Configure desired web settings (e.g. port)
Common Variations
Receiving Complex Data Types
Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).