Avro Handling

Nstream provides an Avro Adapter library that can be coupled with connectors to facilitate ingestion of Avro-serialized data.

Prerequisites

Dependencies

Gradle

implementation 'io.nstream:nstream-adapter-avro:4.11.19'

Maven

<dependency>
  <groupId>io.nstream</groupId>
  <artifactId>nstream-adapter-avro</artifactId>
  <version>4.11.19</version>
  <type>module</type>
</dependency>

Avro Binary Deserialization

Use this strategy if:

This approach creates one nstream.adapter.avro.SwimAvroAssembler instance per schema. Avro payloads are assemble‘d against these instances to yield swim.structure.Values.

Doing this in pure Java looks like:

// AvroAssembly.java
import swim.structure.Record;
import swim.structure.Value;

public class AvroAssembly {

  private static final String SCHEMA = "{\"type\":\"record\",\"name\":\"Complex\",\"fields\":[{\"name\":\"id\",\"value\":\"int\"}]}";

  public static void main(String[] args) {
    final Value inlineConstructorArgs = Record.create(2)
        .attr("valueAssembler", "nstream.adapter.avro.SwimAvroAssembler")
        .slot("schema", SCHEMA);
    final Value fileConstructorArgs = Record.create(2)
        .attr("valueAssembler", "nstream.adapter.avro.SwimAvroAssembler")
        .slot("schemaConfig", "/schema-location.json");
    // Call constructor with either of above
    final SwimAvroAssembler assembler = new SwimAvroAssembler(inlineConstructorArgs);
    final byte[] msg = new byte[]{ ... };
    // Note alternative String/InputStream overloads
    assembler.assemble(msg);
  }

}

Alternatively, many of Nstream’s client libraries have hooks to perform this instantiation via the server.recon configuration. The name of the configuration field may different from library to library, but the value always follows the same structure, i.e.:

@valueAssembler("nstream.adapter.avro.SwimAvroAssembler"){ ... })

For example, a Kafka-consuming configuration might look like this:

# server.recon

provisions: {
  @provision("consumer") {
    class: "nstream.adapter.common.provision.PropertiesProvision",
    def: {
      "key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer"
      # StringDeserializer also works seamlessly below
      "value.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer"
      "bootstrap.servers": "abc-a12bc.us-east-1.aws.confluent.cloud:9092"
      ...
    }
  }
}

"nstream-cflt-bike": @fabric {
  @plane(class: "swim.api.plane.AbstractPlane")
  @node {
    uri: "/consumer"
    @agent(class: "nstream.adapter.kafka.KafkaIngestingPatch") {
      kafkaIngressConf: @kafkaIngressSettings {
        consumerPropertiesProvisionName: "consumer"
        topics: {"your-topic"}
        valueMolder: @valueAssembler("nstream.adapter.avro.SwimAvroAssembler") {
          # Use only one of the two below
          schema: "{\"type\":\"record\",\"name\":\"Complex\",\"fields\":[{\"name\":\"id\",\"value\":\"int\"}]}"
          schemaConfig: "/schema-location.json"
        }
        relaySchema: @command($value) {
          ...
        }
      }
    }
  }
}
...

GenericRecord Processing

Use this strategy if you’re consuming org.apache.avro.generic.GenericRecord objects.

This approach creates a single nstream.adapter.avro.GenericRecordAssembler instance (which can assemble any GenericRecord). In pure Java:

// GenericRecordAssembly.java
import swim.structure.Record;
import swim.structure.Value;

public class GenericRecordAssembly {

  public static void main(String[] args) {
    final Value inlineConstructorArgs = Record.create(1)
        .attr("valueAssembler", "nstream.adapter.avro.GenericRecordAssembler");
    final GenericRecordAssembler assembler = new GenericRecordAssembler(inlineConstructorArgs);
    final GenericRecord msg = ... ;
    assembler.assemble(msg);
  }

}

And in the configuration alone:

# server.recon

provisions: {
  @provision("consumer") {
    class: "nstream.adapter.common.provision.PropertiesProvision",
    def: {
      "key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer"
      "value.deserializer": "io.confluent.kafka.serializers.KafkaAvroDeserializer"
      "bootstrap.servers": "abc-a12bc.us-east-1.aws.confluent.cloud:9092"
      ...
    }
  }
}

"nstream-cflt-bike": @fabric {
  @plane(class: "swim.api.plane.AbstractPlane")
  @node {
    uri: "/consumer"
    @agent(class: "nstream.adapter.kafka.KafkaIngestingPatch") {
      kafkaIngressConf: @kafkaIngressSettings {
        consumerPropertiesProvisionName: "consumer"
        topics: {"your-topic"}
        valueMolder: @valueAssembler("nstream.adapter.avro.GenericRecordAssembler")
        relaySchema: @command($value) {
          ...
        }
      }
    }
  }
}
...

Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).