AnalysisException:解决连接中的冲突引用时失败:'Join Inner

AnalysisException: Failure when resolving conflicting references in Join: 'Join Inner

我有这个简单的代码

var count = event_stream
      .groupBy("value").count()

event_stream.join(count,"value").printSchema() //get error on this line

event_stream和计数模式如下

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)

root
 |-- value: binary (nullable = true)
 |-- count: long (nullable = false)

两个问题

为什么会出现此错误以及如何解决?

为什么 groupby.count 删除所有其他列?

错误如下

Exception in thread "main" org.apache.spark.sql.AnalysisException: 
Failure when resolving conflicting references in Join:
'Join Inner
:- AnalysisBarrier
:     +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@7f2c57fe, kafka, Map(startingOffsets -> latest, failOnDataLoss -> false, subscribe -> events-identification-carrier, kafka.bootstrap.servers -> svc-kafka-pre-c1-01.jamba.net:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@3dbd7107,kafka,List(),None,List(),None,Map(startingOffsets -> latest, failOnDataLoss -> false, subscribe -> events-identification-carrier, kafka.bootstrap.servers -> svc-kafka-pre-c1-01.jamba.net:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
+- AnalysisBarrier
      +- Aggregate [value#8], [value#8, count(1) AS count#46L]
         +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@7f2c57fe, kafka, Map(startingOffsets -> latest, failOnDataLoss -> false, subscribe -> events-identification-carrier, kafka.bootstrap.servers -> svc-kafka-pre-c1-01.jamba.net:9092), [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@3dbd7107,kafka,List(),None,List(),None,Map(startingOffsets -> latest, failOnDataLoss -> false, subscribe -> events-identification-carrier, kafka.bootstrap.servers -> svc-kafka-pre-c1-01.jamba.net:9092),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]

Conflicting attributes: value#8

编辑:是的! 更改列的名称有效。 但是现在,如果我使用连接,我必须使用 OutputMode.Append 为此,我需要将 Watermarks 添加到流中.

我想要的是在 resultDF 中提取计数和主题(从上面打印的模式)并将其写入某个 Sink。

两个问题

  1. 还有其他way/better方法吗?
  2. 我可以像 count() 这样做多个聚合,然后再添加另一个字符串类型的列,即主题是这种情况吗?

为什么会出现此错误以及如何解决?

我认为您收到此错误是因为最终连接的架构包含两个值字段,连接的每一侧各有一个。要解决此问题,请重命名两个连接流之一的 "value" 字段,如下所示:

var count = event_stream.
    groupBy("value").count().
    withColumnRenamed("value", "join_id")

event_stream.join(count, $"value" === $"join_id").
    drop("join_id").  
    printSchema()

为什么 groupby.count 删除所有其他列?

groupBy 操作基本上是将您的字段分成两个列表。用作键的字段列表,以及要聚合的字段列表。关键字段仅按原样显示在最终结果中,但不在列表中的任何字段都需要定义聚合操作才能显示在结果中。否则 spark 无法知道您想如何组合该字段的多个值!你想数一数吗?你想要最大值吗?您想查看所有不同的值吗?要指定如何汇总字段,您可以在 .agg(..) 调用中定义它。

示例:

val input = Seq(
    (1, "Bob", 4),
    (1, "John", 5)
).toDF("key", "name", "number")

input.groupBy("key").
    agg(collect_set("name") as "names",
        max("number") as "maxnum").
    show

+---+-----------+------+
|key|name       |maxnum|
+---+-----------+------+
|  1|[Bob, John]|     5|
+---+-----------+------+

错误原因是用于连接的列名。 您可以使用像 .

这样的操作
var count = event_stream
      .groupBy("value").count()

event_stream.join(count,Seq("value"))