如何解决我的 pyspark 代码中的这个 reducebykey 转换问题?
How to fix this reducebykey transformation issue in my pyspark code?
我对如何使这个值正确感到困惑。以下是我的示例数据:
col_name,Category,SegmentID,total_cnt,PercentDistribution
city,ANTIOCH,1,1,15
city,ARROYO GRANDE,1,1,15
state,CA,1,3,15
state,NZ,1,4,15
我正在尝试获取输出数据帧:
我可以到这为止。这里需要你的帮助。
from pyspark.sql.types import StructType,StructField,StringType,IntegerType
import json
join_df=spark.read.csv("/tmp/testreduce.csv",inferSchema=True, header=True)
jsonSchema = StructType([StructField("Name", StringType())
, StructField("Value", IntegerType())
, StructField("CatColName", StringType())
, StructField("CatColVal", StringType())
])
def reduceKeys(row1, row2):
row1[0].update(row2[0])
return row1
res_df=join_df.rdd.map(lambda row: ("Segment " + str(row[2]), ({row[1]: row[3]},row[0],row[4])))\
.reduceByKey(lambda x, y: reduceKeys(x, y))\
.map(lambda row: (row[0], row[1][2],row[1][1], json.dumps(row[1][0]))).toDF(jsonSchema)
我当前的代码输出:
它没有根据段 ID 和 CatColName 正确分组数据。
问题是 reduceByKey 将您生成的字符串 Segment 1
考虑在内,这对于城市和州来说是相等的。如果您在开头添加 col_name
它会按预期工作,但您会在结果中收到不同的名称。这可以用正则表达式
改变
res_df=test_df.rdd.map(lambda row: ("Segment " + str(row[2]) +" " + str(row[0]), ({row[1]: row[3]},row[0],row[4])))\
.reduceByKey(lambda x, y: reduceKeys(x, y))\
.map(lambda row: (row[0], row[1][2],row[1][1], json.dumps(row[1][0]))).toDF(jsonSchema).withColumn("name",regexp_extract(col("name"),"(\w+\s\d+)",1))
res_df.show(truncate=False)
输出:
+---------+-----+----------+----------------------------------+
|name |Value|CatColName|CatColVal |
+---------+-----+----------+----------------------------------+
|Segment 1|15 |city |{"ANTIOCH": 1, "ARROYO GRANDE": 1}|
|Segment 1|15 |state |{"CA": 3, "NZ": 4} |
+---------+-----+----------+----------------------------------+
最后的regexp_extract只需要恢复原名即可
我对如何使这个值正确感到困惑。以下是我的示例数据:
col_name,Category,SegmentID,total_cnt,PercentDistribution
city,ANTIOCH,1,1,15
city,ARROYO GRANDE,1,1,15
state,CA,1,3,15
state,NZ,1,4,15
我正在尝试获取输出数据帧:
我可以到这为止。这里需要你的帮助。
from pyspark.sql.types import StructType,StructField,StringType,IntegerType
import json
join_df=spark.read.csv("/tmp/testreduce.csv",inferSchema=True, header=True)
jsonSchema = StructType([StructField("Name", StringType())
, StructField("Value", IntegerType())
, StructField("CatColName", StringType())
, StructField("CatColVal", StringType())
])
def reduceKeys(row1, row2):
row1[0].update(row2[0])
return row1
res_df=join_df.rdd.map(lambda row: ("Segment " + str(row[2]), ({row[1]: row[3]},row[0],row[4])))\
.reduceByKey(lambda x, y: reduceKeys(x, y))\
.map(lambda row: (row[0], row[1][2],row[1][1], json.dumps(row[1][0]))).toDF(jsonSchema)
我当前的代码输出:
它没有根据段 ID 和 CatColName 正确分组数据。
问题是 reduceByKey 将您生成的字符串 Segment 1
考虑在内,这对于城市和州来说是相等的。如果您在开头添加 col_name
它会按预期工作,但您会在结果中收到不同的名称。这可以用正则表达式
res_df=test_df.rdd.map(lambda row: ("Segment " + str(row[2]) +" " + str(row[0]), ({row[1]: row[3]},row[0],row[4])))\
.reduceByKey(lambda x, y: reduceKeys(x, y))\
.map(lambda row: (row[0], row[1][2],row[1][1], json.dumps(row[1][0]))).toDF(jsonSchema).withColumn("name",regexp_extract(col("name"),"(\w+\s\d+)",1))
res_df.show(truncate=False)
输出:
+---------+-----+----------+----------------------------------+
|name |Value|CatColName|CatColVal |
+---------+-----+----------+----------------------------------+
|Segment 1|15 |city |{"ANTIOCH": 1, "ARROYO GRANDE": 1}|
|Segment 1|15 |state |{"CA": 3, "NZ": 4} |
+---------+-----+----------+----------------------------------+
最后的regexp_extract只需要恢复原名即可