Hazelcast Jet 0.6.1 - 多个领域的聚合

Hazelcast Jet 0.6.1- Aggregation on multiple fields

当前的 Hazelcast Jet 0.6.1 代码示例演示了基于单个字段(例如代码)的聚合。

这是一个参考。

\code-samples\streaming\stock-exchange\src\main\java\StockExchange.java

如何将其扩展到多个,例如 ticker、traderId 等

这是来自 StockExchange.java

的当前示例代码
 private static Pipeline buildPipeline() {
    Pipeline p = Pipeline.create();

    p.drawFrom(Sources.<Trade, Integer, Trade>mapJournal(TRADES_MAP_NAME,
            alwaysTrue(), EventJournalMapEvent::getNewValue, START_FROM_CURRENT))
     .addTimestamps(Trade::getTime, 3000)
     .groupingKey(Trade::getTicker)
     .window(WindowDefinition.sliding(SLIDING_WINDOW_LENGTH_MILLIS, SLIDE_STEP_MILLIS))
     .aggregate(counting(),
             (winStart, winEnd, key, result) -> String.format("%s %5s %4d", toLocalTime(winEnd), key, result))
     .drainTo(Sinks.logger());

    return p;
}

对于代码和交易者 ID,您可以使用:

.groupingKey(trade -> Tuple2.tuple2(trade.getTicker(), trade.getTraderId()))

一般来说,密钥可以是任何正确实现 equalshashCode 的东西。 Tuple2 是两个值的通用容器。

我们还可以提供逗号分隔键进行分组。

.aggregate(AggregateOperations.groupingBy(data -> {
    StringBuilder stringBuilder = new StringBuilder();
    stringBuilder.append(StringUtils.defaultString(data.getSource1().get(dataValue) + "", "")).append(",");
    return stringBuilder.substring(0, stringBuilder.toString().length() - 1); 
}));