Apache spark 通过关系 DataFrame 将 DataFrame 加入自身会产生空结果
Apache spark joining a DataFrame to itself via relation DataFrame yields empty result
我在使用 Apache Spark(使用 Scala API)时遇到了一个奇怪的问题。有两个 DataFrame 对象,我们称它们为 beans 和 relation.
- beans 数据框由两列组成,名为 id 和 data。考虑到所有 id 都是唯一的,并且 data 包含一些 action 或 一个动作的目标。
- relation DataFrame 定义了 actions 和它们的 targets 之间的关系。它由两列组成:actionId 和 targetId
(查看下面的代码片段以查看 DataFrame 对象的 table 表示)
基本上,我试图将 beans 别名为两个新的 DataFrame 对象:actions 和 targets 然后通过 relation DataFrame
加入他们
下面是一些代码来说明正在发生的事情:
//define sql context, using
val sqlContext = new SQLContext(sparkContext)
// ...
// Produce the following DataFrame objects:
// beans: relation:
// +--------+--------+ +----------+----------+
// | id | data | | actionId | targetId |
// +--------+--------+ +----------+----------+
// | a | save | | a | 1 |
// +--------+--------+ +----------+----------+
// | b | delete | | b | 2 |
// +--------+--------+ +----------+----------+
// | c | read | | c | 3 |
// +--------+--------+ +----------+----------+
// | 1 | file |
// +--------+--------+
// | 2 | os |
// +--------+--------+
// | 3 | book |
// +--------+--------+
case class Bean(id: String, data: String)
case class Relation(actionId: String, targetId: String)
val beans = sqlContext.createDataFrame(
Bean("a", "save") :: Bean("b", "delete") :: Bean("c", "read") ::
Bean("1", "file") :: Bean("2", "os") :: Bean("3", "book") :: Nil
)
val relation = sqlContext.createDataFrame(
Relation("a", "1") :: Relation("b", "2") :: Relation("c", "3") :: Nil
)
// alias beans as "actions" and "targets" to avoid ambiguity
val actions = beans as "actions"
val targets = beans as "targets"
// join actions and targets via relation
actions.join(relation, actions("id") === relation("actionId"))
.join(targets, targets("id") === relation("targetId"))
.select(actions("id") as "actionId", targets("id") as "targetId",
actions("data") as "action", targets("data") as "target")
.show()
此片段的期望输出是
// desired output
// +----------+----------+--------+--------+
// | actionId | targetId | action | target |
// +----------+----------+--------+--------+
// | a | 1 | save | file |
// +----------+----------+--------+--------+
// | b | 2 | delete | os |
// +----------+----------+--------+--------+
// | c | 3 | read | book |
// +----------+----------+--------+--------+
但是,真实的(奇怪的)输出是一个空的 DataFrame
+--------+--------+------+------+
|actionId|targetId|action|target|
+--------+--------+------+------+
+--------+--------+------+------+
我曾怀疑将 DataFrame 与其自身连接存在问题,但 中的示例证明这种怀疑是错误的。
我正在使用 Spark 1.4.1 和 Scala 2.10.4,但在 Spark 1.5.1 和 Scala 2.11.7 上得到了相同的结果
更改 DataFrame 对象的架构不是一个选项。有什么建议吗?
解决方案
参考。如果您收到这样的错误消息
error: value $ is not a member of StringContext
actions.join(relation, $"actions.id" === $"actionId")
^
一定要加上下面的语句
import sqlContext.implicits._
解决方案
我会把它分成两个阶段,所以:
val beans = sqlContext.createDataFrame(
Bean("a", "save") ::
Bean("b", "delete") ::
Bean("c", "read") ::
Bean("1", "file") ::
Bean("2", "os") ::
Bean("3", "book") ::
Nil
)
val relation = sqlContext.createDataFrame(
Relation("a", "1") ::
Relation("b", "2") ::
Relation("c", "3") ::
Nil
)
// "add" action
val step1 = beans.join(relation, beans("id") === relation("actionId"))
.select(
relation("actionId"),
relation("targetId"),
beans("data").as("action")
)
// "add" target column
val result = step1.join( beans, beans("id") === relation("targetId"))
.select(
step1("actionId"),
step1("targetId"),
step1("action"),
beans("data").as("target")
)
result.show
备注
不过,将不同的豆子 ("a"、"b"、"c") 放在同一个 table 作为 ("1", "2", "3")
您在此处所做的与您链接的示例之间存在细微差别。在链接的答案中,我直接使用 Column
对象,这里你在 DataFrame
上使用 apply
方法。要查看差异,只需在 REPL 中同时键入:
scala> actions("actions.id")
res59: org.apache.spark.sql.Column = id
scala> col("actions.id")
res60: org.apache.spark.sql.Column = actions.id
要正确识别别名,您必须直接使用 Column
对象,否则别名将被简单地删除。这意味着您需要这样的查询:
actions.join(relation, $"actions.id" === $"actionId")
.join(targets, $"targets.id" === $"targetId")
或
import org.apache.spark.sql.functions.col
actions.join(relation, col("actions.id") === col("actionId"))
.join(targets, col("targets.id") === col("targetId"))
让它发挥作用。当然,在 RHS 上使用 col
在这里是完全可选的。您可以像以前一样使用 apply
。
如果您更喜欢使用 apply
,您可以重命名连接列:
val targets = beans.withColumnRenamed("id", "_targetId")
val actions = beans.withColumnRenamed("id", "_actionId")
actions.join(relation, actions("_actionId") === relation("actionId"))
.join(targets, targets("_targetId") === relation("targetId"))
我在使用 Apache Spark(使用 Scala API)时遇到了一个奇怪的问题。有两个 DataFrame 对象,我们称它们为 beans 和 relation.
- beans 数据框由两列组成,名为 id 和 data。考虑到所有 id 都是唯一的,并且 data 包含一些 action 或 一个动作的目标。
- relation DataFrame 定义了 actions 和它们的 targets 之间的关系。它由两列组成:actionId 和 targetId
(查看下面的代码片段以查看 DataFrame 对象的 table 表示)
基本上,我试图将 beans 别名为两个新的 DataFrame 对象:actions 和 targets 然后通过 relation DataFrame
加入他们下面是一些代码来说明正在发生的事情:
//define sql context, using
val sqlContext = new SQLContext(sparkContext)
// ...
// Produce the following DataFrame objects:
// beans: relation:
// +--------+--------+ +----------+----------+
// | id | data | | actionId | targetId |
// +--------+--------+ +----------+----------+
// | a | save | | a | 1 |
// +--------+--------+ +----------+----------+
// | b | delete | | b | 2 |
// +--------+--------+ +----------+----------+
// | c | read | | c | 3 |
// +--------+--------+ +----------+----------+
// | 1 | file |
// +--------+--------+
// | 2 | os |
// +--------+--------+
// | 3 | book |
// +--------+--------+
case class Bean(id: String, data: String)
case class Relation(actionId: String, targetId: String)
val beans = sqlContext.createDataFrame(
Bean("a", "save") :: Bean("b", "delete") :: Bean("c", "read") ::
Bean("1", "file") :: Bean("2", "os") :: Bean("3", "book") :: Nil
)
val relation = sqlContext.createDataFrame(
Relation("a", "1") :: Relation("b", "2") :: Relation("c", "3") :: Nil
)
// alias beans as "actions" and "targets" to avoid ambiguity
val actions = beans as "actions"
val targets = beans as "targets"
// join actions and targets via relation
actions.join(relation, actions("id") === relation("actionId"))
.join(targets, targets("id") === relation("targetId"))
.select(actions("id") as "actionId", targets("id") as "targetId",
actions("data") as "action", targets("data") as "target")
.show()
此片段的期望输出是
// desired output
// +----------+----------+--------+--------+
// | actionId | targetId | action | target |
// +----------+----------+--------+--------+
// | a | 1 | save | file |
// +----------+----------+--------+--------+
// | b | 2 | delete | os |
// +----------+----------+--------+--------+
// | c | 3 | read | book |
// +----------+----------+--------+--------+
但是,真实的(奇怪的)输出是一个空的 DataFrame
+--------+--------+------+------+
|actionId|targetId|action|target|
+--------+--------+------+------+
+--------+--------+------+------+
我曾怀疑将 DataFrame 与其自身连接存在问题,但
我正在使用 Spark 1.4.1 和 Scala 2.10.4,但在 Spark 1.5.1 和 Scala 2.11.7 上得到了相同的结果
更改 DataFrame 对象的架构不是一个选项。有什么建议吗?
解决方案
参考
error: value $ is not a member of StringContext
actions.join(relation, $"actions.id" === $"actionId")
^
一定要加上下面的语句
import sqlContext.implicits._
解决方案
我会把它分成两个阶段,所以:
val beans = sqlContext.createDataFrame(
Bean("a", "save") ::
Bean("b", "delete") ::
Bean("c", "read") ::
Bean("1", "file") ::
Bean("2", "os") ::
Bean("3", "book") ::
Nil
)
val relation = sqlContext.createDataFrame(
Relation("a", "1") ::
Relation("b", "2") ::
Relation("c", "3") ::
Nil
)
// "add" action
val step1 = beans.join(relation, beans("id") === relation("actionId"))
.select(
relation("actionId"),
relation("targetId"),
beans("data").as("action")
)
// "add" target column
val result = step1.join( beans, beans("id") === relation("targetId"))
.select(
step1("actionId"),
step1("targetId"),
step1("action"),
beans("data").as("target")
)
result.show
备注
不过,将不同的豆子 ("a"、"b"、"c") 放在同一个 table 作为 ("1", "2", "3")
您在此处所做的与您链接的示例之间存在细微差别。在链接的答案中,我直接使用 Column
对象,这里你在 DataFrame
上使用 apply
方法。要查看差异,只需在 REPL 中同时键入:
scala> actions("actions.id")
res59: org.apache.spark.sql.Column = id
scala> col("actions.id")
res60: org.apache.spark.sql.Column = actions.id
要正确识别别名,您必须直接使用 Column
对象,否则别名将被简单地删除。这意味着您需要这样的查询:
actions.join(relation, $"actions.id" === $"actionId")
.join(targets, $"targets.id" === $"targetId")
或
import org.apache.spark.sql.functions.col
actions.join(relation, col("actions.id") === col("actionId"))
.join(targets, col("targets.id") === col("targetId"))
让它发挥作用。当然,在 RHS 上使用 col
在这里是完全可选的。您可以像以前一样使用 apply
。
如果您更喜欢使用 apply
,您可以重命名连接列:
val targets = beans.withColumnRenamed("id", "_targetId")
val actions = beans.withColumnRenamed("id", "_actionId")
actions.join(relation, actions("_actionId") === relation("actionId"))
.join(targets, targets("_targetId") === relation("targetId"))