在 Spark Structured Streaming 中外部连接两个数据集(不是 DataFrames)
Outer join two Datasets (not DataFrames) in Spark Structured Streaming
我有一些代码可以连接两个流 DataFrames
并输出到控制台。
val dataFrame1 =
df1Input.withWatermark("timestamp", "40 seconds").as("A")
val dataFrame2 =
df2Input.withWatermark("timestamp", "40 seconds").as("B")
val finalDF: DataFrame = dataFrame1.join(dataFrame2,
expr(
"A.id = B.id" +
" AND " +
"B.timestamp >= A.timestamp " +
" AND " +
"B.timestamp <= A.timestamp + interval 1 hour")
, joinType = "leftOuter")
finalDF.writeStream.format("console").start().awaitTermination()
我现在想要的是重构这部分以使用 Datasets
,这样我就可以进行一些 compile-time
检查。
所以我尝试的方法非常简单:
val finalDS: Dataset[(A,B)] = dataFrame1.as[A].joinWith(dataFrame2.as[B],
expr(
"A.id = B.id" +
" AND " +
"B.timestamp >= A.timestamp " +
" AND " +
"B.timestamp <= A.timestamp + interval 1 hour")
, joinType = "leftOuter")
finalDS.writeStream.format("console").start().awaitTermination()
但是,这会产生以下错误:
org.apache.spark.sql.AnalysisException: Stream-stream outer join between two streaming DataFrame/Datasets is not supported without a watermark in the join keys, or a watermark on the nullable side and an appropriate range condition;;
可以看到,join
代码没有变化,所以两边都有水印,还有范围条件。唯一的变化是使用 Dataset
API 而不是 DataFrame
.
还有,我用inner的时候没问题join
:
val finalDS: Dataset[(A,B)] = dataFrame1.as[A].joinWith(dataFrame2.as[B],
expr(
"A.id = B.id" +
" AND " +
"B.timestamp >= A.timestamp " +
" AND " +
"B.timestamp <= A.timestamp + interval 1 hour")
)
finalDS.writeStream.format("console").start().awaitTermination()
有人知道怎么会这样吗?
好吧,当您使用 joinWith
方法而不是 join
时,您依赖于不同的实现,并且此实现似乎不支持 leftOuter join 进行流式传输数据集。
您可以查看官方文档的 outer joins with watermarking 部分。未使用 join
方法 joinWith
。请注意,结果类型将为 DataFrame
。这意味着您很可能必须手动映射字段
val finalDS = dataFrame1.as[A].join(dataFrame2.as[B],
expr(
"A.key = B.key" +
" AND " +
"B.timestamp >= A.timestamp " +
" AND " +
"B.timestamp <= A.timestamp + interval 1 hour"),
joinType = "leftOuter").select(/* useful fields */).as[C]
如果你在这里了解为什么会出现这个异常
org.apache.spark.sql.AnalysisException: Stream-stream outer join between two streaming DataFrame/Datasets is not supported without a watermark in the join keys, or a watermark on the nullable side and an appropriate range condition;;
虽然您已将水印引入连接并且 Spark 3 已经支持流连接,但您可能已经在连接后添加了水印,但 Spark 希望您在每个流的连接之前添加水印!
我有一些代码可以连接两个流 DataFrames
并输出到控制台。
val dataFrame1 =
df1Input.withWatermark("timestamp", "40 seconds").as("A")
val dataFrame2 =
df2Input.withWatermark("timestamp", "40 seconds").as("B")
val finalDF: DataFrame = dataFrame1.join(dataFrame2,
expr(
"A.id = B.id" +
" AND " +
"B.timestamp >= A.timestamp " +
" AND " +
"B.timestamp <= A.timestamp + interval 1 hour")
, joinType = "leftOuter")
finalDF.writeStream.format("console").start().awaitTermination()
我现在想要的是重构这部分以使用 Datasets
,这样我就可以进行一些 compile-time
检查。
所以我尝试的方法非常简单:
val finalDS: Dataset[(A,B)] = dataFrame1.as[A].joinWith(dataFrame2.as[B],
expr(
"A.id = B.id" +
" AND " +
"B.timestamp >= A.timestamp " +
" AND " +
"B.timestamp <= A.timestamp + interval 1 hour")
, joinType = "leftOuter")
finalDS.writeStream.format("console").start().awaitTermination()
但是,这会产生以下错误:
org.apache.spark.sql.AnalysisException: Stream-stream outer join between two streaming DataFrame/Datasets is not supported without a watermark in the join keys, or a watermark on the nullable side and an appropriate range condition;;
可以看到,join
代码没有变化,所以两边都有水印,还有范围条件。唯一的变化是使用 Dataset
API 而不是 DataFrame
.
还有,我用inner的时候没问题join
:
val finalDS: Dataset[(A,B)] = dataFrame1.as[A].joinWith(dataFrame2.as[B],
expr(
"A.id = B.id" +
" AND " +
"B.timestamp >= A.timestamp " +
" AND " +
"B.timestamp <= A.timestamp + interval 1 hour")
)
finalDS.writeStream.format("console").start().awaitTermination()
有人知道怎么会这样吗?
好吧,当您使用 joinWith
方法而不是 join
时,您依赖于不同的实现,并且此实现似乎不支持 leftOuter join 进行流式传输数据集。
您可以查看官方文档的 outer joins with watermarking 部分。未使用 join
方法 joinWith
。请注意,结果类型将为 DataFrame
。这意味着您很可能必须手动映射字段
val finalDS = dataFrame1.as[A].join(dataFrame2.as[B],
expr(
"A.key = B.key" +
" AND " +
"B.timestamp >= A.timestamp " +
" AND " +
"B.timestamp <= A.timestamp + interval 1 hour"),
joinType = "leftOuter").select(/* useful fields */).as[C]
如果你在这里了解为什么会出现这个异常
org.apache.spark.sql.AnalysisException: Stream-stream outer join between two streaming DataFrame/Datasets is not supported without a watermark in the join keys, or a watermark on the nullable side and an appropriate range condition;;
虽然您已将水印引入连接并且 Spark 3 已经支持流连接,但您可能已经在连接后添加了水印,但 Spark 希望您在每个流的连接之前添加水印!