在 Scala 中重写 Spark Java 应用程序
Rewrite Spark Java Application in Scala
我正在尝试 "convert" 我的 Spark 应用程序,它是用 Java 编写的 Scala。
因为我是Scala和Spark的Scala的新手API,不知道在Scala中这个"transformToPair"函数怎么写:
Java:
JavaPairDStream<String, Boolean> outlierPairDStream = avgAll1h.union(avgPerPlug1h).transformToPair(findOutliersPerComparisonFunction);
*** FUNCTION ***
private static Function<JavaPairRDD<String,Float>, JavaPairRDD<String,Boolean>> findOutliersPerComparisonFunction = new Function<JavaPairRDD<String,Float>, JavaPairRDD<String,Boolean>>() {
public JavaPairRDD<String, Boolean> call(JavaPairRDD<String, Float> v1) throws Exception {
float avgOfAll;
if(v1.count() > 0) {
avgOfAll = v1.filter(new Function<Tuple2<String,Float>, Boolean>() {
public Boolean call(Tuple2<String, Float> v1) throws Exception {
return v1._1().equals("all");
}
}).values().collect().get(0);
} else {
avgOfAll = 0.0f;
}
final float finalAvg = avgOfAll;
JavaPairRDD<String, Boolean> rddBool = v1.mapValues(new Function<Float, Boolean>() {
public Boolean call(Float v1) throws Exception {
return v1 > finalAvg;
}
});
return rddBool.filter(new Function<Tuple2<String,Boolean>, Boolean>() {
public Boolean call(Tuple2<String, Boolean> v1) throws Exception {
return !v1._1().equals("all");
}
});
}
};
这是我对 Scala 的尝试:
val outlierPairDStream = avgAll1h.union(avgPerPlug1h).transform{rdd =>
var avgOfAll = 0.0
if(rdd.count() > 0) {
avgOfAll = rdd.filter({case (k, v) => (k == "all")}).map({case (k, v) => v}).collect()(0)
}
val finalAvg = avgOfAll
val rddBool = rdd.map({case(k, v) => (k, v > finalAvg)})
val rddNew = rddBool.filter({case(k, v) => (k != "all")})
}
我收到以下错误消息:
<console>:281: error: type mismatch;
found : Unit
required: org.apache.spark.rdd.RDD[?]
}
^
有人可以帮助我吗?我如何 return "rddNew" DStream?
如果我说
return rddNew
在 "transform" 函数的末尾,出现以下错误:
<console>:293: error: return outside method definition
return rddNew
^
您实际上必须 return 最后一个值,例如像那样:
val outlierPairDStream = avgAll1h.union(avgPerPlug1h).transform{rdd =>
var avgOfAll = 0.0
if(rdd.count() > 0) {
avgOfAll = rdd.filter({case (k, v) => (k == "all")}).map({case (k, v) => v}).collect()(0)
}
val finalAvg = avgOfAll
val rddBool = rdd.map({case(k, v) => (k, v > finalAvg)})
val rddNew = rddBool.filter({case(k, v) => (k != "all")})
rddNew
}
或者完全跳过定义变量:
val outlierPairDStream = avgAll1h.union(avgPerPlug1h).transform{rdd =>
var avgOfAll = 0.0
if(rdd.count() > 0) {
avgOfAll = rdd.filter({case (k, v) => (k == "all")}).map({case (k, v) => v}).collect()(0)
}
val finalAvg = avgOfAll
val rddBool = rdd.map({case(k, v) => (k, v > finalAvg)})
rddBool.filter({case(k, v) => (k != "all")})
}
多一点 Scala-like 可能是:
val outlierPairDStream = avgAll1h.union(avgPerPlug1h).transform{rdd =>
val finalAvg = if(rdd.count() > 0) {
rdd.filter({case (k, v) => (k == "all")}).map({case (k, v) => v}).collect()(0)
} else { 0.0 }
val rddBool = rdd.map({case(k, v) => (k, v > finalAvg)})
rddBool.filter({case(k, v) => (k != "all")})
}
我正在尝试 "convert" 我的 Spark 应用程序,它是用 Java 编写的 Scala。 因为我是Scala和Spark的Scala的新手API,不知道在Scala中这个"transformToPair"函数怎么写:
Java:
JavaPairDStream<String, Boolean> outlierPairDStream = avgAll1h.union(avgPerPlug1h).transformToPair(findOutliersPerComparisonFunction);
*** FUNCTION ***
private static Function<JavaPairRDD<String,Float>, JavaPairRDD<String,Boolean>> findOutliersPerComparisonFunction = new Function<JavaPairRDD<String,Float>, JavaPairRDD<String,Boolean>>() {
public JavaPairRDD<String, Boolean> call(JavaPairRDD<String, Float> v1) throws Exception {
float avgOfAll;
if(v1.count() > 0) {
avgOfAll = v1.filter(new Function<Tuple2<String,Float>, Boolean>() {
public Boolean call(Tuple2<String, Float> v1) throws Exception {
return v1._1().equals("all");
}
}).values().collect().get(0);
} else {
avgOfAll = 0.0f;
}
final float finalAvg = avgOfAll;
JavaPairRDD<String, Boolean> rddBool = v1.mapValues(new Function<Float, Boolean>() {
public Boolean call(Float v1) throws Exception {
return v1 > finalAvg;
}
});
return rddBool.filter(new Function<Tuple2<String,Boolean>, Boolean>() {
public Boolean call(Tuple2<String, Boolean> v1) throws Exception {
return !v1._1().equals("all");
}
});
}
};
这是我对 Scala 的尝试:
val outlierPairDStream = avgAll1h.union(avgPerPlug1h).transform{rdd =>
var avgOfAll = 0.0
if(rdd.count() > 0) {
avgOfAll = rdd.filter({case (k, v) => (k == "all")}).map({case (k, v) => v}).collect()(0)
}
val finalAvg = avgOfAll
val rddBool = rdd.map({case(k, v) => (k, v > finalAvg)})
val rddNew = rddBool.filter({case(k, v) => (k != "all")})
}
我收到以下错误消息:
<console>:281: error: type mismatch;
found : Unit
required: org.apache.spark.rdd.RDD[?]
}
^
有人可以帮助我吗?我如何 return "rddNew" DStream?
如果我说
return rddNew
在 "transform" 函数的末尾,出现以下错误:
<console>:293: error: return outside method definition
return rddNew
^
您实际上必须 return 最后一个值,例如像那样:
val outlierPairDStream = avgAll1h.union(avgPerPlug1h).transform{rdd =>
var avgOfAll = 0.0
if(rdd.count() > 0) {
avgOfAll = rdd.filter({case (k, v) => (k == "all")}).map({case (k, v) => v}).collect()(0)
}
val finalAvg = avgOfAll
val rddBool = rdd.map({case(k, v) => (k, v > finalAvg)})
val rddNew = rddBool.filter({case(k, v) => (k != "all")})
rddNew
}
或者完全跳过定义变量:
val outlierPairDStream = avgAll1h.union(avgPerPlug1h).transform{rdd =>
var avgOfAll = 0.0
if(rdd.count() > 0) {
avgOfAll = rdd.filter({case (k, v) => (k == "all")}).map({case (k, v) => v}).collect()(0)
}
val finalAvg = avgOfAll
val rddBool = rdd.map({case(k, v) => (k, v > finalAvg)})
rddBool.filter({case(k, v) => (k != "all")})
}
多一点 Scala-like 可能是:
val outlierPairDStream = avgAll1h.union(avgPerPlug1h).transform{rdd =>
val finalAvg = if(rdd.count() > 0) {
rdd.filter({case (k, v) => (k == "all")}).map({case (k, v) => v}).collect()(0)
} else { 0.0 }
val rddBool = rdd.map({case(k, v) => (k, v > finalAvg)})
rddBool.filter({case(k, v) => (k != "all")})
}