March 7 - June 27, 2024 - Nstream is hitting the road with Confluent on the #DataInMotionTour! / Learn More

Summary Stats Patch

Nstream provides a ‘Summary Stats’ patch that greatly facilitates calculating rolling statistics from events. This guide demonstrates how to create and maintain a ValueLane that stores statistics calculated on receipt of an event.

Dependencies

Gradle

implementation 'io.nstream:nstream-adapter-aggr:4.7.10'

Maven

<dependency>
  <groupId>io.nstream</groupId>
  <artifactId>nstream-adapter-aggr</artifactId>
  <version>4.7.10</version>
  <type>module</type>
</dependency>

Config

The SummaryStatsPatch agent contains a ValueLane named summaryStats that will store rolling statistics of events received. In most cases, summary stats lanes can be implemented wholly with configuration in the server.recon file. This involves adding SummaryStatsPatch to the relevant nodes and defining some desired stats. Simply define a node that includes the SummaryStatsPatch agent:

# server.recon
...
space: @fabric {
  @plane(class:"nstream.adapter.runtime.AppPlane")

  # Domain Agents
  
  @node {
    pattern: "/game/server/:name"
    @agent(class: "nstream.adapter.aggr.online.SummaryStatsPatch") {
      statsComputerDef: {
        @peakPlayerCount(max: $onlinePlayers) 
        @averagePlayerCount(avg: $onlinePlayers) 
      }
    }
  }

}

Example: This agent receives events with a current player count field (onlinePlayers), the maximum player count (peakPlayerCount) and mean player count (averagePlayerCount) are then calculated and stored in the summaryStats lane.

Notice that the SummaryStatsPatch has one property, namely statsComputerDef, this is where we define stats to be computed and the format of the output summaryStats lane. The property itself is just a list of aggregation field definitions, each of which have the structure:

@alias(function: selector)

Aggregation functions

Function Key
Minimum min
Maximum max
Range range
Average avg
Count count
Variance variance
Sample Variance sampleVariance
Standard Deviation stdev
Sample Standard Deviation sampleStdev

Common Variations

For most variations, extending the SummaryStatsPatch and overriding the loadComputer method is required. The method must return a custom OnlineComputer<Value> which can be created directly using the constructor and methods or if preferred use the builder.

Conditional Aggregations

It might be desirable to conditionally update the aggregation. Here we override the loadComputer method and return a custom OnlineComputer. We can add an onlyIf predicate function that will only perform the calculation if the condition is met.

Example: Taking the example from above we are only going to recalculate the average player count if the server is online.

// PlayerStatsPatch.java
// import ...

public class PlayerStatsPatch extends SummaryStatsPatch {
  
  @Override
  protected OnlineComputer<Value> loadComputer() {
    return statsComputerBuilder()
        .<Double, Double>withAggr("peakPlayerCount", Aggrs::max,
            e -> e.get("onlinePlayers").doubleValue())
        .<Double, Double>withAggr("avgPlayerCount", Aggrs::avg,
            e -> e.get("onlinePlayers").doubleValue(),
            e -> e.get("serverOnline").booleanValue())
        .build();
  } 
}

Remember to update the server.recon to use the new agent just created (instead of the base SummaryStatsPatch).


Nstream is licensed under the Redis Source Available License 2.0 (RSALv2).