SparkR 中的检查点数据帧

checkpointing DataFrames in SparkR

我正在使用 R/spark 遍历多个 csv 数据文件。每个文件的大约 1% 必须保留(根据某些条件过滤)并与下一个数据文件合并(我使用了 union/rbind)。然而,随着循环的运行,数据的谱系变得越来越长,因为 spark 记住了所有以前的数据集和 filter()-s。

有没有办法在 spark R API 中设置检查点?我了解到 spark 2.1 具有 DataFrame 的检查点,但这似乎无法从 R 获得。

我们在相当大的图(数十亿数据)和搜索连通分量时遇到了与 Scala/GraphX 相同的问题。

我不确定 R 中有哪些适用于您的特定版本,但通常的解决方法是通过 "saving" 数据打破谱系,然后重新加载它。在我们的例子中,我们每 15 次迭代就打破血统:

def refreshGraph[VD: ClassTag, ED: ClassTag](g: Graph[VD, ED], checkpointDir: String, iterationCount: Int, numPartitions: Int): Graph[VD, ED] = {
    val path = checkpointDir + "/iter-" + iterationCount
    saveGraph(g, path)
    g.unpersist()
    loadGraph(path, numPartitions)
}

一个不完整的solution/workaround是将你的dataframecollect()变成一个R对象,然后通过createDataFrame()重新并行化。这适用于小数据,但对于较大的数据集,它变得太慢并且抱怨任务太大。