如何从火花流数据延迟构建缓存

How to lazily build a cache from spark streaming data

我是 运行 一个流媒体工作,想逐步构建一个查找映射(例如跟踪唯一项,过滤重复的传入),最初我想在缓存中保留一个 DataFrame,并将其与每个批次中创建的新 DataFrame 联合起来,像这样

 items.foreachRDD((rdd: RDD[String]) => { 
     ...   
        val uf =  rdd.toDF 
        cached_df = cached_df.unionAll(uf) 
        cached_df.cache 
        cached_df.count   // materialize the 
     ... 
    }) 

我担心的是,cached_df 似乎记得从每个批处理迭代附加到先前 RDD 的所有谱系,在我的例子中,如果我不关心重新计算这个缓存 RDD 如果它崩溃了,那是维护不断增长的 DAG 的开销吗?

作为替代方案,在每批开始时,我从 parquet 文件加载查找,而不是将其保存在内存中,然后在每批结束时,我将新的 RDD 附加到相同的镶木地板文件:

 noDuplicatedDF.write.mode(SaveMode.Append).parquet("lookup")

这按预期工作,但是否有直接的方法将查找保存在内存中?

谢谢 万春

附加到 Parquet 绝对是正确的方法。但是,您可以优化查找。如果您可以接受内存缓存稍微延迟(即没有最新的秒数据),那么您可以定期(例如,每 5 分钟)加载当前 "lookup" parquet table 在内存中(假设它适合)。所有查找查询都将查找最新的 5 分钟快照。

您还可以通过管道将加载到内存和在不同线程中提供查询服务。