Spark (pySpark) groupBy 对 collect_list 上的第一个元素进行了错误排序
Spark (pySpark) groupBy misordering first element on collect_list
我有以下数据框(df_parquet):
DataFrame[id: bigint, date: timestamp, consumption: decimal(38,18)]
我打算使用 collect_list 获取排序的日期和消费列表,正如 post 中所述:
我采用最后一种方法 (),我认为这是一种更有效的方法。
因此,我不只是用默认的分区数 (200) 调用重新分区,而是用 500 调用它,并且我在分区内按 ID 和日期排序,而不仅仅是按日期排序(为了使 groupBy 更有效率,我希望如此)。问题是每个分区一次(每个分区只有一个 id,它似乎是一个随机 id)我在最后一个地方得到列表的第一项。
有什么线索吗?其余的 ID 在其数组中排序良好,所以我认为 groupBy 或 collect_list 在每个分区内的行为方式有所不同。
我验证了它不是分区上的第一个或最后一个 id,而是通过获取分区 id 并检查相同的 groupBy + collect_list 组合是否在这些值之一上失败而表现不同的那个,所以看起来是随机的。
如果你愿意,你可以检查我的代码,它很简单。
ordered_df = df_parquet.repartition(500,
'id').sortWithinPartitions(['id', 'date'])
grouped_df = ordered_df.groupby("id").agg(F.collect_list("date").alias('date'),
F.collect_list('consumption').alias('consumption'))
以及用于测试它的代码(比较第一个和最后一个值,第一个应该更旧,但在 500 个案例中不是):
test = grouped_df.filter(F.size('date') >
1).select('id', (F.col('date').getItem(0) >
F.col('date').getItem(F.size('date') - 1)).alias('test'),
F.array([F.col('fecha').getItem(0),
F.col('date').getItem(F.size('date') -
1)]).alias('see')).filter(F.col('test'))
test.show(5, 100)
test.count()
结果:
+-----+----+------------------------------------------+
| id|test| see|
+-----+----+------------------------------------------+
|89727|true|[2017-02-10 00:00:00, 2017-02-09 00:00:00]|
|76325|true|[2017-02-10 00:00:00, 2017-02-09 00:00:00]|
|80115|true|[2017-02-10 00:00:00, 2017-02-09 00:00:00]|
|89781|true|[2017-02-10 00:00:00, 2017-02-09 00:00:00]|
|76411|true|[2017-02-10 00:00:00, 2017-02-09 00:00:00]|
+-----+----+------------------------------------------+
only showing top 5 rows
500
虽然它应该是一个空数据框,因为所有数组都应该针对所有 ID 进行排序。
好的,问题仍未解决,但我找到了一个简单的解决方法,以防有人因同一问题而卡住:
关键是要颠倒数组的第一个和最后一个位置。在日期数组上,这可以通过使用 spark 2.4 中引入的 array_sort 函数进行排序来完成。要对消耗数组执行重新排序,我们需要使用 udf。
invert_last = F.udf(lambda vector: [vector[-1]] + vector[:-1], ArrayType(DoubleType()))
test = grouped_df.withColumn('error', (F.size('date') > 1) & (F.col('date').getItem(0) >
F.col('date').getItem(F.size('date') - 1))).withColumn('date', F.when(F.col('error'),
F.array_sort(F.col('date'))).otherwise(F.col('date'))).withColumn('consumption',
F.when(F.col('error'), invert_last(F.col('consumption'))).otherwise(F.col('consumption'))).drop('error')
干杯。
我有以下数据框(df_parquet):
DataFrame[id: bigint, date: timestamp, consumption: decimal(38,18)]
我打算使用 collect_list 获取排序的日期和消费列表,正如 post 中所述:
我采用最后一种方法 (
因此,我不只是用默认的分区数 (200) 调用重新分区,而是用 500 调用它,并且我在分区内按 ID 和日期排序,而不仅仅是按日期排序(为了使 groupBy 更有效率,我希望如此)。问题是每个分区一次(每个分区只有一个 id,它似乎是一个随机 id)我在最后一个地方得到列表的第一项。
有什么线索吗?其余的 ID 在其数组中排序良好,所以我认为 groupBy 或 collect_list 在每个分区内的行为方式有所不同。
我验证了它不是分区上的第一个或最后一个 id,而是通过获取分区 id 并检查相同的 groupBy + collect_list 组合是否在这些值之一上失败而表现不同的那个,所以看起来是随机的。
如果你愿意,你可以检查我的代码,它很简单。
ordered_df = df_parquet.repartition(500,
'id').sortWithinPartitions(['id', 'date'])
grouped_df = ordered_df.groupby("id").agg(F.collect_list("date").alias('date'),
F.collect_list('consumption').alias('consumption'))
以及用于测试它的代码(比较第一个和最后一个值,第一个应该更旧,但在 500 个案例中不是):
test = grouped_df.filter(F.size('date') >
1).select('id', (F.col('date').getItem(0) >
F.col('date').getItem(F.size('date') - 1)).alias('test'),
F.array([F.col('fecha').getItem(0),
F.col('date').getItem(F.size('date') -
1)]).alias('see')).filter(F.col('test'))
test.show(5, 100)
test.count()
结果:
+-----+----+------------------------------------------+
| id|test| see|
+-----+----+------------------------------------------+
|89727|true|[2017-02-10 00:00:00, 2017-02-09 00:00:00]|
|76325|true|[2017-02-10 00:00:00, 2017-02-09 00:00:00]|
|80115|true|[2017-02-10 00:00:00, 2017-02-09 00:00:00]|
|89781|true|[2017-02-10 00:00:00, 2017-02-09 00:00:00]|
|76411|true|[2017-02-10 00:00:00, 2017-02-09 00:00:00]|
+-----+----+------------------------------------------+
only showing top 5 rows
500
虽然它应该是一个空数据框,因为所有数组都应该针对所有 ID 进行排序。
好的,问题仍未解决,但我找到了一个简单的解决方法,以防有人因同一问题而卡住:
关键是要颠倒数组的第一个和最后一个位置。在日期数组上,这可以通过使用 spark 2.4 中引入的 array_sort 函数进行排序来完成。要对消耗数组执行重新排序,我们需要使用 udf。
invert_last = F.udf(lambda vector: [vector[-1]] + vector[:-1], ArrayType(DoubleType()))
test = grouped_df.withColumn('error', (F.size('date') > 1) & (F.col('date').getItem(0) >
F.col('date').getItem(F.size('date') - 1))).withColumn('date', F.when(F.col('error'),
F.array_sort(F.col('date'))).otherwise(F.col('date'))).withColumn('consumption',
F.when(F.col('error'), invert_last(F.col('consumption'))).otherwise(F.col('consumption'))).drop('error')
干杯。