Apache Beam 中的 afterwatermark 触发器和默认触发器有什么区别?

What is the difference between afterwatermark Trigger and Default Trigger in Apache Beam?

根据 Apache Beam 文档

The AfterWatermark trigger operates on event time. The AfterWatermark trigger emits the contents of a window after the watermark passes the end of the window, based on the timestamps attached to the data elements. The watermark is a global progress metric, and is Beam’s notion of input completeness within your pipeline at any given point. AfterWatermark only fires when the watermark passes the end of the window.

The default trigger for a PCollection is based on event time, and emits the results of the window when the Beam’s watermark passes the end of the window, and then fires each time late data arrives. However, if you are using both the default windowing configuration and the default trigger, the default trigger emits exactly once, and late data is discarded.

我尝试同时实现它们,并且使用固定 window 获得了相似的输出。

使用 Afterwatermark 触发器:

lines |'timestamp' >> beam.Map(get_timestamp)
           | 'window' >> beam.WindowInto(
            window.FixedWindows(20),
            trigger=trigger.AfterWatermark(),
            accumulation_mode=trigger.AccumulationMode.DISCARDING
        )
        | 'CountGlobally' >> beam.CombineGlobally(
                beam.combiners.CountCombineFn()
            ).without_defaults() 
        | 'printnbrarticles' >> beam.ParDo(PrintFn())
        | 'jsondumps' >> beam.Map(lambda x: json.dumps(x))
        | 'encode' >> beam.Map(lambda x: x.encode('utf-8'))
        | 'send_to_Pub/Sub' >> beam.io.WriteToPubSub(known_args.out_topic)
    )

使用默认触发器:

lines |'timestamp' >> beam.Map(get_timestamp)
           | 'window' >> beam.WindowInto(
            window.FixedWindows(20),
        )
        | 'CountGlobally' >> beam.CombineGlobally(
                beam.combiners.CountCombineFn()
            ).without_defaults() 
        | 'printnbrarticles' >> beam.ParDo(PrintFn())
        | 'jsondumps' >> beam.Map(lambda x: json.dumps(x))
        | 'encode' >> beam.Map(lambda x: x.encode('utf-8'))
        | 'send_to_Pub/Sub' >> beam.io.WriteToPubSub(known_args.out_topic)
    )

您没有看到差异,因为您没有延迟数据。如文档中所述,Afterwatermark 仅在水印之后触发一次,每次延迟数据到达时,在水印 A​​ND 之后调用默认触发器。

此外,借助 Afterwatermark 触发器,您可以配置额外的行为(和调用)以防早期数据(数据在 window 开启之前到达)或晚期数据(在水印之后到达)。

您无法使用默认触发器对其进行自定义。