通过其他键将列添加到具有 non-unique id 的 pyspark 数据框
Add column to pyspark dataframe with non-unique ids by other key
为标题道歉 - 不知道如何轻松总结我的问题。
我有一个包含 2 列代码和 emp 的 pyspark 数据框。每个唯一代码值都有多个 emp 值,如下所示。我想添加一个列,为每个唯一代码值应用一个递增的数字,例如下面的值列。我玩过 monotonicallyIncreasingId()
,并没有设法将其 id 创建限制为一个特定的代码键,事实上文档说索引不需要按顺序递增。
+----+---+-----+
|code|emp|value|
+----+---+-----+
| a| 14| 1|
| a| 22| 2|
| a| 35| 3|
| a| 64| 4|
| b| 12| 1|
...
+----+---+-----+
如果对效率有任何影响,每个代码值最多有 4 个 emp 值。索引应随着 emp 值的大小递增 - 最低值应为 1,最高值应为 n,其中 n 是具有特定代码的记录数。
对于 Scala,您可以创建一个带有增量索引列的数据框,如下所示:
%scala
val rankedWordCount = sqlContext.sql("select row_number() over (order by some_numeric_value desc) as index_col,lower(info) as info, some_numeric_value from information_table")
您可以创建临时视图并为此使用 Spark SQL:
>>> df = spark.createDataFrame([('a', 14), ('a', 22), ('a', 35), ('a', 64), ('b', 12)], ['code', 'emp'])
>>> df.show()
+----+---+
|code|emp|
+----+---+
| a| 14|
| a| 22|
| a| 35|
| a| 64|
| b| 12|
+----+---+
>>> df.createOrReplaceTempView("df")
>>> df2 = spark.sql("select code, emp, row_number() over(partition by code order by emp) as value from df order by code")
>>> df2.show()
+----+---+-----+
|code|emp|value|
+----+---+-----+
| a| 14| 1|
| a| 22| 2|
| a| 35| 3|
| a| 64| 4|
| b| 12| 1|
+----+---+-----+
您可以将 row_number() 与 Windowing 函数一起使用。
首先导入Window和row_number,
from pyspark.sql import Window
from pyspark.sql.functions import row_number()
假设您的场景具有以下列和值
>>> cols1 = ['code', 'emp']
>>> vals1 = [
('a', 14),
('a', 22),
('a', 35),
('a', 64),
('b', 12),
('b', 35)
]
# Create a DataFrame
>>> df1 = spark.createDataFrame(vals1, cols1)
# Result of 'df1' table.
>>> df1.show()
+----+---+
|code|emp|
+----+---+
| a| 14|
| a| 22|
| a| 35|
| a| 64|
| b| 12|
| b| 35|
+----+---+
应用,row_number() 超过列 code
。
>>> val = df1.withColumn("value", row_number().over(Window.partitionBy("code").orderBy("emp")))
>>> val.show()
+----+---+-----+
|code|emp|value|
+----+---+-----+
| b| 12| 1|
| b| 35| 2|
| a| 14| 1|
| a| 22| 2|
| a| 35| 3|
| a| 64| 4|
+----+---+-----+
最后,按列排序code
得到想要的结果。
>>> val.orderBy('code').show()
+----+---+-----+
|code|emp|value|
+----+---+-----+
| a| 14| 1|
| a| 22| 2|
| a| 35| 3|
| a| 64| 4|
| b| 12| 1|
| b| 35| 2|
+----+---+-----+
- partitionBy:创建一个 WindowSpec 并定义分区。
更多信息请参考:
为标题道歉 - 不知道如何轻松总结我的问题。
我有一个包含 2 列代码和 emp 的 pyspark 数据框。每个唯一代码值都有多个 emp 值,如下所示。我想添加一个列,为每个唯一代码值应用一个递增的数字,例如下面的值列。我玩过 monotonicallyIncreasingId()
,并没有设法将其 id 创建限制为一个特定的代码键,事实上文档说索引不需要按顺序递增。
+----+---+-----+
|code|emp|value|
+----+---+-----+
| a| 14| 1|
| a| 22| 2|
| a| 35| 3|
| a| 64| 4|
| b| 12| 1|
...
+----+---+-----+
如果对效率有任何影响,每个代码值最多有 4 个 emp 值。索引应随着 emp 值的大小递增 - 最低值应为 1,最高值应为 n,其中 n 是具有特定代码的记录数。
对于 Scala,您可以创建一个带有增量索引列的数据框,如下所示:
%scala
val rankedWordCount = sqlContext.sql("select row_number() over (order by some_numeric_value desc) as index_col,lower(info) as info, some_numeric_value from information_table")
您可以创建临时视图并为此使用 Spark SQL:
>>> df = spark.createDataFrame([('a', 14), ('a', 22), ('a', 35), ('a', 64), ('b', 12)], ['code', 'emp'])
>>> df.show()
+----+---+
|code|emp|
+----+---+
| a| 14|
| a| 22|
| a| 35|
| a| 64|
| b| 12|
+----+---+
>>> df.createOrReplaceTempView("df")
>>> df2 = spark.sql("select code, emp, row_number() over(partition by code order by emp) as value from df order by code")
>>> df2.show()
+----+---+-----+
|code|emp|value|
+----+---+-----+
| a| 14| 1|
| a| 22| 2|
| a| 35| 3|
| a| 64| 4|
| b| 12| 1|
+----+---+-----+
您可以将 row_number() 与 Windowing 函数一起使用。
首先导入Window和row_number,
from pyspark.sql import Window
from pyspark.sql.functions import row_number()
假设您的场景具有以下列和值
>>> cols1 = ['code', 'emp']
>>> vals1 = [
('a', 14),
('a', 22),
('a', 35),
('a', 64),
('b', 12),
('b', 35)
]
# Create a DataFrame
>>> df1 = spark.createDataFrame(vals1, cols1)
# Result of 'df1' table.
>>> df1.show()
+----+---+
|code|emp|
+----+---+
| a| 14|
| a| 22|
| a| 35|
| a| 64|
| b| 12|
| b| 35|
+----+---+
应用,row_number() 超过列 code
。
>>> val = df1.withColumn("value", row_number().over(Window.partitionBy("code").orderBy("emp")))
>>> val.show()
+----+---+-----+
|code|emp|value|
+----+---+-----+
| b| 12| 1|
| b| 35| 2|
| a| 14| 1|
| a| 22| 2|
| a| 35| 3|
| a| 64| 4|
+----+---+-----+
最后,按列排序code
得到想要的结果。
>>> val.orderBy('code').show()
+----+---+-----+
|code|emp|value|
+----+---+-----+
| a| 14| 1|
| a| 22| 2|
| a| 35| 3|
| a| 64| 4|
| b| 12| 1|
| b| 35| 2|
+----+---+-----+
- partitionBy:创建一个 WindowSpec 并定义分区。
更多信息请参考: