Spark:遍历 JavaRDD 元组 - Java
Spark: Loop through JavaRDD Tuple - Java
我正在尝试使用元组循环遍历 JavaRDD,但是我遇到了一些关于如何正确循环遍历 JavaRDD 并将我的 rdd_value 设置为 rdd_array 中包含的元组的问题无论 b 迭代器计数器值是多少。
这是我的代码
//JavaRDD that contains Tuples
JavaRDD<Tuple5<Long, String, Float, Float, String>> rdd_array
//For loop to loop through rdd_array
for (int b=0;b<rdd_array.count();b++){
//Need help on this line, how do I set rdd_row equal to the tuple5 in rdd_array
Tuple5<Long, String, Float, Float, String> Value = rdd_array.;
String id=Value._1().toString();
String text=Value._2().toString();
String negative_tweets=Value._3().toString();
String positive_tweets=Value._4().toString();
String score_tweets=Value._5().toString();
System.out.println(id+text+negative_tweets+positive_tweets+score_tweets)
}
编辑:
伙计们,我真的在这里尝试,接受了 zero323 对 JavaRDD 上的 foreachloop 的建议,收到错误无法解析方法。
rdd_array.foreach(new Function<Tuple5<Long, String, Float, Float, String>, Void>(){
@Override
public Void call(Tuple5<Long, String, Float, Float, String> rdd){
String id=rdd._1().toString();
String text=rdd._2().toString();
String negative_tweets=rdd._3().toString();
String positive_tweets=rdd._4().toString();
String score_tweets=rdd._5().toString();
System.out.println(id+text+negative_tweets+positive_tweets+score_tweets);
return null;
}
});
使用以下函数解决
rdd_array.foreach(new VoidFunction<Tuple2<Long, Tuple7<String, String, String, String, String, String, String>>>() {
@Override
public void call(Tuple2<Long, Tuple7<String, String, String, String, String, String, String>> rdd_val) throws Exception {
//new Tuple7<String, String, String, String, String, String, String>(text,created_at,userlocation,name,username,lat,lon);
String id = rdd_val._1().toString();
String text = rdd_val._2()._1().toString();
String createdat = rdd_val._2()._2().toString();
String userlocation = rdd_val._2()._3().toString();
String name = rdd_val._2()._4().toString();
String username = rdd_val._2()._5().toString();
String lat = rdd_val._2()._6().toString();
String lon = rdd_val._2()._7().toString();
System.out.println("Printing Values EXTRA: "+id+text+createdat+userlocation+name+username+lat+lon);
}
});
请注意,在您自己尝试回答时,您实际上正在处理看起来像 <K,V>
的内容,其中 K
(键)是 Long,V
(值)一个 Tuple7。这与您在原始问题中提出的内容非常不同。使用 JavaPairRDD.
最有可能更有效地实现所有这些
从 RDD 开始,您可以使用
将 JavaRDD 转换为 JavaPairRDD
JavaPairRDD<Long,Tuple7<...>> prdd = rdd.mapToPair(...)
这将包括基于您的密钥的重新分区。
同时使用 .foreach
进行最终处理会将您的 rdd 结果序列化到您的驱动程序并执行包含的逻辑序列。您可能需要考虑将大部分逻辑推到 RDD 的上游,使用过滤器、缩减和其他范例。您还可以考虑使用 .foreachPartition
来实现某种程度的并行并在任务节点而不是驱动程序上进行计算。
请注意,使用 Java 8 lambda 语法,您可以将大部分逻辑编写得更紧凑:
prdd.foreach((k,v)->{
System.out.println("Printing: " + k + ", " + v._1() ...);
});
现在,要注意另一件事......使用更专用的 class 而不是通用的 Tuple7<>
会不会更容易。至少它看起来像这样:
public class UserLocation {
public long id;
public String text;
public String createdat;
public String userlocation;
public String name;
public String username;
public String lat;
public String lon;
@Override
public String toString() {
return Long.toString(id)+text+createdat+userlocation+name+username+lat+lon;
}
}
然后,您的处理过程可能如下所示:
JavaRDD<UserLocation> jrdd;
JavaPairRDD<Long,UserLocation> jprdd = jrdd.mapToPair((v)->new Tupple2<>(v.id,v));
...
jprdd
.foreach((k,v)->{
System.out.println(v.toString());
});
普通,在这个例子中,将您的数据映射到 JavaPairRDD<Long,UserLocation>
没有实际用途。但是,您的数据将根据长键重新分区,并可能具有更好的并行性。
我正在尝试使用元组循环遍历 JavaRDD,但是我遇到了一些关于如何正确循环遍历 JavaRDD 并将我的 rdd_value 设置为 rdd_array 中包含的元组的问题无论 b 迭代器计数器值是多少。
这是我的代码
//JavaRDD that contains Tuples
JavaRDD<Tuple5<Long, String, Float, Float, String>> rdd_array
//For loop to loop through rdd_array
for (int b=0;b<rdd_array.count();b++){
//Need help on this line, how do I set rdd_row equal to the tuple5 in rdd_array
Tuple5<Long, String, Float, Float, String> Value = rdd_array.;
String id=Value._1().toString();
String text=Value._2().toString();
String negative_tweets=Value._3().toString();
String positive_tweets=Value._4().toString();
String score_tweets=Value._5().toString();
System.out.println(id+text+negative_tweets+positive_tweets+score_tweets)
}
编辑: 伙计们,我真的在这里尝试,接受了 zero323 对 JavaRDD 上的 foreachloop 的建议,收到错误无法解析方法。
rdd_array.foreach(new Function<Tuple5<Long, String, Float, Float, String>, Void>(){
@Override
public Void call(Tuple5<Long, String, Float, Float, String> rdd){
String id=rdd._1().toString();
String text=rdd._2().toString();
String negative_tweets=rdd._3().toString();
String positive_tweets=rdd._4().toString();
String score_tweets=rdd._5().toString();
System.out.println(id+text+negative_tweets+positive_tweets+score_tweets);
return null;
}
});
使用以下函数解决
rdd_array.foreach(new VoidFunction<Tuple2<Long, Tuple7<String, String, String, String, String, String, String>>>() {
@Override
public void call(Tuple2<Long, Tuple7<String, String, String, String, String, String, String>> rdd_val) throws Exception {
//new Tuple7<String, String, String, String, String, String, String>(text,created_at,userlocation,name,username,lat,lon);
String id = rdd_val._1().toString();
String text = rdd_val._2()._1().toString();
String createdat = rdd_val._2()._2().toString();
String userlocation = rdd_val._2()._3().toString();
String name = rdd_val._2()._4().toString();
String username = rdd_val._2()._5().toString();
String lat = rdd_val._2()._6().toString();
String lon = rdd_val._2()._7().toString();
System.out.println("Printing Values EXTRA: "+id+text+createdat+userlocation+name+username+lat+lon);
}
});
请注意,在您自己尝试回答时,您实际上正在处理看起来像 <K,V>
的内容,其中 K
(键)是 Long,V
(值)一个 Tuple7。这与您在原始问题中提出的内容非常不同。使用 JavaPairRDD.
从 RDD 开始,您可以使用
将 JavaRDD 转换为 JavaPairRDDJavaPairRDD<Long,Tuple7<...>> prdd = rdd.mapToPair(...)
这将包括基于您的密钥的重新分区。
同时使用 .foreach
进行最终处理会将您的 rdd 结果序列化到您的驱动程序并执行包含的逻辑序列。您可能需要考虑将大部分逻辑推到 RDD 的上游,使用过滤器、缩减和其他范例。您还可以考虑使用 .foreachPartition
来实现某种程度的并行并在任务节点而不是驱动程序上进行计算。
请注意,使用 Java 8 lambda 语法,您可以将大部分逻辑编写得更紧凑:
prdd.foreach((k,v)->{
System.out.println("Printing: " + k + ", " + v._1() ...);
});
现在,要注意另一件事......使用更专用的 class 而不是通用的 Tuple7<>
会不会更容易。至少它看起来像这样:
public class UserLocation {
public long id;
public String text;
public String createdat;
public String userlocation;
public String name;
public String username;
public String lat;
public String lon;
@Override
public String toString() {
return Long.toString(id)+text+createdat+userlocation+name+username+lat+lon;
}
}
然后,您的处理过程可能如下所示:
JavaRDD<UserLocation> jrdd;
JavaPairRDD<Long,UserLocation> jprdd = jrdd.mapToPair((v)->new Tupple2<>(v.id,v));
...
jprdd
.foreach((k,v)->{
System.out.println(v.toString());
});
普通,在这个例子中,将您的数据映射到 JavaPairRDD<Long,UserLocation>
没有实际用途。但是,您的数据将根据长键重新分区,并可能具有更好的并行性。