Avro Handling

Nstream provides an Avro Adapter library that can be coupled with connectors to facilitate ingestion of Avro-serialized data.

Consuming Avro data from any source looks like the following:

  1. Source data gets consumed by…
  2. …an Ingress Bridge, which receives either
    • org.apache.avro.generic.GenericRecords, or
    • Avro binary payloads

    and then…

  3. …transforms the received objects into either:
    1. Custom POJOs, which may be processed and relayed to downstream “business logic” Web Agents via Java code.
    2. swim.structure.Values, which may be processed and relayed to downstream “business logic” Web Agents by Java code or the Relay DSL.

The utilities in this guide demonstrate the 3.2-terminating branch of flow.

Prerequisites

Dependencies

Gradle

implementation 'io.nstream:nstream-adapter-avro:4.14.22'

Maven

<dependency>
  <groupId>io.nstream</groupId>
  <artifactId>nstream-adapter-avro</artifactId>
  <version>4.14.22</version>
  <type>module</type> <!-- Remove or comment this line for non-modular projects -->
</dependency>

GenericRecord to Value

Note: this is typically the most convenient strategy, but it is limited in availability to the following environments:

Use the an nstream.adapter.avro.GenericRecordAssembler instance to transform GenericRecord messages into Values.

In pure Java, this looks like:

final GenericRecordAssembler assembler = new GenericRecordAssembler();
final GenericRecord msg = ... ; // org.apache.avro.generic.GenericRecord
// Note: assembler is reusable, and assemble() is thread-safe
final Value result = assembler.assemble(msg); // swim.structure.Value

No-code (config-only) declarations are also available, though the rules are somewhat connector-dependent.

Kafka

# server.recon

provisions: {
  @provision("consumerConf") {
    class: "nstream.adapter.common.provision.PropertiesProvision",
    def: {
      ...
      # Using this deserializer yields a KafkaIngestingPatch<$KeyType, GenericRecord>
      "value.deserializer": "io.confluent.kafka.serializers.KafkaAvroDeserializer"
      ...
    }
  }
}
"demo-app": @fabric {
  @plane(class: "swim.api.plane.AbstractPlane")
  @node {
    uri: "/consumer"
    @agent(class: "nstream.adapter.kafka.KafkaIngestingPatch") {
      kafkaIngressConf: @kafkaIngressSettings {
        ...
        valueMolder: @valueAssembler("nstream.adapter.avro.GenericRecordAssembler")
        ...
      }
    }
  }
}
...

Confluent

The rules for Confluent are identical to those for Kafka, other than the two name changes required in any nstream.adapter.confluent setup:

Pulsar

nstream.adapter.pulsar operates slightly differently from the previous examples.

Users do not manually work with GenericRecordAssembler; instead, simply declare pulsarIngressSettings#schemaType as auto, and any GenericRecordAssembler utilization will be taken care of under the hood.

# server.recon

"provisions": {
  ...
}
"demo-app": @fabric {
  @plane(class: "nstream.adapter.runtime.AppPlane")
  @node {
    uri: "/consumer"
    @agent(class: "nstream.adapter.pulsar.PulsarIngestingPatch") {
      pulsarIngressConf: @pulsarIngressSettings {
        ...
        schemaType: "auto"
        ...
      }
    }
  }
}
...

Avro Binary to Value

If your use case falls outside the aforementioned environments, you will need to transform Avro binaries instead of GenericRecords, and your code (or configuration) must statically know the desired schema.

This approach creates one nstream.adapter.avro.SwimAvroAssembler instance per schema. Avro payloads are assemble‘d against these instances to yield swim.structure.Values.

Doing this in pure Java looks like:

final String schema = "{\"type\":\"record\","
  + "\"name\":\"Complex\","
  + "\"fields\":"
    + "[{\"name\":\"id\",\"value\":\"int\"}]}";
final Value inlineConstructorArgs = // swim.structure.Value
    Record.create(2) // swim.structure.Record
        .attr("valueAssembler", "nstream.adapter.avro.SwimAvroAssembler")
        .slot("schema", schema);
final Value fileConstructorArgs =
    Record.create(2)
        .attr("valueAssembler", "nstream.adapter.avro.SwimAvroAssembler")
        .slot("schemaConfig", "/schema-location.json");
final SwimAvroAssembler assembler =
  new SwimAvroAssembler(inlineConstructorArgs);
// OR: new SwimAvroAssembler(fileConstructorArgs);
final String msg = ...; // UTF-8 encoded, Avro-serialized, schema-compliant String
// Note: assembler is reusable; and
//  assemble/assembleBytes/assembleString are thread-safe.
assembler.assemble(msg);

And config-only looks like:

@valueAssembler("nstream.adapter.avro.SwimAvroAssembler"){
  # Pick just one of the two strategies below
  schema: "{\"type\":\"record\",\"name\":\"Complex\",\"fields\":[{\"name\":\"id\",\"value\":\"int\"}]}"
  schemaConfig: "/schema-location.json"
}

Kafka and Confluent

Assignable KafkaIngressSettings fields:

Compatible deserializers (specified in Provision)

KafkaAvroSerializer-Published Incompatibility

Kafka libraries utilize internal magic bytes when using KafkaAvro(Des|S)erializer. Thus, a topic that was published to using KafkaAvroSerializer is not compatible with the aforementioned deserializers.

Example:

# server.recon

provisions: {
  @provision("consumerConf") {
    class: "nstream.adapter.common.provision.PropertiesProvision",
    def: {
      ...
      "value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer"
      ...
    }
  }
}
"demo-app": @fabric {
  @plane(class: "swim.api.plane.AbstractPlane")
  @node {
    uri: "/consumer"
    @agent(class: "nstream.adapter.kafka.KafkaIngestingPatch") {
      kafkaIngressConf: @kafkaIngressSettings {
        ...
        valueMolder: @valueAssembler(...) {...}
        ...
      }
    }
  }
}

Pulsar

Assignable PulsarIngressSettings field:

Compatible schemaType values (specified in PulsarIngressSettings)


Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).