重用 pyspark 缓存并取消坚持 for 循环

Reusing pyspark cache and unpersist in for loop

我有很多数据要分块取出 - 假设是 3 个块 - 而不是一次将它们全部缓存在内存中。但是,我想在之后同时保存它(动作)。

这是当前的简化策略:

for query in [query1,query2,query3]:

    df = spark.sql(query)

    df.cache()

    df1 = df.filter('a')
    df2 = df.filter('b')

    final_output_1 = final_output_1.join(df1)
    final_output_2 = final_output_2.join(df2)

    df.unpersist()


final_output_1.write.saveAsTable()
final_output_2.write.saveAsTable()

因此 第一个问题: unpersist() 在这里不起作用,因为 df 上还没有任何操作?

第二个问题:当我在 for 循环中重用 df 变量时,df.cache() 在这里如何工作?我知道它是不可变的,所以它会制作一个副本,但 unpersist() 实际上会清除该内存吗?

当您想一次又一次地重新使用数据帧时,在 Spark 中使用缓存,

例如:映射表

一旦你缓存了 teh df,你就需要一个动作操作来物理地将数据移动到内存中,因为 spark 是基于惰性执行的。

你的情况

df.cache()

将无法按预期工作,因为在此之后您没有执行任何操作。

要使缓存正常工作,您需要 运行 df.count()df.show() 或任何其他将数据移动到内存的操作,否则您的数据将不会移动到内存,您将不会获得任何优势。所以 df.unpersist() 也是多余的。

第一个问题:

不,你的 df.cache()df.unpersist() 将无法工作,因为没有数据被缓存首先,他们没有什么可以不坚持的。

第二个问题:

是的,您可以使用相同的变量名,如果执行了某个操作,数据将被缓存,并且在您的操作之后 df.unpersist() 将取消保存每个循环中的数据。 所以前一个DF与下一个循环中的下一个DF没有联系。 正如您所说,它们是 immutable ,并且由于您在每个循环中将新查询分配给同一变量,因此它充当新的 DF(与以前的 DF 无关)。

根据您的代码,我认为您不需要进行缓存,因为您只执行一项操作。

参考 and If I cache a Spark Dataframe and then overwrite the reference, will the original data frame still be cached?