如何针对特定用例在 reduceByKeyAndWindow() 中实现 invFunc
How to implement invFunc in reduceByKeyAndWindow() for specific use case
我正在使用 spark streaming 来处理文件流。多个文件成批到达,并从所有文件中激发处理数据。
我的用途是获取进入后续批次的文件中每条记录的总和。例如:
- 键:key_1 值:10 --> batch1
- 键:key_1 值:05 --> batch1
- 键:key_1 值:19 --> batch2
- 键:key_1 值:11 --> batch3
- 键:key_1 值:10 --> batch4
我需要如下输出:
- 处理第一批后我需要输出 => key: key_1 val: 15
- 处理第二批后我需要输出 => key: key_1 val: 34
- 处理完第 3 批后我需要输出 => key: key_1 val: 45
- 处理第 4 批后,我需要输出为 => key: key_1 val: 55
- 处理第 5 批后,我需要输出为 => key: key_1 val: 55
我使用 reduceByKeyAndWindow() 的 spark 代码如下:
JavaPairDStream<String, Summary> grpSumRDD = sumRDD.reduceByKeyAndWindow(GET_GRP_SUM, Durations.minutes(2*batchInterval), Durations.minutes(batchInterval));
private static final Function2<Summary, Summary, Summary> GET_GRP_SUM = new Function2<Summary, Summary, Summary>() {
private static final long serialVersionUID = 1L;
public Summary call(Summary s1, Summary s2) throws Exception {
try {
Summary s = new Summary();
long grpCnt = s1.getDelta() + s2.getDelta();
s.setDeltaSum(grpCnt);
return s;
} catch (Exception e) {
logger.error(" ==== error in CKT_GRP_SUM ==== :"+e);
return new Summary();
}
}
};
我从上面的实现中得到的输出如下:
- 处理第一批后我得到输出=>键:key_1值:15
- 处理第二批后我得到输出=>键:key_1值:34
- 处理第 3 批后我得到输出 => 键:key_1 值:30
- 处理第 4 批后我得到输出 => 键:key_1 值:21
- 处理第 5 批后我得到输出 => 键:key_1 值:10
从reduceByKeyAndWindow()的输出来看,它似乎是在计算前一批数据和当前批数据的聚合。
但我的要求是对前一批的聚合数据和当前批数据进行聚合。所以按照上面的例子
它应该在第 4 批和第 5 批结束时输出为 [(((15)+19)+11)+10 = 55]。
我读到 reduceByKeyAndWindow() 和 invFunc 可以实现以获得预期的输出。我试图实现它类似于 GET_GRP_SUM 但它没有给我预期的结果。任何有关正确实施以获得所需输出的帮助将不胜感激。
我正在使用 java 1.8.45 和 spark 1.4.1 版以及 hadoop 2.7.1 版。
我使用 reduceByKeyAndWindow() 在 invFunc 上实现
JavaPairDStream<String, Summary> grpSumRDD = sumRDD.reduceByKeyAndWindow(GET_GRP_SUM, INV_GET_GRP_SUM, Durations.minutes(2*batchInterval), Durations.minutes(batchInterval));
private static final Function2<Summary, Summary, Summary> INV_GET_GRP_SUM = new Function2<Summary, Summary, Summary>() {
private static final long serialVersionUID = 1L;
public Summary call(Summary s1, Summary s2) throws Exception {
try {
Summary s = new Summary();
long grpCnt = s1.getDelta() + s2.getDelta();
s.setDeltaSum(grpCnt);
return s;
} catch (Exception e) {
logger.error(" ==== error in INV_GET_GRP_SUM ==== :"+e);
return new Summary();
}
}
};
我已经像上面那样实现了我的 invFunc,这没有给我预期的输出。我这里分析的是s1和s2给了我前几批的合计值,我想我不是很确定。
我尝试更改我的 invFunc 实现,如下所示:
private static final Function2<Summary, Summary, Summary> INV_GET_GRP_SUM = new Function2<Summary, Summary, Summary>() {
private static final long serialVersionUID = 1L;
public Summary call(Summary s1, Summary s2) throws Exception {
try {
return s1;
} catch (Exception e) {
logger.error(" ==== error in INV_GET_GRP_SUM ==== :"+e);
return new Summary();
}
}
};
此实现为我提供了预期的输出。但我面临的问题是带有 invFunc 的 reduceByKeyAndWindow() 不会自动删除旧密钥。我浏览了更多的帖子,发现我需要编写自己的过滤器函数,该函数将删除具有 0 值(无值)的旧键。
同样,我不确定如何编写过滤函数来删除具有 0 值(无值)的旧键,因为我没有具体了解 s1 和 s2 返回 INV_GET_GRP_SUM 的内容。
使用 UpdateStateByKey
您是否已从流媒体 API 中查看 updateStateByKey()
?它允许您在批次间隔之间维护键值对的状态,不断用与其关联的新信息(值)更新每个键。这对您的用例很有效,因为以前的数据状态将包含每个键的聚合总和,直到最新状态。有关此函数的更多信息,请参见其用法 here and in an example here.
关于该函数的一个注意事项是它需要启用检查点,以便可以在每次迭代时保存状态。
(编辑:)
使用 ReduceByKeyAndWindow
关于使用 reduceKeyAndWindow()
,call()
方法的第二个参数用于普通 func 和 invFunc分别是新加的元素和减去的旧元素。本质上,您是通过从新的时间片中添加元素(您正在使用 GET_GRP_SUM
)和从旧时间片中减去元素(您没有使用INV_GET_GRP_SUM
)。请注意,在您的第一次尝试中,您将旧值重新添加回当前的 window 值,而在您的第二次尝试中,您忽略了移出 window 的值。
要从移出 window 的元素中减去旧值,您可能希望 INV_GET_GRP_SUM
具有类似于下面的逻辑(并且可以找到类似的正确实现 here):
public Summary call(Summary s1, Summary s2) throws Exception {
try {
long grpCnt = s1.getDelta() - s2.getDelta();
s.setDeltaSum(grpCnt);
} catch (Exception e) {
logger.error(" ==== error in INV_GET_GRP_SUM ==== :"+e);
return new Summary();
}
}
对于你的另一个问题,似乎确实有一种方法可以过滤掉过期的密钥,而且正如你提到的,它确实涉及编写一个过滤函数。正如您从 API 中看到的那样,此过滤器函数接受您的键值对和 returns 一个布尔值,该布尔值将被设置为 true(如果您想保留该对)或 false(如果您想要删除这对)。在这种情况下,由于您希望在值达到零后删除您的货币对,因此您可以执行以下操作:
private static final Function<scala.Tuple2<String, Summary>, Boolean> FILTER_EXPIRED = new Function<scala.Tuple2<String, Summary>, Boolean>() {
public Boolean call(scala.Tuple2<String, Summary> s) {
return s.productElement(1) > 0;
}
}
然后你可以将它传递给你的reduceByKeyAndWindow()
函数(注意你应该在这里传递分区参数来确定你的DStream
中的RDD将使用多少分区):
JavaPairDStream<String, Summary> grpSumRDD = sumRDD.reduceByKeyAndWindow(GET_GRP_SUM, INV_GET_GRP_SUM, Durations.minutes(2*batchInterval), Durations.minutes(batchInterval), partitions, FILTER_EXPIRED);
我正在使用 spark streaming 来处理文件流。多个文件成批到达,并从所有文件中激发处理数据。 我的用途是获取进入后续批次的文件中每条记录的总和。例如:
- 键:key_1 值:10 --> batch1
- 键:key_1 值:05 --> batch1
- 键:key_1 值:19 --> batch2
- 键:key_1 值:11 --> batch3
- 键:key_1 值:10 --> batch4
我需要如下输出:
- 处理第一批后我需要输出 => key: key_1 val: 15
- 处理第二批后我需要输出 => key: key_1 val: 34
- 处理完第 3 批后我需要输出 => key: key_1 val: 45
- 处理第 4 批后,我需要输出为 => key: key_1 val: 55
- 处理第 5 批后,我需要输出为 => key: key_1 val: 55
我使用 reduceByKeyAndWindow() 的 spark 代码如下:
JavaPairDStream<String, Summary> grpSumRDD = sumRDD.reduceByKeyAndWindow(GET_GRP_SUM, Durations.minutes(2*batchInterval), Durations.minutes(batchInterval));
private static final Function2<Summary, Summary, Summary> GET_GRP_SUM = new Function2<Summary, Summary, Summary>() {
private static final long serialVersionUID = 1L;
public Summary call(Summary s1, Summary s2) throws Exception {
try {
Summary s = new Summary();
long grpCnt = s1.getDelta() + s2.getDelta();
s.setDeltaSum(grpCnt);
return s;
} catch (Exception e) {
logger.error(" ==== error in CKT_GRP_SUM ==== :"+e);
return new Summary();
}
}
};
我从上面的实现中得到的输出如下:
- 处理第一批后我得到输出=>键:key_1值:15
- 处理第二批后我得到输出=>键:key_1值:34
- 处理第 3 批后我得到输出 => 键:key_1 值:30
- 处理第 4 批后我得到输出 => 键:key_1 值:21
- 处理第 5 批后我得到输出 => 键:key_1 值:10
从reduceByKeyAndWindow()的输出来看,它似乎是在计算前一批数据和当前批数据的聚合。 但我的要求是对前一批的聚合数据和当前批数据进行聚合。所以按照上面的例子 它应该在第 4 批和第 5 批结束时输出为 [(((15)+19)+11)+10 = 55]。
我读到 reduceByKeyAndWindow() 和 invFunc 可以实现以获得预期的输出。我试图实现它类似于 GET_GRP_SUM 但它没有给我预期的结果。任何有关正确实施以获得所需输出的帮助将不胜感激。
我正在使用 java 1.8.45 和 spark 1.4.1 版以及 hadoop 2.7.1 版。
我使用 reduceByKeyAndWindow() 在 invFunc 上实现
JavaPairDStream<String, Summary> grpSumRDD = sumRDD.reduceByKeyAndWindow(GET_GRP_SUM, INV_GET_GRP_SUM, Durations.minutes(2*batchInterval), Durations.minutes(batchInterval));
private static final Function2<Summary, Summary, Summary> INV_GET_GRP_SUM = new Function2<Summary, Summary, Summary>() {
private static final long serialVersionUID = 1L;
public Summary call(Summary s1, Summary s2) throws Exception {
try {
Summary s = new Summary();
long grpCnt = s1.getDelta() + s2.getDelta();
s.setDeltaSum(grpCnt);
return s;
} catch (Exception e) {
logger.error(" ==== error in INV_GET_GRP_SUM ==== :"+e);
return new Summary();
}
}
};
我已经像上面那样实现了我的 invFunc,这没有给我预期的输出。我这里分析的是s1和s2给了我前几批的合计值,我想我不是很确定。
我尝试更改我的 invFunc 实现,如下所示:
private static final Function2<Summary, Summary, Summary> INV_GET_GRP_SUM = new Function2<Summary, Summary, Summary>() {
private static final long serialVersionUID = 1L;
public Summary call(Summary s1, Summary s2) throws Exception {
try {
return s1;
} catch (Exception e) {
logger.error(" ==== error in INV_GET_GRP_SUM ==== :"+e);
return new Summary();
}
}
};
此实现为我提供了预期的输出。但我面临的问题是带有 invFunc 的 reduceByKeyAndWindow() 不会自动删除旧密钥。我浏览了更多的帖子,发现我需要编写自己的过滤器函数,该函数将删除具有 0 值(无值)的旧键。
同样,我不确定如何编写过滤函数来删除具有 0 值(无值)的旧键,因为我没有具体了解 s1 和 s2 返回 INV_GET_GRP_SUM 的内容。
使用 UpdateStateByKey
您是否已从流媒体 API 中查看 updateStateByKey()
?它允许您在批次间隔之间维护键值对的状态,不断用与其关联的新信息(值)更新每个键。这对您的用例很有效,因为以前的数据状态将包含每个键的聚合总和,直到最新状态。有关此函数的更多信息,请参见其用法 here and in an example here.
关于该函数的一个注意事项是它需要启用检查点,以便可以在每次迭代时保存状态。
(编辑:)
使用 ReduceByKeyAndWindow
关于使用 reduceKeyAndWindow()
,call()
方法的第二个参数用于普通 func 和 invFunc分别是新加的元素和减去的旧元素。本质上,您是通过从新的时间片中添加元素(您正在使用 GET_GRP_SUM
)和从旧时间片中减去元素(您没有使用INV_GET_GRP_SUM
)。请注意,在您的第一次尝试中,您将旧值重新添加回当前的 window 值,而在您的第二次尝试中,您忽略了移出 window 的值。
要从移出 window 的元素中减去旧值,您可能希望 INV_GET_GRP_SUM
具有类似于下面的逻辑(并且可以找到类似的正确实现 here):
public Summary call(Summary s1, Summary s2) throws Exception {
try {
long grpCnt = s1.getDelta() - s2.getDelta();
s.setDeltaSum(grpCnt);
} catch (Exception e) {
logger.error(" ==== error in INV_GET_GRP_SUM ==== :"+e);
return new Summary();
}
}
对于你的另一个问题,似乎确实有一种方法可以过滤掉过期的密钥,而且正如你提到的,它确实涉及编写一个过滤函数。正如您从 API 中看到的那样,此过滤器函数接受您的键值对和 returns 一个布尔值,该布尔值将被设置为 true(如果您想保留该对)或 false(如果您想要删除这对)。在这种情况下,由于您希望在值达到零后删除您的货币对,因此您可以执行以下操作:
private static final Function<scala.Tuple2<String, Summary>, Boolean> FILTER_EXPIRED = new Function<scala.Tuple2<String, Summary>, Boolean>() {
public Boolean call(scala.Tuple2<String, Summary> s) {
return s.productElement(1) > 0;
}
}
然后你可以将它传递给你的reduceByKeyAndWindow()
函数(注意你应该在这里传递分区参数来确定你的DStream
中的RDD将使用多少分区):
JavaPairDStream<String, Summary> grpSumRDD = sumRDD.reduceByKeyAndWindow(GET_GRP_SUM, INV_GET_GRP_SUM, Durations.minutes(2*batchInterval), Durations.minutes(batchInterval), partitions, FILTER_EXPIRED);