Spark 文件流问题

Spark FileStreaming issue

我正在尝试使用 Sparkstreaming(spark-streaming_2.10,version:1.5.1)

的简单文件流示例
public class DStreamExample {

    public static void main(final String[] args) {

        final SparkConf sparkConf = new SparkConf();
        sparkConf.setAppName("SparkJob");
        sparkConf.setMaster("local[4]"); // for local

        final JavaSparkContext sc = new JavaSparkContext(sparkConf);

        final JavaStreamingContext ssc = new JavaStreamingContext(sc,
            new Duration(2000));

        final JavaDStream<String> lines = ssc.textFileStream("/opt/test/");
        lines.print();

        ssc.start();
        ssc.awaitTermination();
    }
}

当我 运行 在单个文件或目录上使用此代码时,它不会从文件中打印任何内容,我在日志中看到它不断轮询,但没有打印任何内容。当此程序 运行ning.

时,我尝试将文件移动到目录

有什么我想念的吗?我尝试在行 RDD 上应用 map 函数也不起作用。

API textFileStream 不应该读取现有目录内容,相反,它的目的是监视给定的 Hadoop 兼容文件系统路径的更改,文件必须被 "moving" 它们从同一文件系统中的另一个位置写入监控位置。 简而言之,您正在订阅 目录更改 并将收到受监视位置内新出现的文件的内容 - 处于监视快照时文件出现的状态(在您的情况下持续时间为 2000 毫秒),任何进一步的文件更新都不会到达流,只有目录更新(新文件)会。

模拟更新的方法是在监控会话期间创建新文件:

import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import java.io.File;
import java.io.IOException;
import java.util.List;

public class DStreamExample {

public static void main(final String[] args) throws IOException {

    final SparkConf sparkConf = new SparkConf();
    sparkConf.setAppName("SparkJob");
    sparkConf.setMaster("local[4]"); // for local

    final JavaSparkContext sc = new JavaSparkContext(sparkConf);

    final JavaStreamingContext ssc = new JavaStreamingContext(sc,
            new Duration(2000));

    final JavaDStream<String> lines = ssc.textFileStream("/opt/test/");

    // spawn the thread which will create new file within the monitored directory soon
    Runnable r = () -> {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        try {
            FileUtils.write(new File("/opt/test/newfile1"), "whatever");
        } catch (IOException e) {
            e.printStackTrace();
        }
    };

    new Thread(r).start();


    lines.foreachRDD((Function<JavaRDD<String>, Void>) rdd -> {
        List<String> lines1 = rdd.collect();
        lines1.stream().forEach(l -> System.out.println(l));
        return null;
    });

    ssc.start();
    ssc.awaitTermination();
}

}