March 7 - June 27, 2024 - Nstream is hitting the road with Confluent on the #DataInMotionTour! / Learn More

Toolkit Overview

The SwimOS platform provides a good framework for building real time applications that model business entities using distributed objects, the objects being Web Agents.

Nstream provides a toolkit that builds upon SwimOS aiming to provide pre-packaged configurable solutions for some common patterns, removing complexity and boilerplate, allowing for greater focus on business logic.

Toolkit Flow

The general flow of most SwimOS applications is very similar and involves some key steps.

We will demonstrate how implementing this whole flow using the Nstream Toolkit is not only easy but can be achieved with a single configuration file.

The next few pages will go into more detail about patches and lanes but for now, know that patches are pre-built configurable agents - the building blocks of the Nstream Toolkit.

Ingress

The first stage of any Swim application is getting the data or entities from some datasource into the application.

The Nstream Toolkit provides several ingress patches that can be used to pull or receive from a datasource, including message brokers such as Kafka, or databases using the JDBC ingress patch. Ingress patches are singleton agents that can be entirely configuration driven (no Java, just properties in the server.recon file).

  @node {
    uri: "/kafka"
    @agent(class: "nstream.adapter.kafka.KafkaIngestingPatch") {
      kafkaIngressConf: @kafkaIngressSettings {
        topics: {"vehicle-topic"}
      }
    }
  }

For exact properties and implementation instructions see the relevant ingress documentation. Here we demonstrate the general pattern using Kafka as an example, with some properties omitted for clarity.

Ingress to Entities

The ingress patch, once configured with connection details, will now start pulling or receiving events. Regardless of the source, the next stage of the process is to route the ingested events to the appropriate entity nodes.

To route the message, we are going to configure a relay, essentially a configurable way of constructing a destination and payload from an event. The relay defines how to extract unique identifiers of the entity from an event and apply them to the URI pattern of the entity. It also specifies the lane to command and the payload - which might just be the whole event. The relay is a property of the ingress patch.

  relaySchema: @command {
    nodeUri: {
      "/vehicle/",   # Our vehicle nodes have URI pattern: '/vehicle/:id'
       $key          # $value$id will also work here
    },
    laneUri: "addEvent"
    value: $value
  }
Sample Event:
{ id: 10023, agency: SanFrancisco, active: true, latitude: 37.7, longitude: 122.4 } 

The ingress patch is now pulling/receiving events from a datasource and routing them to the entity node the event is concerning. The entity node’s behaviour can now be composed with agents, defining how to handle the events. This can be done with the patch system or custom business logic can be run in bespoke agents.

  @node {
    uri: "/vehicle/:id"
    @agent(class: "nstream.adapter.common.patches.LatestValuePatch")
  }

Here we define a very simple vehicle agent that uses the LatestValuePatch to store the most recent event.

Entities to Aggregations

The final stage is to model the relationships and groupings of agents. Grouping agents allows for aggregate statistics to be calculated and provides a single streaming API for a set of agents. The patch system provides a configurable method of doing this, by extracting a group identifier from an event then joining that group.

  @node {
    uri: "/agency/:name"
    @agent(class: "nstream.adapter.common.patches.GroupPatch")
  }

  @node {
    pattern: "/vehicle/:id"
    @agent(class: "nstream.adapter.common.patches.LatestValuePatch")
    @agent(class: "nstream.adapter.common.patches.MemberPatch") {
      groupUriPattern: "/agency/:name"
      extractGroup: $agency                # Selector to extract the 'agency' from an event
    }

We use a member patch to extract the ‘agency’ value from an event and use that to automatically join the agencies group patch. For more on group and member patches see the agent relationship guide.

We now have a set of streaming aggregate agents that keep an up-to-date map of all the lower level entities current status.

Egress

An optional step is egress which is to write the state of agents to some external process. The Nstream toolkit provides some egress patches for common data sinks that greatly reduce boilerplate.

Configuration alone is not sufficient to define what and how to publish to the external process and so an egress patch must be extended. The subclass is responsible for:

Each egress guide will go over different implementations but a good example is an egress agent that publishes on every new value of a value lane.

  @SwimLane("summary")
  ValueLane<Value> summary = this.<Value>valueLane()
      .didSet((nv, ov) -> submit(() -> publish(nv)));

Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).