Spark SQL - 在连接和 groupBy 后获取重复行

SparkSQL - got duplicate rows after join & groupBy

我有 2 个数据框,其列如下所示。
注意:第 uid 不是唯一键 ,并且数据框中存在具有相同 uid 的重复行。

val df1 = spark.read.parquet(args(0)).drop("sv")
val df2 = spark.read.parquet(args(1))

scala> df1.orderBy("uid").show

+----+----+---+
| uid| hid| sv|
+----+----+---+
|uid1|hid2| 10|
|uid1|hid1| 10|
|uid1|hid3| 10|
|uid2|hid1|  2|
|uid3|hid2| 10|
|uid4|hid2|  3|
|uid5|hid3|  5|
+----+----+---+

scala> df2.orderBy("uid").show

+----+----+---+
| uid| pid| sv|
+----+----+---+
|uid1|pid2|  2|
|uid1|pid1|  1|
|uid2|pid1|  2|
|uid3|pid1|  3|
|uid3|pidx|999|
|uid3|pid2|  4|
|uidx|pid1|  2|
+----+----+---+

scala> df1.drop("sv")
  .join(df2, "uid")
  .groupBy("hid", "pid")
  .agg(count("*") as "xcnt", sum("sv") as "xsum", avg("sv") as "xavg")
  .orderBy("hid").show

+----+----+----+----+-----+
| hid| pid|xcnt|xsum| xavg|
+----+----+----+----+-----+
|hid1|pid1|   2|   3|  1.5|
|hid1|pid2|   1|   2|  2.0|
|hid2|pid2|   2|   6|  3.0|
|hid2|pidx|   1| 999|999.0|
|hid2|pid1|   2|   4|  2.0|
|hid3|pid1|   1|   1|  1.0|
|hid3|pid2|   1|   2|  2.0|
+----+----+----+----+-----+

在这个演示案例中,一切看起来都不错。

但是当我对生产大数据应用相同的操作时,最终输出包含 许多重复行(相同的 (hid, pid) 对).
我虽然 groupBy 运算符会像 select distinct hid, pid from ...,但显然不是。

那我操作有什么问题吗?我应该按 hid, pid 对数据框进行重新分区吗?
谢谢!

-- 更新
如果我在加入数据帧后添加 .drop("uid"),那么最终输出中会遗漏一些行。

scala> df1.drop("sv")
  .join(df2, "uid").drop("uid")
  .groupBy("hid", "pid")
  .agg(count("*") as "xcnt", sum("sv") as "xsum", avg("sv") as "xavg")
  .orderBy("hid").show

老实说,我认为数据有问题,而不是代码。当然,如果 pidhid 确实不同(我之前在数据中看到过一些流氓西里尔符号),则不应有任何重复项。

要调试此问题,您可以尝试查看 'uid' 和 sv 值的哪些组合代表每个重复行。

df1.drop( "sv" )
  .join(df2, "uid")
  .groupBy( "hid", "pid" )
  .agg( collect_list( "uid" ), collect_list( "sv" ) )
  .orderBy( "hid" )
  .show

之后,您将有一些起点来评估您的数据。或者,如果 uid(和 'sv')的列表相同,请提交错误。

我想我可能找到了根本原因。

这可能是由 AWS S3 一致性模型引起的。

背景是,我提交了2个Spark作业来创建2个表,并提交了第三个任务来连接两个表(我将它们拆分以防万一其中任何一个失败并且我不需要重新运行他们)。
我把这 3 spark-submit 按顺序放在 shell 脚本 运行ning 中,得到的结果是重复的行。
当我重新运行刚才的最后一个工作时,结果似乎还不错。