循环遍历数据框并同时更新查找 table:spark scala
Loop through dataframe and update the lookup table simultaneously: spark scala
我有一个 DataFrame
如下所示。
+---+-------------+-----+
| id|AccountNumber|scale|
+---+-------------+-----+
| 1| 1500847| 6|
| 2| 1501199| 7|
| 3| 1119024| 3|
+---+-------------+-----+
我必须填充第二个 DataFrame
,它最初是空的,如下所示。
id AccountNumber scale
1 1500847 6
2 1501199 6
3 1119024 3
输出说明
第一个 DataFrame
中的第一行的 scale
为 6。在结果中检查该值减 1(因此 scale
等于 5)。那里 none,所以只需将行 (1,1500847,6)
添加到输出。
输出中的第二行 scale
为 7。原来的 table 已经有一行 scale
7 - 1,所以添加这一行但使用该比例(2, 15001199, 6)
.
第三行与第一行相同。
使用广播列表
您可以收集 scale
列中的所有尺度作为数组 和 broadcast
它将在 udf
函数中使用。然后在when
逻辑中使用udf
函数与withColumn
作为
import org.apache.spark.sql.functions._
val collectedList = sc.broadcast(df.select(collect_list("scale")).collect()(0)(0).asInstanceOf[collection.mutable.WrappedArray[Int]])
import org.apache.spark.sql.functions._
def newScale = udf((scale: Int)=> collectedList.value.contains(scale))
df.withColumn("scale", when(newScale(col("scale")-1), col("scale")-1).otherwise(col("scale")))
.show(false)
你应该得到想要的输出
+---+-------------+-----+
|id |AccountNumber|scale|
+---+-------------+-----+
|1 |1500847 |6 |
|2 |1501199 |6 |
|3 |1119024 |3 |
+---+-------------+-----+
使用Window函数
我要建议的解决方案需要您使用 Window
函数收集 one executor 中的所有数据以形成另一列 scaleCheck
scale
列中的所有比例将填充为
import org.apache.spark.sql.expressions.Window
def windowSpec = Window.orderBy("id").rowsBetween(Long.MinValue, Long.MaxValue)
val tempdf = df.withColumn("scaleCheck", collect_list("scale").over(windowSpec))
这会给你 dataframe
+---+-------------+-----+----------+
|id |AccountNumber|scale|scaleCheck|
+---+-------------+-----+----------+
|1 |1500847 |6 |[6, 7, 3] |
|2 |1501199 |7 |[6, 7, 3] |
|3 |1119024 |3 |[6, 7, 3] |
+---+-------------+-----+----------+
然后你将不得不编写一个udf
函数来检查行中的比例是否已经存在于收集的列表中。然后使用when
函数并调用udf
函数,可以生成scale
值
import org.apache.spark.sql.functions._
def newScale = udf((scale: Int, scaleCheck: collection.mutable.WrappedArray[Int])=> scaleCheck.contains(scale))
tempdf.withColumn("scale", when(newScale(col("scale")-1, col("scaleCheck")), col("scale")-1).otherwise(col("scale")))
.drop("scaleCheck")
.show(false)
所以你的最终要求 dataframe
已经达到了上面给出的
希望回答对你有帮助
我有一个 DataFrame
如下所示。
+---+-------------+-----+
| id|AccountNumber|scale|
+---+-------------+-----+
| 1| 1500847| 6|
| 2| 1501199| 7|
| 3| 1119024| 3|
+---+-------------+-----+
我必须填充第二个 DataFrame
,它最初是空的,如下所示。
id AccountNumber scale
1 1500847 6
2 1501199 6
3 1119024 3
输出说明
第一个 DataFrame
中的第一行的 scale
为 6。在结果中检查该值减 1(因此 scale
等于 5)。那里 none,所以只需将行 (1,1500847,6)
添加到输出。
输出中的第二行 scale
为 7。原来的 table 已经有一行 scale
7 - 1,所以添加这一行但使用该比例(2, 15001199, 6)
.
第三行与第一行相同。
使用广播列表
您可以收集 scale
列中的所有尺度作为数组 和 broadcast
它将在 udf
函数中使用。然后在when
逻辑中使用udf
函数与withColumn
作为
import org.apache.spark.sql.functions._
val collectedList = sc.broadcast(df.select(collect_list("scale")).collect()(0)(0).asInstanceOf[collection.mutable.WrappedArray[Int]])
import org.apache.spark.sql.functions._
def newScale = udf((scale: Int)=> collectedList.value.contains(scale))
df.withColumn("scale", when(newScale(col("scale")-1), col("scale")-1).otherwise(col("scale")))
.show(false)
你应该得到想要的输出
+---+-------------+-----+
|id |AccountNumber|scale|
+---+-------------+-----+
|1 |1500847 |6 |
|2 |1501199 |6 |
|3 |1119024 |3 |
+---+-------------+-----+
使用Window函数
我要建议的解决方案需要您使用 Window
函数收集 one executor 中的所有数据以形成另一列 scaleCheck
scale
列中的所有比例将填充为
import org.apache.spark.sql.expressions.Window
def windowSpec = Window.orderBy("id").rowsBetween(Long.MinValue, Long.MaxValue)
val tempdf = df.withColumn("scaleCheck", collect_list("scale").over(windowSpec))
这会给你 dataframe
+---+-------------+-----+----------+
|id |AccountNumber|scale|scaleCheck|
+---+-------------+-----+----------+
|1 |1500847 |6 |[6, 7, 3] |
|2 |1501199 |7 |[6, 7, 3] |
|3 |1119024 |3 |[6, 7, 3] |
+---+-------------+-----+----------+
然后你将不得不编写一个udf
函数来检查行中的比例是否已经存在于收集的列表中。然后使用when
函数并调用udf
函数,可以生成scale
值
import org.apache.spark.sql.functions._
def newScale = udf((scale: Int, scaleCheck: collection.mutable.WrappedArray[Int])=> scaleCheck.contains(scale))
tempdf.withColumn("scale", when(newScale(col("scale")-1, col("scaleCheck")), col("scale")-1).otherwise(col("scale")))
.drop("scaleCheck")
.show(false)
所以你的最终要求 dataframe
已经达到了上面给出的
希望回答对你有帮助