Kafka Ingress Configuration

Kafka

Kafka data ingestion is available using ingress of type kafka.

Examples Repo

Example Kafka configuration files showcasing all features we will discuss can be found in our Examples Repo .

Settings

The Kafka settings in the configuration file are used to configure data ingestion from Kafka topics. Below is a detailed description of each field within the Kafka settings:

Field Required Configurable Description
bootstrapServers true true A comma-separated list of Kafka brokers to connect to.
topic true true The name of the Kafka topic to subscribe to.
groupId true true The consumer group ID for coordinating consumption across multiple clients.
keyDeserializer true true The type of the deserializer for the message key.
keyContentType false true The content type of the key, accepted types: avro, csv, json, recon, xml (default plaintext).
keyAvroSchema false* true The Avro schema for the key, required iff keyContentType=avro AND keyDeserializerGenericAvroDe...
valueDeserializer true true The type of the deserializer for the message value.
valueContentType false true The content type of the value, accepted types: avro, csv, json, recon, xml (default plaintext).
valueAvroSchema false true The Avro schema for the value, required iff valueContentType=avro AND valueDeserializerGenericAvro...
autoOffset false true The offset reset policy, latest or earliest.
pollTimeoutMillis false true The timeout for polling messages from Kafka, in milliseconds.
kafkaConsumerProps false false Additional Kafka consumer properties as key-value pairs.

Message Structure

The message structure of received Kafka records is:

{ "key": ..., "value": {...} }

Example

The configuration below defines a Kafka ingress where messages are consumed from the specified Kafka topic and relayed to the appropriate agents for processing.

{
  "type": "kafka",
  "settings": {
    "bootstrapServers": "my-kafka-server:9092",
    "topic": "my-vehicle-topic",
    "groupId": "myGroupId",
    "keyDeserializer": "integer",
    "valueDeserializer": "string",
    "valueContentType": "json",
    "autoOffset": "latest",
    "pollTimeoutMillis": 100
  },
  "relay": {
    "agent": "vehicle",
    "idExtractor": "$key",
    "valueExtractor": "$value"
  }
}

Confluent

Confluent data ingestion is available using ingress of type confluent. The Confluent ingress is an extension of the Kafka ingress and so all of the above applies with a few additional available settings fields:

Field Required Configurable Description
clusterApiKey false true API key for authenticating with the Confluent Kafka cluster.
clusterApiSecret false true API secret for authenticating with the Confluent Kafka cluster.
schemaRegistryUrl false true URL of the Confluent Schema Registry.
schemaRegistryApiKey false true API key for authenticating with the Confluent Schema Registry.
schemaRegistryApiSecret false true API secret for authenticating with the Confluent Schema Registry.

Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).