如何针对特定用例在 reduceByKeyAndWindow() 中实现 invFunc

How to implement invFunc in reduceByKeyAndWindow() for specific use case

我正在使用 spark streaming 来处理文件流。多个文件成批到达,并从所有文件中激发处理数据。 我的用途是获取进入后续批次的文件中每条记录的总和。例如:

我需要如下输出:

我使用 reduceByKeyAndWindow() 的 spark 代码如下:

JavaPairDStream<String, Summary> grpSumRDD = sumRDD.reduceByKeyAndWindow(GET_GRP_SUM, Durations.minutes(2*batchInterval), Durations.minutes(batchInterval));

private static final Function2<Summary, Summary, Summary> GET_GRP_SUM = new Function2<Summary, Summary, Summary>() {
    private static final long serialVersionUID = 1L;

    public Summary call(Summary s1, Summary s2) throws Exception {
        try {

            Summary s = new Summary();

            long grpCnt = s1.getDelta() + s2.getDelta();
            s.setDeltaSum(grpCnt);

            return s;
        } catch (Exception e) {
            logger.error(" ==== error in CKT_GRP_SUM ==== :"+e);
            return new Summary();
        }
    }

};

我从上面的实现中得到的输出如下:

从reduceByKeyAndWindow()的输出来看,它似乎是在计算前一批数据和当前批数据的聚合。 但我的要求是对前一批的聚合数据和当前批数据进行聚合。所以按照上面的例子 它应该在第 4 批和第 5 批结束时输出为 [(((15)+19)+11)+10 = 55]。

我读到 reduceByKeyAndWindow()invFunc 可以实现以获得预期的输出。我试图实现它类似于 GET_GRP_SUM 但它没有给我预期的结果。任何有关正确实施以获得所需输出的帮助将不胜感激。

我正在使用 java 1.8.45 和 spark 1.4.1 版以及 hadoop 2.7.1 版。

我使用 reduceByKeyAndWindow() 在 invFunc 上实现

JavaPairDStream<String, Summary> grpSumRDD = sumRDD.reduceByKeyAndWindow(GET_GRP_SUM, INV_GET_GRP_SUM, Durations.minutes(2*batchInterval), Durations.minutes(batchInterval));

private static final Function2<Summary, Summary, Summary> INV_GET_GRP_SUM = new Function2<Summary, Summary, Summary>() {
    private static final long serialVersionUID = 1L;

    public Summary call(Summary s1, Summary s2) throws Exception {
        try {

            Summary s = new Summary();

            long grpCnt = s1.getDelta() + s2.getDelta();
            s.setDeltaSum(grpCnt);

            return s;

        } catch (Exception e) {
            logger.error(" ==== error in INV_GET_GRP_SUM ==== :"+e);
            return new Summary();
        }
    }
};

我已经像上面那样实现了我的 invFunc,这没有给我预期的输出。我这里分析的是s1和s2给了我前几批的合计值,我想我不是很确定。

我尝试更改我的 invFunc 实现,如下所示:

private static final Function2<Summary, Summary, Summary> INV_GET_GRP_SUM = new Function2<Summary, Summary, Summary>() {
    private static final long serialVersionUID = 1L;

    public Summary call(Summary s1, Summary s2) throws Exception {
        try {

            return s1;

        } catch (Exception e) {
            logger.error(" ==== error in INV_GET_GRP_SUM ==== :"+e);
            return new Summary();
        }
    }
};

此实现为我提供了预期的输出。但我面临的问题是带有 invFunc 的 reduceByKeyAndWindow() 不会自动删除旧密钥。我浏览了更多的帖子,发现我需要编写自己的过滤器函数,该函数将删除具有 0 值(无值)的旧键。

同样,我不确定如何编写过滤函数来删除具有 0 值(无值)的旧键,因为我没有具体了解 s1 和 s2 返回 INV_GET_GRP_SUM 的内容。

使用 UpdateStateByKey

您是否已从流媒体 API 中查看 updateStateByKey()?它允许您在批次间隔之间维护键值对的状态,不断用与其关联的新信息(值)更新每个键。这对您的用例很有效,因为以前的数据状态将包含每个键的聚合总和,直到最新状态。有关此函数的更多信息,请参见其用法 here and in an example here.

关于该函数的一个注意事项是它需要启用检查点,以便可以在每次迭代时保存状态。

(编辑:)

使用 ReduceByKeyAndWindow

关于使用 reduceKeyAndWindow()call() 方法的第二个参数用于普通 funcinvFunc分别是新加的元素和减去的旧元素。本质上,您是通过从新的时间片中添加元素(您正在使用 GET_GRP_SUM)和从旧时间片中减去元素(您没有使用INV_GET_GRP_SUM)。请注意,在您的第一次尝试中,您将旧值重新添加回当前的 window 值,而在您的第二次尝试中,您忽略了移出 window 的值。

要从移出 window 的元素中减去旧值,您可能希望 INV_GET_GRP_SUM 具有类似于下面的逻辑(并且可以找到类似的正确实现 here):

public Summary call(Summary s1, Summary s2) throws Exception {
    try {

        long grpCnt = s1.getDelta() - s2.getDelta();
        s.setDeltaSum(grpCnt);

    } catch (Exception e) {
        logger.error(" ==== error in INV_GET_GRP_SUM ==== :"+e);
        return new Summary();
    }
}

对于你的另一个问题,似乎确实有一种方法可以过滤掉过期的密钥,而且正如你提到的,它确实涉及编写一个过滤函数。正如您从 API 中看到的那样,此过滤器函数接受您的键值对和 returns 一个布尔值,该布尔值将被设置为 true(如果您想保留该对)或 false(如果您想要删除这对)。在这种情况下,由于您希望在值达到零后删除您的货币对,因此您可以执行以下操作:

private static final Function<scala.Tuple2<String, Summary>, Boolean> FILTER_EXPIRED = new Function<scala.Tuple2<String, Summary>, Boolean>() {
    public Boolean call(scala.Tuple2<String, Summary> s) { 
        return s.productElement(1) > 0; 
    }
}

然后你可以将它传递给你的reduceByKeyAndWindow()函数(注意你应该在这里传递分区参数来确定你的DStream中的RDD将使用多少分区):

JavaPairDStream<String, Summary> grpSumRDD = sumRDD.reduceByKeyAndWindow(GET_GRP_SUM, INV_GET_GRP_SUM, Durations.minutes(2*batchInterval), Durations.minutes(batchInterval), partitions, FILTER_EXPIRED);