Spark 文件流问题
Spark FileStreaming issue
我正在尝试使用 Sparkstreaming(spark-streaming_2.10,version:1.5.1)
的简单文件流示例
public class DStreamExample {
public static void main(final String[] args) {
final SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("SparkJob");
sparkConf.setMaster("local[4]"); // for local
final JavaSparkContext sc = new JavaSparkContext(sparkConf);
final JavaStreamingContext ssc = new JavaStreamingContext(sc,
new Duration(2000));
final JavaDStream<String> lines = ssc.textFileStream("/opt/test/");
lines.print();
ssc.start();
ssc.awaitTermination();
}
}
当我 运行 在单个文件或目录上使用此代码时,它不会从文件中打印任何内容,我在日志中看到它不断轮询,但没有打印任何内容。当此程序 运行ning.
时,我尝试将文件移动到目录
有什么我想念的吗?我尝试在行 RDD 上应用 map 函数也不起作用。
API textFileStream 不应该读取现有目录内容,相反,它的目的是监视给定的 Hadoop 兼容文件系统路径的更改,文件必须被 "moving" 它们从同一文件系统中的另一个位置写入监控位置。
简而言之,您正在订阅 目录更改 并将收到受监视位置内新出现的文件的内容 - 处于监视快照时文件出现的状态(在您的情况下持续时间为 2000 毫秒),任何进一步的文件更新都不会到达流,只有目录更新(新文件)会。
模拟更新的方法是在监控会话期间创建新文件:
import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import java.io.File;
import java.io.IOException;
import java.util.List;
public class DStreamExample {
public static void main(final String[] args) throws IOException {
final SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("SparkJob");
sparkConf.setMaster("local[4]"); // for local
final JavaSparkContext sc = new JavaSparkContext(sparkConf);
final JavaStreamingContext ssc = new JavaStreamingContext(sc,
new Duration(2000));
final JavaDStream<String> lines = ssc.textFileStream("/opt/test/");
// spawn the thread which will create new file within the monitored directory soon
Runnable r = () -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
FileUtils.write(new File("/opt/test/newfile1"), "whatever");
} catch (IOException e) {
e.printStackTrace();
}
};
new Thread(r).start();
lines.foreachRDD((Function<JavaRDD<String>, Void>) rdd -> {
List<String> lines1 = rdd.collect();
lines1.stream().forEach(l -> System.out.println(l));
return null;
});
ssc.start();
ssc.awaitTermination();
}
}
我正在尝试使用 Sparkstreaming(spark-streaming_2.10,version:1.5.1)
的简单文件流示例public class DStreamExample {
public static void main(final String[] args) {
final SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("SparkJob");
sparkConf.setMaster("local[4]"); // for local
final JavaSparkContext sc = new JavaSparkContext(sparkConf);
final JavaStreamingContext ssc = new JavaStreamingContext(sc,
new Duration(2000));
final JavaDStream<String> lines = ssc.textFileStream("/opt/test/");
lines.print();
ssc.start();
ssc.awaitTermination();
}
}
当我 运行 在单个文件或目录上使用此代码时,它不会从文件中打印任何内容,我在日志中看到它不断轮询,但没有打印任何内容。当此程序 运行ning.
时,我尝试将文件移动到目录有什么我想念的吗?我尝试在行 RDD 上应用 map 函数也不起作用。
API textFileStream 不应该读取现有目录内容,相反,它的目的是监视给定的 Hadoop 兼容文件系统路径的更改,文件必须被 "moving" 它们从同一文件系统中的另一个位置写入监控位置。 简而言之,您正在订阅 目录更改 并将收到受监视位置内新出现的文件的内容 - 处于监视快照时文件出现的状态(在您的情况下持续时间为 2000 毫秒),任何进一步的文件更新都不会到达流,只有目录更新(新文件)会。
模拟更新的方法是在监控会话期间创建新文件:
import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import java.io.File;
import java.io.IOException;
import java.util.List;
public class DStreamExample {
public static void main(final String[] args) throws IOException {
final SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("SparkJob");
sparkConf.setMaster("local[4]"); // for local
final JavaSparkContext sc = new JavaSparkContext(sparkConf);
final JavaStreamingContext ssc = new JavaStreamingContext(sc,
new Duration(2000));
final JavaDStream<String> lines = ssc.textFileStream("/opt/test/");
// spawn the thread which will create new file within the monitored directory soon
Runnable r = () -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
FileUtils.write(new File("/opt/test/newfile1"), "whatever");
} catch (IOException e) {
e.printStackTrace();
}
};
new Thread(r).start();
lines.foreachRDD((Function<JavaRDD<String>, Void>) rdd -> {
List<String> lines1 = rdd.collect();
lines1.stream().forEach(l -> System.out.println(l));
return null;
});
ssc.start();
ssc.awaitTermination();
}
}