在 Spark 中使用多个逗号分隔字段映射 RDD

Mapping RDD with several comma separated fields in Spark

我是 Spark 的新手,我正在学习一个教程,其中包含多个字段的行是用 Scala 解析的,scala 的代码是这样的:

val pass = lines.map(_.split(",")).
map(pass=>(pass(15),pass(7).toInt)).
reduceByKey(_+_)

其中 pass 是从 socketTextStream(它的 SparkStreams)接收到的数据。我是 Spark 的新手,想使用 Java 来获得相同的结果。我已经使用

声明了 JavaReceiverInputDStream
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);

我想到了两个可能的解决方案:

  1. 使用平面地图:

    JavaDStream<String> words = lines.flatMap( new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String x) { return Arrays.asList(x.split(",")); } });

但这似乎不对,因为结果是将 CSV 分解为没有任何顺序的单词。

  1. 使用映射(编译错误),这看起来是合适的解决方案,但我无法使用以下方法提取字段 15 和 7:

    JavaDStream<List<String>> words = lines.map( new Function<String, List<String>>() { public List<String> call(String s) { return Arrays.asList(s.split(",")); } });

当我尝试映射 List<String> => Tuple2<String, Int>

时,这个想法失败了

映射代码为:

JavaPairDStream<String, Integer> pairs = words.map(
new PairFunction<List<String>, String, Integer>() {
  public Tuple2<String, Integer> call(List<String> s) throws Exception {
    return new Tuple2(s.get(15), 6);
  }
});

错误:

class

中的方法映射
org.apache.spark.streaming.api.java.AbstractJavaDStreamLike`<T,This,R>` cannot be applied to given types;
[ERROR] required: org.apache.spark.api.java.function.Function`<java.util.List`<java.lang.String>`,R>`
[ERROR] found: `<anonymous org.apache.spark.api.java.function.PairFunction`<java.util.List`<java.lang.String>`,java.lang.String,java.lang.Integer>`>`
[ERROR] reason: no instance(s) of type variable(s) R exist so that argument type `<anonymous org.apache.spark.api.java.function.PairFunction`<java.util.List`<java.lang.String>`,java.lang.String,java.lang.Integer>`>` conforms to formal parameter type org.apache.spark.api.java.

对此有什么建议吗?

使用此代码。它将从 String 中获取 require 字段。

JavaDStream<String> lines = { ..... };
JavaPairDStream<String, Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() {
    @Override
    public Tuple2<String, Integer> call(String t) throws Exception {
        String[] words = t.split(",");
        return new Tuple2<String, Integer>(words[15],Integer.parseInt(words[7]));
    }
});