Avro Handling
Nstream provides an Avro Adapter library that can be coupled with connectors to facilitate ingestion of Avro-serialized data.
Consuming Avro data from any source looks like the following:
- Source data gets consumed by…
- …an Ingress Bridge, which receives either
-
org.apache.avro.generic.GenericRecords
, or - Avro binary payloads
and then…
-
- …transforms the received objects into either:
- Custom POJOs, which may be processed and relayed to downstream “business logic” Web Agents via Java code.
-
swim.structure.Values
, which may be processed and relayed to downstream “business logic” Web Agents by Java code or the Relay DSL.
The utilities in this guide demonstrate the 3.2
-terminating branch of flow.
Prerequisites
Dependencies
Gradle
implementation 'io.nstream:nstream-adapter-avro:4.14.22'
Maven
<dependency>
<groupId>io.nstream</groupId>
<artifactId>nstream-adapter-avro</artifactId>
<version>4.14.22</version>
<type>module</type> <!-- Remove or comment this line for non-modular projects -->
</dependency>
GenericRecord
to Value
Note: this is typically the most convenient strategy, but it is limited in availability to the following environments:
- Kafka/Confluent topics that only contain messages published to via
KafkaAvroSerializer
- Pulsar topics that only contain messages published to via
Schema.AVRO(...)
Use the an nstream.adapter.avro.GenericRecordAssembler
instance to transform GenericRecord
messages into Values
.
In pure Java, this looks like:
final GenericRecordAssembler assembler = new GenericRecordAssembler();
final GenericRecord msg = ... ; // org.apache.avro.generic.GenericRecord
// Note: assembler is reusable, and assemble() is thread-safe
final Value result = assembler.assemble(msg); // swim.structure.Value
No-code (config-only) declarations are also available, though the rules are somewhat connector-dependent.
Kafka
- Within the
Provision
that corresponds to the consumer configuration, useKafkaAvroDeserializer
for each applicable deserializer (typically onlyvalue.deserializer
). - Within the
kafkaIngressSettings
configuration, set the applicablevalueMolder
and/orkeyMolder
(rare) as shown below. The toolkit may correctly infer these sometimes if omitted, but it never hurts to be explicit.
# server.recon
provisions: {
@provision("consumerConf") {
class: "nstream.adapter.common.provision.PropertiesProvision",
def: {
...
# Using this deserializer yields a KafkaIngestingPatch<$KeyType, GenericRecord>
"value.deserializer": "io.confluent.kafka.serializers.KafkaAvroDeserializer"
...
}
}
}
"demo-app": @fabric {
@plane(class: "swim.api.plane.AbstractPlane")
@node {
uri: "/consumer"
@agent(class: "nstream.adapter.kafka.KafkaIngestingPatch") {
kafkaIngressConf: @kafkaIngressSettings {
...
valueMolder: @valueAssembler("nstream.adapter.avro.GenericRecordAssembler")
...
}
}
}
}
...
Confluent
The rules for Confluent are identical to those for Kafka, other than the two name changes required in any nstream.adapter.confluent
setup:
-
kafka.KafkaIngestingPatch
->confluent.ConfluentIngesetingPatch
-
kafkaIngressConf
->confluentIngressConf
Pulsar
nstream.adapter.pulsar
operates slightly differently from the previous examples.
Users do not manually work with GenericRecordAssembler
; instead, simply declare pulsarIngressSettings#schemaType
as auto
, and any GenericRecordAssembler
utilization will be taken care of under the hood.
# server.recon
"provisions": {
...
}
"demo-app": @fabric {
@plane(class: "nstream.adapter.runtime.AppPlane")
@node {
uri: "/consumer"
@agent(class: "nstream.adapter.pulsar.PulsarIngestingPatch") {
pulsarIngressConf: @pulsarIngressSettings {
...
schemaType: "auto"
...
}
}
}
}
...
Value
Avro Binary to If your use case falls outside the aforementioned environments, you will need to transform Avro binaries instead of GenericRecords
, and your code (or configuration) must statically know the desired schema.
This approach creates one nstream.adapter.avro.SwimAvroAssembler
instance per schema.
Avro payloads are assemble
‘d against these instances to yield swim.structure.Values
.
Doing this in pure Java looks like:
final String schema = "{\"type\":\"record\","
+ "\"name\":\"Complex\","
+ "\"fields\":"
+ "[{\"name\":\"id\",\"value\":\"int\"}]}";
final Value inlineConstructorArgs = // swim.structure.Value
Record.create(2) // swim.structure.Record
.attr("valueAssembler", "nstream.adapter.avro.SwimAvroAssembler")
.slot("schema", schema);
final Value fileConstructorArgs =
Record.create(2)
.attr("valueAssembler", "nstream.adapter.avro.SwimAvroAssembler")
.slot("schemaConfig", "/schema-location.json");
final SwimAvroAssembler assembler =
new SwimAvroAssembler(inlineConstructorArgs);
// OR: new SwimAvroAssembler(fileConstructorArgs);
final String msg = ...; // UTF-8 encoded, Avro-serialized, schema-compliant String
// Note: assembler is reusable; and
// assemble/assembleBytes/assembleString are thread-safe.
assembler.assemble(msg);
And config-only looks like:
@valueAssembler("nstream.adapter.avro.SwimAvroAssembler"){
# Pick just one of the two strategies below
schema: "{\"type\":\"record\",\"name\":\"Complex\",\"fields\":[{\"name\":\"id\",\"value\":\"int\"}]}"
schemaConfig: "/schema-location.json"
}
Kafka and Confluent
Assignable KafkaIngressSettings
fields:
keyMolder
valueMolder
Compatible deserializers (specified in Provision
)
- (
org.apache.avro.common.serialization.
)ByteArrayDeserializer
BytesDeserializer
ByteBufferDeserializer
StringDeserializer
KafkaAvroSerializer-Published Incompatibility
Kafka libraries utilize internal magic bytes when using KafkaAvro(Des|S)erializer. Thus, a topic that was published to using KafkaAvroSerializer is not compatible with the aforementioned deserializers.
Example:
# server.recon
provisions: {
@provision("consumerConf") {
class: "nstream.adapter.common.provision.PropertiesProvision",
def: {
...
"value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer"
...
}
}
}
"demo-app": @fabric {
@plane(class: "swim.api.plane.AbstractPlane")
@node {
uri: "/consumer"
@agent(class: "nstream.adapter.kafka.KafkaIngestingPatch") {
kafkaIngressConf: @kafkaIngressSettings {
...
valueMolder: @valueAssembler(...) {...}
...
}
}
}
}
Pulsar
Assignable PulsarIngressSettings
field:
valueMolder
Compatible schemaType
values (specified in PulsarIngressSettings
)
bytearray
bytebuffer
string
Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).