AnalysisException:解决连接中的冲突引用时失败:'Join Inner
AnalysisException: Failure when resolving conflicting references in Join: 'Join Inner
我有这个简单的代码
var count = event_stream
.groupBy("value").count()
event_stream.join(count,"value").printSchema() //get error on this line
event_stream和计数模式如下
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)
root
|-- value: binary (nullable = true)
|-- count: long (nullable = false)
两个问题
为什么会出现此错误以及如何解决?
为什么 groupby.count 删除所有其他列?
错误如下
Exception in thread "main" org.apache.spark.sql.AnalysisException:
Failure when resolving conflicting references in Join:
'Join Inner
:- AnalysisBarrier
: +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@7f2c57fe, kafka, Map(startingOffsets -> latest, failOnDataLoss -> false, subscribe -> events-identification-carrier, kafka.bootstrap.servers -> svc-kafka-pre-c1-01.jamba.net:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@3dbd7107,kafka,List(),None,List(),None,Map(startingOffsets -> latest, failOnDataLoss -> false, subscribe -> events-identification-carrier, kafka.bootstrap.servers -> svc-kafka-pre-c1-01.jamba.net:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
+- AnalysisBarrier
+- Aggregate [value#8], [value#8, count(1) AS count#46L]
+- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@7f2c57fe, kafka, Map(startingOffsets -> latest, failOnDataLoss -> false, subscribe -> events-identification-carrier, kafka.bootstrap.servers -> svc-kafka-pre-c1-01.jamba.net:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@3dbd7107,kafka,List(),None,List(),None,Map(startingOffsets -> latest, failOnDataLoss -> false, subscribe -> events-identification-carrier, kafka.bootstrap.servers -> svc-kafka-pre-c1-01.jamba.net:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
Conflicting attributes: value#8
编辑:是的! 更改列的名称有效。
但是现在,如果我使用连接,我必须使用 OutputMode.Append 为此,我需要将 Watermarks 添加到流中.
我想要的是在 resultDF 中提取计数和主题(从上面打印的模式)并将其写入某个 Sink。
两个问题
- 还有其他way/better方法吗?
- 我可以像 count() 这样做多个聚合,然后再添加另一个字符串类型的列,即主题是这种情况吗?
为什么会出现此错误以及如何解决?
我认为您收到此错误是因为最终连接的架构包含两个值字段,连接的每一侧各有一个。要解决此问题,请重命名两个连接流之一的 "value" 字段,如下所示:
var count = event_stream.
groupBy("value").count().
withColumnRenamed("value", "join_id")
event_stream.join(count, $"value" === $"join_id").
drop("join_id").
printSchema()
为什么 groupby.count 删除所有其他列?
groupBy
操作基本上是将您的字段分成两个列表。用作键的字段列表,以及要聚合的字段列表。关键字段仅按原样显示在最终结果中,但不在列表中的任何字段都需要定义聚合操作才能显示在结果中。否则 spark 无法知道您想如何组合该字段的多个值!你想数一数吗?你想要最大值吗?您想查看所有不同的值吗?要指定如何汇总字段,您可以在 .agg(..) 调用中定义它。
示例:
val input = Seq(
(1, "Bob", 4),
(1, "John", 5)
).toDF("key", "name", "number")
input.groupBy("key").
agg(collect_set("name") as "names",
max("number") as "maxnum").
show
+---+-----------+------+
|key|name |maxnum|
+---+-----------+------+
| 1|[Bob, John]| 5|
+---+-----------+------+
错误原因是用于连接的列名。
您可以使用像 .
这样的操作
var count = event_stream
.groupBy("value").count()
event_stream.join(count,Seq("value"))
我有这个简单的代码
var count = event_stream
.groupBy("value").count()
event_stream.join(count,"value").printSchema() //get error on this line
event_stream和计数模式如下
root
|-- key: binary (nullable = true)
|-- value: binary (nullable = true)
|-- topic: string (nullable = true)
|-- partition: integer (nullable = true)
|-- offset: long (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- timestampType: integer (nullable = true)
root
|-- value: binary (nullable = true)
|-- count: long (nullable = false)
两个问题
为什么会出现此错误以及如何解决?
为什么 groupby.count 删除所有其他列?
错误如下
Exception in thread "main" org.apache.spark.sql.AnalysisException:
Failure when resolving conflicting references in Join:
'Join Inner
:- AnalysisBarrier
: +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@7f2c57fe, kafka, Map(startingOffsets -> latest, failOnDataLoss -> false, subscribe -> events-identification-carrier, kafka.bootstrap.servers -> svc-kafka-pre-c1-01.jamba.net:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@3dbd7107,kafka,List(),None,List(),None,Map(startingOffsets -> latest, failOnDataLoss -> false, subscribe -> events-identification-carrier, kafka.bootstrap.servers -> svc-kafka-pre-c1-01.jamba.net:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
+- AnalysisBarrier
+- Aggregate [value#8], [value#8, count(1) AS count#46L]
+- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@7f2c57fe, kafka, Map(startingOffsets -> latest, failOnDataLoss -> false, subscribe -> events-identification-carrier, kafka.bootstrap.servers -> svc-kafka-pre-c1-01.jamba.net:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@3dbd7107,kafka,List(),None,List(),None,Map(startingOffsets -> latest, failOnDataLoss -> false, subscribe -> events-identification-carrier, kafka.bootstrap.servers -> svc-kafka-pre-c1-01.jamba.net:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
Conflicting attributes: value#8
编辑:是的! 更改列的名称有效。 但是现在,如果我使用连接,我必须使用 OutputMode.Append 为此,我需要将 Watermarks 添加到流中.
我想要的是在 resultDF 中提取计数和主题(从上面打印的模式)并将其写入某个 Sink。
两个问题
- 还有其他way/better方法吗?
- 我可以像 count() 这样做多个聚合,然后再添加另一个字符串类型的列,即主题是这种情况吗?
为什么会出现此错误以及如何解决?
我认为您收到此错误是因为最终连接的架构包含两个值字段,连接的每一侧各有一个。要解决此问题,请重命名两个连接流之一的 "value" 字段,如下所示:
var count = event_stream.
groupBy("value").count().
withColumnRenamed("value", "join_id")
event_stream.join(count, $"value" === $"join_id").
drop("join_id").
printSchema()
为什么 groupby.count 删除所有其他列?
groupBy
操作基本上是将您的字段分成两个列表。用作键的字段列表,以及要聚合的字段列表。关键字段仅按原样显示在最终结果中,但不在列表中的任何字段都需要定义聚合操作才能显示在结果中。否则 spark 无法知道您想如何组合该字段的多个值!你想数一数吗?你想要最大值吗?您想查看所有不同的值吗?要指定如何汇总字段,您可以在 .agg(..) 调用中定义它。
示例:
val input = Seq(
(1, "Bob", 4),
(1, "John", 5)
).toDF("key", "name", "number")
input.groupBy("key").
agg(collect_set("name") as "names",
max("number") as "maxnum").
show
+---+-----------+------+
|key|name |maxnum|
+---+-----------+------+
| 1|[Bob, John]| 5|
+---+-----------+------+
错误原因是用于连接的列名。 您可以使用像 .
这样的操作var count = event_stream
.groupBy("value").count()
event_stream.join(count,Seq("value"))