Spark `DataFrame` 的 `unionAll` 出了什么问题?

What is going wrong with `unionAll` of Spark `DataFrame`?

使用 Spark 1.5.0 并给出以下代码,我希望 unionAll 根据列名联合 DataFrames。在代码中,我使用一些 FunSuite 来传递 SparkContext sc:

object Entities {

  case class A (a: Int, b: Int)
  case class B (b: Int, a: Int)

  val as = Seq(
    A(1,3),
    A(2,4)
  )

  val bs = Seq(
    B(5,3),
    B(6,4)
  )
}

class UnsortedTestSuite extends SparkFunSuite {

  configuredUnitTest("The truth test.") { sc =>
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._
    val aDF = sc.parallelize(Entities.as, 4).toDF
    val bDF = sc.parallelize(Entities.bs, 4).toDF
    aDF.show()
    bDF.show()
    aDF.unionAll(bDF).show
  }
}

输出:

+---+---+
|  a|  b|
+---+---+
|  1|  3|
|  2|  4|
+---+---+

+---+---+
|  b|  a|
+---+---+
|  5|  3|
|  6|  4|
+---+---+

+---+---+
|  a|  b|
+---+---+
|  1|  3|
|  2|  4|
|  5|  3|
|  6|  4|
+---+---+

为什么结果包含 混合 "b" 和 "a" 列,而不是根据列名对齐列?听起来像是一个 严重的 错误!?

正如 SPARK-9813 中所讨论的,似乎只要数据类型和列数跨帧相同,unionAll 操作就应该有效。请参阅评论以进行更多讨论。

它看起来根本不像是一个错误。您看到的是标准的 SQL 行为,每个主要的 RDMBS,包括 PostgreSQL, MySQL, Oracle and MS SQL 的行为都完全相同。您会发现 SQL Fiddle 个与名称相关联的示例。

引用PostgreSQL manual:

In order to calculate the union, intersection, or difference of two queries, the two queries must be "union compatible", which means that they return the same number of columns and the corresponding columns have compatible data types

列名,不包括集合操作中的第一个 table,将被简单地忽略。

此行为直接来自关系代数,其中基本构建块是元组。由于元组是有序的,因此两组元组的联合等效于(忽略重复处理)您在此处获得的输出。

如果您想使用姓名进行匹配,您可以这样做

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.col

def unionByName(a: DataFrame, b: DataFrame): DataFrame = {
  val columns = a.columns.toSet.intersect(b.columns.toSet).map(col).toSeq
  a.select(columns: _*).unionAll(b.select(columns: _*))
}

要同时检查名称和类型,只需将 columns 替换为:

a.dtypes.toSet.intersect(b.dtypes.toSet).map{case (c, _) => col(c)}.toSeq

此问题已在 spark2.3 中得到修复。他们正在数据集中添加对 unionByName 的支持。

https://issues.apache.org/jira/browse/SPARK-21043

没有issues/bugs - 如果你仔细观察你的情况class B 那么你就会清楚。 Case Class A --> 你提到了顺序 (a,b),并且 案例 Class B --> 你提到了顺序 (b,a) ---> 这是按照顺序

预期的

case classA (a: Int, b: Int) case class B (b: Int, a: Int)

谢谢, 苏布

使用 unionByName:

文档摘录:

def unionByName(other: Dataset[T]): Dataset[T]

The difference between this function and union is that this function resolves columns by name (not by position):

val df1 = Seq((1, 2, 3)).toDF("col0", "col1", "col2")
val df2 = Seq((4, 5, 6)).toDF("col1", "col2", "col0")
df1.union(df2).show

// output:
// +----+----+----+
// |col0|col1|col2|
// +----+----+----+
// |   1|   2|   3|
// |   4|   5|   6|
// +----+----+----+