PySpark:在 DataFrame 中的小组内部迭代
PySpark: iterate inside small groups in DataFrame
我想了解如何在 PySpark DataFrame 中的小组内进行操作。假设我有具有以下架构的 DF:
root
|-- first_id: string (nullable = true)
|-- second_id_struct: struct (nullable = true)
| |-- s_id: string (nullable = true)
| |-- s_id_2: int (nullable = true)
|-- depth_from: float (nullable = true)
|-- depth_to: float (nullable = true)
|-- total_depth: float (nullable = true)
因此数据可能如下所示:
我愿意:
- 按first_id
分组数据
- 在每组中,按s_id_2升序排列
- 将额外的列
layer
附加到结构或根 DataFrame 以指示此 s_id_2
在组中的顺序。
例如:
first_id | second_id | second_id_order
---------| --------- | ---------------
A1 | [B, 10] | 1
---------| --------- | ---------------
A1 | [B, 14] | 2
---------| --------- | ---------------
A1 | [B, 22] | 3
---------| --------- | ---------------
A5 | [A, 1] | 1
---------| --------- | ---------------
A5 | [A, 7] | 2
---------| --------- | ---------------
A7 | null | 1
---------| --------- | ---------------
分组后每个 first_id
最多有 4 个 second_id_struct
。我该如何处理这类问题?
我特别感兴趣如何在一般情况下在 DataFrame 的小组(1-40 行)内进行迭代操作,其中组内列的顺序很重要。
谢谢!
创建数据框
d = [{'first_id': 'A1', 'second_id': ['B',10]}, {'first_id': 'A1', 'second_id': ['B',14]},{'first_id': 'A1', 'second_id': ['B',22]},{'first_id': 'A5', 'second_id': ['A',1]},{'first_id': 'A5', 'second_id': ['A',7]}]
df = sqlContext.createDataFrame(d)
而且你可以看到结构
df.printSchema()
|-- first_id: string (nullable = true)
|-- second_id: array (nullable = true)
|........|-- element: string (containsNull = true)
df.show()
+--------+----------+
|first_id|second_id |
+--------+----------+
| A1| [B, 10]|
| A1| [B, 14]|
| A1| [B, 22]|
| A5| [A, 1]|
| A5| [A, 7]|
+--------+----------+
然后你可以使用dense_rank和Window函数来显示子组中的顺序。与SQL.
中的over partition相同
window函数介绍:Introducing Window Functions in Spark SQL
代码在这里:
# setting a window spec
windowSpec = Window.partitionBy('first_id').orderBy(df.second_id[1])
# apply dense_rank to the window spec
df.select(df.first_id, df.second_id, dense_rank().over(windowSpec).alias("second_id_order")).show()
结果:
+--------+---------+---------------+
|first_id|second_id|second_id_order|
+--------+---------+---------------+
| A1| [B, 10]| 1|
| A1| [B, 14]| 2|
| A1| [B, 22]| 3|
| A5| [A, 1]| 1|
| A5| [A, 7]| 2|
+--------+---------+---------------+
我想了解如何在 PySpark DataFrame 中的小组内进行操作。假设我有具有以下架构的 DF:
root
|-- first_id: string (nullable = true)
|-- second_id_struct: struct (nullable = true)
| |-- s_id: string (nullable = true)
| |-- s_id_2: int (nullable = true)
|-- depth_from: float (nullable = true)
|-- depth_to: float (nullable = true)
|-- total_depth: float (nullable = true)
因此数据可能如下所示:
我愿意:
- 按first_id 分组数据
- 在每组中,按s_id_2升序排列
- 将额外的列
layer
附加到结构或根 DataFrame 以指示此s_id_2
在组中的顺序。
例如:
first_id | second_id | second_id_order
---------| --------- | ---------------
A1 | [B, 10] | 1
---------| --------- | ---------------
A1 | [B, 14] | 2
---------| --------- | ---------------
A1 | [B, 22] | 3
---------| --------- | ---------------
A5 | [A, 1] | 1
---------| --------- | ---------------
A5 | [A, 7] | 2
---------| --------- | ---------------
A7 | null | 1
---------| --------- | ---------------
分组后每个 first_id
最多有 4 个 second_id_struct
。我该如何处理这类问题?
我特别感兴趣如何在一般情况下在 DataFrame 的小组(1-40 行)内进行迭代操作,其中组内列的顺序很重要。
谢谢!
创建数据框
d = [{'first_id': 'A1', 'second_id': ['B',10]}, {'first_id': 'A1', 'second_id': ['B',14]},{'first_id': 'A1', 'second_id': ['B',22]},{'first_id': 'A5', 'second_id': ['A',1]},{'first_id': 'A5', 'second_id': ['A',7]}]
df = sqlContext.createDataFrame(d)
而且你可以看到结构
df.printSchema()
|-- first_id: string (nullable = true)
|-- second_id: array (nullable = true)
|........|-- element: string (containsNull = true)
df.show()
+--------+----------+
|first_id|second_id |
+--------+----------+
| A1| [B, 10]|
| A1| [B, 14]|
| A1| [B, 22]|
| A5| [A, 1]|
| A5| [A, 7]|
+--------+----------+
然后你可以使用dense_rank和Window函数来显示子组中的顺序。与SQL.
中的over partition相同window函数介绍:Introducing Window Functions in Spark SQL
代码在这里:
# setting a window spec
windowSpec = Window.partitionBy('first_id').orderBy(df.second_id[1])
# apply dense_rank to the window spec
df.select(df.first_id, df.second_id, dense_rank().over(windowSpec).alias("second_id_order")).show()
结果:
+--------+---------+---------------+
|first_id|second_id|second_id_order|
+--------+---------+---------------+
| A1| [B, 10]| 1|
| A1| [B, 14]| 2|
| A1| [B, 22]| 3|
| A5| [A, 1]| 1|
| A5| [A, 7]| 2|
+--------+---------+---------------+