Apache spark 通过关系 DataFrame 将 DataFrame 加入自身会产生空结果

Apache spark joining a DataFrame to itself via relation DataFrame yields empty result

我在使用 Apache Spark(使用 Scala API)时遇到了一个奇怪的问题。有两个 DataFrame 对象,我们称它们为 beansrelation.

  1. beans 数据框由两列组成,名为 iddata。考虑到所有 id 都是唯一的,并且 data 包含一些 action一个动作的目标
  2. relation DataFrame 定义了 actions 和它们的 targets 之间的关系。它由两列组成:actionIdtargetId

(查看下面的代码片段以查看 DataFrame 对象的 table 表示)

基本上,我试图将 beans 别名为两个新的 DataFrame 对象:actionstargets 然后通过 relation DataFrame

加入他们

下面是一些代码来说明正在发生的事情:

//define sql context, using 
val sqlContext = new SQLContext(sparkContext)

// ...

// Produce the following DataFrame objects:
// beans:                   relation:
// +--------+--------+      +----------+----------+
// |   id   |  data  |      | actionId | targetId |
// +--------+--------+      +----------+----------+
// |   a    |  save  |      |    a     |     1    |
// +--------+--------+      +----------+----------+
// |   b    | delete |      |    b     |     2    |
// +--------+--------+      +----------+----------+
// |   c    |  read  |      |    c     |     3    |
// +--------+--------+      +----------+----------+
// |   1    |  file  |
// +--------+--------+
// |   2    |   os   |
// +--------+--------+
// |   3    |  book  |
// +--------+--------+
case class Bean(id: String, data: String)
case class Relation(actionId: String, targetId: String)
val beans = sqlContext.createDataFrame(
    Bean("a", "save") :: Bean("b", "delete") :: Bean("c", "read") ::
    Bean("1", "file") :: Bean("2", "os") :: Bean("3", "book") :: Nil
  )
val relation = sqlContext.createDataFrame(
    Relation("a", "1") :: Relation("b", "2") :: Relation("c", "3") :: Nil
  )


// alias beans as "actions" and "targets" to avoid ambiguity
val actions = beans as "actions"
val targets = beans as "targets"
// join actions and targets via relation
actions.join(relation, actions("id") === relation("actionId"))
        .join(targets, targets("id") === relation("targetId"))
        .select(actions("id") as "actionId", targets("id") as "targetId",
                 actions("data") as "action", targets("data") as "target")
        .show()

此片段的期望输出是

// desired output
// +----------+----------+--------+--------+
// | actionId | targetId | action | target |
// +----------+----------+--------+--------+
// |    a     |    1     |  save  |  file  |
// +----------+----------+--------+--------+
// |    b     |    2     | delete |   os   |
// +----------+----------+--------+--------+
// |    c     |    3     |  read  |  book  |
// +----------+----------+--------+--------+

但是,真实的(奇怪的)输出是一个空的 DataFrame

+--------+--------+------+------+
|actionId|targetId|action|target|
+--------+--------+------+------+
+--------+--------+------+------+

我曾怀疑将 DataFrame 与其自身连接存在问题,但 中的示例证明这种怀疑是错误的。

我正在使用 Spark 1.4.1 和 Scala 2.10.4,但在 Spark 1.5.1 和 Scala 2.11.7 上得到了相同的结果

更改 DataFrame 对象的架构不是一个选项。有什么建议吗?

解决方案

参考。如果您收到这样的错误消息

error: value $ is not a member of StringContext
              actions.join(relation, $"actions.id" === $"actionId")
                                     ^

一定要加上下面的语句

import sqlContext.implicits._

解决方案

我会把它分成两个阶段,所以:

val beans = sqlContext.createDataFrame(
  Bean("a", "save") :: 
  Bean("b", "delete") :: 
  Bean("c", "read") ::
  Bean("1", "file") :: 
  Bean("2", "os") :: 
  Bean("3", "book") :: 
  Nil
)
val relation = sqlContext.createDataFrame(
  Relation("a", "1") :: 
  Relation("b", "2") :: 
  Relation("c", "3") :: 
  Nil
)

// "add" action
val step1 = beans.join(relation, beans("id") === relation("actionId"))
  .select(
    relation("actionId"), 
    relation("targetId"), 
    beans("data").as("action")
  )

// "add" target column
val result = step1.join( beans, beans("id") === relation("targetId"))
  .select( 
    step1("actionId"), 
    step1("targetId"), 
    step1("action"), 
    beans("data").as("target")
)

result.show

备注

不过,将不同的豆子 ("a"、"b"、"c") 放在同一个 table 作为 ("1", "2", "3")

您在此处所做的与您链接的示例之间存在细微差别。在链接的答案中,我直接使用 Column 对象,这里你在 DataFrame 上使用 apply 方法。要查看差异,只需在 REPL 中同时键入:

scala> actions("actions.id")
res59: org.apache.spark.sql.Column = id

scala> col("actions.id")
res60: org.apache.spark.sql.Column = actions.id

要正确识别别名,您必须直接使用 Column 对象,否则别名将被简单地删除。这意味着您需要这样的查询:

actions.join(relation, $"actions.id" === $"actionId")
  .join(targets, $"targets.id" === $"targetId")

import org.apache.spark.sql.functions.col 

actions.join(relation, col("actions.id") === col("actionId"))
  .join(targets, col("targets.id") === col("targetId"))

让它发挥作用。当然,在 RHS 上使用 col 在这里是完全可选的。您可以像以前一样使用 apply

如果您更喜欢使用 apply,您可以重命名连接列:

val targets = beans.withColumnRenamed("id", "_targetId")
val actions = beans.withColumnRenamed("id", "_actionId")

actions.join(relation, actions("_actionId") === relation("actionId"))
  .join(targets, targets("_targetId") === relation("targetId"))