March 7 - June 27, 2024 - Nstream is hitting the road with Confluent on the #DataInMotionTour! / Learn More

HTTP Ingress

Nstream provides an HTTP Adapter library that greatly facilitates ingestion from HTTP (REST) endpoints. This guide demonstrates how to repeatedly poll HTTP endpoints and process responses in Web Agents using minimal boilerplate.

Prerequisites

Dependencies

Gradle

implementation 'io.nstream:nstream-adapter-http:4.7.10'

Maven

<dependency>
  <groupId>io.nstream</groupId>
  <artifactId>nstream-adapter-http</artifactId>
  <version>4.7.10</version>
  <type>module</type>
</dependency>

Glossary

Ideal HttpIngestingPatch Conditions

If:

Then you simply need to extend the HttpIngestingPatch class and override at most one method. Let’s demonstrate this by recreating the AgencyAgent example from the open source documentation.

First, spike the AgencyAgent class. By extending HttpIngestingPatch, the only logic to implement is converting a received HttpResponse into a Value, and we’ll copy this part exactly from the open source implementation:

import java.util.ArrayList;
import java.util.List;
import nstream.adapter.http.HttpIngestingPatch;
import swim.structure.Attr;
import swim.structure.Item;
import swim.structure.Value;

public class AgencyAgent extends HttpIngestingPatch {

  private String agencyId() {
    final String nodeUri = nodeUri().toString();
    return nodeUri.substring(nodeUri.lastIndexOf("/") + 1);
  }

  // The only method that requires overriding (and less complicated
  // payloads may not even need to override this).
  @Override
  protected void ingestStructure(Value payload) {
    final String aid = agencyId();
    long lastTime = System.currentTimeMillis();
    // Extract information for all vehicles and the payload's timestamp
    final List<Value> vehicleInfos = new ArrayList<>(payload.length());
    for (Item i : payload) {
      if (i.head() instanceof Attr) {
        final String label = i.head().key().stringValue(null);
        if ("vehicle".equals(label)) {
          vehicleInfos.add(i.head().toValue());
        } else if ("lastTime".equals(label)) {
          lastTime = i.head().toValue().get("time").longValue();
        }
      }
    }
    // Relay each vehicleInfo to the appropriate VehicleAgent
    int i = 0;
    for (Value vehicleInfo : vehicleInfos) {
      command("/vehicle/" + aid + "/" + vehicleInfo.get("id").stringValue(),
          "addMessage",
          // lastTime came separately, manually add it to each vehicleInfo
          vehicleInfo.updatedSlot("timestamp", lastTime));
      i++;
    }
    System.out.println(nodeUri() + ": relayed info for " + i + " vehicles");
  }

}

Instances of these HttpIngestingPatch must correctly configure this.ingressSettings. While easy enough to do in Java code, a config-only option is also available due to the unchanging request structure. Within server.recon, simply define an httpIngressConf prop that contains the serialized representation of your HttpIngressSettings object:

# server.recon
vehicle: @fabric {
  @plane(class: "nstream.adapter.runtime.AppPlane")
  # Ingestion Agents
  @node {
    uri: "/agency/portland-sc"
    @agent(class: "http.adapter.AgencyAgent") {
      httpIngressConf: @httpIngressSettings {
        firstPollDelayMillis: 1000, # Wait 1s after Web Agent starts to begin polling
        pollIntervalMillis: 15000, # Subsequent 15s delay between timer ticks
        headers: {
          "User-Agent": "NstreamHttpAdapterDemo/4.0.0",
          "Accept-Encoding": "gzip"
        },
        endpointUrl: "https://retro.umoiq.com/service/publicXMLFeed?command=vehicleLocations&a=portland-sc&t=0"
      }
    }
  }
  @node {
    uri: "/agency/reno"
    @agent(class: "http.adapter.AgencyAgent") {
      httpIngressConf: @httpIngressSettings {
        firstPollDelayMillis: 2000, # Wait 2s after Web Agent starts to begin polling
        pollIntervalMillis: 15000, # Subsequent 15s delay between timer ticks
        headers: {
          "User-Agent": "NstreamHttpAdapterDemo/4.0.0",
          "Accept-Encoding": "gzip"
        },
        endpointUrl: "https://retro.umoiq.com/service/publicXMLFeed?command=vehicleLocations&a=reno&t=0"
      }
    }
  }
  # Domain Agents (i.e. VehicleAgent)
  ...
}

We’re actually done – these two files alone form a fully-runnable Swim server (using nstream.adapter.runtime.AppPlane#main as the launcher), and copy-pasting the open source guide’s VehicleAgent implementation achieves full parity. The TimerRef management and HttpResponse translation all happen under the hood.

Common Variations

Timing Strategy

The default timing strategy fires an HTTP-exchange-executing task with a fixed period between fires (regardless of task duration), with the class’s Agent#didStart() callback initiating the process. There are two aspects that you may wish to change:

A rather common alternative for the latter is to instead invoke stageReception() from the onCommand() callback of some CommandLane:

// AgencyAgent.java
// import...
import swim.api.lane.CommandLane;

public class AgencyAgent extends HttpIngestingPatch {

  // ...

  // Addressing the first bullet point; note that complicated timing strategies
  // may require more involved, manual timer management
  @Override
  protected void stageReception() {
    loadSettings("httpIngressConf");
    // exchangeTimer is inherited
    this.exchangeTimer = scheduleWithFixedDelay(this::exchangeTimer,
        this.ingressSettings.firstPollDelayMillis(), this.ingressSettings.pollIntervalMillis(),
        this::fetchAndRelay);
  }

  // Together with didStart(), addressing the second bullet point
  @SwimLane("triggerReception")
  CommandLane<String> triggerReception = this.<String>commandLane()
      .onCommand(s -> {
        if ("start".equals(s)) {
          stageReception();
        }
      });

  @Override
  public void didStart() {
    System.out.println(nodeUri() + ": didStart");
    // No call to stageReception() unlike in superclass
  }

}

Variable HTTP Request Parameters

If the parameters for each request are not statically known, then the very simple HttpIngressSettings POJO cannot express the desired functionality. The general-purpose alternative strategy is as follows:

You may have noticed that the endpointUrl property in this guide so far uses a URL parameter t=0, but t was a variable in the open source example. We got away with it because these happen to produce the same result, but now we’ll address this properly. First modify the URLs in server.recon to use t=%d instead of t=0, then add the following to AgencyAgent:

// AgencyAgent.java
// import...

public class AgencyAgent extends HttpIngestingPatch {

  // ...

  private volatile long lastTime = -1L;

  @Override
  public HttpRequest prepareRequest(HttpIngressSettings ingressSettings) {
    final HttpRequest request;
    try {
      request = HttpAdapterUtils.buildHttpRequest("GET",
          String.format(ingressSettings.endpointUrl(), this.lastTime),
          ingressSettings.headers(),
          HttpRequest.BodyPublishers.noBody(),
          ingressSettings.timeoutMillis());
    } catch (URISyntaxException e) {
      throw new RuntimeException(e);
    }
    return request;
  }

  @Override
  protected void ingestStructure(Value payload) {
    // ...
    // exact same as earlier implementation, plus the following line
    // for state management (value determined by the received response)
    this.lastTime = lastTime;
    System.out.println(nodeUri() + ": relayed info for " + i + " vehicles");
  }

}

Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).