pyspark 诱人的行为

pyspark temptable behaviour

我想实现一个功能,我会有一些BASE数据,我会得到增量数据。

我将结合两者并执行一些操作(SQL 查询),成功后我将有 BASE = BASE + 增量用于下一个 运行.

在例外情况下,我的 baseData 将是 BASE_Data(增量不应该是这里的一部分)。

我已经尝试通过下面的代码进行解释。

我对火花温度的行为感到困惑table...

# i am reading 2 files and persisting them in MEMORY_ONLY
df = spark.read.csv('BASE_data.csv', header=True)
df.persist()
print(df.count())  #o/p:4
df1 = spark.read.csv('data.csv', header=True)
df1.persist()
print(df1.count())  #o/p:4

# i will register temp tables
df.registerTempTable('BASE_data')
spark.sql('select count(1) from BASE_data').show()  # 4 which is fine
# i will append rows from df1 to df(BASE_data) and registered as combined_data
spark.sql("select * from {0}".format('BASE_data')).union(df1).registerTempTable('combined_data')
spark.sql('select count(1) from combined_data').show()  # 8 which is fine too

# Now i am going to unpersist df1 from memory and also change the variable
df1.unpersist()
df1=[]
spark.sql('select count(1) from combined_data').show()
# o/p=8, i am confused here, it should be 4
# when i unpersisted, spark might try to rebuild df1, by reading that file,
# so to be double sure, i reassign df1 to some empty list.

我需要帮助来理解这个行为以及我如何实现这个功能。

我正在计划以下简单方法,如果有任何其他方法

-- 我不想保留 BASE_data 和 Combine_DATA 状态,我可以通过单个 temp_table 定义来实现吗? spark.sql("select * from {0}".format('BASE_data')).union(df1).registerTempTable('BASE_data')

-- 我不想创建一段时间后将不再使用的东西,并且执行会消耗内存。 BASE_data 出现异常时应回退到原始 BASE_data i:e 新的附加数据 (df1) 应从 BASE_data 中删除或通过取消坚持。

如果有什么不清楚的地方请告诉我,我会尽力解释,谢谢。

try:
    # create combine_data by union
    # do sql ops
    # BASE_DATA  = select * from cobine_data
except Exception:
    # BASE_data = BASE_Data # Basically do nothing

也在"what kind of clean up i can do in the exception-Block for whatever(is ever) junk i might have created in Try-block "中帮助我。我真的很关心内存管理。谢谢

您将 df1 与 df(在名为 'BASE_data' 的临时 table 中注册)合并,然后使用名为 combined_data 的数据创建 table。方法 registerTempTable() 是一个动作,因此当时 DAG(有向无环图)被评估(使用 df 和 temp_table 'BASE_data' 的当前值),因此数据被复制到不同的地方内存,现在独立于 df1 和 df。此时删除 df1 对 combined_data 中的值没有影响,因为它们已经被计算过。

我不明白为什么您期望第二次计数得到 4 而不是正确的 8。 table 是在您执行并集的行上创建的,并且从那时起不会更改,因此结果不会更改。