为什么加入使用在水印 20 秒后发送的行?
Why does join use rows that were sent after watermark of 20 seconds?
我正在使用水印连接两个流,如下所示:
val order_wm = order_details.withWatermark("tstamp_trans", "20 seconds")
val invoice_wm = invoice_details.withWatermark("tstamp_trans", "20 seconds")
val join_df = order_wm
.join(invoice_wm, order_wm.col("s_order_id") === invoice_wm.col("order_id"))
我对上面代码的理解是,它将每个流保持 20 秒。之后,但是,当我现在提供一个流,20 秒后提供另一个流时,两者也会加入。似乎即使在水印完成之后,Spark 也会将数据保存在内存中。我什至在 45 秒后尝试过,它也加入了。
这让我对水印产生了困惑。
After it comes but, when I’m giving one stream now and the another after 20secs then also both are getting joined.
这是可能的,因为测量的时间不是事件到达的时间,而是水印字段内的时间,即 tstamp_trans
。您必须确保 tstamp_trans
中的最后一次是在将参与联接的行之后 20 秒。
换句话说,您必须在联接中执行以下附加步骤。
在两个输入上定义水印延迟,以便引擎知道输入可以延迟多长时间(类似于流聚合)
定义两个输入的事件时间约束,以便引擎可以确定何时不需要一个输入的旧行(即不满足时间约束)来匹配另一个输入.可以通过两种方式之一定义此约束。
时间范围加入条件(例如...JOIN ON leftTime BETWEEN rightTime AND rightTime + INTERVAL 1 HOUR),
在事件时间加入 windows(例如...加入 leftTimeWindow = rightTimeWindow)。
我正在使用水印连接两个流,如下所示:
val order_wm = order_details.withWatermark("tstamp_trans", "20 seconds")
val invoice_wm = invoice_details.withWatermark("tstamp_trans", "20 seconds")
val join_df = order_wm
.join(invoice_wm, order_wm.col("s_order_id") === invoice_wm.col("order_id"))
我对上面代码的理解是,它将每个流保持 20 秒。之后,但是,当我现在提供一个流,20 秒后提供另一个流时,两者也会加入。似乎即使在水印完成之后,Spark 也会将数据保存在内存中。我什至在 45 秒后尝试过,它也加入了。
这让我对水印产生了困惑。
After it comes but, when I’m giving one stream now and the another after 20secs then also both are getting joined.
这是可能的,因为测量的时间不是事件到达的时间,而是水印字段内的时间,即 tstamp_trans
。您必须确保 tstamp_trans
中的最后一次是在将参与联接的行之后 20 秒。
换句话说,您必须在联接中执行以下附加步骤。
在两个输入上定义水印延迟,以便引擎知道输入可以延迟多长时间(类似于流聚合)
定义两个输入的事件时间约束,以便引擎可以确定何时不需要一个输入的旧行(即不满足时间约束)来匹配另一个输入.可以通过两种方式之一定义此约束。
时间范围加入条件(例如...JOIN ON leftTime BETWEEN rightTime AND rightTime + INTERVAL 1 HOUR),
在事件时间加入 windows(例如...加入 leftTimeWindow = rightTimeWindow)。