Relay DSL

Nstream provides a Relay domain-specific language (DSL) that can be configured within connector settings to enable no-code ingestion of source data.

This is an opt-in convenience feature that is designed to enable reasonably powerful customizability from configurations alone. If you prefer to work directly within your Web Agents’ Java code, you are perfectly welcome to do so instead.

Note: the content here is not connector-specific. For an adapter library nstream.adapter.foo, each relayDef demonstrated in the following sections configures the relaySchema of a FooIngressSettings.

Prerequisites

Dependencies

Gradle

implementation 'io.nstream:nstream-adapter-common:4.14.22'

Maven

<dependency>
  <groupId>io.nstream</groupId>
  <artifactId>nstream-adapter-common</artifactId>
  <version>4.14.22</version>
  <type>module</type> <!-- Remove or comment this line for non-modular projects -->
</dependency>

Overview

The Relay DSL expresses the following: given an incoming message of type swim.structure.Value, perform these declared actions. This corresponds to the Ingress to Entities section from the Toolkit Overview.

For ingestor-type processes, the desired actions are nearly always command messages that other Web Agents will utilize. Recall that a command message is uniquely identified by its target nodeUri, laneUri, and payload-type value. This article demonstrates how to configure these properties in a variety of use cases.

Static Target nodeUris and laneUris

Single Target, Single Pass-Through Value

If each incoming message should be relayed to statically known WARP endpoints, then simply configure the approriate String values in the nodeUri (no defaults present, so this is mandatory) and laneUri (default=addEvent) properties.

# input
{bar:3,baz:4}

# relayDef
@command {
  nodeUri: "/vehicle/foo"
}

# resulting command
nodeUri=/vehicle/foo
laneUri=addEvent
value={bar:3,baz:4}

Multiple Targets, Single Pass-Through Value

Your snippet may also be a Record that contains multiple @command blocks, each of which will execute in declared order.

# input
{bar:3,baz:4}

# relayDef
{
  @command {
    nodeUri: "/vehicle/foo qux"
  },
  @command {
    nodeUri: "/site/foo"
    laneUri: "addMessage"
  }
}

# resulting commands (2)
(1)
nodeUri=/vehicle/foo%20qux # Note automatic URI encoding
laneUri=addEvent
value={bar:3,baz:4}
(2)
nodeUri=/site/foo
laneUri=addMessage
value={bar:3,baz:4}

value Subselection

Leaving out any @command#value declarations tells the DSL to relay the entire received message. Providing swim.structure.Selectors indicates to relay component(s) of each received message instead.

# input
{bar:3,baz:4}

# relayDef
@command {
  nodeUri: "/vehicle/foo",
  value: $bar # READ AS: Selector.get(bar)
}

# resulting command
nodeUri=/vehicle/foo
laneUri=addEvent
value=3

value Computations

In fact, the value configuration accepts general swim.structure.Exprs, not just Selectors.

# input
{nested:{id:foo,bar:3,baz:4}}

# relayDef0
@command {
  nodeUri: "/vehicle/foo",
  value: $nested.bar * $nested.baz
}
# relayDef1 (functionally equivalent to relayDef0,
#  but offers "eager truncation")
@command($nested) {
  nodeUri: "/vehicle/foo",
  value: $bar * $baz
}

# resulting command
nodeUri=/vehicle/foo
laneUri=addEvent
value=12

Dynamically Evaluated nodeUris and laneUris

Most use cases will require message-dependent routing logic, i.e. dynamically evaluable nodeUri and laneUri properties (the latter is rarely non-static in practice, but the option is available).

Cleanly Delimited URIs

Most target URIs can be expressed as path components (delimited by the / character), where every component is either:

In such cases, continue use the nodeUri and laneUri configurations; just include Expr components within the Strings as needed.

# input
{
  info:{id:"4JWH921",state:CA,year:2000}
  stats:{mileage:131201,speed:45}
}

# relayDef
@command {
  nodeUri: "/vehicle/$info.state/$info.id",
  laneUri: "addStats",
  value: $stats
}

# resulting command
nodeUri=/vehicle/CA/4JWH921
laneUri=addStats
value={mileage:131201,speed:45}

Just like with value, each URI component may be a general Expr instead of just a Selector.

# input
{
  hero: {
    name: "Foo/Bar", score: 22, penalty: 0
  }
  sidekick: {
    name: "Bar/Baz", score: 21, penalty: 3
  }
}

