Kafka Ingress Configuration
Kafka
Kafka data ingestion is available using ingress of type kafka
.
Examples Repo
Example Kafka configuration files showcasing all features we will discuss can be found in our Examples Repo .
Settings
The Kafka settings
in the configuration file are used to configure data ingestion from Kafka topics.
Below is a detailed description of each field within the Kafka settings
:
Field | Required | Configurable | Description |
---|---|---|---|
bootstrapServers |
true |
true |
A comma-separated list of Kafka brokers to connect to. |
topic |
true |
true |
The name of the Kafka topic to subscribe to. |
partitionCount |
false |
false |
Number of partitions the topic to consume has (recommended for enhanced performance, an ingress agent will be started per partition, default 1 ). |
groupId |
true |
true |
The consumer group ID for coordinating consumption across multiple clients. |
keyDeserializer |
true |
true |
The type of the deserializer for the message key. For generic Avro deserialization use avro , the following two fields can then be omitted.
|
keyContentType |
false |
true |
The content type of the key, accepted types: avro , csv , json , recon , xml (default plaintext ). |
keyAvroSchema |
false |
true |
The Avro schema for the key, required iff keyContentType =avro AND keyDeserializer ≠GenericAvroDe...
|
valueDeserializer |
true |
true |
The type of the deserializer for the message value. For generic Avro deserialization use avro , the following two fields can then be omitted.
|
valueContentType |
false |
true |
The content type of the value, accepted types: avro , csv , json , recon , xml (default plaintext ). |
valueAvroSchema |
false |
true |
The Avro schema for the value, required iff valueContentType =avro AND valueDeserializer ≠GenericAvro...
|
autoOffset |
false |
true |
The offset reset policy, latest or earliest . |
pollTimeoutMillis |
false |
true |
The timeout for polling messages from Kafka, in milliseconds. |
kafkaConsumerProps |
false |
false |
Additional Kafka consumer properties as key-value pairs. |
Message Structure
The message structure of received Kafka records is:
{ "key": ..., "value": {...} }
Example
The configuration below defines a Kafka ingress where messages are consumed from the specified Kafka topic and relayed to the appropriate agents for processing.
{
"type": "kafka",
"settings": {
"bootstrapServers": "my-kafka-server:9092",
"topic": "my-vehicle-topic",
"partitionCount": 5,
"groupId": "myGroupId",
"keyDeserializer": "integer",
"valueDeserializer": "string",
"valueContentType": "json",
"autoOffset": "latest",
"pollTimeoutMillis": 100
},
"relay": {
"agent": "vehicle",
"idExtractor": "$key",
"valueExtractor": "$value"
}
}
Confluent
Confluent data ingestion is available using ingress of type confluent
.
The Confluent ingress is an extension of the Kafka ingress and so all of the above applies with a few additional available settings
fields:
Field | Required | Configurable | Description |
---|---|---|---|
clusterApiKey |
false |
true |
API key for authenticating with the Confluent Kafka cluster. |
clusterApiSecret |
false |
true |
API secret for authenticating with the Confluent Kafka cluster. |
schemaRegistryUrl |
false |
true |
URL of the Confluent Schema Registry. |
schemaRegistryApiKey |
false |
true |
API key for authenticating with the Confluent Schema Registry. |
schemaRegistryApiSecret |
false |
true |
API secret for authenticating with the Confluent Schema Registry. |
Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).