Apache Spark - 简单字数统计获取:SparkException:任务不可序列化

Apache Spark - Simple Word Count gets: SparkException: Task not serializable

我正在尝试在 Apache Spark (v1.3.0) 上做一些测试,我有一个简单的 Java 8 class:

public class WordCount {
    private JavaSparkContext ctx;
    private String inputFile, outputFile;
    public WordCount(String inputFile, String outputFile) {
        this.inputFile = inputFile;
        this.outputFile = outputFile;
        // Initialize Spark Conf
        ctx = new JavaSparkContext("local", "WordCount",
                System.getenv("SPARK_HOME"), System.getenv("JARS"));

    }

    public static void main(String... args) {
        String inputFile = "/home/workspace/spark/src/main/resources/inferno.txt";//args[0];
        String outputFile = "/home/workspace/spark/src/main/resources/dv";//args[1];
        WordCount wc = new WordCount(inputFile, outputFile);
        wc.doWordCount();
        wc.close();
    }

    public void doWordCount() {
        long start = System.currentTimeMillis();
        JavaRDD<String> inputRdd = ctx.textFile(inputFile);
        JavaPairRDD<String, Integer> count = inputRdd.flatMapToPair((String s) -> {
            List<Tuple2<String, Integer>> list = new ArrayList<>();
            Arrays.asList(s.split(" ")).forEach(s1 -> list.add(new Tuple2<String, Integer>(s1, 1)));
            return list;
        }).reduceByKey((x, y) -> x + y);
        List<Tuple2<String, Integer>> list = count.takeOrdered(10, 
                (o1, o2) -> o2._2() - o1._2());
        list.forEach(t2 -> System.out.println(t2._1()));
//      count.saveAsTextFile(outputFile);
        long end = System.currentTimeMillis();
        System.out.println(String.format("Time in ms is: %d", end - start));
    }

    public void close() {
        ctx.stop();
    }

}

当我 运行 它得到以下异常:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:1622)
    at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:635)
    at org.apache.spark.rdd.RDD.takeOrdered(RDD.scala:1231)
    at org.apache.spark.api.java.JavaRDDLike$class.takeOrdered(JavaRDDLike.scala:578)
    at org.apache.spark.api.java.JavaPairRDD.takeOrdered(JavaPairRDD.scala:45)
    at it.conker.spark.base.WordCount.doWordCount2(WordCount.java:65)
    at it.conker.spark.base.WordCount.main(WordCount.java:37)
Caused by: java.io.NotSerializableException: it.conker.spark.base.WordCount$$Lambda/1541232265
Serialization stack:
    - object not serializable (class: it.conker.spark.base.WordCount$$Lambda/1541232265, value: it.conker.spark.base.WordCount$$Lambda/1541232265@213860b8)
    - field (class: scala.math.LowPriorityOrderingImplicits$$anon, name: cmp, type: interface java.util.Comparator)
    - object (class scala.math.LowPriorityOrderingImplicits$$anon, scala.math.LowPriorityOrderingImplicits$$anon@511505e7)
    - field (class: org.apache.spark.rdd.RDD$$anonfun, name: ord, type: interface scala.math.Ordering)
    - object (class org.apache.spark.rdd.RDD$$anonfun, <function1>)
    - field (class: org.apache.spark.rdd.RDD$$anonfun, name: f, type: interface scala.Function1)
    - object (class org.apache.spark.rdd.RDD$$anonfun, <function3>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
    ... 8 more

这是 mi pom 文件:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <groupId>it.conker.spark</groupId>
    <artifactId>learning-spark-by-example</artifactId>
    <modelVersion>4.0.0</modelVersion>
    <name>Learning Spark by example</name>
    <packaging>jar</packaging>
    <version>0.0.1</version>
    <dependencies>
        <dependency> <!-- Spark dependency -->
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>1.3.0</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <build>
        <pluginManagement>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.1</version>
                    <configuration>
                        <source>${java.version}</source>
                        <target>${java.version}</target>
                    </configuration>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>
</project>

我 运行 eclipse 中的 class。 谁能告诉我哪里错了?

编辑: 正如 mark91 所问,替换行:

List<Tuple2<String, Integer>> list = count.takeOrdered(10, 
            (o1, o2) -> o2._2() - o1._2());

与:

List<Tuple2<String, Integer>> list = count.takeOrdered(10);

我遇到了这个异常:

    java.lang.ClassCastException: scala.Tuple2 cannot be cast to java.lang.Comparable
    at org.spark-project.guava.collect.NaturalOrdering.compare(NaturalOrdering.java:28)
    at scala.math.LowPriorityOrderingImplicits$$anon.compare(Ordering.scala:153)
    at org.apache.spark.util.collection.Utils$$anon.compare(Utils.scala:35)
    at org.spark-project.guava.collect.Ordering.leastOf(Ordering.java:672)
    at org.apache.spark.util.collection.Utils$.takeOrdered(Utils.scala:37)
    at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:1234)
    at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:1231)
    at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:634)
    at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:634)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
    at org.apache.spark.scheduler.Task.run(Task.scala:64)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
