Relay DSL
Nstream provides a Relay domain-specific language (DSL) that can be configured within connector settings to enable no-code ingestion of source data.
This is an opt-in convenience feature that is designed to enable reasonably powerful customizability from configurations alone. If you prefer to work directly within your Web Agents’ Java code, you are perfectly welcome to do so instead.
Note: the content here is not connector-specific.
For an adapter library nstream.adapter.foo
, each relayDef
demonstrated in the following sections configures the relaySchema
of a FooIngressSettings
.
Prerequisites
Dependencies
Gradle
implementation 'io.nstream:nstream-adapter-common:4.14.22'
Maven
<dependency>
<groupId>io.nstream</groupId>
<artifactId>nstream-adapter-common</artifactId>
<version>4.14.22</version>
<type>module</type> <!-- Remove or comment this line for non-modular projects -->
</dependency>
Overview
The Relay DSL expresses the following: given an incoming message of type swim.structure.Value
, perform these declared actions.
This corresponds to the Ingress to Entities section from the Toolkit Overview.
For ingestor-type processes, the desired actions are nearly always command messages that other Web Agents will utilize.
Recall that a command message is uniquely identified by its target nodeUri
, laneUri
, and payload-type value
.
This article demonstrates how to configure these properties in a variety of use cases.
nodeUris
and laneUris
Static Target Single Target, Single Pass-Through Value
If each incoming message should be relayed to statically known WARP endpoints, then simply configure the approriate String values in the nodeUri
(no defaults present, so this is mandatory) and laneUri
(default=addEvent
) properties.
# input
{bar:3,baz:4}
# relayDef
@command {
nodeUri: "/vehicle/foo"
}
# resulting command
nodeUri=/vehicle/foo
laneUri=addEvent
value={bar:3,baz:4}
Multiple Targets, Single Pass-Through Value
Your snippet may also be a Record
that contains multiple @command
blocks, each of which will execute in declared order.
# input
{bar:3,baz:4}
# relayDef
{
@command {
nodeUri: "/vehicle/foo qux"
},
@command {
nodeUri: "/site/foo"
laneUri: "addMessage"
}
}
# resulting commands (2)
(1)
nodeUri=/vehicle/foo%20qux # Note automatic URI encoding
laneUri=addEvent
value={bar:3,baz:4}
(2)
nodeUri=/site/foo
laneUri=addMessage
value={bar:3,baz:4}
value
Subselection
Leaving out any @command#value
declarations tells the DSL to relay the entire received message.
Providing swim.structure.Selector
s indicates to relay component(s) of each received message instead.
# input
{bar:3,baz:4}
# relayDef
@command {
nodeUri: "/vehicle/foo",
value: $bar # READ AS: Selector.get(bar)
}
# resulting command
nodeUri=/vehicle/foo
laneUri=addEvent
value=3
value
Computations
In fact, the value
configuration accepts general swim.structure.Expr
s, not just Selectors
.
# input
{nested:{id:foo,bar:3,baz:4}}
# relayDef0
@command {
nodeUri: "/vehicle/foo",
value: $nested.bar * $nested.baz
}
# relayDef1 (functionally equivalent to relayDef0,
# but offers "eager truncation")
@command($nested) {
nodeUri: "/vehicle/foo",
value: $bar * $baz
}
# resulting command
nodeUri=/vehicle/foo
laneUri=addEvent
value=12
nodeUris
and laneUris
Dynamically Evaluated Most use cases will require message-dependent routing logic, i.e. dynamically evaluable nodeUri
and laneUri
properties
(the latter is rarely non-static in practice, but the option is available).
Cleanly Delimited URIs
Most target URIs can be expressed as path components (delimited by the /
character), where every component is either:
- A static String, or
- An expression that yields a String when evaluated against the input.
In such cases, continue use the nodeUri
and laneUri
configurations; just include Expr
components within the Strings as needed.
# input
{
info:{id:"4JWH921",state:CA,year:2000}
stats:{mileage:131201,speed:45}
}
# relayDef
@command {
nodeUri: "/vehicle/$info.state/$info.id",
laneUri: "addStats",
value: $stats
}
# resulting command
nodeUri=/vehicle/CA/4JWH921
laneUri=addStats
value={mileage:131201,speed:45}
Just like with value
, each URI component may be a general Expr
instead of just a Selector
.
# input
{
hero: {
name: "Foo/Bar", score: 22, penalty: 0
}
sidekick: {
name: "Bar/Baz", score: 21, penalty: 3
}
}
# relayDef
{
@command {
nodeUri: "/team/$hero.name"
laneUri: "addScore"
value: $hero.score - $hero.penalty
}
@command {
nodeUri: "/team/$hero.name + \"+\" + $sidekick.name"
laneUri: "addScore"
value: $hero.score - $hero.penalty + $sidekick.score - $sidekick.penalty
}
}
# resulting commands (2)
# Note that / within components are transparently encoded,
# while path component separators are left untouched
(1)
nodeUri=/team/Foo%2fBar
laneUri=addScore
value=22
(2)
nodeUri=/team/Foo%2fBar%2bBar%2fBaz
laneUri=addScore
value=40
Complex URIs
The syntax in the previous section is limited by the power of Exprs
.
More complicated URIs can be constructed by using the complex...Uri
identifiers, which have access to the full library of Directives
within the Relay DSL.
These configurations:
- Accept singular
Records
- Treat each
Item
in theRecord
as a DSL snippet to be evaluated - Concatenate the evaluations in-order to form the final result.
Complex Properties Are Explicit
There are zero transparent convenience features (e.g. the implicit URI encoding demonstrated above) in this flow. Ensure that your configuration does exactly what you want, perhaps even on malformed inputs.
# input
{
"code": 200,
"payload": {
"USD/EUR": {
"rate": 0.967715,
"timestamp": 1660235884
}
}
}
# relayDef0
@command {
complexNodeUri: {
"/currency/",
@uriPathEncode($payload.*:) # READ AS Selector.get(payload).key()
}
value: $payload.:* # READ AS Selector.get(payload).value()
}
# relayDef1 (functionally equivalent to relayDef0)
@command($payload) {
complexNodeUri: {
"/currency/",
@uriPathEncode($*:) # READ AS Selector.key()
}
value: $:* # READ AS Selector.value()
}
# resulting command
nodeUri=/currency/USD%2fEUR
laneUri=addEvent
value={rate:0.967715,timestamp:1660235884}
@foreach
)
One Message, Many Targets (An input may contain data for many targets whose natures themselves depend on possibly highly variable input.
A @foreach
declaration that encapsulates one or more @command
blocks may help to address such situations.
Everything else that has been discussed in the article remains applicable.
Basic Usage
# input
{
"code": 200,
"vehicles": {
@vehicle{id:SC021,lat:45.502948,lon:-122.6702635}
@vehicle{id:SC010,lat:45.5228468,lon:-122.6811123}
}
}
# relayDef
@foreach($vehicles) {
@command {
nodeUri: "/vehicle/$id",
laneUri: "addLocation"
}
}
# resulting commands (2, because the @foreach iterates)
(1)
nodeUri=/vehicle/SC021
laneUri=addLocation
value=@vehicle{id:SC021,lat:45.502948,lon:-122.6702635}
(2)
nodeUri=/vehicle/SC010
laneUri=addLocation
value=@vehicle{id:SC010,lat:45.5228468,lon:-122.6811123}
Note: the 200
under code
is unavailable within the @foreach
due to “eager truncation”, though perfectly usable outside of it.
Future nstream-adapter-common
releases will introduce a means to circumvent this and similar situations.
value
Transformations
With # input (JSON)
{
"code": 200,
"rates": [
"USDEUR": {"rate": 0.967715, "timestamp": 1660235884},
"USDJPY": {"rate": 132.716501, "timestamp": 1660235884}
]
}
# relayDef
@foreach($rates) {
@command {
nodeUri: "/currency/$*:" # READ AS Selector.key()
laneUri: "addEvent"
value: $:* # READ AS Selector.value()
}
}
# resulting commands (2)
(1)
nodeUri=/currency/USDEUR
laneUri=addEvent
value: {rate:0.967715,timestamp:1660235884}
(2)
nodeUri=/currency/USDJPY
laneUri=addEvent
value:{rate:132.716501,timestamp:1660235884}
complexNodeUri
With # input (JSON, same as before)
{
"code": 200,
"rates": [
"USDEUR": {"rate": 0.967715, "timestamp": 1660235884},
"USDJPY": {"rate": 132.716501, "timestamp": 1660235884}
]
}
# relayDef
@foreach($rates) {
@command {
complexNodeUri: {
"/currency/",
@substring($*:){ # READ AS Selector.key()
lower:3 # READ AS str[3:] (in Python syntax)
}
}
laneUri: "addCurrentRate"
value: $:*.rate # READ AS Selector.value().get(rate)
}
}
# resulting commands (2)
(1)
nodeUri=/currency/EUR
laneUri=addCurrentRate
value: 0.967715
(2)
nodeUri=/currency/JPY
laneUri=addCurrentRate
value: 132.716501
Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).