根据列值是否在另一列中将列添加到 PySpark DataFrame
Adding column to PySpark DataFrame depending on whether column value is in another column
我有一个 PySpark DataFrame,其结构由
[('u1', 1, [1 ,2, 3]), ('u1', 4, [1, 2, 3])].toDF('user', 'item', 'fav_items')
我需要根据 'item' 是否在 'fav_items' 中,添加 1 或 0 的另一列。
所以我想要
[('u1', 1, [1 ,2, 3], 1), ('u1', 4, [1, 2, 3], 0)]
我如何将第二列查找到第三列以确定值,然后如何添加它?
以下代码执行请求的任务。定义了一个用户定义的函数,它接收 DataFrame
的两列作为参数。因此,对于每一行,搜索项目是否在项目列表中。如果找到该项目,则 1 为 return,否则为 0。
# Imports
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
# First we create a RDD in order to create a dataFrame:
rdd = sc.parallelize([('u1', 1, [1 ,2, 3]), ('u1', 4, [1, 2, 3])])
df = rdd.toDF(['user', 'item', 'fav_items'])
# Print dataFrame
df.show()
# We make an user define function that receives two columns and do operation
function = udf(lambda item, items: 1 if item in items else 0, IntegerType())
df.select('user', 'item', 'fav_items', function(col('item'), col('fav_items')).alias('result')).show()
这里是结果:
+----+----+---------+
|user|item|fav_items|
+----+----+---------+
| u1| 1|[1, 2, 3]|
| u1| 4|[1, 2, 3]|
+----+----+---------+
+----+----+---------+------+
|user|item|fav_items|result|
+----+----+---------+------+
| u1| 1|[1, 2, 3]| 1|
| u1| 4|[1, 2, 3]| 0|
+----+----+---------+------+
纯属娱乐的非UDF解决方案:
from pyspark.sql.functions import col, first, explode, max as max_
result = (
# Here we take exploded rows and for each row check if there
# is a match. We cast to integer (false -> 0, true -> 1)
# and take max (1 if there is any match)
max_((col("fav_item") == col("item")).cast("integer"))
).alias("result")
(df.repartition("user", "item")
# Explode array so we compare item and fav_item
.withColumn("fav_item", explode("fav_items"))
.groupBy("user", "item")
# Aggregate
# we add result and retain fav_items
.agg(result, first("fav_items").alias("fav_items")))
所以它只是:
展开fav_array
:
## +----+----+---------+--------+
## |user|item|fav_items|fav_item|
## +----+----+---------+--------+
## | u1| 1|[1, 2, 3]| 1|
## | u1| 1|[1, 2, 3]| 2|
## | u1| 1|[1, 2, 3]| 3|
## | u1| 4|[1, 2, 3]| 1|
## | u1| 4|[1, 2, 3]| 2|
## | u1| 4|[1, 2, 3]| 3|
## +----+----+---------+--------+
检查是否 fav_item
= item
(_1
是 (col("fav_item") == col("item")).cast("integer")
表达式的结果):
## +----+----+---------+--------+---+
## |user|item|fav_items|fav_item| _1|
## +----+----+---------+--------+---+
## | u1| 1|[1, 2, 3]| 1| 1|
## | u1| 1|[1, 2, 3]| 2| 0|
## | u1| 1|[1, 2, 3]| 3| 0|
## | u1| 4|[1, 2, 3]| 1| 0|
## | u1| 4|[1, 2, 3]| 2| 0|
## | u1| 4|[1, 2, 3]| 3| 0|
## +----+----+---------+--------+---+
并回滚保持 user
和 item
作为组列,任意 fav_items
(都相同)和临时列的最大值 _1
(0 或 1)。
不过我会选择 UDF。
我有一个 PySpark DataFrame,其结构由
[('u1', 1, [1 ,2, 3]), ('u1', 4, [1, 2, 3])].toDF('user', 'item', 'fav_items')
我需要根据 'item' 是否在 'fav_items' 中,添加 1 或 0 的另一列。
所以我想要
[('u1', 1, [1 ,2, 3], 1), ('u1', 4, [1, 2, 3], 0)]
我如何将第二列查找到第三列以确定值,然后如何添加它?
以下代码执行请求的任务。定义了一个用户定义的函数,它接收 DataFrame
的两列作为参数。因此,对于每一行,搜索项目是否在项目列表中。如果找到该项目,则 1 为 return,否则为 0。
# Imports
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf
# First we create a RDD in order to create a dataFrame:
rdd = sc.parallelize([('u1', 1, [1 ,2, 3]), ('u1', 4, [1, 2, 3])])
df = rdd.toDF(['user', 'item', 'fav_items'])
# Print dataFrame
df.show()
# We make an user define function that receives two columns and do operation
function = udf(lambda item, items: 1 if item in items else 0, IntegerType())
df.select('user', 'item', 'fav_items', function(col('item'), col('fav_items')).alias('result')).show()
这里是结果:
+----+----+---------+
|user|item|fav_items|
+----+----+---------+
| u1| 1|[1, 2, 3]|
| u1| 4|[1, 2, 3]|
+----+----+---------+
+----+----+---------+------+
|user|item|fav_items|result|
+----+----+---------+------+
| u1| 1|[1, 2, 3]| 1|
| u1| 4|[1, 2, 3]| 0|
+----+----+---------+------+
纯属娱乐的非UDF解决方案:
from pyspark.sql.functions import col, first, explode, max as max_
result = (
# Here we take exploded rows and for each row check if there
# is a match. We cast to integer (false -> 0, true -> 1)
# and take max (1 if there is any match)
max_((col("fav_item") == col("item")).cast("integer"))
).alias("result")
(df.repartition("user", "item")
# Explode array so we compare item and fav_item
.withColumn("fav_item", explode("fav_items"))
.groupBy("user", "item")
# Aggregate
# we add result and retain fav_items
.agg(result, first("fav_items").alias("fav_items")))
所以它只是:
展开
fav_array
:## +----+----+---------+--------+ ## |user|item|fav_items|fav_item| ## +----+----+---------+--------+ ## | u1| 1|[1, 2, 3]| 1| ## | u1| 1|[1, 2, 3]| 2| ## | u1| 1|[1, 2, 3]| 3| ## | u1| 4|[1, 2, 3]| 1| ## | u1| 4|[1, 2, 3]| 2| ## | u1| 4|[1, 2, 3]| 3| ## +----+----+---------+--------+
检查是否
fav_item
=item
(_1
是(col("fav_item") == col("item")).cast("integer")
表达式的结果):## +----+----+---------+--------+---+ ## |user|item|fav_items|fav_item| _1| ## +----+----+---------+--------+---+ ## | u1| 1|[1, 2, 3]| 1| 1| ## | u1| 1|[1, 2, 3]| 2| 0| ## | u1| 1|[1, 2, 3]| 3| 0| ## | u1| 4|[1, 2, 3]| 1| 0| ## | u1| 4|[1, 2, 3]| 2| 0| ## | u1| 4|[1, 2, 3]| 3| 0| ## +----+----+---------+--------+---+
并回滚保持
user
和item
作为组列,任意fav_items
(都相同)和临时列的最大值_1
(0 或 1)。
不过我会选择 UDF。