Spark mapPartitions 迭代器 returns 重复记录

Spark mapPartitions iterator returns duplicate records

我正在为 FlatMapFunction>>、String> 实现 class。为每个分区初始化一些不可序列化的连接。但是当我在迭代器上调用 next() 时,它会为多个分区提供相同的记录。以下是代码:

@Override
    public Iterator < String > call(Iterator < Tuple2 < String, Iterable < String >>> tuple2Iterator)
    throws Exception {
        BitLambdaService lambda = buildClient();
        List <String> resultList = new ArrayList < > ();
        while (tuple2Iterator.hasNext()) {
            Tuple2 < String, Iterable < String >> tpl = tuple2Iterator.next();
            // do something
        }
        return resultList.iterator();
    }

有没有人之前遇到过这个问题?或者知道如何修复它?

通过在转换后调用 rdd.cache() 解决了这个问题。问题的发生是因为转换是以惰性方式执行的,当对 RDD 应用操作时,转换实际上是在对 RDD 应用操作时执行的。所以 mapPartitions 没有等待调用方法完成,而是将相同的记录分配给另一个执行者。