Spark mapPartitions 迭代器 returns 重复记录
Spark mapPartitions iterator returns duplicate records
我正在为 FlatMapFunction>>、String> 实现 class。为每个分区初始化一些不可序列化的连接。但是当我在迭代器上调用 next() 时,它会为多个分区提供相同的记录。以下是代码:
@Override
public Iterator < String > call(Iterator < Tuple2 < String, Iterable < String >>> tuple2Iterator)
throws Exception {
BitLambdaService lambda = buildClient();
List <String> resultList = new ArrayList < > ();
while (tuple2Iterator.hasNext()) {
Tuple2 < String, Iterable < String >> tpl = tuple2Iterator.next();
// do something
}
return resultList.iterator();
}
有没有人之前遇到过这个问题?或者知道如何修复它?
通过在转换后调用 rdd.cache()
解决了这个问题。问题的发生是因为转换是以惰性方式执行的,当对 RDD 应用操作时,转换实际上是在对 RDD 应用操作时执行的。所以 mapPartitions 没有等待调用方法完成,而是将相同的记录分配给另一个执行者。
我正在为 FlatMapFunction>>、String> 实现 class。为每个分区初始化一些不可序列化的连接。但是当我在迭代器上调用 next() 时,它会为多个分区提供相同的记录。以下是代码:
@Override
public Iterator < String > call(Iterator < Tuple2 < String, Iterable < String >>> tuple2Iterator)
throws Exception {
BitLambdaService lambda = buildClient();
List <String> resultList = new ArrayList < > ();
while (tuple2Iterator.hasNext()) {
Tuple2 < String, Iterable < String >> tpl = tuple2Iterator.next();
// do something
}
return resultList.iterator();
}
有没有人之前遇到过这个问题?或者知道如何修复它?
通过在转换后调用 rdd.cache()
解决了这个问题。问题的发生是因为转换是以惰性方式执行的,当对 RDD 应用操作时,转换实际上是在对 RDD 应用操作时执行的。所以 mapPartitions 没有等待调用方法完成,而是将相同的记录分配给另一个执行者。