Avro Handling
Nstream provides an Avro Adapter library that can be coupled with connectors to facilitate ingestion of Avro-serialized data.
Prerequisites
Dependencies
Gradle
implementation 'io.nstream:nstream-adapter-avro:4.11.19'
Maven
<dependency>
<groupId>io.nstream</groupId>
<artifactId>nstream-adapter-avro</artifactId>
<version>4.11.19</version>
<type>module</type>
</dependency>
Avro Binary Deserialization
Use this strategy if:
- You’re consuming “raw” Avro-serialized bytes (with no magic/indicator bytes mixed in), AND
- You know the schema(s) for your payloads.
This approach creates one nstream.adapter.avro.SwimAvroAssembler
instance per schema.
Avro payloads are assemble
‘d against these instances to yield swim.structure.Values
.
Doing this in pure Java looks like:
// AvroAssembly.java
import swim.structure.Record;
import swim.structure.Value;
public class AvroAssembly {
private static final String SCHEMA = "{\"type\":\"record\",\"name\":\"Complex\",\"fields\":[{\"name\":\"id\",\"value\":\"int\"}]}";
public static void main(String[] args) {
final Value inlineConstructorArgs = Record.create(2)
.attr("valueAssembler", "nstream.adapter.avro.SwimAvroAssembler")
.slot("schema", SCHEMA);
final Value fileConstructorArgs = Record.create(2)
.attr("valueAssembler", "nstream.adapter.avro.SwimAvroAssembler")
.slot("schemaConfig", "/schema-location.json");
// Call constructor with either of above
final SwimAvroAssembler assembler = new SwimAvroAssembler(inlineConstructorArgs);
final byte[] msg = new byte[]{ ... };
// Note alternative String/InputStream overloads
assembler.assemble(msg);
}
}
Alternatively, many of Nstream’s client libraries have hooks to perform this instantiation via the server.recon
configuration.
The name of the configuration field may different from library to library, but the value always follows the same structure, i.e.:
@valueAssembler("nstream.adapter.avro.SwimAvroAssembler"){ ... })
For example, a Kafka-consuming configuration might look like this:
# server.recon
provisions: {
@provision("consumer") {
class: "nstream.adapter.common.provision.PropertiesProvision",
def: {
"key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer"
# StringDeserializer also works seamlessly below
"value.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer"
"bootstrap.servers": "abc-a12bc.us-east-1.aws.confluent.cloud:9092"
...
}
}
}
"nstream-cflt-bike": @fabric {
@plane(class: "swim.api.plane.AbstractPlane")
@node {
uri: "/consumer"
@agent(class: "nstream.adapter.kafka.KafkaIngestingPatch") {
kafkaIngressConf: @kafkaIngressSettings {
consumerPropertiesProvisionName: "consumer"
topics: {"your-topic"}
valueMolder: @valueAssembler("nstream.adapter.avro.SwimAvroAssembler") {
# Use only one of the two below
schema: "{\"type\":\"record\",\"name\":\"Complex\",\"fields\":[{\"name\":\"id\",\"value\":\"int\"}]}"
schemaConfig: "/schema-location.json"
}
relaySchema: @command($value) {
...
}
}
}
}
}
...
GenericRecord Processing
Use this strategy if you’re consuming org.apache.avro.generic.GenericRecord
objects.
This approach creates a single nstream.adapter.avro.GenericRecordAssembler
instance (which can assemble
any GenericRecord
).
In pure Java:
// GenericRecordAssembly.java
import swim.structure.Record;
import swim.structure.Value;
public class GenericRecordAssembly {
public static void main(String[] args) {
final Value inlineConstructorArgs = Record.create(1)
.attr("valueAssembler", "nstream.adapter.avro.GenericRecordAssembler");
final GenericRecordAssembler assembler = new GenericRecordAssembler(inlineConstructorArgs);
final GenericRecord msg = ... ;
assembler.assemble(msg);
}
}
And in the configuration alone:
# server.recon
provisions: {
@provision("consumer") {
class: "nstream.adapter.common.provision.PropertiesProvision",
def: {
"key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer"
"value.deserializer": "io.confluent.kafka.serializers.KafkaAvroDeserializer"
"bootstrap.servers": "abc-a12bc.us-east-1.aws.confluent.cloud:9092"
...
}
}
}
"nstream-cflt-bike": @fabric {
@plane(class: "swim.api.plane.AbstractPlane")
@node {
uri: "/consumer"
@agent(class: "nstream.adapter.kafka.KafkaIngestingPatch") {
kafkaIngressConf: @kafkaIngressSettings {
consumerPropertiesProvisionName: "consumer"
topics: {"your-topic"}
valueMolder: @valueAssembler("nstream.adapter.avro.GenericRecordAssembler")
relaySchema: @command($value) {
...
}
}
}
}
}
...
Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).