如何将行 ID 的持久列添加到 Spark DataFrame?
How do I add an persistent column of row ids to Spark DataFrame?
这个问题并不新鲜,但我在 Spark 中发现了令人惊讶的行为。我需要向 DataFrame 添加一列行 ID。我使用了 DataFrame 方法 monotonically_increasing_id() 并且它确实给了我一个额外的唯一行 ID 列(顺便说一句,它们不是连续的,但是是唯一的)。
我遇到的问题是,当我过滤 DataFrame 时,结果 DataFrame 中的行 ID 被重新分配。这两个 DataFrame 如下所示。
第一个是初始DataFrame,添加行ID如下:
df.withColumn("rowId", monotonically_increasing_id())
第二个DataFrame是通过df.filter(col("P"))
对col P进行过滤后得到的。
问题由 custId 169 的 rowId 说明,它在初始 DataFrame 中为 5,但在过滤掉 custId 169 后,rowId (5) 被重新分配给 custmId 773!我不知道为什么这是默认行为。
我希望 rowIds
是 "sticky";如果我从 DataFrame 中删除行,我不想要它们的 ID "re-used",我希望它们连同它们的行一起消失。有可能这样做吗?我没有从 monotonically_increasing_id
方法中看到任何请求此行为的标志。
+---------+--------------------+-------+
| custId | features| P |rowId|
+---------+--------------------+-------+
|806 |[50,5074,...| true| 0|
|832 |[45,120,1...| true| 1|
|216 |[6691,272...| true| 2|
|926 |[120,1788...| true| 3|
|875 |[54,120,1...| true| 4|
|169 |[19406,21...| false| 5|
after filtering on P:
+---------+--------------------+-------+
| custId| features| P |rowId|
+---------+--------------------+-------+
| 806|[50,5074,...| true| 0|
| 832|[45,120,1...| true| 1|
| 216|[6691,272...| true| 2|
| 926|[120,1788...| true| 3|
| 875|[54,120,1...| true| 4|
| 773|[3136,317...| true| 5|
Spark 2.0
此问题已在 Spark 2.0 中通过 SPARK-14241 解决。
Spark 2.1 中的另一个类似问题已通过 SPARK-14393
解决
火花1.x
您遇到的问题相当微妙,但可以简化为一个简单的事实 monotonically_increasing_id
是一个极其丑陋的函数。它显然不是纯粹的,它的价值取决于完全不受你控制的事物。
它不接受任何参数,因此从优化器的角度来看,它何时被调用并不重要,并且可以在所有其他操作之后推送。因此你看到的行为。
如果您查看代码,您会发现这是通过使用 Nondeterministic
.
扩展 MonotonicallyIncreasingID
表达式来明确标记的
我认为没有任何优雅的解决方案,但处理此问题的一种方法是添加对过滤值的人为依赖。例如像这样的 UDF:
from pyspark.sql.types import LongType
from pyspark.sql.functions import udf
bound = udf(lambda _, v: v, LongType())
(df
.withColumn("rn", monotonically_increasing_id())
# Due to nondeterministic behavior it has to be a separate step
.withColumn("rn", bound("P", "rn"))
.where("P"))
一般来说,在 RDD
上使用 zipWithIndex
添加索引然后将其转换回 DataFrame
.
可能更简洁
* 上面显示的解决方法不再是 Spark 2.x 中的有效解决方案(也不是必需的),其中 Python UDF 是执行计划优化的主题。
我无法重现这个。我使用的是 Spark 2.0,所以可能行为发生了变化,或者我做的事情和你不一样。
val df = Seq(("one", 1,true),("two", 2,false),("three", 3,true),("four", 4,true))
.toDF("name", "value","flag")
.withColumn("rowd", monotonically_increasing_id())
df.show
val df2 = df.filter(col("flag")=== true)
df2.show
df: org.apache.spark.sql.DataFrame = [name: string, value: int ... 2 more fields]
+-----+-----+-----+----+
| name|value| flag|rowd|
+-----+-----+-----+----+
| one| 1| true| 0|
| two| 2|false| 1|
|three| 3| true| 2|
| four| 4| true| 3|
+-----+-----+-----+----+
df2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [name: string, value: int ... 2 more fields]
+-----+-----+----+----+
| name|value|flag|rowd|
+-----+-----+----+----+
| one| 1|true| 0|
|three| 3|true| 2|
| four| 4|true| 3|
+-----+-----+----+----+
为了绕过 monotonically_increasing_id() 的移位计算,您可以尝试将数据帧写入磁盘,然后重新读取。然后 id 列现在只是一个正在读取的数据字段,而不是在管道中的某个点动态计算。虽然这是一个非常丑陋的解决方案,但当我进行快速测试时它起作用了。
这对我有用。创建了另一个标识列并使用了 window 函数 row_number
import org.apache.spark.sql.functions.{row_number}
import org.apache.spark.sql.expressions.Window
val df1: DataFrame = df.withColumn("Id",lit(1))
df1
.select(
...,
row_number()
.over(Window
.partitionBy("Id"
.orderBy(col("...").desc))
)
.alias("Row_Nbr")
)
我最近在处理类似的问题。 monotonically_increasing_id()
虽然很快,但是不靠谱,不会给你连续的行号,只会增加唯一的整数。
创建一个 windows 分区然后使用 row_number().over(some_windows_partition)
非常耗时。
目前最好的解决方案是使用带索引的压缩文件,然后将压缩文件转换回原始数据帧,新架构包括索引列。
试试这个:
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, LongType
new_schema = StructType(**original_dataframe**.schema.fields[:] + [StructField("index", LongType(), False)])
zipped_rdd = **original_dataframe**.rdd.zipWithIndex()
indexed = (zipped_rdd.map(lambda ri: row_with_index(*list(ri[0]) + [ri[1]])).toDF(new_schema))
其中 original_dataframe
是您必须添加索引的 dataframe
,row_with_index
是具有列索引的新架构,您可以将其写为
row_with_index = Row(
"calendar_date"
,"year_week_number"
,"year_period_number"
,"realization"
,"index"
)
这里,calendar_date
、year_week_number
、year_period_number
、realization
是我原来的dataframe
的栏目。您可以将名称替换为列的名称。索引是您必须为行号添加的新列名。
与 row_number().over(some_windows_partition)
方法相比,此过程更加高效和顺畅。
希望这对您有所帮助。
为了通过 Chris T 解决方案获得更好的性能,您可以尝试写入 apache ignite 共享数据帧而不是写入磁盘。
https://ignite.apache.org/use-cases/spark/shared-memory-layer.html
最好的方法是使用唯一键的连续散列。
例如:在 python:
from pyspark.sql.functions import concat, md5
unique_keys = ['event_datetime', 'ingesttime']
raw_df.withColumn('rowid', md5(concat(*unique_keys)))
原因:
- 新的 'rowid' 是确定性地从输入数据派生的(相对于 uuid,这是不确定的)
- 添加新数据很容易。 (与其他方式相比:像 monotonically_increasing_id() 或 row_number(),需要获取当前的最大数量..)
- 仅供参考 https://bzhangusc.wordpress.com/2016/03/23/create-unique-record-key-for-table-linking/
这个问题并不新鲜,但我在 Spark 中发现了令人惊讶的行为。我需要向 DataFrame 添加一列行 ID。我使用了 DataFrame 方法 monotonically_increasing_id() 并且它确实给了我一个额外的唯一行 ID 列(顺便说一句,它们不是连续的,但是是唯一的)。
我遇到的问题是,当我过滤 DataFrame 时,结果 DataFrame 中的行 ID 被重新分配。这两个 DataFrame 如下所示。
第一个是初始DataFrame,添加行ID如下:
df.withColumn("rowId", monotonically_increasing_id())
第二个DataFrame是通过
df.filter(col("P"))
对col P进行过滤后得到的。
问题由 custId 169 的 rowId 说明,它在初始 DataFrame 中为 5,但在过滤掉 custId 169 后,rowId (5) 被重新分配给 custmId 773!我不知道为什么这是默认行为。
我希望 rowIds
是 "sticky";如果我从 DataFrame 中删除行,我不想要它们的 ID "re-used",我希望它们连同它们的行一起消失。有可能这样做吗?我没有从 monotonically_increasing_id
方法中看到任何请求此行为的标志。
+---------+--------------------+-------+
| custId | features| P |rowId|
+---------+--------------------+-------+
|806 |[50,5074,...| true| 0|
|832 |[45,120,1...| true| 1|
|216 |[6691,272...| true| 2|
|926 |[120,1788...| true| 3|
|875 |[54,120,1...| true| 4|
|169 |[19406,21...| false| 5|
after filtering on P:
+---------+--------------------+-------+
| custId| features| P |rowId|
+---------+--------------------+-------+
| 806|[50,5074,...| true| 0|
| 832|[45,120,1...| true| 1|
| 216|[6691,272...| true| 2|
| 926|[120,1788...| true| 3|
| 875|[54,120,1...| true| 4|
| 773|[3136,317...| true| 5|
Spark 2.0
此问题已在 Spark 2.0 中通过 SPARK-14241 解决。
Spark 2.1 中的另一个类似问题已通过 SPARK-14393
解决
火花1.x
您遇到的问题相当微妙,但可以简化为一个简单的事实 monotonically_increasing_id
是一个极其丑陋的函数。它显然不是纯粹的,它的价值取决于完全不受你控制的事物。
它不接受任何参数,因此从优化器的角度来看,它何时被调用并不重要,并且可以在所有其他操作之后推送。因此你看到的行为。
如果您查看代码,您会发现这是通过使用 Nondeterministic
.
MonotonicallyIncreasingID
表达式来明确标记的
我认为没有任何优雅的解决方案,但处理此问题的一种方法是添加对过滤值的人为依赖。例如像这样的 UDF:
from pyspark.sql.types import LongType
from pyspark.sql.functions import udf
bound = udf(lambda _, v: v, LongType())
(df
.withColumn("rn", monotonically_increasing_id())
# Due to nondeterministic behavior it has to be a separate step
.withColumn("rn", bound("P", "rn"))
.where("P"))
一般来说,在 RDD
上使用 zipWithIndex
添加索引然后将其转换回 DataFrame
.
* 上面显示的解决方法不再是 Spark 2.x 中的有效解决方案(也不是必需的),其中 Python UDF 是执行计划优化的主题。
我无法重现这个。我使用的是 Spark 2.0,所以可能行为发生了变化,或者我做的事情和你不一样。
val df = Seq(("one", 1,true),("two", 2,false),("three", 3,true),("four", 4,true))
.toDF("name", "value","flag")
.withColumn("rowd", monotonically_increasing_id())
df.show
val df2 = df.filter(col("flag")=== true)
df2.show
df: org.apache.spark.sql.DataFrame = [name: string, value: int ... 2 more fields]
+-----+-----+-----+----+
| name|value| flag|rowd|
+-----+-----+-----+----+
| one| 1| true| 0|
| two| 2|false| 1|
|three| 3| true| 2|
| four| 4| true| 3|
+-----+-----+-----+----+
df2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [name: string, value: int ... 2 more fields]
+-----+-----+----+----+
| name|value|flag|rowd|
+-----+-----+----+----+
| one| 1|true| 0|
|three| 3|true| 2|
| four| 4|true| 3|
+-----+-----+----+----+
为了绕过 monotonically_increasing_id() 的移位计算,您可以尝试将数据帧写入磁盘,然后重新读取。然后 id 列现在只是一个正在读取的数据字段,而不是在管道中的某个点动态计算。虽然这是一个非常丑陋的解决方案,但当我进行快速测试时它起作用了。
这对我有用。创建了另一个标识列并使用了 window 函数 row_number
import org.apache.spark.sql.functions.{row_number}
import org.apache.spark.sql.expressions.Window
val df1: DataFrame = df.withColumn("Id",lit(1))
df1
.select(
...,
row_number()
.over(Window
.partitionBy("Id"
.orderBy(col("...").desc))
)
.alias("Row_Nbr")
)
我最近在处理类似的问题。 monotonically_increasing_id()
虽然很快,但是不靠谱,不会给你连续的行号,只会增加唯一的整数。
创建一个 windows 分区然后使用 row_number().over(some_windows_partition)
非常耗时。
目前最好的解决方案是使用带索引的压缩文件,然后将压缩文件转换回原始数据帧,新架构包括索引列。
试试这个:
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, LongType
new_schema = StructType(**original_dataframe**.schema.fields[:] + [StructField("index", LongType(), False)])
zipped_rdd = **original_dataframe**.rdd.zipWithIndex()
indexed = (zipped_rdd.map(lambda ri: row_with_index(*list(ri[0]) + [ri[1]])).toDF(new_schema))
其中 original_dataframe
是您必须添加索引的 dataframe
,row_with_index
是具有列索引的新架构,您可以将其写为
row_with_index = Row(
"calendar_date"
,"year_week_number"
,"year_period_number"
,"realization"
,"index"
)
这里,calendar_date
、year_week_number
、year_period_number
、realization
是我原来的dataframe
的栏目。您可以将名称替换为列的名称。索引是您必须为行号添加的新列名。
与 row_number().over(some_windows_partition)
方法相比,此过程更加高效和顺畅。
希望这对您有所帮助。
为了通过 Chris T 解决方案获得更好的性能,您可以尝试写入 apache ignite 共享数据帧而不是写入磁盘。 https://ignite.apache.org/use-cases/spark/shared-memory-layer.html
最好的方法是使用唯一键的连续散列。
例如:在 python:
from pyspark.sql.functions import concat, md5
unique_keys = ['event_datetime', 'ingesttime']
raw_df.withColumn('rowid', md5(concat(*unique_keys)))
原因:
- 新的 'rowid' 是确定性地从输入数据派生的(相对于 uuid,这是不确定的)
- 添加新数据很容易。 (与其他方式相比:像 monotonically_increasing_id() 或 row_number(),需要获取当前的最大数量..)
- 仅供参考 https://bzhangusc.wordpress.com/2016/03/23/create-unique-record-key-for-table-linking/