Hazelcast Jet 0.6.1 - 多个领域的聚合
Hazelcast Jet 0.6.1- Aggregation on multiple fields
当前的 Hazelcast Jet 0.6.1 代码示例演示了基于单个字段(例如代码)的聚合。
这是一个参考。
\code-samples\streaming\stock-exchange\src\main\java\StockExchange.java
如何将其扩展到多个,例如 ticker、traderId 等
这是来自 StockExchange.java
的当前示例代码
private static Pipeline buildPipeline() {
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<Trade, Integer, Trade>mapJournal(TRADES_MAP_NAME,
alwaysTrue(), EventJournalMapEvent::getNewValue, START_FROM_CURRENT))
.addTimestamps(Trade::getTime, 3000)
.groupingKey(Trade::getTicker)
.window(WindowDefinition.sliding(SLIDING_WINDOW_LENGTH_MILLIS, SLIDE_STEP_MILLIS))
.aggregate(counting(),
(winStart, winEnd, key, result) -> String.format("%s %5s %4d", toLocalTime(winEnd), key, result))
.drainTo(Sinks.logger());
return p;
}
对于代码和交易者 ID,您可以使用:
.groupingKey(trade -> Tuple2.tuple2(trade.getTicker(), trade.getTraderId()))
一般来说,密钥可以是任何正确实现 equals
和 hashCode
的东西。 Tuple2
是两个值的通用容器。
我们还可以提供逗号分隔键进行分组。
.aggregate(AggregateOperations.groupingBy(data -> {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append(StringUtils.defaultString(data.getSource1().get(dataValue) + "", "")).append(",");
return stringBuilder.substring(0, stringBuilder.toString().length() - 1);
}));
当前的 Hazelcast Jet 0.6.1 代码示例演示了基于单个字段(例如代码)的聚合。
这是一个参考。
\code-samples\streaming\stock-exchange\src\main\java\StockExchange.java
如何将其扩展到多个,例如 ticker、traderId 等
这是来自 StockExchange.java
private static Pipeline buildPipeline() {
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<Trade, Integer, Trade>mapJournal(TRADES_MAP_NAME,
alwaysTrue(), EventJournalMapEvent::getNewValue, START_FROM_CURRENT))
.addTimestamps(Trade::getTime, 3000)
.groupingKey(Trade::getTicker)
.window(WindowDefinition.sliding(SLIDING_WINDOW_LENGTH_MILLIS, SLIDE_STEP_MILLIS))
.aggregate(counting(),
(winStart, winEnd, key, result) -> String.format("%s %5s %4d", toLocalTime(winEnd), key, result))
.drainTo(Sinks.logger());
return p;
}
对于代码和交易者 ID,您可以使用:
.groupingKey(trade -> Tuple2.tuple2(trade.getTicker(), trade.getTraderId()))
一般来说,密钥可以是任何正确实现 equals
和 hashCode
的东西。 Tuple2
是两个值的通用容器。
我们还可以提供逗号分隔键进行分组。
.aggregate(AggregateOperations.groupingBy(data -> {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append(StringUtils.defaultString(data.getSource1().get(dataValue) + "", "")).append(",");
return stringBuilder.substring(0, stringBuilder.toString().length() - 1);
}));