HTTP Ingress
Nstream provides an HTTP Adapter library that greatly facilitates ingestion from HTTP (REST) endpoints. This guide demonstrates how to repeatedly poll HTTP endpoints and process responses in Web Agents using minimal boilerplate.
Prerequisites
-
JDK 11+: The library takes advantage of
java.net.http.HttpClient
under the hood, which is only available starting Java 11.
Dependencies
Gradle
implementation 'io.nstream:nstream-adapter-http:4.13.21'
Maven
<dependency>
<groupId>io.nstream</groupId>
<artifactId>nstream-adapter-http</artifactId>
<version>4.13.21</version>
<type>module</type> <!-- Remove or comment this line for non-modular projects -->
</dependency>
Glossary
- ingestion: The act of converting an external response into messages utilized by Web Agents
-
HttpIngestingAgent:
An abstract Web Agent class with built-in methods relevant to an HTTP request-response cycle -
HttpIngressSettings:
A plain old Java object (POJO) that configures a singleHttpIngestingAgent
-
HttpIngestingPatch:
A concrete (but extendable)HttpIngestingAgent
subclass that collectsHttpResponses
into aswim.structure.Values
HttpIngestingPatch
Conditions
Ideal If:
- Every outgoing HTTP Request is a GET-type request with unchanging parameters
- Response bodies are converted into
Values
during ingestion - The recurring timing can resemble ScheduledExecutorService#scheduleAtFixedRate
Then you simply need to extend the HttpIngestingPatch
class and override at most one method.
Let’s demonstrate this by recreating the AgencyAgent
example from the open source documentation.
First, spike the AgencyAgent
class.
By extending HttpIngestingPatch
, the only logic to implement is converting a received HttpResponse
into a Value
, and we’ll copy this part exactly from the open source implementation:
import java.util.ArrayList;
import java.util.List;
import nstream.adapter.http.HttpIngestingPatch;
import swim.structure.Attr;
import swim.structure.Item;
import swim.structure.Value;
public class AgencyAgent extends HttpIngestingPatch {
private String agencyId() {
final String nodeUri = nodeUri().toString();
return nodeUri.substring(nodeUri.lastIndexOf("/") + 1);
}
// The only method that requires overriding (and less complicated
// payloads may not even need to override this).
@Override
protected void ingestStructure(Value payload) {
final String aid = agencyId();
long lastTime = System.currentTimeMillis();
// Extract information for all vehicles and the payload's timestamp
final List<Value> vehicleInfos = new ArrayList<>(payload.length());
for (Item i : payload) {
if (i.head() instanceof Attr) {
final String label = i.head().key().stringValue(null);
if ("vehicle".equals(label)) {
vehicleInfos.add(i.head().toValue());
} else if ("lastTime".equals(label)) {
lastTime = i.head().toValue().get("time").longValue();
}
}
}
// Relay each vehicleInfo to the appropriate VehicleAgent
int i = 0;
for (Value vehicleInfo : vehicleInfos) {
command("/vehicle/" + aid + "/" + vehicleInfo.get("id").stringValue(),
"addMessage",
// lastTime came separately, manually add it to each vehicleInfo
vehicleInfo.updatedSlot("timestamp", lastTime));
i++;
}
System.out.println(nodeUri() + ": relayed info for " + i + " vehicles");
}
}
Instances of these HttpIngestingPatch
must correctly configure this.ingressSettings
.
While easy enough to do in Java code, a config-only option is also available due to the unchanging request structure.
Within server.recon
, simply define an httpIngressConf
prop that contains the serialized representation of your HttpIngressSettings
object:
# server.recon
vehicle: @fabric {
@plane(class: "nstream.adapter.runtime.AppPlane")
# Ingestion Agents
@node {
uri: "/agency/portland-sc"
@agent(class: "http.adapter.AgencyAgent") {
httpIngressConf: @httpIngressSettings {
firstPollDelayMillis: 1000, # Wait 1s after Web Agent starts to begin polling
pollIntervalMillis: 15000, # Subsequent 15s delay between timer ticks
headers: {
"User-Agent": "NstreamHttpAdapterDemo/4.0.0",
"Accept-Encoding": "gzip"
},
endpointUrl: "https://retro.umoiq.com/service/publicXMLFeed?command=vehicleLocations&a=portland-sc&t=0"
}
}
}
@node {
uri: "/agency/reno"
@agent(class: "http.adapter.AgencyAgent") {
httpIngressConf: @httpIngressSettings {
firstPollDelayMillis: 2000, # Wait 2s after Web Agent starts to begin polling
pollIntervalMillis: 15000, # Subsequent 15s delay between timer ticks
headers: {
"User-Agent": "NstreamHttpAdapterDemo/4.0.0",
"Accept-Encoding": "gzip"
},
endpointUrl: "https://retro.umoiq.com/service/publicXMLFeed?command=vehicleLocations&a=reno&t=0"
}
}
}
# Domain Agents (i.e. VehicleAgent)
...
}
We’re actually done – these two files alone form a fully-runnable Swim server (using nstream.adapter.runtime.AppPlane#main
as the launcher), and copy-pasting the open source guide’s VehicleAgent
implementation achieves full parity.
The TimerRef
management and HttpResponse
translation all happen under the hood.
Common Variations
Timing Strategy
The default timing strategy fires an HTTP-exchange-executing task with a fixed period between fires (regardless of task duration), with the class’s Agent#didStart()
callback initiating the process.
There are two aspects that you may wish to change:
- To alter the periodicity strategy, override
HttpIngestingAgent#stageReception
to not invokeNstreamAgent#scheduleAtFixedRate
- To disable the automatic task startup, override
didStart()
to not callstageReception()
A rather common alternative for the latter is to instead invoke stageReception()
from the onCommand()
callback of some CommandLane
:
// AgencyAgent.java
// import...
import swim.api.lane.CommandLane;
public class AgencyAgent extends HttpIngestingPatch {
// ...
// Addressing the first bullet point; note that complicated timing strategies
// may require more involved, manual timer management
@Override
protected void stageReception() {
loadSettings("httpIngressConf");
// exchangeTimer is inherited
this.exchangeTimer = scheduleWithFixedDelay(this::exchangeTimer,
this.ingressSettings.firstPollDelayMillis(), this.ingressSettings.pollIntervalMillis(),
this::fetchAndRelay);
}
// Together with didStart(), addressing the second bullet point
@SwimLane("triggerReception")
CommandLane<String> triggerReception = this.<String>commandLane()
.onCommand(s -> {
if ("start".equals(s)) {
stageReception();
}
});
@Override
public void didStart() {
System.out.println(nodeUri() + ": didStart");
// No call to stageReception() unlike in superclass
}
}
Variable HTTP Request Parameters
If the parameters for each request are not statically known, then the very simple HttpIngressSettings
POJO cannot express the desired functionality.
The general-purpose alternative strategy is as follows:
- Within your
HttpIngestingPatch
extension, store the state required to dynamically build each request in one or more class-local variables, updating these variables correctly during the Web Agent’s lifecycle - Override
HttpIngestingAgent#prepareRequest
within your concrete class to read that state while building each request
You may have noticed that the endpointUrl
property in this guide so far uses a URL parameter t=0
, but t
was a variable in the open source example.
We got away with it because these happen to produce the same result, but now we’ll address this properly.
First modify the URLs in server.recon
to use t=%d
instead of t=0
, then add the following to AgencyAgent
:
// AgencyAgent.java
// import...
public class AgencyAgent extends HttpIngestingPatch {
// ...
private volatile long lastTime = -1L;
@Override
public HttpRequest prepareRequest(HttpIngressSettings ingressSettings) {
final HttpRequest request;
try {
request = HttpAdapterUtils.buildHttpRequest("GET",
String.format(ingressSettings.endpointUrl(), this.lastTime),
ingressSettings.headers(),
HttpRequest.BodyPublishers.noBody(),
ingressSettings.timeoutMillis());
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
return request;
}
@Override
protected void ingestStructure(Value payload) {
// ...
// exact same as earlier implementation, plus the following line
// for state management (value determined by the received response)
this.lastTime = lastTime;
System.out.println(nodeUri() + ": relayed info for " + i + " vehicles");
}
}
Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).