Spark 流 + Spark SQL
Spark Streaming + Spark SQL
我正在尝试通过 Spark Streaming 和 Spark SQL 处理日志。主要思想是根据查询需要将 "old" 数据的 Parquet 格式的 "compacted" 数据集转换为 DataFrame,压缩数据集加载完成:
SQLContext sqlContext = JavaSQLContextSingleton.getInstance(sc.sc());
DataFrame compact = null;
compact = sqlContext.parquetFile("hdfs://auto-ha/tmp/data/logs");
由于未压缩的数据集(我每天压缩数据集)由许多文件组成,我想在 DStream 中包含当天的数据,以便快速获得这些查询。
我已经尝试过 DataFrame 方法但没有结果....
DataFrame df = JavaSQLContextSingleton.getInstance(sc.sc()).createDataFrame(lastData, schema);
df.registerTempTable("lastData");
JavaDStream SumStream = inputStream.transform(new Function<JavaRDD<Row>, JavaRDD<Object>>() {
@Override
public JavaRDD<Object> call(JavaRDD<Row> v1) throws Exception {
DataFrame df = JavaSQLContextSingleton.getInstance(v1.context()).createDataFrame(v1, schema);
......drop old data from lastData table
df.insertInto("lastData");
}
});
例如,如果我在不同的线程中查询临时 table,使用这种方法我不会得到任何结果。
我也尝试过使用 RDD 转换方法,更具体地说,我尝试按照 Spark 示例创建一个空的 RDD,然后将 DSStream RDD 内容与空的 RDD 合并:
JavaRDD<Row> lastData = sc.emptyRDD();
JavaDStream SumStream = inputStream.transform(new Function<JavaRDD<Row>, JavaRDD<Object>>() {
@Override
public JavaRDD<Object> call(JavaRDD<Row> v1) throws Exception {
lastData.union(v1).filter(let only recent data....);
}
});
这种方法也行不通,因为我在 lastData 中没有得到任何内容
我可以为此目的使用窗口计算或 updateStateBy 键吗?
有什么建议吗?
感谢您的帮助!
好吧,我终于明白了。
我使用 updateState 函数和 return 0 如果时间戳像这样早于 24 小时。
final static Function2<List<Long>, Optional<Long>, Optional<Long>> RETAIN_RECENT_DATA
= (List<Long> values, Optional<Long> state) -> {
Long newSum = state.or(0L);
for (Long value : values) {
newSum += value;
}
//current milis uses UTC
if (System.currentTimeMillis() - newSum > 86400000L) {
return Optional.absent();
} else {
return Optional.of(newSum);
}
};
然后在每个批次上我将 DataFrame 注册为临时 table:
finalsum.foreachRDD((JavaRDD<Row> rdd, Time time) -> {
if (!rdd.isEmpty()) {
HiveContext sqlContext1 = JavaSQLContextSingleton.getInstance(rdd.context());
if (sqlContext1.cacheManager().isCached("alarm_recent")) {
sqlContext1.uncacheTable("alarm_recent");
}
DataFrame wordsDataFrame = sqlContext1.createDataFrame(rdd, schema);
wordsDataFrame.registerTempTable("alarm_recent");
wordsDataFrame.cache();//
wordsDataFrame.first();
}
return null;
});
您可以在 Spark1.6 中使用 mapwithState。
mapwithState函数更高效,更容易实现。
看看this link.
mapwithState 支持很酷的功能,例如 State time out
和 initialRDD
,这在维护有状态的 Dstream 时会派上用场。
谢谢
玛纳斯
我正在尝试通过 Spark Streaming 和 Spark SQL 处理日志。主要思想是根据查询需要将 "old" 数据的 Parquet 格式的 "compacted" 数据集转换为 DataFrame,压缩数据集加载完成:
SQLContext sqlContext = JavaSQLContextSingleton.getInstance(sc.sc());
DataFrame compact = null;
compact = sqlContext.parquetFile("hdfs://auto-ha/tmp/data/logs");
由于未压缩的数据集(我每天压缩数据集)由许多文件组成,我想在 DStream 中包含当天的数据,以便快速获得这些查询。
我已经尝试过 DataFrame 方法但没有结果....
DataFrame df = JavaSQLContextSingleton.getInstance(sc.sc()).createDataFrame(lastData, schema);
df.registerTempTable("lastData");
JavaDStream SumStream = inputStream.transform(new Function<JavaRDD<Row>, JavaRDD<Object>>() {
@Override
public JavaRDD<Object> call(JavaRDD<Row> v1) throws Exception {
DataFrame df = JavaSQLContextSingleton.getInstance(v1.context()).createDataFrame(v1, schema);
......drop old data from lastData table
df.insertInto("lastData");
}
});
例如,如果我在不同的线程中查询临时 table,使用这种方法我不会得到任何结果。
我也尝试过使用 RDD 转换方法,更具体地说,我尝试按照 Spark 示例创建一个空的 RDD,然后将 DSStream RDD 内容与空的 RDD 合并:
JavaRDD<Row> lastData = sc.emptyRDD();
JavaDStream SumStream = inputStream.transform(new Function<JavaRDD<Row>, JavaRDD<Object>>() {
@Override
public JavaRDD<Object> call(JavaRDD<Row> v1) throws Exception {
lastData.union(v1).filter(let only recent data....);
}
});
这种方法也行不通,因为我在 lastData 中没有得到任何内容
我可以为此目的使用窗口计算或 updateStateBy 键吗?
有什么建议吗?
感谢您的帮助!
好吧,我终于明白了。
我使用 updateState 函数和 return 0 如果时间戳像这样早于 24 小时。
final static Function2<List<Long>, Optional<Long>, Optional<Long>> RETAIN_RECENT_DATA
= (List<Long> values, Optional<Long> state) -> {
Long newSum = state.or(0L);
for (Long value : values) {
newSum += value;
}
//current milis uses UTC
if (System.currentTimeMillis() - newSum > 86400000L) {
return Optional.absent();
} else {
return Optional.of(newSum);
}
};
然后在每个批次上我将 DataFrame 注册为临时 table:
finalsum.foreachRDD((JavaRDD<Row> rdd, Time time) -> {
if (!rdd.isEmpty()) {
HiveContext sqlContext1 = JavaSQLContextSingleton.getInstance(rdd.context());
if (sqlContext1.cacheManager().isCached("alarm_recent")) {
sqlContext1.uncacheTable("alarm_recent");
}
DataFrame wordsDataFrame = sqlContext1.createDataFrame(rdd, schema);
wordsDataFrame.registerTempTable("alarm_recent");
wordsDataFrame.cache();//
wordsDataFrame.first();
}
return null;
});
您可以在 Spark1.6 中使用 mapwithState。 mapwithState函数更高效,更容易实现。
看看this link.
mapwithState 支持很酷的功能,例如 State time out
和 initialRDD
,这在维护有状态的 Dstream 时会派上用场。
谢谢 玛纳斯