在 Scala 中重写 Spark Java 应用程序

Rewrite Spark Java Application in Scala

我正在尝试 "convert" 我的 Spark 应用程序,它是用 Java 编写的 Scala。 因为我是Scala和Spark的Scala的新手API,不知道在Scala中这个"transformToPair"函数怎么写:

Java:

JavaPairDStream<String, Boolean> outlierPairDStream = avgAll1h.union(avgPerPlug1h).transformToPair(findOutliersPerComparisonFunction);

*** FUNCTION ***

private static Function<JavaPairRDD<String,Float>, JavaPairRDD<String,Boolean>> findOutliersPerComparisonFunction = new Function<JavaPairRDD<String,Float>, JavaPairRDD<String,Boolean>>() {
    public JavaPairRDD<String, Boolean> call(JavaPairRDD<String, Float> v1) throws Exception {

        float avgOfAll;
        if(v1.count() > 0) {
            avgOfAll = v1.filter(new Function<Tuple2<String,Float>, Boolean>() {
                public Boolean call(Tuple2<String, Float> v1) throws Exception {
                    return v1._1().equals("all");
                }
            }).values().collect().get(0);
        } else {
            avgOfAll = 0.0f;
        }

        final float finalAvg = avgOfAll;

        JavaPairRDD<String, Boolean> rddBool = v1.mapValues(new Function<Float, Boolean>() {
            public Boolean call(Float v1) throws Exception {
                return v1 > finalAvg;
            }
        });


        return rddBool.filter(new Function<Tuple2<String,Boolean>, Boolean>() {
            public Boolean call(Tuple2<String, Boolean> v1) throws Exception {
                return !v1._1().equals("all");
            }
        });
    }
};

这是我对 Scala 的尝试:

val outlierPairDStream = avgAll1h.union(avgPerPlug1h).transform{rdd => 
  var avgOfAll = 0.0

  if(rdd.count() > 0) {
    avgOfAll = rdd.filter({case (k, v) => (k == "all")}).map({case (k, v) => v}).collect()(0)
  }

  val finalAvg = avgOfAll

  val rddBool = rdd.map({case(k, v) => (k, v > finalAvg)})

  val rddNew = rddBool.filter({case(k, v) => (k != "all")})
}

我收到以下错误消息:

<console>:281: error: type mismatch;
 found   : Unit
 required: org.apache.spark.rdd.RDD[?]
       }
       ^

有人可以帮助我吗?我如何 return "rddNew" DStream?

如果我说

return rddNew

在 "transform" 函数的末尾,出现以下错误:

<console>:293: error: return outside method definition
       return rddNew
       ^

您实际上必须 return 最后一个值,例如像那样:

val outlierPairDStream = avgAll1h.union(avgPerPlug1h).transform{rdd => 
  var avgOfAll = 0.0

  if(rdd.count() > 0) {
    avgOfAll = rdd.filter({case (k, v) => (k == "all")}).map({case (k, v) => v}).collect()(0)
  }

  val finalAvg = avgOfAll

  val rddBool = rdd.map({case(k, v) => (k, v > finalAvg)})

  val rddNew = rddBool.filter({case(k, v) => (k != "all")})

  rddNew
}

或者完全跳过定义变量:

val outlierPairDStream = avgAll1h.union(avgPerPlug1h).transform{rdd => 
  var avgOfAll = 0.0

  if(rdd.count() > 0) {
    avgOfAll = rdd.filter({case (k, v) => (k == "all")}).map({case (k, v) => v}).collect()(0)
  }

  val finalAvg = avgOfAll

  val rddBool = rdd.map({case(k, v) => (k, v > finalAvg)})

  rddBool.filter({case(k, v) => (k != "all")})
}

多一点 Scala-like 可能是:

val outlierPairDStream = avgAll1h.union(avgPerPlug1h).transform{rdd => 

  val finalAvg = if(rdd.count() > 0) {
    rdd.filter({case (k, v) => (k == "all")}).map({case (k, v) => v}).collect()(0)
  } else { 0.0 }

  val rddBool = rdd.map({case(k, v) => (k, v > finalAvg)})

  rddBool.filter({case(k, v) => (k != "all")})
}