Kafka Vehicle Tutorial
This tutorial walks through how to build the backend for a web application that conveys real-time city transit information using the Nstream platform.
More specifically, we demonstrate how to:
- Efficiently consume messages from a Kafka topic (though the principles can easily be generalized beyond Kafka)
- Transform those messages into useful insights
- Serve those insights using Nstream Web Agents as real-time, webpage-subscribable streams addressable by granular, sensible URIs
If this seems daunting, don’t worry! The Nstream platform makes transforming real-time data into insightful, first class citizens of the World Wide Web a very approachable task.
A standalone project containing all the discussed code can be found and cloned from here.
Nstream Library Dependencies
Dependencies
Gradle
implementation 'io.nstream:nstream-adapter-common:4.12.20'
implementation 'io.nstream:nstream-adapter-kafka:4.12.20'
implementation 'io.nstream:nstream-adapter-runtime:4.12.20'
Maven
<dependency>
<groupId>io.nstream</groupId>
<artifactId>nstream-adapter-common</artifactId>
<version>4.12.20</version>
<type>module</type> <!-- Remove or comment this line for non-modular projects -->
</dependency>
<dependency>
<groupId>io.nstream</groupId>
<artifactId>nstream-adapter-kafka</artifactId>
<version>4.12.20</version>
<type>module</type> <!-- Remove or comment this line for non-modular projects -->
</dependency>
<dependency>
<groupId>io.nstream</groupId>
<artifactId>nstream-adapter-runtime</artifactId>
<version>4.12.20</version>
<type>module</type> <!-- Remove or comment this line for non-modular projects -->
</dependency>
Source Data
Let’s envision a situation where vehicles continuously report their state to a Kafka topic. Messages in the topic take the following structure:
- key: a unique String identifying this vehicle
- value: a JSON string that looks like:
{
"id": (string (same as key)),
"timestamp": (number (Unix timestamp))
"latitude": (number),
"longitude": (number),
"speed": (number),
"bearing": (number)
... (possibly more fields)
}
Note: If you run the broker and populator using these instructions, your machine will have an equivalent topic available to any local Kafka consumer via bootstrap server localhost:29092
for experimentation purposes.
Nstream Entities
The Swim server we build will:
- Consume data from the Kafka topic
- Process and collect the data into vehicle-granularly addressable endpoints
- Perform dynamic, addressable groupings against these vehicle-corresponding endpoints
The runtime for each one of these operations will simply be a specialized Web Agent.
VehiclesIngestingAgent
”
“Per the Kafka Ingress guide, we can use a KafkaIngestingPatch
instance to consume Kafka-hosted messages.
Simple-to-process source data and zero need for tricky consumer lifecycle management mean that we can implement this in the server.recon
configuration alone, without requiring a concrete VehiclesIngestingAgent
class:
# server.recon
provisions: {
@provision("consumer-properties") {
class: "nstream.adapter.common.provision.PropertiesProvision",
def: {
"bootstrap.servers": "localhost:29092",
"group.id": "bespoke-group",
"key.deserializer": "org.apache.kafka.common.serialization.IntegerDeserializer",
"value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer"
"auto.offset.reset": "latest"
}
}
}
"starter": @fabric { # Point 2
@plane(class: "nstream.adapter.runtime.AppPlane")
@node {
uri: "/kafka" # singleton instance, note lack of dynamic components
@agent(class: "nstream.adapter.kafka.KafkaIngestingPatch") {
kafkaIngressConf: @kafkaIngressSettings {
consumerPropertiesProvisionName: "consumer-properties"
topics: {"vehicles-integer-json"}
valueContentTypeOverride: "json"
# Let's discuss VehicleAgent before implementing this
relaySchema: ???
}
}
}
}
VehicleAgent
Upon consuming data, the Swim server must process and collect it into addressable endpoints.
Because there is so much flexibility in how this may play out (just like with designing any API), a good way to focus the scope is to start by asking:
- “What questions do we want real-time answers to?”
To this, you may answer:
- The latest state of specified vehicles
- A historical time-series of specified vehicles
It then becomes clear that the entities that we wish to track are vehicles, and that the desired vehicle-specific properties are both current and historical states.
This maps nicely to a design involving one VehicleAgent
per vehicle that contains a latest
ValueLane
and a history
MapLane
:
// VehicleAgent.java
import swim.api.SwimLane;
import swim.api.agent.AbstractAgent;
import swim.api.lane.CommandLane;
import swim.api.lane.MapLane;
import swim.api.lane.ValueLane;
import swim.structure.Value;
public class VehicleAgent extends AbstractAgent {
@SwimLane("latest")
ValueLane<Value> latest = this.<Value>valueLane();
@SwimLane("history")
MapLane<Long, Value> history = this.<Long, Value>mapLane();
// Required so each VehicleAgent has a way to receive data
@SwimLane("addEvent")
CommandLane<Value> addEvent = this.<Value>commandLane()
.onCommand(v -> {
this.latest.set(v);
this.history.put(v.get("timestamp").longValue(), v);
});
}
Tying this into server.recon
yields:
# server.recon
provisions: { ... } # from before
"starter": @fabric {
@plane(class: "nstream.adapter.runtime.AppPlane")
@node { uri: "/kafka", ... } # from before
@node {
# Multiple instantiations possible (note dynamic component)
pattern: "/vehicle/:id"
@agent(class: "nstream.starter.VehicleAgent")
}
}
Revisiting “VehiclesIngestingAgent
“
Since we’ve established the existence of distributed VehicleAgents
, we should now update our KafkaIngestingPatch
to relay messages to them:
# server.recon
...
relaySchema: @command {
nodeUri: "/vehicle/$key" # or "/vehicle/$value.id"
laneUri: "addEvent"
value: $value
}
...
Progress Check
If you’ve made it this far, take a moment to congratulate yourself! We’ve built a server that serves granular, real-time streams of the historical states of every vehicle present within some Kafka topic. This is already an impressive application on its own; the remainder of this article just exercises additional functionalities.
To verify our progress, you may either:
- Open
localhost:9001
in your browser for a general purpose (i.e. not repository-specific) meta view of your Web Agents in action - Run the first two
swim-cli
commands in this article’s final section to exercise code-specific streaming APIs (the last command will only work once we implement everything in the “PolarityAgent
“ section).
PolarityAgent
” and PolarityMemberAgent
“Nstream also offers valuable real-time interactions across Web Agents on top of the benefits that we’ve already seen.
Once such interaction is dynamically assigning categories to certain Web Agents based on some of their properties.
Because each vehicle’s status comes with GPS coordinates, let’s utilize agent grouping to categorize every VehicleAgent
as “north” or “south” of lat=34.0
based on their latest update.
Per the linked documentation, this requires two additional Web Agent types:
- Two
GroupPatch
instances (one for “north”, one for “south”) - A
MemberPatch
instance for everyVehicleAgent
The first can be trivially configured in server.recon
alone:
# server.recon
...
"starter": @fabric {
...
@node {
uri: "/lat34/north"
@agent(class: "nstream.adapter.common.patches.GroupPatch")
}
@node {
uri: "/lat34/south"
@agent(class: "nstream.adapter.common.patches.GroupPatch")
}
}
The second requires a bit of custom Java logic to transform a coordinate into a polarity, but this isn’t very difficult either:
import nstream.adapter.common.patches.MemberPatch;
import swim.structure.Value;
public class PolarityMemberAgent extends MemberPatch {
@Override
protected String extractGroupFromEvent(Value event) {
final float indicator = event.get("latitude").floatValue();
return indicator < 34.f ? "south" : "north";
}
}
And neither is its remaining server.recon
configuration:
# server.recon
...
"starter": @fabric {
...
@node {
pattern: "/vehicle/:id"
@agent(class: "nstream.starter.VehicleAgent")
# Note "mixin"-style, additive Web Agent functionality,
@agent(class: "nstream.starter.PolarityMemberAgent") {
"groupUriPattern": "/lat34/:group"
}
}
...
}
Final Notes
After running at least the main server component from the standalone project instructions, you may open localhost:9001
in a browser window for a meta view of the web agents.
Congratulations on building the backend for an end-to-end streaming application!
To experiment with code-specific streaming APIs, you may run the following swim-cli
commands while the process runs (any number in the first column of src/main/resources/locations.csv
is a valid $VID
; $POLARITY
is either north
or south
):
- Subscribing to a
VehicleAgent
instance’slatest
lane% swim-cli sync -h warp://localhost:9001 -n /vehicle/$VID -l latest {"id":9542,"routeId":207,"dir":"inbound","latitude":34.01912,"longitude":-118.30902,"speed":6,"bearing":"W","routeName":"207 Hollywood - Crenshaw Sta Via Western","timestamp":1695084501247}
- Subscribing to a
VehicleAgent
instance’shistory
lane% swim-cli sync -h warp://localhost:9001 -n /vehicle/$VID -l history @update(key:1695084501247){"id":9542,"routeId":207,"dir":"inbound","latitude":34.01912,"longitude":-118.30902,"speed":6,"bearing":"W","routeName":"207 Hollywood - Crenshaw Sta Via Western","timestamp":1695084501247} @update(key:1695084628525){"id":9542,"routeId":207,"dir":"inbound","latitude":34.01912,"longitude":-118.30902,"speed":6,"bearing":"W","routeName":"207 Hollywood - Crenshaw Sta Via Western","timestamp":1695084628525} ...
- Subscribing to a “
PolarityAgent
” (really aGroupPatch
) instance’sagents
lane% swim-cli sync -h warp://localhost:9001 -n /lat34/$POLARITY -l agents @update(key:"/vehicle/1564"){id:1564,routeId:90,dir:outbound,latitude:34.056484,longitude:-118.25013,speed:9,bearing:NW,routeName:"90 Dtla - Noho Sta Via Vineland-Foothil",timestamp:1695085068135} @update(key:"/vehicle/1570"){id:1570,routeId:78,dir:outbound,latitude:34.03334,longitude:-118.26485,speed:0,bearing:NE,routeName:"78 Downtown LA - South Arcadia Via Main",timestamp:1695085082336} ...
Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).