停止火花流

Stop spark streaming

我想在文件中处理 100 条记录后停止 java spark 中的流上下文。问题是 if 语句中的代码在流式传输开始时未执行。下面的代码将解释我的想法:

    public static void main(String[] args) throws Exception {

        int ff = testSparkStreaming();

        System.out.println("wqwqwq");
        System.out.println(ff);

    }


    public static int testSparkStreaming() throws IOException, InterruptedException {

        int numberInst = 0
        String savePath = "Path to Model";
        final NaiveBayesModel savedModel = NaiveBayesModel.load(jssc.sparkContext().sc(), savePath);

        BufferedReader br = new BufferedReader(new FileReader("C://testStream//copy.csv"));
        Queue<JavaRDD<String>> rddQueue = new LinkedList<JavaRDD<String>>();
        List<String> list = Lists.newArrayList();
        String line = "";
        while ((line = br.readLine()) != null) {
            list.add(line);
        }
        br.close();

        rddQueue.add(jssc.sparkContext().parallelize(list));
        numberInst+= list.size();
        JavaDStream<String> dataStream = jssc.queueStream(rddQueue);
        dataStream.print();

        if (numberInst == 100){
             System.out.println("should stop");
             jssc.wait();
        }
        jssc.start();
        jssc.awaitTermination();

        return numberInst;

}

我的问题是如何在 numberInst == 100 时停止流式传输并将执行移至 main 方法 运行 以下语句。

P.S:在前面的代码中,If语句没有执行:

        if (numberInst == 100){
             System.out.println("should stop");
             jssc.wait();
        }

你试过像线程一样停止它吗,我的意思是中断。

你可以试试这个:

    jssc.start();

    while (numberInst < 100){
        jssc.awaitTerminationOrTimeout(1000); // 1 second polling time, you can change it as per your usecase
    }

    jssc.stop();