Spark - 带排序的字数统计(不是排序)
Spark - Word Count with Sorting (is not sorting)
我正在学习 Spark 并尝试通过按出现次数对单词进行排序来扩展 WordCount 示例。问题出在哪里,在 运行 代码之后我得到了未排序的结果:
(708,word1)
(46,word2)
(65,word3)
看来排序由于某种原因失败了。 wordSortedByCount.first() 命令和将执行限制为仅一个线程也有类似的效果。
import java.io.Serializable;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import scala.Tuple2;
public class JavaWordCount2 {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("JavaWordCountAndSort");
int numOfKernels = 8;
sparkConf.setMaster("local[" + numOfKernels + "]");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
JavaRDD<String> lines = ctx.textFile("data.csv", 1);
JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line
.split("[,; :\.]")));
words = words.flatMap(line -> Arrays.asList(line.replaceAll("[\"\(\)]", "").toLowerCase()));
// sum words
JavaPairRDD<String, Integer> counts = words.mapToPair(
w -> new Tuple2<String, Integer>(w, 1)).reduceByKey(
(x, y) -> x + y);
// minimum 5 occurences
// counts = counts.filer(s -> s._2 > 5);
counts = counts.filter(new Function<Tuple2<String,Integer>, Boolean>() {
@Override
public Boolean call(Tuple2<String, Integer> v1) throws Exception {
return v1._2 > 5;
}
});
// to enable sorting by value (count) and not key -> value-to-key conversion pattern
// setting value to null, since it won't be used anymore
JavaPairRDD<Tuple2<Integer, String>, Integer> countInKey = counts.mapToPair(a -> new Tuple2(new Tuple2<Integer, String>(a._2, a._1), null));
// sort by num of occurences
JavaPairRDD<Tuple2<Integer, String>, Integer> wordSortedByCount = countInKey.sortByKey(new TupleComparator(), true);
// print result
List<Tuple2<Tuple2<Integer, String>, Integer>> output = wordSortedByCount.take(10);
for (Tuple2<?, ?> tuple : output) {
System.out.println(tuple._1());
}
ctx.stop();
}
}
Class比较:
import java.io.Serializable;
import java.util.Comparator;
import scala.Tuple2;
public class TupleComparator implements Comparator<Tuple2<Integer, String>>,
Serializable {
@Override
public int compare(Tuple2<Integer, String> tuple1,
Tuple2<Integer, String> tuple2) {
return tuple1._1 < tuple2._1 ? 0 : 1;
}
}
任何人都可以指出代码有什么问题吗?
您的代码的第一个问题是在比较器中。事实上,您 returning 0 或 1,而 compare
方法应该 return 一些负值,无论第一个元素出现在第二个元素之前。所以将其更改为:
@Override
public int compare(Tuple2<Integer, String> tuple1,
Tuple2<Integer, String> tuple2) {
return tuple1._1 - tuple2._1;
}
另外,sortByKey
的第二个参数应该放在false
,否则会得到一个升序,即从最低到最高,这与你想要的正好相反我觉得。
我正在学习 Spark 并尝试通过按出现次数对单词进行排序来扩展 WordCount 示例。问题出在哪里,在 运行 代码之后我得到了未排序的结果:
(708,word1)
(46,word2)
(65,word3)
看来排序由于某种原因失败了。 wordSortedByCount.first() 命令和将执行限制为仅一个线程也有类似的效果。
import java.io.Serializable;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import scala.Tuple2;
public class JavaWordCount2 {
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf().setAppName("JavaWordCountAndSort");
int numOfKernels = 8;
sparkConf.setMaster("local[" + numOfKernels + "]");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
JavaRDD<String> lines = ctx.textFile("data.csv", 1);
JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line
.split("[,; :\.]")));
words = words.flatMap(line -> Arrays.asList(line.replaceAll("[\"\(\)]", "").toLowerCase()));
// sum words
JavaPairRDD<String, Integer> counts = words.mapToPair(
w -> new Tuple2<String, Integer>(w, 1)).reduceByKey(
(x, y) -> x + y);
// minimum 5 occurences
// counts = counts.filer(s -> s._2 > 5);
counts = counts.filter(new Function<Tuple2<String,Integer>, Boolean>() {
@Override
public Boolean call(Tuple2<String, Integer> v1) throws Exception {
return v1._2 > 5;
}
});
// to enable sorting by value (count) and not key -> value-to-key conversion pattern
// setting value to null, since it won't be used anymore
JavaPairRDD<Tuple2<Integer, String>, Integer> countInKey = counts.mapToPair(a -> new Tuple2(new Tuple2<Integer, String>(a._2, a._1), null));
// sort by num of occurences
JavaPairRDD<Tuple2<Integer, String>, Integer> wordSortedByCount = countInKey.sortByKey(new TupleComparator(), true);
// print result
List<Tuple2<Tuple2<Integer, String>, Integer>> output = wordSortedByCount.take(10);
for (Tuple2<?, ?> tuple : output) {
System.out.println(tuple._1());
}
ctx.stop();
}
}
Class比较:
import java.io.Serializable;
import java.util.Comparator;
import scala.Tuple2;
public class TupleComparator implements Comparator<Tuple2<Integer, String>>,
Serializable {
@Override
public int compare(Tuple2<Integer, String> tuple1,
Tuple2<Integer, String> tuple2) {
return tuple1._1 < tuple2._1 ? 0 : 1;
}
}
任何人都可以指出代码有什么问题吗?
您的代码的第一个问题是在比较器中。事实上,您 returning 0 或 1,而 compare
方法应该 return 一些负值,无论第一个元素出现在第二个元素之前。所以将其更改为:
@Override
public int compare(Tuple2<Integer, String> tuple1,
Tuple2<Integer, String> tuple2) {
return tuple1._1 - tuple2._1;
}
另外,sortByKey
的第二个参数应该放在false
,否则会得到一个升序,即从最低到最高,这与你想要的正好相反我觉得。