15/03/27 17:58:55 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.lang.ClassCastException: scala.Tuple2 cannot be cast to java.lang.Comparable
    at org.spark-project.guava.collect.NaturalOrdering.compare(NaturalOrdering.java:28)
    at scala.math.LowPriorityOrderingImplicits$$anon.compare(Ordering.scala:153)
    at org.apache.spark.util.collection.Utils$$anon.compare(Utils.scala:35)
    at org.spark-project.guava.collect.Ordering.leastOf(Ordering.java:672)
    at org.apache.spark.util.collection.Utils$.takeOrdered(Utils.scala:37)
    at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:1234)
    at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:1231)
    at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:634)
    at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:634)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
    at org.apache.spark.scheduler.Task.run(Task.scala:64)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

15/03/27 17:58:55 ERROR TaskSetManager: Task 0 in stage 1.0 failed 1 times; aborting job
15/03/27 17:58:55 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 
15/03/27 17:58:55 INFO TaskSchedulerImpl: Cancelling stage 1
15/03/27 17:58:56 INFO DAGScheduler: Job 0 failed: takeOrdered at WordCount.java:66, took 14.117721 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.lang.ClassCastException: scala.Tuple2 cannot be cast to java.lang.Comparable
    at org.spark-project.guava.collect.NaturalOrdering.compare(NaturalOrdering.java:28)
    at scala.math.LowPriorityOrderingImplicits$$anon.compare(Ordering.scala:153)
    at org.apache.spark.util.collection.Utils$$anon.compare(Utils.scala:35)
    at org.spark-project.guava.collect.Ordering.leastOf(Ordering.java:672)
    at org.apache.spark.util.collection.Utils$.takeOrdered(Utils.scala:37)
    at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:1234)
    at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:1231)
    at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:634)
    at org.apache.spark.rdd.RDD$$anonfun.apply(RDD.scala:634)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
    at org.apache.spark.scheduler.Task.run(Task.scala:64)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1192)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage.apply(DAGScheduler.scala:1191)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:693)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed.apply(DAGScheduler.scala:693)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
    at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:48)

好的,原因是你在进动中使用的所有 类(即存储在你的 RDD 中的对象和 类 是要传递给 spark 的函数)需要 Serializable.这意味着它们需要实现 Serializable 接口,或者您必须提供另一种方法将它们序列化为 Kryo。其实我不知道为什么你定义的 lambda 函数不被认为是可序列化的,但我相信这是因为 Java Comparator 接受一个参数。

但是,使其工作的方法是定义一个 Serializable 比较器,例如:

public class WordCountComparator implements Comparator<Tuple2<String, Integer>>, Serializable {

@Override
public int compare(Tuple2<String, Integer> o1, Tuple2<String, Integer> o2) {
    // TODO Auto-generated method stub
    return o2._2()-o1._2();
}



}

然后将它的一个实例作为第二个参数传递给您的 takeOrdered 函数。