如何在 Pyspark 中的数据帧上为不同的组找到不同的值并重新编码数据帧
How to find distinct values for different groups on a dataframe in Pyspark and recode the dataframe
我有一个大数据框,该数据框包含一组人,这些人在名为 "groups" 的变量中被标记。
我现在需要为这个数据框做的是以更有意义的方式呈现。
例如下面的148组,就是下面的table:
df.select('gender','postcode','age','groups','bought').filter(df.groups==148).show()
+------+--------+---+----------+----------+
|gender|postcode|age| groups|bought |
+------+--------+---+----------+----------+
| 0| 2189| 25| 148|car |
| 0| 2192| 34| 148|house |
| 1| 2193| 37| 148|car |
| 1| 2194| 38| 148|house |
| 1| 2196| 54| 148|laptop |
| 1| 2197| 27| 148|laptop |
| 0| 2198| 44| 148|laptop |
+------+--------+---+----------+----------+
性别有 0,1,所以这个组中的所有这些人,都将更改为 "people"
全为1则为女,全为0则为男。规则,但不适用于该组。
现在邮编,最低2189,最高2211,那么每个case都会变成[2189 - 2198]。
年龄方面,最低18岁,最高62岁,所以会是[25-54]
对于bought,我需要查看已经购买了哪些物品,这些是[car,house,laptop]
因此,此组重新编码将最终为:
+------+-------------+--------+----------+------------------+
|gender| postcode| age| groups| bought |
+------+-------------+--------+----------+------------------+
|person|[2189 - 2198]| [25-54]| 148|[car,house,laptop]|
|person|[2189 - 2198]| [25-54]| 148|[car,house,laptop]|
|person|[2189 - 2198]| [25-54]| 148|[car,house,laptop]|
|person|[2189 - 2198]| [25-54]| 148|[car,house,laptop]|
|person|[2189 - 2198]| [25-54]| 148|[car,house,laptop]|
|person|[2189 - 2198]| [25-54]| 148|[car,house,laptop]|
|person|[2189 - 2198]| [25-54]| 148|[car,house,laptop]|
+------+-------------+--------+----------+------------------+
这将对数据框中的所有组完成。
有什么想法吗?
Here 我发现了类似的东西但是在 scala 中
提前致谢!
希望对您有所帮助!
import pyspark.sql.functions as f
from pyspark.sql.types import StringType
df = sc.parallelize([
[0, 2189, 25, 148, 'car'],
[0, 2192, 34, 148, 'house'],
[1, 2193, 37, 148, 'car'],
[1, 2194, 38, 148, 'house'],
[1, 2196, 54, 148, 'laptop'],
[1, 2197, 27, 148, 'laptop'],
[0, 2198, 44, 148, 'laptop']
]).toDF(('gender', 'postcode', 'age', 'groups', 'bought'))
df.show()
df1 = df.groupBy("groups").agg(f.collect_set("bought")).withColumnRenamed("collect_set(bought)","bought")
df2 = df.groupBy("groups").agg(f.min("age"), f.max("age")). \
withColumn("age", f.concat(f.col("min(age)"), f.lit("-"), f.col("max(age)"))).select("groups","age")
df3 = df.groupBy("groups").agg(f.min("postcode"), f.max("postcode")). \
withColumn("postcode", f.concat(f.col("min(postcode)"), f.lit("-"), f.col("max(postcode)"))).select("groups","postcode")
def modify_values(l):
if l == [0, 1]:
return "person"
else:
if l == [0]:
return "male"
else:
return "female"
modified_val = f.udf(modify_values, StringType())
df4 = df.groupBy("groups").agg(f.collect_set("gender")).withColumn("gender",modified_val("collect_set(gender)")).select("groups","gender")
merged_df = df1.join(df2, "groups").join(df3, "groups").join(df4, "groups")
merged_df.show()
输出为:
+------+--------------------+-----+---------+------+
|groups| bought| age| postcode|gender|
+------+--------------------+-----+---------+------+
| 148|[laptop, house, car]|25-54|2189-2198|person|
+------+--------------------+-----+---------+------+
如果它解决了您的问题,请不要忘记告诉我们
我有一个大数据框,该数据框包含一组人,这些人在名为 "groups" 的变量中被标记。
我现在需要为这个数据框做的是以更有意义的方式呈现。
例如下面的148组,就是下面的table:
df.select('gender','postcode','age','groups','bought').filter(df.groups==148).show()
+------+--------+---+----------+----------+
|gender|postcode|age| groups|bought |
+------+--------+---+----------+----------+
| 0| 2189| 25| 148|car |
| 0| 2192| 34| 148|house |
| 1| 2193| 37| 148|car |
| 1| 2194| 38| 148|house |
| 1| 2196| 54| 148|laptop |
| 1| 2197| 27| 148|laptop |
| 0| 2198| 44| 148|laptop |
+------+--------+---+----------+----------+
性别有 0,1,所以这个组中的所有这些人,都将更改为 "people" 全为1则为女,全为0则为男。规则,但不适用于该组。
现在邮编,最低2189,最高2211,那么每个case都会变成[2189 - 2198]。
年龄方面,最低18岁,最高62岁,所以会是[25-54]
对于bought,我需要查看已经购买了哪些物品,这些是[car,house,laptop]
因此,此组重新编码将最终为:
+------+-------------+--------+----------+------------------+
|gender| postcode| age| groups| bought |
+------+-------------+--------+----------+------------------+
|person|[2189 - 2198]| [25-54]| 148|[car,house,laptop]|
|person|[2189 - 2198]| [25-54]| 148|[car,house,laptop]|
|person|[2189 - 2198]| [25-54]| 148|[car,house,laptop]|
|person|[2189 - 2198]| [25-54]| 148|[car,house,laptop]|
|person|[2189 - 2198]| [25-54]| 148|[car,house,laptop]|
|person|[2189 - 2198]| [25-54]| 148|[car,house,laptop]|
|person|[2189 - 2198]| [25-54]| 148|[car,house,laptop]|
+------+-------------+--------+----------+------------------+
这将对数据框中的所有组完成。
有什么想法吗? Here 我发现了类似的东西但是在 scala 中 提前致谢!
希望对您有所帮助!
import pyspark.sql.functions as f
from pyspark.sql.types import StringType
df = sc.parallelize([
[0, 2189, 25, 148, 'car'],
[0, 2192, 34, 148, 'house'],
[1, 2193, 37, 148, 'car'],
[1, 2194, 38, 148, 'house'],
[1, 2196, 54, 148, 'laptop'],
[1, 2197, 27, 148, 'laptop'],
[0, 2198, 44, 148, 'laptop']
]).toDF(('gender', 'postcode', 'age', 'groups', 'bought'))
df.show()
df1 = df.groupBy("groups").agg(f.collect_set("bought")).withColumnRenamed("collect_set(bought)","bought")
df2 = df.groupBy("groups").agg(f.min("age"), f.max("age")). \
withColumn("age", f.concat(f.col("min(age)"), f.lit("-"), f.col("max(age)"))).select("groups","age")
df3 = df.groupBy("groups").agg(f.min("postcode"), f.max("postcode")). \
withColumn("postcode", f.concat(f.col("min(postcode)"), f.lit("-"), f.col("max(postcode)"))).select("groups","postcode")
def modify_values(l):
if l == [0, 1]:
return "person"
else:
if l == [0]:
return "male"
else:
return "female"
modified_val = f.udf(modify_values, StringType())
df4 = df.groupBy("groups").agg(f.collect_set("gender")).withColumn("gender",modified_val("collect_set(gender)")).select("groups","gender")
merged_df = df1.join(df2, "groups").join(df3, "groups").join(df4, "groups")
merged_df.show()
输出为:
+------+--------------------+-----+---------+------+
|groups| bought| age| postcode|gender|
+------+--------------------+-----+---------+------+
| 148|[laptop, house, car]|25-54|2189-2198|person|
+------+--------------------+-----+---------+------+
如果它解决了您的问题,请不要忘记告诉我们