如何在 Spark 中使用 foreachPartition?

How to use foreachPartition in Spark?

如何在 Spark 中使用下面的函数 Java?在互联网上找遍了,但找不到合适的例子。

public void foreachPartition(scala.Function1<scala.collection.Iterator<T>,scala.runtime.BoxedUnit> f)

我唯一知道的是它对进程batch of data有好处,所以叫BoxedUnit

如何让datasetbatch IDBoxedUnit批量处理数据?

谁能告诉我如何实现这个方法?

我认为您对 BoxedUnit 的印象有误,因此坚持使用 Java 中的 Scala 界面暴露于 Java。 scala.Function1<scala.collection.Iterator<T>, scala.runtime.BoxedUnit>(Iterator[T]) => Unit 的实现——一个接受 Iterator[T] 和 return 类型 Unit 的 Scala 函数。 Unit 在 Scala 中相当于 Java 的 voidBoxedUnitUnit 的盒装版本 - 它是一个堆对象,在其 UNIT 成员中保存单例单元值,并且是一个几乎从未出现在 Scala 程序中的实现细节。如果数据集是 DataFrame,那么 T 将是 org.apache.spark.sql.Row,您需要在 Row 个对象的集合上处理 Scala 迭代器。

要在 Java 中定义 scala.Function1<scala.collection.Iterator<Row>, scala.runtime.BoxedUnit> 的内容,您需要创建 AbstractFunction1<scala.collection.Iterator<Row>, scala.runtime.BoxedUnit> 的实例并覆盖其 apply() 方法,您必须 return BoxedUnit.UNIT。您还需要使其可序列化,因此您通常声明自己的 class 继承自 AbstractFunction1 并实现 Serializable。您也可以 Java-fy 通过公开一个不同的、更Java-友好的抽象方法来稍后被覆盖:

import org.apache.spark.sql.Row;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;
import scala.collection.JavaConverters;
import java.util.Iterator;

class MyPartitionFunction<T> extends AbstractFunction1<scala.collection.Iterator<T>, BoxedUnit>
   implements Serializable {
   @Override
   public BoxedUnit apply(scala.collection.Iterator<T> iterator) {
      call(JavaConverters.asJavaIteratorConverter(iterator).asJava());
      return BoxedUnit.UNIT;
   }

   public abstract void call(Iterator<T> iterator);
}

df.foreachPartition(new MyPartitionFunction<Row>() {
   @Override
   public void call(Iterator<Row> iterator) {
      for (Row row : iterator) {
         // do something with the row
      }
   }
});

这是相当多的实现复杂性,这就是为什么有 Java 特定版本采用 ForeachPartitionFunction<T> 而上面的代码变为:

import org.apache.spark.sql.Row;
import org.apache.spark.api.java.function.ForeachPartitionFunction;
import java.util.Iterator;

df.foreachPartition(new ForeachPartitionFunction<Row>() {
   public void call(Iterator<Row> iterator) throws Exception {
      for (Row row : iterator) {
         // do something with the row
      }
   }
}

功能与 Scala 接口提供的功能完全相同,只是 Apache Spark 为您完成了迭代器转换,它还为您提供了一个友好的 Java class 不需要您导入和实现 Scala 类型。

也就是说,我认为您对 Spark 的工作原理有些误解。批量处理流式数据不需要使用foreachPartition。这是由 Spark 的流引擎自动完成的。您编写指定 transformationsaggregations 的流式查询,然后随着新数据从流中到达而逐步应用。

foreachPartitionforeach的一种形式,为一些特殊的批处理情况保留,例如,当你需要在处理函数中做一些昂贵的对象实例化并且对每一行进行实例化时巨大的开销。使用 foreachPartition,您的处理函数每个分区只被调用一次,因此您可以实例化昂贵的对象一次,然后遍历分区的数据。这减少了处理时间,因为您只需做一次昂贵的事情。

但是,您甚至不能在流媒体源上调用 foreach()foreachPartition(),因为这会导致 AnalysisException。相反,您必须使用 DataStreamWriter. DataStreamWriter.foreach() takes an instance of ForeachWriterforeach()foreachBatch() 方法,而 DataStreamWriter.foreachBatch() 采用接收数据集和批次 ID 的无效函数。 ForeachWriter 在其 open() 方法中接收一个纪元 ID。同样,foreachBatch() 具有功能相同的 Scala 和 Java 两种风格,因此如果您要在 Java 中编写,请使用特定于 Java 的版本。 =51=]