spark structured streaming 将聚合数据框连接到数据框
spark structured streaming joining aggregate dataframe to dataframe
我有一个流式数据帧,它可以在某些时候看起来像:
+--------------------+--------------------+
| owner| fruits|
+--------------------+--------------------+
|Brian | apple|
Brian | pear |
Brian | date|
Brian | avocado|
Bob | avocado|
Bob | apple|
........
+--------------------+--------------------+
我执行了 groupBy,agg collect_list 来清理。
val myFarmDF = farmDF.withWatermark("timeStamp", "1 seconds").groupBy("fruits").agg(collect_list(col("fruits")) as "fruitsA")
输出是每个所有者的一行和每个水果的数组。
我现在想将这个清理过的数组加入到原始流数据帧中,删除 fruits col 并只包含 fruitsA 列
val joinedDF = farmDF.join(myFarmDF, "owner").drop("fruits")
这似乎在我脑海中起作用,但 spark 似乎并不同意。
我得到一个
Failure when resolving conflicting references in Join:
'Join Inner
...
+- AnalysisBarrier
+- Aggregate [name#17], [name#17, collect_list(fruits#61, 0, 0) AS fruitA#142]
当我把所有东西都变成静态数据帧时,它工作得很好。这在流媒体上下文中是不可能的吗?
您是否尝试过重命名列名?还有类似的问题https://issues.apache.org/jira/browse/SPARK-19860
我有一个流式数据帧,它可以在某些时候看起来像:
+--------------------+--------------------+
| owner| fruits|
+--------------------+--------------------+
|Brian | apple|
Brian | pear |
Brian | date|
Brian | avocado|
Bob | avocado|
Bob | apple|
........
+--------------------+--------------------+
我执行了 groupBy,agg collect_list 来清理。
val myFarmDF = farmDF.withWatermark("timeStamp", "1 seconds").groupBy("fruits").agg(collect_list(col("fruits")) as "fruitsA")
输出是每个所有者的一行和每个水果的数组。 我现在想将这个清理过的数组加入到原始流数据帧中,删除 fruits col 并只包含 fruitsA 列
val joinedDF = farmDF.join(myFarmDF, "owner").drop("fruits")
这似乎在我脑海中起作用,但 spark 似乎并不同意。
我得到一个
Failure when resolving conflicting references in Join:
'Join Inner
...
+- AnalysisBarrier
+- Aggregate [name#17], [name#17, collect_list(fruits#61, 0, 0) AS fruitA#142]
当我把所有东西都变成静态数据帧时,它工作得很好。这在流媒体上下文中是不可能的吗?
您是否尝试过重命名列名?还有类似的问题https://issues.apache.org/jira/browse/SPARK-19860