循环遍历数据框并同时更新查找 table:spark scala

Loop through dataframe and update the lookup table simultaneously: spark scala

我有一个 DataFrame 如下所示。

+---+-------------+-----+
| id|AccountNumber|scale|
+---+-------------+-----+
|  1|      1500847|    6|
|  2|      1501199|    7|
|  3|      1119024|    3|
+---+-------------+-----+

我必须填充第二个 DataFrame,它最初是空的,如下所示。

id  AccountNumber   scale
1   1500847         6
2   1501199         6
3   1119024         3

输出说明

第一个 DataFrame 中的第一行的 scale 为 6。在结果中检查该值减 1(因此 scale 等于 5)。那里 none,所以只需将行 (1,1500847,6) 添加到输出。

输出中的第二行 scale 为 7。原来的 table 已经有一行 scale 7 - 1,所以添加这一行但使用该比例(2, 15001199, 6).

第三行与第一行相同。

使用广播列表

您可以收集 scale列中的所有尺度作为数组broadcast 它将在 udf 函数中使用。然后在when逻辑中使用udf函数与withColumn作为

import org.apache.spark.sql.functions._
val collectedList = sc.broadcast(df.select(collect_list("scale")).collect()(0)(0).asInstanceOf[collection.mutable.WrappedArray[Int]])

import org.apache.spark.sql.functions._
def newScale = udf((scale: Int)=> collectedList.value.contains(scale))

df.withColumn("scale", when(newScale(col("scale")-1), col("scale")-1).otherwise(col("scale")))
  .show(false)

你应该得到想要的输出

+---+-------------+-----+
|id |AccountNumber|scale|
+---+-------------+-----+
|1  |1500847      |6    |
|2  |1501199      |6    |
|3  |1119024      |3    |
+---+-------------+-----+

使用Window函数

我要建议的解决方案需要您使用 Window 函数收集 one executor 中的所有数据以形成另一列 scaleCheck scale 列中的所有比例将填充为

import org.apache.spark.sql.expressions.Window
def windowSpec = Window.orderBy("id").rowsBetween(Long.MinValue, Long.MaxValue)
val tempdf = df.withColumn("scaleCheck", collect_list("scale").over(windowSpec))

这会给你 dataframe

+---+-------------+-----+----------+
|id |AccountNumber|scale|scaleCheck|
+---+-------------+-----+----------+
|1  |1500847      |6    |[6, 7, 3] |
|2  |1501199      |7    |[6, 7, 3] |
|3  |1119024      |3    |[6, 7, 3] |
+---+-------------+-----+----------+

然后你将不得不编写一个udf函数来检查行中的比例是否已经存在于收集的列表中。然后使用when函数并调用udf函数,可以生成scale

import org.apache.spark.sql.functions._
def newScale = udf((scale: Int, scaleCheck: collection.mutable.WrappedArray[Int])=> scaleCheck.contains(scale))

tempdf.withColumn("scale", when(newScale(col("scale")-1, col("scaleCheck")), col("scale")-1).otherwise(col("scale")))
  .drop("scaleCheck")
  .show(false)

所以你的最终要求 dataframe 已经达到了上面给出的

希望回答对你有帮助