# relayDef
{
  @command {
    nodeUri: "/team/$hero.name"
    laneUri: "addScore"
    value: $hero.score - $hero.penalty
  }
  @command {
    nodeUri: "/team/$hero.name + \"+\" + $sidekick.name"
    laneUri: "addScore"
    value: $hero.score - $hero.penalty + $sidekick.score - $sidekick.penalty
  }
}

# resulting commands (2)
# Note that / within components are transparently encoded,
#  while path component separators are left untouched
(1)
nodeUri=/team/Foo%2fBar
laneUri=addScore
value=22
(2)
nodeUri=/team/Foo%2fBar%2bBar%2fBaz
laneUri=addScore
value=40

Complex URIs

The syntax in the previous section is limited by the power of Exprs. More complicated URIs can be constructed by using the complex...Uri identifiers, which have access to the full library of Directives within the Relay DSL. These configurations:

Complex Properties Are Explicit

There are zero transparent convenience features (e.g. the implicit URI encoding demonstrated above) in this flow. Ensure that your configuration does exactly what you want, perhaps even on malformed inputs.

# input
{
  "code": 200,
  "payload": {
    "USD/EUR": {
      "rate": 0.967715,
      "timestamp": 1660235884
    }    
  }
}

# relayDef0
@command {
  complexNodeUri: {
    "/currency/",
    @uriPathEncode($payload.*:) # READ AS Selector.get(payload).key()
  }
  value: $payload.:* # READ AS Selector.get(payload).value()
}

# relayDef1 (functionally equivalent to relayDef0)
@command($payload) {
  complexNodeUri: {
    "/currency/",
    @uriPathEncode($*:) # READ AS Selector.key()
  }
  value: $:* # READ AS Selector.value()
}

# resulting command
nodeUri=/currency/USD%2fEUR
laneUri=addEvent
value={rate:0.967715,timestamp:1660235884}

One Message, Many Targets (@foreach)

An input may contain data for many targets whose natures themselves depend on possibly highly variable input. A @foreach declaration that encapsulates one or more @command blocks may help to address such situations. Everything else that has been discussed in the article remains applicable.

Basic Usage

# input
{
  "code": 200,
  "vehicles": {
    @vehicle{id:SC021,lat:45.502948,lon:-122.6702635}
    @vehicle{id:SC010,lat:45.5228468,lon:-122.6811123}
  }
}

# relayDef
@foreach($vehicles) {
  @command {
    nodeUri: "/vehicle/$id",
    laneUri: "addLocation"
  }
}

# resulting commands (2, because the @foreach iterates)
(1)
nodeUri=/vehicle/SC021
laneUri=addLocation
value=@vehicle{id:SC021,lat:45.502948,lon:-122.6702635}
(2)
nodeUri=/vehicle/SC010
laneUri=addLocation
value=@vehicle{id:SC010,lat:45.5228468,lon:-122.6811123}

Note: the 200 under code is unavailable within the @foreach due to “eager truncation”, though perfectly usable outside of it. Future nstream-adapter-common releases will introduce a means to circumvent this and similar situations.

With value Transformations

# input (JSON)
{
  "code": 200,
  "rates": [
    "USDEUR": {"rate": 0.967715, "timestamp": 1660235884},
    "USDJPY": {"rate": 132.716501, "timestamp": 1660235884}
  ]
}

# relayDef
@foreach($rates) {
  @command {
    nodeUri: "/currency/$*:" # READ AS Selector.key()
    laneUri: "addEvent"
    value: $:* # READ AS Selector.value()
  }
}

# resulting commands (2)
(1)
nodeUri=/currency/USDEUR
laneUri=addEvent
value: {rate:0.967715,timestamp:1660235884}
(2)
nodeUri=/currency/USDJPY
laneUri=addEvent
value:{rate:132.716501,timestamp:1660235884}

With complexNodeUri

# input (JSON, same as before)
{
  "code": 200,
  "rates": [
    "USDEUR": {"rate": 0.967715, "timestamp": 1660235884},
    "USDJPY": {"rate": 132.716501, "timestamp": 1660235884}
  ]
}

# relayDef
@foreach($rates) {
  @command {
    complexNodeUri: {
      "/currency/",
      @substring($*:){ # READ AS Selector.key()
        lower:3 # READ AS str[3:] (in Python syntax)
      }
    }
    laneUri: "addCurrentRate"
    value: $:*.rate # READ AS Selector.value().get(rate)
  }
}

# resulting commands (2)
(1)
nodeUri=/currency/EUR
laneUri=addCurrentRate
value: 0.967715
(2)
nodeUri=/currency/JPY
laneUri=addCurrentRate
value: 132.716501

Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).