Apache Beam 中累积 windows 和丢弃 windows 之间的区别?
Difference between Accumulating windows and Discarding windows in Apache Beam?
我这里有一个示例管道:
def print_windows(element, window=beam.DoFn.WindowParam, pane_info=beam.DoFn.PaneInfoParam, timestamp=beam.DoFn.TimestampParam):
print(window)
print(pane_info)
print(timestamp)
print(element)
print('-----------------')
options = PipelineOptions()
with beam.Pipeline(options=options) as p:
keyed_elements = [
('USA', {'score': 1, 'timestamp': 2}),
('USA', {'score': 2, 'timestamp': 4}),
('USA', {'score': 3, 'timestamp': 4}),
('USA', {'score': 4, 'timestamp': 5}),
('USA', {'score': 5, 'timestamp': 14}),
('USA', {'score': 6, 'timestamp': 17}),
]
elements = (
p
| beam.Create(keyed_elements)
| 'ConvertIntoUserEvents' >> beam.Map(lambda e: beam.window.TimestampedValue(e, e[1]['timestamp']))
| beam.Map(lambda e: (e[0], e[1]['score']))
)
results = (
elements
| "" >> beam.WindowInto(
beam.window.FixedWindows(10),
trigger=Repeatedly(AfterCount(2)),
accumulation_mode=beam.transforms.trigger.AccumulationMode.ACCUMULATING
)
| beam.CombinePerKey(beam.combiners.ToListCombineFn())
)
results | beam.ParDo(print_windows)
想法很简单——我想获取一些带时间戳的分数并将它们组合到一个列表中。我在看到 2 个元素后触发每个窗格。
如果我 运行 按原样,我得到:
[0.0, 10.0)
PaneInfo(first: True, last: False, timing: EARLY, index: 0, nonspeculative_index: -1)
Timestamp(9.999000)
('USA', [1, 2, 3, 4])
-----------------
[10.0, 20.0)
PaneInfo(first: True, last: False, timing: EARLY, index: 0, nonspeculative_index: -1)
Timestamp(19.999000)
('USA', [5, 6])
但是,如果我将累积模式更改为 DISCARDING,输出保持不变。我很困惑,因为根据我在高层次上的理解,ACCUMULATING 会输出如下窗格:
前 10 秒 [1, 2] ... [1, 2, 3, 4]
window,最后 10 秒 [5, 6]
window。
另一方面,DISCARDING 应该给出:
[1, 2] .. [3, 4]
然后 [5,6]
。为什么输出相同?
根据 Beam 概念,Window 可以包含 0 到 N 个窗格,这些窗格由应用程序代码中的触发器定义控制。
当触发器定义为 Accumulating
时,这意味着作为 Window 的一部分并根据触发器逻辑触发的任何值都将被保留并附加到新值作为和当新窗格被触发或 Window 关闭时。
当触发器定义为 Discarding
时,这意味着属于 Window 的一部分并根据触发器逻辑触发的任何值都将被丢弃,并且不可用于以下新窗格被解雇或 Window 关闭时。
在上面的示例中,如果将触发逻辑更改为以下,您可以观察到至少两个窗格:-
- 早
- ON_TIME
beam.transforms.trigger.AfterWatermark(early=AfterCount(2))
下方 ACCUMULATING
个窗格是行为
INFO:apache_beam.runners.portability.fn_api_runner:Running (CombinePerKey(ToListCombineFn)/GroupByKey/Read)+((CombinePerKey(ToListCombineFn)/Combine)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_26))
INFO:root:2020-05-24 14:10:00
INFO:root:2020-05-24 14:12:00
INFO:root:PaneInfo(first: True, last: False, timing: EARLY, index: 0, nonspeculative_index: -1)
INFO:root:Timestamp(1590329519.999000)
INFO:root:('USA', [{'score': 1, 'ts': 5}, {'score': 2, 'ts': 5}])
INFO:root:-----------------
INFO:root:2020-05-24 14:12:00
INFO:root:2020-05-24 14:14:00
INFO:root:PaneInfo(first: True, last: False, timing: EARLY, index: 0, nonspeculative_index: -1)
INFO:root:Timestamp(1590329639.999000)
INFO:root:('USA', [{'score': 5, 'ts': 105}, {'score': 4, 'ts': 60}, {'score': 6, 'ts': 105}, {'score': 3, 'ts': 60}])
INFO:root:-----------------
INFO:root:2020-05-24 14:10:00
INFO:root:2020-05-24 14:12:00
INFO:root:PaneInfo(first: False, last: True, timing: ON_TIME, index: 1, nonspeculative_index: 0)
INFO:root:Timestamp(1590329519.999000)
INFO:root:('USA', [{'score': 1, 'ts': 5}, {'score': 2, 'ts': 5}])
INFO:root:-----------------
INFO:root:2020-05-24 14:12:00
INFO:root:2020-05-24 14:14:00
INFO:root:PaneInfo(first: False, last: True, timing: ON_TIME, index: 1, nonspeculative_index: 0)
INFO:root:Timestamp(1590329639.999000)
INFO:root:('USA', [{'score': 5, 'ts': 105}, {'score': 4, 'ts': 60}, {'score': 6, 'ts': 105}, {'score': 3, 'ts': 60}])
INFO:root:-----------------
下面的 DISCARDING
窗格是行为
INFO:root:2020-05-24 14:12:00
INFO:root:2020-05-24 14:14:00
INFO:root:PaneInfo(first: True, last: False, timing: EARLY, index: 0, nonspeculative_index: -1)
INFO:root:Timestamp(1590329639.999000)
INFO:root:('USA', [{'score': 2, 'ts': 5}, {'score': 4, 'ts': 60}, {'score': 1, 'ts': 5}, {'score': 3, 'ts': 60}])
INFO:root:-----------------
INFO:root:2020-05-24 14:14:00
INFO:root:2020-05-24 14:16:00
INFO:root:PaneInfo(first: True, last: False, timing: EARLY, index: 0, nonspeculative_index: -1)
INFO:root:Timestamp(1590329759.999000)
INFO:root:('USA', [{'score': 5, 'ts': 105}, {'score': 6, 'ts': 105}])
INFO:root:-----------------
INFO:root:2020-05-24 14:12:00
INFO:root:2020-05-24 14:14:00
INFO:root:PaneInfo(first: False, last: True, timing: ON_TIME, index: 1, nonspeculative_index: 0)
INFO:root:Timestamp(1590329639.999000)
INFO:root:('USA', [])
INFO:root:-----------------
INFO:root:2020-05-24 14:14:00
INFO:root:2020-05-24 14:16:00
INFO:root:PaneInfo(first: False, last: True, timing: ON_TIME, index: 1, nonspeculative_index: 0)
INFO:root:Timestamp(1590329759.999000)
INFO:root:('USA', [])
INFO:root:-----------------
在 ACCUMULATING
的情况下,当达到水印并且 Window 关闭时,来自 EARLY
窗格的值将被保留,由 ON_TIME
窗格表示。
然而,在 DISCARDING
窗格的情况下,EARLY
窗格中的值被丢弃并且 ON_TIME
窗格为空。
在元素通过 Pub/Sub 流式传输的现实场景中,可以触发超过 1 个 EARLY 窗格。在模拟场景中,因为所有值都已经存在,所以它不能触发超过 1 个 EARLY
窗格。
我这里有一个示例管道:
def print_windows(element, window=beam.DoFn.WindowParam, pane_info=beam.DoFn.PaneInfoParam, timestamp=beam.DoFn.TimestampParam):
print(window)
print(pane_info)
print(timestamp)
print(element)
print('-----------------')
options = PipelineOptions()
with beam.Pipeline(options=options) as p:
keyed_elements = [
('USA', {'score': 1, 'timestamp': 2}),
('USA', {'score': 2, 'timestamp': 4}),
('USA', {'score': 3, 'timestamp': 4}),
('USA', {'score': 4, 'timestamp': 5}),
('USA', {'score': 5, 'timestamp': 14}),
('USA', {'score': 6, 'timestamp': 17}),
]
elements = (
p
| beam.Create(keyed_elements)
| 'ConvertIntoUserEvents' >> beam.Map(lambda e: beam.window.TimestampedValue(e, e[1]['timestamp']))
| beam.Map(lambda e: (e[0], e[1]['score']))
)
results = (
elements
| "" >> beam.WindowInto(
beam.window.FixedWindows(10),
trigger=Repeatedly(AfterCount(2)),
accumulation_mode=beam.transforms.trigger.AccumulationMode.ACCUMULATING
)
| beam.CombinePerKey(beam.combiners.ToListCombineFn())
)
results | beam.ParDo(print_windows)
想法很简单——我想获取一些带时间戳的分数并将它们组合到一个列表中。我在看到 2 个元素后触发每个窗格。
如果我 运行 按原样,我得到:
[0.0, 10.0)
PaneInfo(first: True, last: False, timing: EARLY, index: 0, nonspeculative_index: -1)
Timestamp(9.999000)
('USA', [1, 2, 3, 4])
-----------------
[10.0, 20.0)
PaneInfo(first: True, last: False, timing: EARLY, index: 0, nonspeculative_index: -1)
Timestamp(19.999000)
('USA', [5, 6])
但是,如果我将累积模式更改为 DISCARDING,输出保持不变。我很困惑,因为根据我在高层次上的理解,ACCUMULATING 会输出如下窗格:
前 10 秒[1, 2] ... [1, 2, 3, 4]
window,最后 10 秒 [5, 6]
window。
另一方面,DISCARDING 应该给出:
[1, 2] .. [3, 4]
然后 [5,6]
。为什么输出相同?
根据 Beam 概念,Window 可以包含 0 到 N 个窗格,这些窗格由应用程序代码中的触发器定义控制。
当触发器定义为 Accumulating
时,这意味着作为 Window 的一部分并根据触发器逻辑触发的任何值都将被保留并附加到新值作为和当新窗格被触发或 Window 关闭时。
当触发器定义为 Discarding
时,这意味着属于 Window 的一部分并根据触发器逻辑触发的任何值都将被丢弃,并且不可用于以下新窗格被解雇或 Window 关闭时。
在上面的示例中,如果将触发逻辑更改为以下,您可以观察到至少两个窗格:-
- 早
- ON_TIME
beam.transforms.trigger.AfterWatermark(early=AfterCount(2))
下方 ACCUMULATING
个窗格是行为
INFO:apache_beam.runners.portability.fn_api_runner:Running (CombinePerKey(ToListCombineFn)/GroupByKey/Read)+((CombinePerKey(ToListCombineFn)/Combine)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_26))
INFO:root:2020-05-24 14:10:00
INFO:root:2020-05-24 14:12:00
INFO:root:PaneInfo(first: True, last: False, timing: EARLY, index: 0, nonspeculative_index: -1)
INFO:root:Timestamp(1590329519.999000)
INFO:root:('USA', [{'score': 1, 'ts': 5}, {'score': 2, 'ts': 5}])
INFO:root:-----------------
INFO:root:2020-05-24 14:12:00
INFO:root:2020-05-24 14:14:00
INFO:root:PaneInfo(first: True, last: False, timing: EARLY, index: 0, nonspeculative_index: -1)
INFO:root:Timestamp(1590329639.999000)
INFO:root:('USA', [{'score': 5, 'ts': 105}, {'score': 4, 'ts': 60}, {'score': 6, 'ts': 105}, {'score': 3, 'ts': 60}])
INFO:root:-----------------
INFO:root:2020-05-24 14:10:00
INFO:root:2020-05-24 14:12:00
INFO:root:PaneInfo(first: False, last: True, timing: ON_TIME, index: 1, nonspeculative_index: 0)
INFO:root:Timestamp(1590329519.999000)
INFO:root:('USA', [{'score': 1, 'ts': 5}, {'score': 2, 'ts': 5}])
INFO:root:-----------------
INFO:root:2020-05-24 14:12:00
INFO:root:2020-05-24 14:14:00
INFO:root:PaneInfo(first: False, last: True, timing: ON_TIME, index: 1, nonspeculative_index: 0)
INFO:root:Timestamp(1590329639.999000)
INFO:root:('USA', [{'score': 5, 'ts': 105}, {'score': 4, 'ts': 60}, {'score': 6, 'ts': 105}, {'score': 3, 'ts': 60}])
INFO:root:-----------------
下面的 DISCARDING
窗格是行为
INFO:root:2020-05-24 14:12:00
INFO:root:2020-05-24 14:14:00
INFO:root:PaneInfo(first: True, last: False, timing: EARLY, index: 0, nonspeculative_index: -1)
INFO:root:Timestamp(1590329639.999000)
INFO:root:('USA', [{'score': 2, 'ts': 5}, {'score': 4, 'ts': 60}, {'score': 1, 'ts': 5}, {'score': 3, 'ts': 60}])
INFO:root:-----------------
INFO:root:2020-05-24 14:14:00
INFO:root:2020-05-24 14:16:00
INFO:root:PaneInfo(first: True, last: False, timing: EARLY, index: 0, nonspeculative_index: -1)
INFO:root:Timestamp(1590329759.999000)
INFO:root:('USA', [{'score': 5, 'ts': 105}, {'score': 6, 'ts': 105}])
INFO:root:-----------------
INFO:root:2020-05-24 14:12:00
INFO:root:2020-05-24 14:14:00
INFO:root:PaneInfo(first: False, last: True, timing: ON_TIME, index: 1, nonspeculative_index: 0)
INFO:root:Timestamp(1590329639.999000)
INFO:root:('USA', [])
INFO:root:-----------------
INFO:root:2020-05-24 14:14:00
INFO:root:2020-05-24 14:16:00
INFO:root:PaneInfo(first: False, last: True, timing: ON_TIME, index: 1, nonspeculative_index: 0)
INFO:root:Timestamp(1590329759.999000)
INFO:root:('USA', [])
INFO:root:-----------------
在 ACCUMULATING
的情况下,当达到水印并且 Window 关闭时,来自 EARLY
窗格的值将被保留,由 ON_TIME
窗格表示。
然而,在 DISCARDING
窗格的情况下,EARLY
窗格中的值被丢弃并且 ON_TIME
窗格为空。
在元素通过 Pub/Sub 流式传输的现实场景中,可以触发超过 1 个 EARLY 窗格。在模拟场景中,因为所有值都已经存在,所以它不能触发超过 1 个 EARLY
窗格。