Kafka:有效地将窗口聚合加入事件

Kafka: Efficiently join windowed aggregates to events

我正在制作欺诈应用程序的原型。我们经常会有像 "total amount of cash transactions in the last 5 days" 这样的指标,我们需要将其与某个阈值进行比较,以确定我们是否发出警报。

我们希望使用 Kafka Streams 来创建和维护聚合,然后创建传入事务的增强版本,其中包含原始事务字段和聚合。此增强记录由下游规则系统处理。

我想知道解决这个问题的最佳方法。我已经使用如下代码创建聚合原型:

TimeWindows twoDayHopping TimeWindows.of(TimeUnit.DAYS.toMillis(2))
               .advanceBy(TimeUnit.DAYS.toMillis(1));
KStream<String, AdditiveStatistics> aggrStream = transactions
    .filter((key,value)->{
        return value.getAccountTypeDesc().equals("P") &&
               value.getPrimaryMediumDesc().equals("CASH");

    })
    .groupByKey()
    .aggregate(AdditiveStatistics::new,
               (key,value,accumulator)-> {                 
                   return AdditiveStatsUtil
                     .advance(value.getCurrencyAmount(),accumulator),
                              twoDayHopping,
                              metricsSerde,
                              "sas10005_store")
                } 
     .toStream()
     .map((key,value)-> {
                value.setTransDate(key.window().start());
                return new KeyValue<String, AdditiveStatistics>(key.key(),value);
            })
     .through(Serdes.String(),metricsSerde,datedAggrTopic);;

这将创建一个存储支持的流,每个 window 每个键都有一个记录。然后我将原始交易流加入这个 window 以产生主题的最终输出:

  JoinWindows joinWindow = JoinWindows.of(TimeUnit.DAYS.toMillis(1))
                                        .before(TimeUnit.DAYS.toMillis(1))
                                        .after(-1)
                                        .until(TimeUnit.DAYS.toMillis(2)+1);
    KStream<String,Transactions10KEnhanced> enhancedTrans = transactions.join(aggrStream,
                      (left,right)->{
                            Transactions10KEnhanced out = new Transactions10KEnhanced();
                            out.setAccountNumber(left.getAccountNumber());
                            out.setAccountTypeDesc(left.getAccountTypeDesc());
                            out.setPartyNumber(left.getPartyNumber());
                            out.setPrimaryMediumDesc(left.getPrimaryMediumDesc());
                            out.setSecondaryMediumDesc(left.getSecondaryMediumDesc());
                            out.setTransactionKey(left.getTransactionKey());
                            out.setCurrencyAmount(left.getCurrencyAmount());
                            out.setTransDate(left.getTransDate());
                            if(right != null) {
                                out.setSum2d(right.getSum());

                            }
                            return out;
                       },
                       joinWindow);

这产生了正确的结果,但它似乎 运行 了很长一段时间,即使记录数量很少。我想知道是否有更有效的方法来实现相同的结果。

这是一个配置问题:cf http://docs.confluent.io/current/streams/developer-guide.html#memory-management

通过将缓存大小设置为零来禁用缓存(StreamsConfig 中的参数 cache.max.bytes.buffering)将解决 "delayed" 传递到输出主题的问题。

您还可以阅读此博客 post 以获取有关 Streams 设计的一些背景信息:https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/