加入操作前如何转换DataFrame?
How to transform DataFrame before joining operation?
以下代码用于从列 products
中提取排名。排名是每对 [...]
中的第二个数字。例如,在给定的示例 [[222,66],[333,55]]
中,PK 为 222
和 333
的产品的排名分别为 66
和 55
。
但是当 df_products
大约为 800 Mb 时,Spark 2.2 中的代码运行非常缓慢:
df_products.createOrReplaceTempView("df_products")
val result = df.as("df2")
.join(spark.sql("SELECT * FROM df_products")
.select($"product_PK", explode($"products").as("products"))
.withColumnRenamed("product_PK","product_PK_temp").as("df1"),$"df2.product _PK" === $"df1.product_PK_temp" and $"df2.rec_product_PK" === $"df1.products.product_PK", "left")
.drop($"df1.product_PK_temp")
.select($"product_PK", $"rec_product_PK", coalesce($"df1.products.col2", lit(0.0)).as("rank_product"))
这是 df_products
和 df
的小样本:
df_products =
+----------+--------------------+
|product_PK| products|
+----------+--------------------+
| 111|[[222,66],[333,55...|
| 222|[[333,24],[444,77...|
...
+----------+--------------------+
df=
+----------+-----------------+
|product_PK| rec_product_PK|
+----------+-----------------+
| 111| 222|
| 222| 888|
+----------+-----------------+
当 products
的每一行中的数组包含少量元素时,上述代码运行良好。但是当每一行的数组元素很多的时候[[..],[..],...]
,那么代码就好像卡住了,不前进了
如何优化代码?非常感谢任何帮助。
是否可以在加入之前将df_products
转换成下面的DataFrame?
df_products =
+----------+--------------------+------+
|product_PK| rec_product_PK| rank|
+----------+--------------------+------+
| 111| 222| 66|
| 111| 333| 55|
| 222| 333| 24|
| 222| 444| 77|
...
+----------+--------------------+------+
根据我的回答 ,您可以使用如下方式转换 df_products:
import org.apache.spark.sql.functions.explode
df1 = df.withColumn("array_elem", explode(df("products"))
df2 = df1.select("product_PK", "array_elem.*")
这假设产品是一个结构数组。如果 products 是数组的数组,您可以使用以下代替:
df2 = df1.withColumn("rank", df2("products").getItem(1))
以下代码用于从列 products
中提取排名。排名是每对 [...]
中的第二个数字。例如,在给定的示例 [[222,66],[333,55]]
中,PK 为 222
和 333
的产品的排名分别为 66
和 55
。
但是当 df_products
大约为 800 Mb 时,Spark 2.2 中的代码运行非常缓慢:
df_products.createOrReplaceTempView("df_products")
val result = df.as("df2")
.join(spark.sql("SELECT * FROM df_products")
.select($"product_PK", explode($"products").as("products"))
.withColumnRenamed("product_PK","product_PK_temp").as("df1"),$"df2.product _PK" === $"df1.product_PK_temp" and $"df2.rec_product_PK" === $"df1.products.product_PK", "left")
.drop($"df1.product_PK_temp")
.select($"product_PK", $"rec_product_PK", coalesce($"df1.products.col2", lit(0.0)).as("rank_product"))
这是 df_products
和 df
的小样本:
df_products =
+----------+--------------------+
|product_PK| products|
+----------+--------------------+
| 111|[[222,66],[333,55...|
| 222|[[333,24],[444,77...|
...
+----------+--------------------+
df=
+----------+-----------------+
|product_PK| rec_product_PK|
+----------+-----------------+
| 111| 222|
| 222| 888|
+----------+-----------------+
当 products
的每一行中的数组包含少量元素时,上述代码运行良好。但是当每一行的数组元素很多的时候[[..],[..],...]
,那么代码就好像卡住了,不前进了
如何优化代码?非常感谢任何帮助。
是否可以在加入之前将df_products
转换成下面的DataFrame?
df_products =
+----------+--------------------+------+
|product_PK| rec_product_PK| rank|
+----------+--------------------+------+
| 111| 222| 66|
| 111| 333| 55|
| 222| 333| 24|
| 222| 444| 77|
...
+----------+--------------------+------+
根据我的回答
import org.apache.spark.sql.functions.explode
df1 = df.withColumn("array_elem", explode(df("products"))
df2 = df1.select("product_PK", "array_elem.*")
这假设产品是一个结构数组。如果 products 是数组的数组,您可以使用以下代替:
df2 = df1.withColumn("rank", df2("products").getItem(1))