Pyspark - 不持久的父数据框也从缓存中删除子数据框
Pyspark - unpersisting parent dataframe removes also the child dataframe from the cache
我正在做这样的事情:
import pandas as pd
pdf = pd.DataFrame({
'a': [1, 2, 3],
'b': ['a', 'b', 'c']
})
parent_df = spark.createDataFrame(pdf)
parent_df.cache().count()
child_df = parent_df.replace('c', 'x')
child_df.cache().count()
parent_df.unpersist()
本质上,我想缓存 parent_df
因为在接下来的步骤中,我要对其进行一些重大的转换。一旦我完成这些并返回 child_df
,我不再需要 parent_df
,因此想从缓存中释放它。但是,这样做也不会持久化新缓存的 child_df
!
显然,问题是:
- 为什么会这样?
- 我怎样才能完成我想要的(从缓存中释放
parent_df
,同时将新的 child_df
保留在缓存中)?
有趣的是,相反的情况是有效的——即如果我在最后一行不坚持 child_df
而不是 parent_df
,parent_df
将按预期保持缓存,而 child_df
将被释放。
PS:我在这里发现了一个类似的问题。但是,在这种情况下,该答案似乎不起作用,因为我们已经在缓存后立即调用了一个动作 (.count()
)。
好的,我想我找到了解决方案:
首先,我猜测为什么会发生这种情况是因为 parent_df
缓存点是 child_df
谱系的一部分。 IE。即使 child_df
使用了较晚的缓存点,其 DAG 仍包含来自 parent_df
的较早位。因此,删除该缓存点会以某种方式影响以后的缓存点。
至于如何防止这种情况发生,做以下工作:
import pandas as pd
pdf = pd.DataFrame({
'a': [1, 2, 3],
'b': ['a', 'b', 'c']
})
parent_df = spark.createDataFrame(pdf)
parent_df.cache().count()
# this is the relevant line
child_df = spark.createDataFrame(parent_df.rdd, schema=parent_df.schema)
child_df = child_df.replace('c', 'x')
child_df.cache().count()
parent_df.unpersist()
相关行(标有注释)发生的情况是 child_df
的谱系被剪切,不包括对应于 parent_df
的部分,并以 "fresh RDD" 开头.取消坚持 parent_df
然后 child_df
的血统不受影响。
再说一次 - 尽管这似乎可行,但我欢迎更多 explanation/confirmation 这个理论作为公认的答案!
这是基于数据一致性的有意识的设计决策。取消保留父项的一个可能原因是您希望其源数据发生变化。父级使用新数据,而明显的子级使用旧数据,可能会导致意外和不一致的结果。因此,当父级缓存时,父级的任何缓存子级都将失效。
the PR that implemented this change and in this bug report after the change was introduced中有一点讨论。
如第二个 link 中所述,如果您确实需要持久化子对象,您可以使用 saveAsTable
.
将其具体化为 table 来实现
我正在做这样的事情:
import pandas as pd
pdf = pd.DataFrame({
'a': [1, 2, 3],
'b': ['a', 'b', 'c']
})
parent_df = spark.createDataFrame(pdf)
parent_df.cache().count()
child_df = parent_df.replace('c', 'x')
child_df.cache().count()
parent_df.unpersist()
本质上,我想缓存 parent_df
因为在接下来的步骤中,我要对其进行一些重大的转换。一旦我完成这些并返回 child_df
,我不再需要 parent_df
,因此想从缓存中释放它。但是,这样做也不会持久化新缓存的 child_df
!
显然,问题是:
- 为什么会这样?
- 我怎样才能完成我想要的(从缓存中释放
parent_df
,同时将新的child_df
保留在缓存中)?
有趣的是,相反的情况是有效的——即如果我在最后一行不坚持 child_df
而不是 parent_df
,parent_df
将按预期保持缓存,而 child_df
将被释放。
PS:我在这里发现了一个类似的问题.count()
)。
好的,我想我找到了解决方案:
首先,我猜测为什么会发生这种情况是因为
parent_df
缓存点是child_df
谱系的一部分。 IE。即使child_df
使用了较晚的缓存点,其 DAG 仍包含来自parent_df
的较早位。因此,删除该缓存点会以某种方式影响以后的缓存点。至于如何防止这种情况发生,做以下工作:
import pandas as pd
pdf = pd.DataFrame({
'a': [1, 2, 3],
'b': ['a', 'b', 'c']
})
parent_df = spark.createDataFrame(pdf)
parent_df.cache().count()
# this is the relevant line
child_df = spark.createDataFrame(parent_df.rdd, schema=parent_df.schema)
child_df = child_df.replace('c', 'x')
child_df.cache().count()
parent_df.unpersist()
相关行(标有注释)发生的情况是 child_df
的谱系被剪切,不包括对应于 parent_df
的部分,并以 "fresh RDD" 开头.取消坚持 parent_df
然后 child_df
的血统不受影响。
再说一次 - 尽管这似乎可行,但我欢迎更多 explanation/confirmation 这个理论作为公认的答案!
这是基于数据一致性的有意识的设计决策。取消保留父项的一个可能原因是您希望其源数据发生变化。父级使用新数据,而明显的子级使用旧数据,可能会导致意外和不一致的结果。因此,当父级缓存时,父级的任何缓存子级都将失效。
the PR that implemented this change and in this bug report after the change was introduced中有一点讨论。
如第二个 link 中所述,如果您确实需要持久化子对象,您可以使用 saveAsTable
.