运行 没有 UDF 的每批 Pyspark 结构化流的函数
Running a function for every batch of Pyspark sctructred stream without UDF
我必须 运行 以下 Pyspark 代码。我正在从 eventhub 读取数据,使用多个函数(数据帧转换)转换数据并将数据帧写入目录。 update_session_id 函数必须为每个批次 运行,但它不适用于来自 eventhub 的数据。它只需要更新 transform_raw_data 函数中引用的查找 table,如果 current_timestamp 大于查找 table 中维护的时间戳的 2 小时。
我该如何实施?目前,update_session_id 函数只执行一次,然后不会在流的整个生命周期内执行。
df = spark.readStream.format("eventhubs").options(**conf).load() #Reading from eventhub
update_session_id(session_length, db_table) #function to update session value. Has to run for each batch or every hour
df = transform_raw_data(df, db_table) #tranforming the function
df = filter_countries(df=df, country_list=COUNTRY_CODE_ACCEPTLIST)
df = map_vehicle_type(df)
df = df_to_json(df, output_column=DATA_COLUMN)
df.writeStream \
.format("delta") \
.outputMode("append") \
.partitionBy("YYYYMMDD","hour") \
.option("checkpointLocation", "BASE_PATH_RAW/CHECKPOINT_REPORTING_RAW_LOCATION") \
.start("BASE_PATH_RAW/REPORTING_RAW_LOCATION")
您可以使用将为每个微批次执行的 foreachBatch function 来实现此目的。在您的情况下,它可能如下所示:
def my_foreach_batch(df, epoch_id):
update_session_id(session_length, db_table)
df.write.format("delta").mode("append") \
.partitionBy("YYYYMMDD","hour") \
.save("BASE_PATH_RAW/REPORTING_RAW_LOCATION")
df.writeStream \
.outputMode("append") \
.foreachBatch(my_foreach_batch) \
.option("checkpointLocation", "BASE_PATH_RAW/CHECKPOINT_REPORTING_RAW_LOCATION") \
.start()
请注意,默认情况下 foreachBatch 不是幂等的,可以多次调用,例如,当流重新启动时,根据操作的复杂性,这可能会导致重复追加。在 Databricks Runtimes >= 8.4 上,您可以使用 idempotent writes into Delta tables.
来防止这种情况
我必须 运行 以下 Pyspark 代码。我正在从 eventhub 读取数据,使用多个函数(数据帧转换)转换数据并将数据帧写入目录。 update_session_id 函数必须为每个批次 运行,但它不适用于来自 eventhub 的数据。它只需要更新 transform_raw_data 函数中引用的查找 table,如果 current_timestamp 大于查找 table 中维护的时间戳的 2 小时。
我该如何实施?目前,update_session_id 函数只执行一次,然后不会在流的整个生命周期内执行。
df = spark.readStream.format("eventhubs").options(**conf).load() #Reading from eventhub
update_session_id(session_length, db_table) #function to update session value. Has to run for each batch or every hour
df = transform_raw_data(df, db_table) #tranforming the function
df = filter_countries(df=df, country_list=COUNTRY_CODE_ACCEPTLIST)
df = map_vehicle_type(df)
df = df_to_json(df, output_column=DATA_COLUMN)
df.writeStream \
.format("delta") \
.outputMode("append") \
.partitionBy("YYYYMMDD","hour") \
.option("checkpointLocation", "BASE_PATH_RAW/CHECKPOINT_REPORTING_RAW_LOCATION") \
.start("BASE_PATH_RAW/REPORTING_RAW_LOCATION")
您可以使用将为每个微批次执行的 foreachBatch function 来实现此目的。在您的情况下,它可能如下所示:
def my_foreach_batch(df, epoch_id):
update_session_id(session_length, db_table)
df.write.format("delta").mode("append") \
.partitionBy("YYYYMMDD","hour") \
.save("BASE_PATH_RAW/REPORTING_RAW_LOCATION")
df.writeStream \
.outputMode("append") \
.foreachBatch(my_foreach_batch) \
.option("checkpointLocation", "BASE_PATH_RAW/CHECKPOINT_REPORTING_RAW_LOCATION") \
.start()
请注意,默认情况下 foreachBatch 不是幂等的,可以多次调用,例如,当流重新启动时,根据操作的复杂性,这可能会导致重复追加。在 Databricks Runtimes >= 8.4 上,您可以使用 idempotent writes into Delta tables.
来防止这种情况