Apache Beam 如何处理中间窗格?

How does Apache Beam handle intermediate panes?

我有这个简单的代码

def print_windows(element, window=beam.DoFn.WindowParam, timestamp=beam.DoFn.TimestampParam):
    print(window)
    print(timestamp)
    print(element)
    print('-----------------')


options = PipelineOptions()
with beam.Pipeline(options=options) as p:
    keyed_elements = [
        ('USA', {'user_id': 1, 'timestamp': 2}),
        ('USA', {'user_id': 1, 'timestamp': 14}),
        ('USA', {'user_id': 1, 'timestamp': 17}),
    ]

    sliding_windows = (
        p 
        | beam.Create(keyed_elements)
        | 'ConvertIntoUserEvents' >> beam.Map(lambda e: beam.window.TimestampedValue(e, e[1]['timestamp']))
        | beam.WindowInto(
            beam.window.SlidingWindows(60, 10),
            trigger= beam.transforms.trigger.AfterWatermark(early=AfterCount(1)),
            accumulation_mode=beam.transforms.trigger.AccumulationMode.DISCARDING
        )
        | beam.ParDo(print_windows)
    )

基本上,它接收一些数据并在其上创建滑动 windows。但是,考虑到触发器的定义方式,我希望每个 window 有多个窗格。例如,在 [0.0, 60) window 中,我希望每个项目都有一个窗格,但最后还有一个包含所有元素的窗格。

我在下面提供了实际输出。似乎我得到了所描述的每个元素的早期触发,但我没有得到整个 window。我已经尝试将 AccumulationMode 更改为 ACCUMULATING,但我仍然没有得到所需的输入。

[0.0, 60.0)
Timestamp(2)
('USA', {'user_id': 1, 'timestamp': 2})
-----------------
[-10.0, 50.0)
Timestamp(2)
('USA', {'user_id': 1, 'timestamp': 2})
-----------------
[-20.0, 40.0)
Timestamp(2)
('USA', {'user_id': 1, 'timestamp': 2})
-----------------
[-30.0, 30.0)
Timestamp(2)
('USA', {'user_id': 1, 'timestamp': 2})
-----------------
[-40.0, 20.0)
Timestamp(2)
('USA', {'user_id': 1, 'timestamp': 2})
-----------------
[-50.0, 10.0)
Timestamp(2)
('USA', {'user_id': 1, 'timestamp': 2})
-----------------
[10.0, 70.0)
Timestamp(14)
('USA', {'user_id': 1, 'timestamp': 14})
-----------------
[0.0, 60.0)
Timestamp(14)
('USA', {'user_id': 1, 'timestamp': 14})
-----------------
[-10.0, 50.0)
Timestamp(14)
('USA', {'user_id': 1, 'timestamp': 14})
-----------------
[-20.0, 40.0)
Timestamp(14)
('USA', {'user_id': 1, 'timestamp': 14})
-----------------
[-30.0, 30.0)
Timestamp(14)
('USA', {'user_id': 1, 'timestamp': 14})
-----------------
[-40.0, 20.0)
Timestamp(14)
('USA', {'user_id': 1, 'timestamp': 14})
-----------------
[10.0, 70.0)
Timestamp(17)
('USA', {'user_id': 1, 'timestamp': 17})
-----------------
[0.0, 60.0)
Timestamp(17)
('USA', {'user_id': 1, 'timestamp': 17})
-----------------
[-10.0, 50.0)
Timestamp(17)
('USA', {'user_id': 1, 'timestamp': 17})
-----------------
[-20.0, 40.0)
Timestamp(17)
('USA', {'user_id': 1, 'timestamp': 17})
-----------------
[-30.0, 30.0)
Timestamp(17)
('USA', {'user_id': 1, 'timestamp': 17})
-----------------
[-40.0, 20.0)
Timestamp(17)
('USA', {'user_id': 1, 'timestamp': 17})
-----------------

在上面分享的代码片段中,没有进行合并操作,例如 beam.CombinePerKey。这是 Python SDK 中的必需步骤,否则所有窗格都将标记为 UNKNOWN。记录如下

PaneInfo: When triggers are used, Beam provides a DoFn.PaneInfoParam object that contains information about the current firing. Using DoFn.PaneInfoParam you can determine whether this is an early or a late firing, and how many times this window has already fired for this key. This feature implementation in python sdk is not fully completed, see more at BEAM-3759.

可在此处找到 JIRA 详细信息 - BEAM-3759

由于 PaneInfo 设置为 UNKNOWN,因此不会重复触发,如下所示

