对两列应用 Window.partitionBy 以获得 pyspark 中的 n-core 数据集
apply Window.partitionBy for two columns to get n-core dataset in pyspark
我有一个 2M 条目的数据集,其中包含用户、项目、评级信息。我想过滤掉数据,使其包含至少有 2 个用户评价的项目和至少有 2 个项目评价的用户。我可以使用 window 函数完成一个约束,但不确定如何同时完成这两个约束。
输入:
user
product
rating
J
p1
3
J
p2
4
M
p1
4
M
p3
3
B
p2
3
B
p4
3
B
p3
3
N
p3
2
N
p5
4
这里是示例数据。
from pyspark import SparkContext
from pyspark.sql import SparkSession
# Create Spark Context
sc = SparkSession.builder.master("local[*]")\
.config("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.1.2")\
.getOrCreate()
sampleData = (("J", "p1", 3), \
("J", "p2", 4), \
("M", "p1", 4), \
("M", "p3", 3), \
("B", "p2", 3), \
("B", "p4", 3), \
("B", "p3", 3), \
("N", "p3", 2),\
("N", "p5", 4) \
)
columns= ["user", "product", "rating"]
df = sc.createDataFrame(data = sampleData, schema = columns)
期望的输出是,
user
product
rating
J
p1
3
J
p2
4
M
p1
4
M
p3
3
B
p2
3
B
p3
3
window 我用来满足“评价至少 2 个项目的用户”的功能是
from pyspark.sql import functions as F
from pyspark.sql.functions import count, col
from pyspark.sql.window import Window
window = Window.partitionBy("user")
df.withColumn("count", F.count("rating").over(window))\
.filter(F.col("count") >= 2).drop("count")
下面的怎么样?
df = spark.createDataFrame(data = sampleData, schema = columns)
df_p = df.groupBy('product').count().filter('count >= 2').select('product')
df = df.join(df_p, ['product'], 'inner')
df_u = df.select('user').groupBy('user').count().filter('count >=
2').select('user')
df = df.join(df_u, ['user'], 'inner')
给出以下输出:
user
product
rating
B
p2
3
B
p3
3
M
p1
4
M
p3
3
J
p2
4
J
p1
3
您可以使用两个 window 函数来做到这一点。我不太熟悉 df 语法,这里是 sql:
df.createOrReplaceTempView("ratings")
spark.sql("""
SELECT USER,
product,
rating,
Count(*)OVER (partition BY USER ) num_ratings_for_user,
Count(*)OVER (partition BY product ) num_raters_for_product
FROM ratings
""")
你可以过滤这个。
from pyspark.sql import functions as F
from pyspark.sql.window import Window
window1 = Window.partitionBy("user")
window2 = Window.partitionBy("product")
df.withColumn("count_users", F.count("rating").over(window1))\
.filter(F.col("count_users") >= 2)\
.withColumn("count_prod", F.count("rating").over(window2))\
.filter(F.col("count_prod") >= 2)\
.drop("count", "count_users", "count_prod")\
.show()
用户 N 对超过 1 个产品进行了评分,因此输出应为:
+----+-------+------+
|user|product|rating|
+----+-------+------+
| J| p1| 3|
| M| p1| 4|
| B| p2| 3|
| J| p2| 4|
| B| p3| 3|
| M| p3| 3|
| N| p3| 2|
+----+-------+------+
我有一个 2M 条目的数据集,其中包含用户、项目、评级信息。我想过滤掉数据,使其包含至少有 2 个用户评价的项目和至少有 2 个项目评价的用户。我可以使用 window 函数完成一个约束,但不确定如何同时完成这两个约束。
输入:
user | product | rating |
---|---|---|
J | p1 | 3 |
J | p2 | 4 |
M | p1 | 4 |
M | p3 | 3 |
B | p2 | 3 |
B | p4 | 3 |
B | p3 | 3 |
N | p3 | 2 |
N | p5 | 4 |
这里是示例数据。
from pyspark import SparkContext
from pyspark.sql import SparkSession
# Create Spark Context
sc = SparkSession.builder.master("local[*]")\
.config("spark.jars.packages", "org.apache.spark:spark-avro_2.12:3.1.2")\
.getOrCreate()
sampleData = (("J", "p1", 3), \
("J", "p2", 4), \
("M", "p1", 4), \
("M", "p3", 3), \
("B", "p2", 3), \
("B", "p4", 3), \
("B", "p3", 3), \
("N", "p3", 2),\
("N", "p5", 4) \
)
columns= ["user", "product", "rating"]
df = sc.createDataFrame(data = sampleData, schema = columns)
期望的输出是,
user | product | rating |
---|---|---|
J | p1 | 3 |
J | p2 | 4 |
M | p1 | 4 |
M | p3 | 3 |
B | p2 | 3 |
B | p3 | 3 |
window 我用来满足“评价至少 2 个项目的用户”的功能是
from pyspark.sql import functions as F
from pyspark.sql.functions import count, col
from pyspark.sql.window import Window
window = Window.partitionBy("user")
df.withColumn("count", F.count("rating").over(window))\
.filter(F.col("count") >= 2).drop("count")
下面的怎么样?
df = spark.createDataFrame(data = sampleData, schema = columns)
df_p = df.groupBy('product').count().filter('count >= 2').select('product')
df = df.join(df_p, ['product'], 'inner')
df_u = df.select('user').groupBy('user').count().filter('count >=
2').select('user')
df = df.join(df_u, ['user'], 'inner')
给出以下输出:
user | product | rating |
---|---|---|
B | p2 | 3 |
B | p3 | 3 |
M | p1 | 4 |
M | p3 | 3 |
J | p2 | 4 |
J | p1 | 3 |
您可以使用两个 window 函数来做到这一点。我不太熟悉 df 语法,这里是 sql:
df.createOrReplaceTempView("ratings")
spark.sql("""
SELECT USER,
product,
rating,
Count(*)OVER (partition BY USER ) num_ratings_for_user,
Count(*)OVER (partition BY product ) num_raters_for_product
FROM ratings
""")
你可以过滤这个。
from pyspark.sql import functions as F
from pyspark.sql.window import Window
window1 = Window.partitionBy("user")
window2 = Window.partitionBy("product")
df.withColumn("count_users", F.count("rating").over(window1))\
.filter(F.col("count_users") >= 2)\
.withColumn("count_prod", F.count("rating").over(window2))\
.filter(F.col("count_prod") >= 2)\
.drop("count", "count_users", "count_prod")\
.show()
用户 N 对超过 1 个产品进行了评分,因此输出应为:
+----+-------+------+
|user|product|rating|
+----+-------+------+
| J| p1| 3|
| M| p1| 4|
| B| p2| 3|
| J| p2| 4|
| B| p3| 3|
| M| p3| 3|
| N| p3| 2|
+----+-------+------+