如何从火花流数据延迟构建缓存
How to lazily build a cache from spark streaming data
我是 运行 一个流媒体工作,想逐步构建一个查找映射(例如跟踪唯一项,过滤重复的传入),最初我想在缓存中保留一个 DataFrame
,并将其与每个批次中创建的新 DataFrame
联合起来,像这样
items.foreachRDD((rdd: RDD[String]) => {
...
val uf = rdd.toDF
cached_df = cached_df.unionAll(uf)
cached_df.cache
cached_df.count // materialize the
...
})
我担心的是,cached_df 似乎记得从每个批处理迭代附加到先前 RDD
的所有谱系,在我的例子中,如果我不关心重新计算这个缓存 RDD
如果它崩溃了,那是维护不断增长的 DAG 的开销吗?
作为替代方案,在每批开始时,我从 parquet 文件加载查找,而不是将其保存在内存中,然后在每批结束时,我将新的 RDD
附加到相同的镶木地板文件:
noDuplicatedDF.write.mode(SaveMode.Append).parquet("lookup")
这按预期工作,但是否有直接的方法将查找保存在内存中?
谢谢
万春
附加到 Parquet 绝对是正确的方法。但是,您可以优化查找。如果您可以接受内存缓存稍微延迟(即没有最新的秒数据),那么您可以定期(例如,每 5 分钟)加载当前 "lookup" parquet table 在内存中(假设它适合)。所有查找查询都将查找最新的 5 分钟快照。
您还可以通过管道将加载到内存和在不同线程中提供查询服务。
我是 运行 一个流媒体工作,想逐步构建一个查找映射(例如跟踪唯一项,过滤重复的传入),最初我想在缓存中保留一个 DataFrame
,并将其与每个批次中创建的新 DataFrame
联合起来,像这样
items.foreachRDD((rdd: RDD[String]) => {
...
val uf = rdd.toDF
cached_df = cached_df.unionAll(uf)
cached_df.cache
cached_df.count // materialize the
...
})
我担心的是,cached_df 似乎记得从每个批处理迭代附加到先前 RDD
的所有谱系,在我的例子中,如果我不关心重新计算这个缓存 RDD
如果它崩溃了,那是维护不断增长的 DAG 的开销吗?
作为替代方案,在每批开始时,我从 parquet 文件加载查找,而不是将其保存在内存中,然后在每批结束时,我将新的 RDD
附加到相同的镶木地板文件:
noDuplicatedDF.write.mode(SaveMode.Append).parquet("lookup")
这按预期工作,但是否有直接的方法将查找保存在内存中?
谢谢 万春
附加到 Parquet 绝对是正确的方法。但是,您可以优化查找。如果您可以接受内存缓存稍微延迟(即没有最新的秒数据),那么您可以定期(例如,每 5 分钟)加载当前 "lookup" parquet table 在内存中(假设它适合)。所有查找查询都将查找最新的 5 分钟快照。
您还可以通过管道将加载到内存和在不同线程中提供查询服务。