INFO:root:[1590314700.0, 1590314760.0)
INFO:root:PaneInfo(first: True, last: True, timing: UNKNOWN, index: 0, nonspeculative_index: 0)
INFO:root:Timestamp(1590314727.557941)
INFO:root:('USA', {'score': 3, 'ts': 15})
INFO:root:-----------------
INFO:root:[1590314700.0, 1590314760.0)
INFO:root:PaneInfo(first: True, last: True, timing: UNKNOWN, index: 0, nonspeculative_index: 0)
INFO:root:Timestamp(1590314717.558444)
INFO:root:('USA', {'score': 1, 'ts': 5})
INFO:root:-----------------
INFO:root:[1590314700.0, 1590314760.0)
INFO:root:PaneInfo(first: True, last: True, timing: UNKNOWN, index: 0, nonspeculative_index: 0)
INFO:root:Timestamp(1590314727.558758)
INFO:root:('USA', {'score': 4, 'ts': 15})
INFO:root:-----------------
INFO:root:[1590314700.0, 1590314760.0)
INFO:root:PaneInfo(first: True, last: True, timing: UNKNOWN, index: 0, nonspeculative_index: 0)
INFO:root:Timestamp(1590314757.559044)
INFO:root:('USA', {'score': 6, 'ts': 45})
INFO:root:-----------------
INFO:root:[1590314700.0, 1590314760.0)
INFO:root:PaneInfo(first: True, last: True, timing: UNKNOWN, index: 0, nonspeculative_index: 0)
INFO:root:Timestamp(1590314717.559365)
INFO:root:('USA', {'score': 2, 'ts': 5})
INFO:root:-----------------
INFO:root:[1590314700.0, 1590314760.0)
INFO:root:PaneInfo(first: True, last: True, timing: UNKNOWN, index: 0, nonspeculative_index: 0)
INFO:root:Timestamp(1590314757.559638)
INFO:root:('USA', {'score': 5, 'ts': 45})
INFO:root:-----------------

如果您按如下所示更改代码,您将看到多个触发选项

def print_output(element, window=beam.DoFn.WindowParam,  pane_info=beam.DoFn.PaneInfoParam, timestamp=beam.DoFn.TimestampParam):
    logging.info(window.start.to_utc_datetime())
    logging.info(window.end.to_utc_datetime())
    logging.info(pane_info)
    logging.info(timestamp)
    logging.info(element)
    logging.info('-----------------')

def run(argv=None):
  keyed_elements = [
      ('USA', {'score': 1, 'ts': 5}),
      ('USA', {'score': 2, 'ts': 5}),
      ('USA', {'score': 3, 'ts': 60}),
      ('USA', {'score': 4, 'ts': 60}),
      ('USA', {'score': 5, 'ts': 105}),
      ('USA', {'score': 6, 'ts': 105}),
  ]
  import time
  #beam.io.ReadFromPubSub(subscription=subscription)
  data = (p | "read" >> beam.Create(keyed_elements)
          #| "JsonConvert" >> beam.Map(json.loads)
          | "ConvertIntoUserEvents" >> beam.Map(lambda e: beam.window.TimestampedValue(e, time.time() + e[1]['ts']))
          )

  results = (
      data
      | "" >> beam.WindowInto(
      beam.window.FixedWindows(120),
      trigger=beam.transforms.trigger.AfterWatermark(early=AfterCount(1)),
      accumulation_mode=beam.transforms.trigger.AccumulationMode.ACCUMULATING
  )
      | beam.CombinePerKey(beam.combiners.ToListCombineFn())
  )
  results | beam.ParDo(print_output)

在运行上面的示例中,您将看到如下结果

INFO:apache_beam.runners.portability.fn_api_runner:Running (CombinePerKey(ToListCombineFn)/GroupByKey/Read)+((CombinePerKey(ToListCombineFn)/Combine)+(ref_AppliedPTransform_ParDo(CallableWrapperDoFn)_26))
INFO:root:2020-05-24 11:02:00
INFO:root:2020-05-24 11:04:00
INFO:root:PaneInfo(first: True, last: False, timing: EARLY, index: 0, nonspeculative_index: -1)
INFO:root:Timestamp(1590318239.999000)
INFO:root:('USA', [{'score': 5, 'ts': 105}, {'score': 6, 'ts': 105}])
INFO:root:-----------------
INFO:root:2020-05-24 11:00:00
INFO:root:2020-05-24 11:02:00
INFO:root:PaneInfo(first: True, last: False, timing: EARLY, index: 0, nonspeculative_index: -1)
INFO:root:Timestamp(1590318119.999000)
INFO:root:('USA', [{'score': 3, 'ts': 60}, {'score': 1, 'ts': 5}, {'score': 2, 'ts': 5}, {'score': 4, 'ts': 60}])
INFO:root:-----------------
INFO:root:2020-05-24 11:02:00
INFO:root:2020-05-24 11:04:00
INFO:root:PaneInfo(first: False, last: True, timing: ON_TIME, index: 1, nonspeculative_index: 0)
INFO:root:Timestamp(1590318239.999000)
INFO:root:('USA', [{'score': 5, 'ts': 105}, {'score': 6, 'ts': 105}])
INFO:root:-----------------
INFO:root:2020-05-24 11:00:00
INFO:root:2020-05-24 11:02:00
INFO:root:PaneInfo(first: False, last: True, timing: ON_TIME, index: 1, nonspeculative_index: 0)
INFO:root:Timestamp(1590318119.999000)
INFO:root:('USA', [{'score': 3, 'ts': 60}, {'score': 1, 'ts': 5}, {'score': 2, 'ts': 5}, {'score': 4, 'ts': 60}])
INFO:root:-----------------