在 Spark 中使用多个逗号分隔字段映射 RDD
Mapping RDD with several comma separated fields in Spark
我是 Spark 的新手,我正在学习一个教程,其中包含多个字段的行是用 Scala 解析的,scala 的代码是这样的:
val pass = lines.map(_.split(",")).
map(pass=>(pass(15),pass(7).toInt)).
reduceByKey(_+_)
其中 pass
是从 socketTextStream(它的 SparkStreams)接收到的数据。我是 Spark 的新手,想使用 Java 来获得相同的结果。我已经使用
声明了 JavaReceiverInputDStream
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
我想到了两个可能的解决方案:
使用平面地图:
JavaDStream<String> words = lines.flatMap(
new FlatMapFunction<String, String>() {
@Override public Iterable<String> call(String x) {
return Arrays.asList(x.split(","));
}
});
但这似乎不对,因为结果是将 CSV 分解为没有任何顺序的单词。
使用映射(编译错误),这看起来是合适的解决方案,但我无法使用以下方法提取字段 15 和 7:
JavaDStream<List<String>> words = lines.map(
new Function<String, List<String>>() {
public List<String> call(String s) {
return Arrays.asList(s.split(","));
}
});
当我尝试映射 List<String>
=> Tuple2<String, Int>
时,这个想法失败了
映射代码为:
JavaPairDStream<String, Integer> pairs = words.map(
new PairFunction<List<String>, String, Integer>() {
public Tuple2<String, Integer> call(List<String> s) throws Exception {
return new Tuple2(s.get(15), 6);
}
});
错误:
class
中的方法映射
org.apache.spark.streaming.api.java.AbstractJavaDStreamLike`<T,This,R>` cannot be applied to given types;
[ERROR] required: org.apache.spark.api.java.function.Function`<java.util.List`<java.lang.String>`,R>`
[ERROR] found: `<anonymous org.apache.spark.api.java.function.PairFunction`<java.util.List`<java.lang.String>`,java.lang.String,java.lang.Integer>`>`
[ERROR] reason: no instance(s) of type variable(s) R exist so that argument type `<anonymous org.apache.spark.api.java.function.PairFunction`<java.util.List`<java.lang.String>`,java.lang.String,java.lang.Integer>`>` conforms to formal parameter type org.apache.spark.api.java.
对此有什么建议吗?
使用此代码。它将从 String 中获取 require 字段。
JavaDStream<String> lines = { ..... };
JavaPairDStream<String, Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String t) throws Exception {
String[] words = t.split(",");
return new Tuple2<String, Integer>(words[15],Integer.parseInt(words[7]));
}
});
我是 Spark 的新手,我正在学习一个教程,其中包含多个字段的行是用 Scala 解析的,scala 的代码是这样的:
val pass = lines.map(_.split(",")).
map(pass=>(pass(15),pass(7).toInt)).
reduceByKey(_+_)
其中 pass
是从 socketTextStream(它的 SparkStreams)接收到的数据。我是 Spark 的新手,想使用 Java 来获得相同的结果。我已经使用
JavaReceiverInputDStream
JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
我想到了两个可能的解决方案:
使用平面地图:
JavaDStream<String> words = lines.flatMap( new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String x) { return Arrays.asList(x.split(",")); } });
但这似乎不对,因为结果是将 CSV 分解为没有任何顺序的单词。
使用映射(编译错误),这看起来是合适的解决方案,但我无法使用以下方法提取字段 15 和 7:
JavaDStream<List<String>> words = lines.map( new Function<String, List<String>>() { public List<String> call(String s) { return Arrays.asList(s.split(",")); } });
当我尝试映射 List<String>
=> Tuple2<String, Int>
映射代码为:
JavaPairDStream<String, Integer> pairs = words.map(
new PairFunction<List<String>, String, Integer>() {
public Tuple2<String, Integer> call(List<String> s) throws Exception {
return new Tuple2(s.get(15), 6);
}
});
错误:
class
中的方法映射org.apache.spark.streaming.api.java.AbstractJavaDStreamLike`<T,This,R>` cannot be applied to given types;
[ERROR] required: org.apache.spark.api.java.function.Function`<java.util.List`<java.lang.String>`,R>`
[ERROR] found: `<anonymous org.apache.spark.api.java.function.PairFunction`<java.util.List`<java.lang.String>`,java.lang.String,java.lang.Integer>`>`
[ERROR] reason: no instance(s) of type variable(s) R exist so that argument type `<anonymous org.apache.spark.api.java.function.PairFunction`<java.util.List`<java.lang.String>`,java.lang.String,java.lang.Integer>`>` conforms to formal parameter type org.apache.spark.api.java.
对此有什么建议吗?
使用此代码。它将从 String 中获取 require 字段。
JavaDStream<String> lines = { ..... };
JavaPairDStream<String, Integer> pairs = lines.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String t) throws Exception {
String[] words = t.split(",");
return new Tuple2<String, Integer>(words[15],Integer.parseInt(words[7]));
}
});