Spark ml streaming predictOnValues 如何保存结果?
Spark ml streaming predictOnValues how to save results?
我有以下代码:
StreamingLinearRegressionWithSGD regressionWithSGD =
new StreamingLinearRegressionWithSGD()
.setInitialWeights(Vectors.zeros(featuresNumber));
JavaDStream<LabeledPoint> trainingData = streamingContext.textFileStream(model.getTrainPath()).map(LabeledPoint::parse).cache();
JavaDStream<LabeledPoint> testData = streamingContext.textFileStream(model.getPredictPath()).map(LabeledPoint::parse);
regressionWithSGD.trainOn(trainingData);
regressionWithSGD.predictOnValues(testData.mapToPair(lp -> new Tuple2<>(lp.label(), lp.features()))).print();
我想将结果放入 file/db/queue 等而不是 print()
是否可能?
我想通了
StreamingLinearRegressionWithSGD regressionWithSGD =
new StreamingLinearRegressionWithSGD()
.setInitialWeights(Vectors.zeros(featuresNumber));
JavaDStream<LabeledPoint> trainingData = streamingContext.textFileStream(model.getTrainPath()).map(LabeledPoint::parse).cache();
JavaDStream<LabeledPoint> testData = streamingContext.textFileStream(model.getPredictPath()).map(LabeledPoint::parse);
regressionWithSGD.trainOn(trainingData);
JavaDStream<Double> doubleJavaDStream=regressionWithSGD.predictOn(testData.map(labeledPoint -> labeledPoint.features()));
doubleJavaDStream.dstream().saveAsTextFiles("result","out");
因此我们得到了 result-{timestamp}.out 文件夹。
我有以下代码:
StreamingLinearRegressionWithSGD regressionWithSGD =
new StreamingLinearRegressionWithSGD()
.setInitialWeights(Vectors.zeros(featuresNumber));
JavaDStream<LabeledPoint> trainingData = streamingContext.textFileStream(model.getTrainPath()).map(LabeledPoint::parse).cache();
JavaDStream<LabeledPoint> testData = streamingContext.textFileStream(model.getPredictPath()).map(LabeledPoint::parse);
regressionWithSGD.trainOn(trainingData);
regressionWithSGD.predictOnValues(testData.mapToPair(lp -> new Tuple2<>(lp.label(), lp.features()))).print();
我想将结果放入 file/db/queue 等而不是 print()
是否可能?
我想通了
StreamingLinearRegressionWithSGD regressionWithSGD =
new StreamingLinearRegressionWithSGD()
.setInitialWeights(Vectors.zeros(featuresNumber));
JavaDStream<LabeledPoint> trainingData = streamingContext.textFileStream(model.getTrainPath()).map(LabeledPoint::parse).cache();
JavaDStream<LabeledPoint> testData = streamingContext.textFileStream(model.getPredictPath()).map(LabeledPoint::parse);
regressionWithSGD.trainOn(trainingData);
JavaDStream<Double> doubleJavaDStream=regressionWithSGD.predictOn(testData.map(labeledPoint -> labeledPoint.features()));
doubleJavaDStream.dstream().saveAsTextFiles("result","out");
因此我们得到了 result-{timestamp}.out 文件夹。