运行 没有 UDF 的每批 Pyspark 结构化流的函数

Running a function for every batch of Pyspark sctructred stream without UDF

我必须 运行 以下 Pyspark 代码。我正在从 eventhub 读取数据,使用多个函数(数据帧转换)转换数据并将数据帧写入目录。 update_session_id 函数必须为每个批次 运行,但它不适用于来自 eventhub 的数据。它只需要更新 transform_raw_data 函数中引用的查找 table,如果 current_timestamp 大于查找 table 中维护的时间戳的 2 小时。

我该如何实施?目前,update_session_id 函数只执行一次,然后不会在流的整个生命周期内执行。

df = spark.readStream.format("eventhubs").options(**conf).load() #Reading from eventhub

update_session_id(session_length, db_table) #function to update session value. Has to run for each batch or every hour

df = transform_raw_data(df, db_table) #tranforming the function

df = filter_countries(df=df, country_list=COUNTRY_CODE_ACCEPTLIST)

df = map_vehicle_type(df)

df = df_to_json(df, output_column=DATA_COLUMN)

df.writeStream  \
     .format("delta")  \
     .outputMode("append")  \
     .partitionBy("YYYYMMDD","hour") \
     .option("checkpointLocation", "BASE_PATH_RAW/CHECKPOINT_REPORTING_RAW_LOCATION")  \
     .start("BASE_PATH_RAW/REPORTING_RAW_LOCATION")

您可以使用将为每个微批次执行的 foreachBatch function 来实现此目的。在您的情况下,它可能如下所示:

def my_foreach_batch(df, epoch_id):
  update_session_id(session_length, db_table)
  df.write.format("delta").mode("append") \
    .partitionBy("YYYYMMDD","hour") \
    .save("BASE_PATH_RAW/REPORTING_RAW_LOCATION")

df.writeStream  \
  .outputMode("append")  \
  .foreachBatch(my_foreach_batch) \
  .option("checkpointLocation", "BASE_PATH_RAW/CHECKPOINT_REPORTING_RAW_LOCATION") \
  .start()

请注意,默认情况下 foreachBatch 不是幂等的,可以多次调用,例如,当流重新启动时,根据操作的复杂性,这可能会导致重复追加。在 Databricks Runtimes >= 8.4 上,您可以使用 idempotent writes into Delta tables.

来防止这种情况