March 7 - June 27, 2024 - Nstream is hitting the road with Confluent on the #DataInMotionTour! / Learn More

Kafka Vehicle Tutorial

This tutorial walks through how to build the backend for a web application that conveys real-time city transit information using the Nstream platform.

More specifically, we demonstrate how to:

If this seems daunting, don’t worry! The Nstream platform makes transforming real-time data into insightful, first class citizens of the World Wide Web a very approachable task.

A standalone project containing all the discussed code can be found and cloned from here.

Nstream Library Dependencies

Dependencies

Gradle

implementation 'io.nstream:nstream-adapter-common:4.9.16'
implementation 'io.nstream:nstream-adapter-kafka:4.9.16'
implementation 'io.nstream:nstream-adapter-runtime:4.9.16'

Maven

<dependency>
  <groupId>io.nstream</groupId>
  <artifactId>nstream-adapter-common</artifactId>
  <version>4.9.16</version>
  <type>module</type>
</dependency>
<dependency>
  <groupId>io.nstream</groupId>
  <artifactId>nstream-adapter-kafka</artifactId>
  <version>4.9.16</version>
  <type>module</type>
</dependency>
<dependency>
  <groupId>io.nstream</groupId>
  <artifactId>nstream-adapter-runtime</artifactId>
  <version>4.9.16</version>
  <type>module</type>
</dependency>

Source Data

Let’s envision a situation where vehicles continuously report their state to a Kafka topic. Messages in the topic take the following structure:

{
  "id": (string (same as key)),
  "timestamp": (number (Unix timestamp))
  "latitude": (number),
  "longitude": (number),
  "speed": (number),
  "bearing": (number)
  ... (possibly more fields)
}

Note: If you run the broker and populator using these instructions, your machine will have an equivalent topic available to any local Kafka consumer via bootstrap server localhost:29092 for experimentation purposes.

Nstream Entities

The Swim server we build will:

The runtime for each one of these operations will simply be a specialized Web Agent.

VehiclesIngestingAgent

Per the Kafka Ingress guide, we can use a KafkaIngestingPatch instance to consume Kafka-hosted messages. Simple-to-process source data and zero need for tricky consumer lifecycle management mean that we can implement this in the server.recon configuration alone, without requiring a concrete VehiclesIngestingAgent class:

# server.recon

provisions: {
  @provision("consumer-properties") {
    class: "nstream.adapter.common.provision.PropertiesProvision",
    def: {
      "bootstrap.servers": "localhost:29092", 
      "group.id": "bespoke-group",
      "key.deserializer": "org.apache.kafka.common.serialization.IntegerDeserializer",
      "value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer"
      "auto.offset.reset": "latest"
    }
  }
}

"starter": @fabric { # Point 2
  @plane(class: "nstream.adapter.runtime.AppPlane")
  @node {
    uri: "/kafka" # singleton instance, note lack of dynamic components
    @agent(class: "nstream.adapter.kafka.KafkaIngestingPatch") {
      kafkaIngressConf: @kafkaIngressSettings {
        consumerPropertiesProvisionName: "consumer-properties"
        topics: {"vehicles-integer-json"}
        valueContentTypeOverride: "json"
        # Let's discuss VehicleAgent before implementing this
        relaySchema: ??? 
      }
    }
  }
}

VehicleAgent

Upon consuming data, the Swim server must process and collect it into addressable endpoints.

Because there is so much flexibility in how this may play out (just like with designing any API), a good way to focus the scope is to start by asking:

To this, you may answer:

It then becomes clear that the entities that we wish to track are vehicles, and that the desired vehicle-specific properties are both current and historical states. This maps nicely to a design involving one VehicleAgent per vehicle that contains a latest ValueLane and a history MapLane:

// VehicleAgent.java
import swim.api.SwimLane;
import swim.api.agent.AbstractAgent;
import swim.api.lane.CommandLane;
import swim.api.lane.MapLane;
import swim.api.lane.ValueLane;
import swim.structure.Value;

public class VehicleAgent extends AbstractAgent {

  @SwimLane("latest")
  ValueLane<Value> latest = this.<Value>valueLane();

  @SwimLane("history")
  MapLane<Long, Value> history = this.<Long, Value>mapLane();

  // Required so each VehicleAgent has a way to receive data
  @SwimLane("addEvent")
  CommandLane<Value> addEvent = this.<Value>commandLane()
      .onCommand(v -> {
        this.latest.set(v);
        this.history.put(v.get("timestamp").longValue(), v);
      });
}

Tying this into server.recon yields:

# server.recon

provisions: { ... } # from before

"starter": @fabric {
  @plane(class: "nstream.adapter.runtime.AppPlane")
  @node { uri: "/kafka", ... } # from before
  @node {
    # Multiple instantiations possible (note dynamic component)
    pattern: "/vehicle/:id"
    @agent(class: "nstream.starter.VehicleAgent")
  }
}

Revisiting “VehiclesIngestingAgent

Since we’ve established the existence of distributed VehicleAgents, we should now update our KafkaIngestingPatch to relay messages to them:

# server.recon
...
        relaySchema: @command {
          nodeUri: {
            "/vehicle/",
            $key # $value$id will also work here
          },
          laneUri: "addEvent"
          value: $value
        }
...

Progress Check

If you’ve made it this far, take a moment to congratulate yourself! We’ve built a server that serves granular, real-time streams of the historical states of every vehicle present within some Kafka topic. This is already an impressive application on its own; the remainder of this article just exercises additional functionalities.

To verify our progress, you may either:

PolarityAgent” and PolarityMemberAgent

Nstream also offers valuable real-time interactions across Web Agents on top of the benefits that we’ve already seen. Once such interaction is dynamically assigning categories to certain Web Agents based on some of their properties. Because each vehicle’s status comes with GPS coordinates, let’s utilize agent grouping to categorize every VehicleAgent as “north” or “south” of lat=34.0 based on their latest update.

Per the linked documentation, this requires two additional Web Agent types:

The first can be trivially configured in server.recon alone:

# server.recon
...

"starter": @fabric {
  ...
  @node {
    uri: "/lat34/north"
    @agent(class: "nstream.adapter.common.patches.GroupPatch")
  }
  @node {
    uri: "/lat34/south"
    @agent(class: "nstream.adapter.common.patches.GroupPatch")
  }
}

The second requires a bit of custom Java logic to transform a coordinate into a polarity, but this isn’t very difficult either:

import nstream.adapter.common.patches.MemberPatch;
import swim.structure.Value;

public class PolarityMemberAgent extends MemberPatch {

  @Override
  protected String extractGroupFromEvent(Value event) {
    final float indicator = event.get("latitude").floatValue();
    return indicator < 34.f ? "south" : "north";
  }
}

And neither is its remaining server.recon configuration:

# server.recon 
...


"starter": @fabric {
  ...
  @node {
    pattern: "/vehicle/:id"
    @agent(class: "nstream.starter.VehicleAgent")
    # Note "mixin"-style, additive Web Agent functionality, 
    @agent(class: "nstream.starter.PolarityMemberAgent") {
      "groupUriPattern": "/lat34/:group"
    }
  }
  ...
}

Final Notes

After running at least the main server component from the standalone project instructions, you may open localhost:9001 in a browser window for a meta view of the web agents. Congratulations on building the backend for an end-to-end streaming application!

To experiment with code-specific streaming APIs, you may run the following swim-cli commands while the process runs (any number in the first column of src/main/resources/locations.csv is a valid $VID; $POLARITY is either north or south):


Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).