Spark 流 + Spark SQL

Spark Streaming + Spark SQL

我正在尝试通过 Spark Streaming 和 Spark SQL 处理日志。主要思想是根据查询需要将 "old" 数据的 Parquet 格式的 "compacted" 数据集转换为 DataFrame,压缩数据集加载完成:

    SQLContext sqlContext = JavaSQLContextSingleton.getInstance(sc.sc());
    DataFrame compact = null;
    compact = sqlContext.parquetFile("hdfs://auto-ha/tmp/data/logs");

由于未压缩的数据集(我每天压缩数据集)由许多文件组成,我想在 DStream 中包含当天的数据,以便快速获得这些查询。

我已经尝试过 DataFrame 方法但没有结果....

    DataFrame df = JavaSQLContextSingleton.getInstance(sc.sc()).createDataFrame(lastData, schema);
    df.registerTempTable("lastData");
    JavaDStream SumStream = inputStream.transform(new Function<JavaRDD<Row>, JavaRDD<Object>>() {
        @Override
        public JavaRDD<Object> call(JavaRDD<Row> v1) throws Exception {
            DataFrame df = JavaSQLContextSingleton.getInstance(v1.context()).createDataFrame(v1, schema);
            ......drop old data from lastData table                                
            df.insertInto("lastData");

        }
    });

例如,如果我在不同的线程中查询临时 table,使用这种方法我不会得到任何结果。

我也尝试过使用 RDD 转换方法,更具体地说,我尝试按照 Spark 示例创建一个空的 RDD,然后将 DSStream RDD 内容与空的 RDD 合并:

  JavaRDD<Row> lastData = sc.emptyRDD();
  JavaDStream SumStream = inputStream.transform(new Function<JavaRDD<Row>, JavaRDD<Object>>() {
        @Override
        public JavaRDD<Object> call(JavaRDD<Row> v1) throws Exception {
            lastData.union(v1).filter(let only recent data....);
        }
    });

这种方法也行不通,因为我在 lastData 中没有得到任何内容

我可以为此目的使用窗口计算或 updateStateBy 键吗?

有什么建议吗?

感谢您的帮助!

好吧,我终于明白了。

我使用 updateState 函数和 return 0 如果时间戳像这样早于 24 小时。

      final static Function2<List<Long>, Optional<Long>, Optional<Long>> RETAIN_RECENT_DATA
        = (List<Long> values, Optional<Long> state) -> {
            Long newSum = state.or(0L);
            for (Long value : values) {
                newSum += value;
            }
            //current milis uses UTC
            if (System.currentTimeMillis() - newSum > 86400000L) {
                return Optional.absent();
            } else {
                return Optional.of(newSum);
            }
        };

然后在每个批次上我将 DataFrame 注册为临时 table:

finalsum.foreachRDD((JavaRDD<Row> rdd, Time time) -> {
        if (!rdd.isEmpty()) {
            HiveContext sqlContext1 = JavaSQLContextSingleton.getInstance(rdd.context());
            if (sqlContext1.cacheManager().isCached("alarm_recent")) {
                sqlContext1.uncacheTable("alarm_recent");
            }
            DataFrame wordsDataFrame = sqlContext1.createDataFrame(rdd, schema);
            wordsDataFrame.registerTempTable("alarm_recent");

            wordsDataFrame.cache();//    
            wordsDataFrame.first();
        }
        return null;
    });

您可以在 Spark1.6 中使用 mapwithState。 mapwithState函数更高效,更容易实现。

看看this link.

mapwithState 支持很酷的功能,例如 State time outinitialRDD,这在维护有状态的 Dstream 时会派上用场。

谢谢 玛纳斯