Spark 数据集 withColumn 添加分区 id
Spark dataset withColumn add partition id
我正在尝试编写一个辅助函数,它接受任何类型的数据集Dataset[_]
,并且 returns 带有一个新列 "partitionId",这是单个分区的 ID数据单元属于。
例如,如果我下面有一个数据集,默认情况下它有两个分区。
+-----+------+
| colA| colB|
+-----+------+
| 1 | a|
| 2 | b|
| 3 | c|
+-----+------+
函数后应该是下面的结果,前两个数据单元属于同一个分区,第三个属于另一个分区。
+-----+------+------------+
| colA| colB| partitionId|
+-----+------+------------+
| 1 | a| 1|
| 2 | b| 1|
| 3 | c| 2|
+-----+------+------------+
我尝试使用 withColumn() 和 mapPartitions(),但其中 none 对我有用。
对于 withColumn(),我无法获取数据单元属于哪个分区的信息,例如 withColumn("partitionId", {What should be here to add the partitionId?})
对于 mapPartitions(),我试过:
dataset
.mapPartitions(iter => {
val partitionId = UUID.randomUUID().toString
iter.map(dataUnit => MyDataType.addPartitionId(partitionId))
})
但这仅适用于特定类型,如 Dataset[MyDataType]
,不适用于 Dataset[_]
如何为任何数据集添加 partitionId 列?
您是否需要每条记录的分区 ID?无论哪种方式,您都可以通过以下方式实现:
import org.apache.spark.sql.functions.spark_partition_id
...
dataFrame.withColumn("partitionID", spark_partition_id)
我正在尝试编写一个辅助函数,它接受任何类型的数据集Dataset[_]
,并且 returns 带有一个新列 "partitionId",这是单个分区的 ID数据单元属于。
例如,如果我下面有一个数据集,默认情况下它有两个分区。
+-----+------+
| colA| colB|
+-----+------+
| 1 | a|
| 2 | b|
| 3 | c|
+-----+------+
函数后应该是下面的结果,前两个数据单元属于同一个分区,第三个属于另一个分区。
+-----+------+------------+
| colA| colB| partitionId|
+-----+------+------------+
| 1 | a| 1|
| 2 | b| 1|
| 3 | c| 2|
+-----+------+------------+
我尝试使用 withColumn() 和 mapPartitions(),但其中 none 对我有用。
对于 withColumn(),我无法获取数据单元属于哪个分区的信息,例如 withColumn("partitionId", {What should be here to add the partitionId?})
对于 mapPartitions(),我试过:
dataset
.mapPartitions(iter => {
val partitionId = UUID.randomUUID().toString
iter.map(dataUnit => MyDataType.addPartitionId(partitionId))
})
但这仅适用于特定类型,如 Dataset[MyDataType]
,不适用于 Dataset[_]
如何为任何数据集添加 partitionId 列?
您是否需要每条记录的分区 ID?无论哪种方式,您都可以通过以下方式实现:
import org.apache.spark.sql.functions.spark_partition_id
...
dataFrame.withColumn("partitionID", spark_partition_id)