Spark 中的移动平均线 Java
Moving Average in Spark Java
我有实时流数据进入 spark,我想对该时间序列数据进行移动平均预测。 Java?
中有什么方法可以使用 spark 来实现吗?
我已经提到过:https://gist.github.com/samklr/27411098f04fc46dcd05/revisions
和
Apache Spark Moving Average
但是这两个代码都是用 Scala 编写的。由于我不熟悉 Scala,我无法判断我是否会发现它有用,甚至无法将代码转换为 Java。
在 Spark Java 中是否有任何直接的预测实现?
我接受了你提到的问题并努力了几个小时才将 Scala 代码翻译成 Java:
// Read a file containing the Stock Quotations
// You can also paralelize a collection of objects to create a RDD
JavaRDD<String> linesRDD = sc.textFile("some sample file containing stock prices");
// Convert the lines into our business objects
JavaRDD<StockQuotation> quotationsRDD = linesRDD.flatMap(new ConvertLineToStockQuotation());
// We need these two objects in order to use the MLLib RDDFunctions object
ClassTag<StockQuotation> classTag = scala.reflect.ClassManifestFactory.fromClass(StockQuotation.class);
RDD<StockQuotation> rdd = JavaRDD.toRDD(quotationsRDD);
// Instantiate a RDDFunctions object to work with
RDDFunctions<StockQuotation> rddFs = RDDFunctions.fromRDD(rdd, classTag);
// This applies the sliding function and return the (DATE,SMA) tuple
JavaPairRDD<Date, Double> smaPerDate = rddFs.sliding(slidingWindow).toJavaRDD().mapToPair(new MovingAvgByDateFunction());
List<Tuple2<Date, Double>> smaPerDateList = smaPerDate.collect();
然后你必须使用一个新函数Class来对每个数据进行实际计算window:
public class MovingAvgByDateFunction implements PairFunction<Object,Date,Double> {
/**
*
*/
private static final long serialVersionUID = 9220435667459839141L;
@Override
public Tuple2<Date, Double> call(Object t) throws Exception {
StockQuotation[] stocks = (StockQuotation[]) t;
List<StockQuotation> stockList = Arrays.asList(stocks);
Double result = stockList.stream().collect(Collectors.summingDouble(new ToDoubleFunction<StockQuotation>() {
@Override
public double applyAsDouble(StockQuotation value) {
return value.getValue();
}
}));
result = result / stockList.size();
return new Tuple2<Date, Double>(stockList.get(0).getTimestamp(),result);
}
}
如果您想了解更多详细信息,我在这里写了关于简单移动平均线的文章:
https://t.co/gmWltdANd3
我有实时流数据进入 spark,我想对该时间序列数据进行移动平均预测。 Java?
中有什么方法可以使用 spark 来实现吗?我已经提到过:https://gist.github.com/samklr/27411098f04fc46dcd05/revisions 和 Apache Spark Moving Average 但是这两个代码都是用 Scala 编写的。由于我不熟悉 Scala,我无法判断我是否会发现它有用,甚至无法将代码转换为 Java。 在 Spark Java 中是否有任何直接的预测实现?
我接受了你提到的问题并努力了几个小时才将 Scala 代码翻译成 Java:
// Read a file containing the Stock Quotations
// You can also paralelize a collection of objects to create a RDD
JavaRDD<String> linesRDD = sc.textFile("some sample file containing stock prices");
// Convert the lines into our business objects
JavaRDD<StockQuotation> quotationsRDD = linesRDD.flatMap(new ConvertLineToStockQuotation());
// We need these two objects in order to use the MLLib RDDFunctions object
ClassTag<StockQuotation> classTag = scala.reflect.ClassManifestFactory.fromClass(StockQuotation.class);
RDD<StockQuotation> rdd = JavaRDD.toRDD(quotationsRDD);
// Instantiate a RDDFunctions object to work with
RDDFunctions<StockQuotation> rddFs = RDDFunctions.fromRDD(rdd, classTag);
// This applies the sliding function and return the (DATE,SMA) tuple
JavaPairRDD<Date, Double> smaPerDate = rddFs.sliding(slidingWindow).toJavaRDD().mapToPair(new MovingAvgByDateFunction());
List<Tuple2<Date, Double>> smaPerDateList = smaPerDate.collect();
然后你必须使用一个新函数Class来对每个数据进行实际计算window:
public class MovingAvgByDateFunction implements PairFunction<Object,Date,Double> {
/**
*
*/
private static final long serialVersionUID = 9220435667459839141L;
@Override
public Tuple2<Date, Double> call(Object t) throws Exception {
StockQuotation[] stocks = (StockQuotation[]) t;
List<StockQuotation> stockList = Arrays.asList(stocks);
Double result = stockList.stream().collect(Collectors.summingDouble(new ToDoubleFunction<StockQuotation>() {
@Override
public double applyAsDouble(StockQuotation value) {
return value.getValue();
}
}));
result = result / stockList.size();
return new Tuple2<Date, Double>(stockList.get(0).getTimestamp(),result);
}
}
如果您想了解更多详细信息,我在这里写了关于简单移动平均线的文章: https://t.co/gmWltdANd3