排序后生成每个键自动递增数字的最佳方法
best way to generate per key auto increment numerals after sorting
我想问一下实现按键自动递增的最好方法是什么
排序后的数字,例如。 :
原始文件:
1,a,b,c,1,1
1,a,b,d,0,0
1,a,b,e,1,0
2,a,e,c,0,0
2,a,f,d,1,0
post-输出(最后一列是分组后的位置编号
前三个字段和后两个值的反向排序)
1,a,b,c,1,1,1
1,a,b,d,0,0,3
1,a,b,e,1,0,2
2,a,e,c,0,0,2
2,a,f,d,1,0,1
我正在使用使用 groupbykey 的解决方案,但那是 运行
问题(可能是 pyspark/spark 的错误?),想知道是否有
实现此目标的更好方法。
我的解决方案:
A = sc.textFile("train.csv")
.filter(lambda x:not isHeader(x))
.map(split)
.map(parse_train)
.filter(lambda x: not x is None)
B = A.map(lambda k:((k.first_field,k.second_field,k.first_field,k.third_field),(k[0:5])))
.groupByKey()
B.map(sort_n_set_position)
.flatMap(lambda line: line)
其中 sort 和 set position 遍历迭代器并执行
排序并添加最后一列
因为你有大键(所有 3 个第一个值),我假设你不会有每个键的大量行。鉴于此,我将只使用 groupByKey([numTasks])
,然后使用普通代码对生成的可迭代对象的每一行进行排序和添加索引。
结合 spark-csv
、DataFrames 和 window 函数的方法略有不同。为简洁起见,我假设 header 行是 x1,x2,x4,x4,x5,x6
:
from pyspark.sql.window import Window
from pyspark.sql.functions import rowNumber, col
df = (sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema", "true")
.load("train.csv"))
w = (Window()
.partitionBy(col("x1"), col("x2"), col("x3"))
.orderBy(col("x5").desc(), col("x6").desc()))
df_with_rn = df.select(col("*"), rowNumber().over(w).alias("x7"))
df_with_rn.show()
## +---+---+---+---+---+---+---+
## | x1| x2| x3| x4| x5| x6| x7|
## +---+---+---+---+---+---+---+
## | 2| a| e| c| 0| 0| 1|
## | 2| a| f| d| 1| 0| 1|
## | 1| a| b| c| 1| 1| 1|
## | 1| a| b| e| 1| 0| 2|
## | 1| a| b| d| 0| 0| 3|
## +---+---+---+---+---+---+---+
如果你想要一个普通的 RDD
作为输出,你可以简单地映射如下:
df_with_rn.map(lambda r: r.asDict())
我想问一下实现按键自动递增的最好方法是什么 排序后的数字,例如。 :
原始文件:
1,a,b,c,1,1
1,a,b,d,0,0
1,a,b,e,1,0
2,a,e,c,0,0
2,a,f,d,1,0
post-输出(最后一列是分组后的位置编号 前三个字段和后两个值的反向排序)
1,a,b,c,1,1,1
1,a,b,d,0,0,3
1,a,b,e,1,0,2
2,a,e,c,0,0,2
2,a,f,d,1,0,1
我正在使用使用 groupbykey 的解决方案,但那是 运行 问题(可能是 pyspark/spark 的错误?),想知道是否有 实现此目标的更好方法。
我的解决方案:
A = sc.textFile("train.csv")
.filter(lambda x:not isHeader(x))
.map(split)
.map(parse_train)
.filter(lambda x: not x is None)
B = A.map(lambda k:((k.first_field,k.second_field,k.first_field,k.third_field),(k[0:5])))
.groupByKey()
B.map(sort_n_set_position)
.flatMap(lambda line: line)
其中 sort 和 set position 遍历迭代器并执行 排序并添加最后一列
因为你有大键(所有 3 个第一个值),我假设你不会有每个键的大量行。鉴于此,我将只使用 groupByKey([numTasks])
,然后使用普通代码对生成的可迭代对象的每一行进行排序和添加索引。
结合 spark-csv
、DataFrames 和 window 函数的方法略有不同。为简洁起见,我假设 header 行是 x1,x2,x4,x4,x5,x6
:
from pyspark.sql.window import Window
from pyspark.sql.functions import rowNumber, col
df = (sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema", "true")
.load("train.csv"))
w = (Window()
.partitionBy(col("x1"), col("x2"), col("x3"))
.orderBy(col("x5").desc(), col("x6").desc()))
df_with_rn = df.select(col("*"), rowNumber().over(w).alias("x7"))
df_with_rn.show()
## +---+---+---+---+---+---+---+
## | x1| x2| x3| x4| x5| x6| x7|
## +---+---+---+---+---+---+---+
## | 2| a| e| c| 0| 0| 1|
## | 2| a| f| d| 1| 0| 1|
## | 1| a| b| c| 1| 1| 1|
## | 1| a| b| e| 1| 0| 2|
## | 1| a| b| d| 0| 0| 3|
## +---+---+---+---+---+---+---+
如果你想要一个普通的 RDD
作为输出,你可以简单地映射如下:
df_with_rn.map(lambda r: r.asDict())