如何在 Spark 结构化流中使用流数据帧更新静态数据帧

How to update a Static Dataframe with Streaming Dataframe in Spark structured streaming

我有一个包含数百万行的静态 DataFrame,如下所示。

静态 DataFrame :

--------------
id|time_stamp|
--------------
|1|1540527851|
|2|1540525602|
|3|1530529187|
|4|1520529185|
|5|1510529182|
|6|1578945709|
--------------

现在在每个批次中,正在形成一个流 DataFrame,其中包含 id 并在如下操作后更新 time_stamp。

第一批:

--------------
id|time_stamp|
--------------
|1|1540527888|
|2|1540525999|
|3|1530529784|
--------------

现在在每个批次中,我想使用流数据帧的更新值更新静态数据帧,如下所示。 怎么做?

第一批后的静态 DF:

--------------
id|time_stamp|
--------------
|1|1540527888|
|2|1540525999|
|3|1530529784|
|4|1520529185|
|5|1510529182|
|6|1578945709|
--------------

我已经尝试过 except()、union() 或 'left_anti' join。但是好像structured streaming不支持这样的操作.

所以我通过 Spark 2.4.0 AddBatch 方法解决了这个问题,该方法将流式数据帧转换为迷你批处理数据帧。但是对于<2.4.0的版本还是很头疼的。

正如 Swarup 自己所解释的那样,如果您使用 Spark 2,则可以使用 forEachBatch 输出接收器。4.x。

接收器采用函数 (batchDF: DataFrame, batchId: Long) => Unit,其中 batchDF 是当前处理的流数据帧批次,可以用作静态数据帧。 因此,在此函数中,您可以使用每个批次的值更新其他数据框。

参见下面的示例: 假设您有一个名为 frameToBeUpdated 的数据框,其模式与实例变量相同,并且您希望将状态保持在那里

df
  .writeStream
  .outputMode("append")
  .foreachBatch((batch: DataFrame, batchId: Long) => {
   //batch is a static dataframe

      //take all rows from the original frames that aren't in batch and 
      //union them with the batch, then reassign to the
      //dataframe you want to keep
      frameToBeUpdated = batch.union(frameToBeUpdated.join(batch, Seq("id"), "left_anti"))
    })
    .start()

更新逻辑来自:spark: merge two dataframes, if ID duplicated in two dataframes, the row in df1 overwrites the row in df2

我有类似的问题。下面是我申请更新静态数据帧的 foreachBatch。我想知道如何 return 在 foreachBatch 中完成的更新 df。

def update_reference_df(df, static_df):
    query: StreamingQuery = df \
        .writeStream \
        .outputMode("append") \
        .format("memory") \
        .foreachBatch(lambda batch_df, batchId: update_static_df(batch_df, static_df)) \
        .start()
    return query

def update_static_df(batch_df, static_df):
    df1: DataFrame = static_df.union(batch_df.join(static_df,
                                                 (batch_df.SITE == static_df.SITE)
                                                 "left_anti"))

    return df1