使用 Foreach 收集数据

Collect data with Foreach

我正在尝试 foreach 一个 RDD 并将数据收集到一个字符串生成器中。但这并没有发生,因为 foreach 在执行程序节点上是 运行,而字符串生成器在驱动程序节点上是 运行。有什么方法可以实现我的 objective 吗?我不想使用 collect 操作,因为它很昂贵。

private static StringBuilder collect(JavaRDD<String> javaRDD) {
        StringBuilder builder = new StringBuilder();
        javaRDD.foreach(x -> builder.append(x));
        System.out.println(builder.toString() + " ****");
         return builder;
    }

非常感谢任何帮助。

您可以使用 rdd.aggregate 将 rdd 中的所有字符串合并到一个 StringBuffer 中,如下所示:

val rdd = sc.parallelize( List( "h" , "a" , "b" ) )
val res = rdd.aggregate( new StringBuffer )( ( sb:StringBuffer , str : String ) => sb.append( str ) , ( sb1 : StringBuffer , sb2 : StringBuffer ) => sb1.append( sb2 ) )
println( res ) // "abh"

显然,你的 rdd 字符串的全部内容将被连接在字符串缓冲区中,这可能会很大,但这正是你想要的...... 这比收集更好,因为字符串数据不会 "raw" 发送到驱动程序(仅以连接形式)。

另请注意,无法保证字符串的顺序...

我认为可能有另一种方法:累加器。这是关于如何使用字符串累加器的重定向:

感谢该回复的作者

您可以使用 foreachPartition。这样只会收集执行器上的数据,不会收集驱动器上的数据。

javaRDD.foreachPartition(partition -> {
  StringBuilder builder = new StringBuilder();
  while (partition.hasNext()) {
    builder.append(partition.next());
  }
  System.out.println(builder.toString() + " ****");
});