如何在 Spark 结构化流中使用流数据帧更新静态数据帧
How to update a Static Dataframe with Streaming Dataframe in Spark structured streaming
我有一个包含数百万行的静态 DataFrame
,如下所示。
静态 DataFrame
:
--------------
id|time_stamp|
--------------
|1|1540527851|
|2|1540525602|
|3|1530529187|
|4|1520529185|
|5|1510529182|
|6|1578945709|
--------------
现在在每个批次中,正在形成一个流 DataFrame
,其中包含 id 并在如下操作后更新 time_stamp。
第一批:
--------------
id|time_stamp|
--------------
|1|1540527888|
|2|1540525999|
|3|1530529784|
--------------
现在在每个批次中,我想使用流数据帧的更新值更新静态数据帧,如下所示。 怎么做?
第一批后的静态 DF:
--------------
id|time_stamp|
--------------
|1|1540527888|
|2|1540525999|
|3|1530529784|
|4|1520529185|
|5|1510529182|
|6|1578945709|
--------------
我已经尝试过 except()、union() 或 'left_anti' join。但是好像structured streaming不支持这样的操作.
所以我通过 Spark 2.4.0 AddBatch 方法解决了这个问题,该方法将流式数据帧转换为迷你批处理数据帧。但是对于<2.4.0的版本还是很头疼的。
正如 Swarup 自己所解释的那样,如果您使用 Spark 2,则可以使用 forEachBatch 输出接收器。4.x。
接收器采用函数 (batchDF: DataFrame, batchId: Long) => Unit
,其中 batchDF 是当前处理的流数据帧批次,可以用作静态数据帧。
因此,在此函数中,您可以使用每个批次的值更新其他数据框。
参见下面的示例:
假设您有一个名为 frameToBeUpdated
的数据框,其模式与实例变量相同,并且您希望将状态保持在那里
df
.writeStream
.outputMode("append")
.foreachBatch((batch: DataFrame, batchId: Long) => {
//batch is a static dataframe
//take all rows from the original frames that aren't in batch and
//union them with the batch, then reassign to the
//dataframe you want to keep
frameToBeUpdated = batch.union(frameToBeUpdated.join(batch, Seq("id"), "left_anti"))
})
.start()
更新逻辑来自:spark: merge two dataframes, if ID duplicated in two dataframes, the row in df1 overwrites the row in df2
我有类似的问题。下面是我申请更新静态数据帧的 foreachBatch。我想知道如何 return 在 foreachBatch 中完成的更新 df。
def update_reference_df(df, static_df):
query: StreamingQuery = df \
.writeStream \
.outputMode("append") \
.format("memory") \
.foreachBatch(lambda batch_df, batchId: update_static_df(batch_df, static_df)) \
.start()
return query
def update_static_df(batch_df, static_df):
df1: DataFrame = static_df.union(batch_df.join(static_df,
(batch_df.SITE == static_df.SITE)
"left_anti"))
return df1
我有一个包含数百万行的静态 DataFrame
,如下所示。
静态 DataFrame
:
--------------
id|time_stamp|
--------------
|1|1540527851|
|2|1540525602|
|3|1530529187|
|4|1520529185|
|5|1510529182|
|6|1578945709|
--------------
现在在每个批次中,正在形成一个流 DataFrame
,其中包含 id 并在如下操作后更新 time_stamp。
第一批:
--------------
id|time_stamp|
--------------
|1|1540527888|
|2|1540525999|
|3|1530529784|
--------------
现在在每个批次中,我想使用流数据帧的更新值更新静态数据帧,如下所示。 怎么做?
第一批后的静态 DF:
--------------
id|time_stamp|
--------------
|1|1540527888|
|2|1540525999|
|3|1530529784|
|4|1520529185|
|5|1510529182|
|6|1578945709|
--------------
我已经尝试过 except()、union() 或 'left_anti' join。但是好像structured streaming不支持这样的操作.
所以我通过 Spark 2.4.0 AddBatch 方法解决了这个问题,该方法将流式数据帧转换为迷你批处理数据帧。但是对于<2.4.0的版本还是很头疼的。
正如 Swarup 自己所解释的那样,如果您使用 Spark 2,则可以使用 forEachBatch 输出接收器。4.x。
接收器采用函数 (batchDF: DataFrame, batchId: Long) => Unit
,其中 batchDF 是当前处理的流数据帧批次,可以用作静态数据帧。
因此,在此函数中,您可以使用每个批次的值更新其他数据框。
参见下面的示例:
假设您有一个名为 frameToBeUpdated
的数据框,其模式与实例变量相同,并且您希望将状态保持在那里
df
.writeStream
.outputMode("append")
.foreachBatch((batch: DataFrame, batchId: Long) => {
//batch is a static dataframe
//take all rows from the original frames that aren't in batch and
//union them with the batch, then reassign to the
//dataframe you want to keep
frameToBeUpdated = batch.union(frameToBeUpdated.join(batch, Seq("id"), "left_anti"))
})
.start()
更新逻辑来自:spark: merge two dataframes, if ID duplicated in two dataframes, the row in df1 overwrites the row in df2
我有类似的问题。下面是我申请更新静态数据帧的 foreachBatch。我想知道如何 return 在 foreachBatch 中完成的更新 df。
def update_reference_df(df, static_df):
query: StreamingQuery = df \
.writeStream \
.outputMode("append") \
.format("memory") \
.foreachBatch(lambda batch_df, batchId: update_static_df(batch_df, static_df)) \
.start()
return query
def update_static_df(batch_df, static_df):
df1: DataFrame = static_df.union(batch_df.join(static_df,
(batch_df.SITE == static_df.SITE)
"left_anti"))
return df1