按行迭代 Java RDD
Iterate through a Java RDD by row
我想遍历字符串的 RDD 并 "do something" 遍历每个字符串。输出应该是 double[][]
。这是一个带有 for 循环的示例。我知道我需要为 Java RDD 使用(我认为)foreach
函数。但是,我不知道如何理解语法。文档不是特别有用。我没有 Java 8.
这是一个示例,说明如果我可以使用常规 for
循环,我想做什么。
public class PCA {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("PCA Example");
SparkContext sc = new SparkContext(conf);
RDD<String> data = sc.textFile("my/directory/my/dataset.txt", 0);
// here is the "type" of code I would like to execute
// 30 because I have 30 variables
double[][] vals = new double[data.count()][30];
double[] temp;
for (int i = 0; i < data.count(); i++) {
temp = splitStringtoDoubles(data[i]);
vals[i] = temp;
}
}
private static double[] splitStringtoDoubles(String s) {
String[] splitVals = s.split("\t");
Double[] vals = new Double[splitVals.length];
for (int i = 0; i < splitVals.length; i++) {
vals[i] = Double.parseDouble(splitVals[i]);
}
}
}
我知道 foreach
似乎需要一个类型为 void return 的函数。不确定如何使用它。到目前为止,这是我尝试过的(显然语法错误):
double[][] matrix = new double[data.count()][30];
foreach(String s : data) {
String[] splitvals = s.split("\t");
double[] vals = Double.parseDouble(splitvals);
matrix[s] = vals;
}
正如 mattinbits 在评论中所说,你想要 map
而不是 foreach
,因为你想要 return 值。 map
所做的基本上是转换您的数据:对于 RDD 的每一行,您执行一个操作,并且 return 为每一行执行一个值。你需要的可以这样实现:
import org.apache.spark.api.java.function.Function;
...
SparkConf conf = new SparkConf().setAppName("PCA Example");
SparkContext sc = new SparkContext(conf);
JavaRDD<String> data = sc.textFile("clean-sl-mix-with-labels.txt",0).toJavaRDD();
JavaRDD<double[]> whatYouWantRdd = data.map(new Function<String, double[]>() {
@Override
public double[] call(String row) throws Exception {
return splitStringtoDoubles(row);
}
private double[] splitStringtoDoubles(String s) {
String[] splitVals = s.split("\t");
Double[] vals = new Double[splitVals.length];
for(int i=0; i < splitVals.length; i++) {
vals[i] = Double.parseDouble(splitVals[i]);
}
return vals;
}
});
List<double[]> whatYouWant = whatYouWantRdd.collect();
为了了解 Spark 的工作原理,您对 RDD 执行操作或转换。例如,这里我们使用 map
函数转换我们的 RDD。您需要自己创建此函数,这次使用匿名 org.apache.spark.api.java.function.Function
强制您重写方法 call
,在该方法中您会收到一行 RDD 和 return 一个值。
只是因为比较 Spark 的 Java 和 Scala API 的冗长程度很有趣,这里有一个 Scala 版本:
import org.apache.spark.{SparkContext, SparkConf}
class example extends App {
val conf = new SparkConf().setMaster("local").setAppName("Spark example")
val sc = new SparkContext(conf)
val inputData = List(
"1.2\t2.7\t3.8",
"4.3\t5.1\t6.3"
)
val inputRDD = sc.parallelize(inputData)
val arrayOfDoubleRDD = inputRDD.map(_.split("\t").map(_.toDouble))
}
我想遍历字符串的 RDD 并 "do something" 遍历每个字符串。输出应该是 double[][]
。这是一个带有 for 循环的示例。我知道我需要为 Java RDD 使用(我认为)foreach
函数。但是,我不知道如何理解语法。文档不是特别有用。我没有 Java 8.
这是一个示例,说明如果我可以使用常规 for
循环,我想做什么。
public class PCA {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("PCA Example");
SparkContext sc = new SparkContext(conf);
RDD<String> data = sc.textFile("my/directory/my/dataset.txt", 0);
// here is the "type" of code I would like to execute
// 30 because I have 30 variables
double[][] vals = new double[data.count()][30];
double[] temp;
for (int i = 0; i < data.count(); i++) {
temp = splitStringtoDoubles(data[i]);
vals[i] = temp;
}
}
private static double[] splitStringtoDoubles(String s) {
String[] splitVals = s.split("\t");
Double[] vals = new Double[splitVals.length];
for (int i = 0; i < splitVals.length; i++) {
vals[i] = Double.parseDouble(splitVals[i]);
}
}
}
我知道 foreach
似乎需要一个类型为 void return 的函数。不确定如何使用它。到目前为止,这是我尝试过的(显然语法错误):
double[][] matrix = new double[data.count()][30];
foreach(String s : data) {
String[] splitvals = s.split("\t");
double[] vals = Double.parseDouble(splitvals);
matrix[s] = vals;
}
正如 mattinbits 在评论中所说,你想要 map
而不是 foreach
,因为你想要 return 值。 map
所做的基本上是转换您的数据:对于 RDD 的每一行,您执行一个操作,并且 return 为每一行执行一个值。你需要的可以这样实现:
import org.apache.spark.api.java.function.Function;
...
SparkConf conf = new SparkConf().setAppName("PCA Example");
SparkContext sc = new SparkContext(conf);
JavaRDD<String> data = sc.textFile("clean-sl-mix-with-labels.txt",0).toJavaRDD();
JavaRDD<double[]> whatYouWantRdd = data.map(new Function<String, double[]>() {
@Override
public double[] call(String row) throws Exception {
return splitStringtoDoubles(row);
}
private double[] splitStringtoDoubles(String s) {
String[] splitVals = s.split("\t");
Double[] vals = new Double[splitVals.length];
for(int i=0; i < splitVals.length; i++) {
vals[i] = Double.parseDouble(splitVals[i]);
}
return vals;
}
});
List<double[]> whatYouWant = whatYouWantRdd.collect();
为了了解 Spark 的工作原理,您对 RDD 执行操作或转换。例如,这里我们使用 map
函数转换我们的 RDD。您需要自己创建此函数,这次使用匿名 org.apache.spark.api.java.function.Function
强制您重写方法 call
,在该方法中您会收到一行 RDD 和 return 一个值。
只是因为比较 Spark 的 Java 和 Scala API 的冗长程度很有趣,这里有一个 Scala 版本:
import org.apache.spark.{SparkContext, SparkConf}
class example extends App {
val conf = new SparkConf().setMaster("local").setAppName("Spark example")
val sc = new SparkContext(conf)
val inputData = List(
"1.2\t2.7\t3.8",
"4.3\t5.1\t6.3"
)
val inputRDD = sc.parallelize(inputData)
val arrayOfDoubleRDD = inputRDD.map(_.split("\t").map(_.toDouble))
}