apache beam(python SDK):延迟(或早期)事件被丢弃和触发。如何知道丢弃了多少以及为什么?

apache beam (python SDK): Late (or early) events discarded and triggers. How to know how many discarded and why?

我有一个与 PubSub 订阅连接的流媒体管道(每小时大约有 2MLN 元素。我需要将它们收集到一个组中,然后提取一些信息。

def expand(self, pcoll):
    return (
            pcoll |

            beam.WindowInto(FixedWindows(10),
                            trigger=AfterAny(AfterCount(2000), AfterProcessingTime(30)),
                            allowed_lateness=10,
                            trigger=AfterAny(
                                AfterCount(8000),
                                AfterProcessingTime(30),
                                AfterWatermark(
                                    early=AfterProcessingTime(60),
                                    late=AfterProcessingTime(60)
                                )
                            ),
                            allowed_lateness=60 * 60 * 24,
                            accumulation_mode=AccumulationMode.DISCARDING)
| "Group by Key" >> beam.GroupByKey()

我尽量不漏掉任何数据。但我发现我丢失了大约 4% 的数据。 正如您在代码中看到的那样,每当我点击 8k 元素或每 30 秒触发一次。 允许延迟 1 天,如果管道正在分析早期或晚期事件,它应该触发两者。

但仍然缺少那 4%。那么,有没有办法知道管道是否丢弃了一些数据?有多少元素?出于什么原因?

非常感谢您

首先,我看到示例代码中有两个触发器,但我认为这是一个错字。

由于没有使用 Repeatedly, so all elements after the first trigger get lost. There's an official doc on this from Beam.

,您似乎正在删除元素

请允许我post举个例子:


test_stream = (TestStream()
               .add_elements([
                    TimestampedValue('in_time_1', 0),
                    TimestampedValue('in_time_2', 0)])
               .advance_watermark_to(9)
               .advance_processing_time(9)
               .add_elements([TimestampedValue('late_but_in_window', 8)])
               .advance_watermark_to(10)
               .advance_processing_time(10)
               .add_elements([TimestampedValue('in_time_window2', 12)])
               .advance_watermark_to(20)  # Past window time
               .advance_processing_time(20)
               .add_elements([TimestampedValue('late_window_closed', 9),
                              TimestampedValue('in_time_window2_2', 12)])
               .advance_watermark_to_infinity())


class RecordFn(beam.DoFn):
    def process(
        self,
        element=beam.DoFn.ElementParam,
        timestamp=beam.DoFn.TimestampParam):

        yield ("key", (element, timestamp))



options = PipelineOptions()
options.view_as(StandardOptions).streaming = True

with TestPipeline(options=options) as p:
    records = (p | test_stream
                 | beam.ParDo(RecordFn())
                 | beam.WindowInto(FixedWindows(10),
                                   allowed_lateness=0,
                                   # trigger=trigger.AfterCount(1),
                                   trigger=trigger.Repeatedly(trigger.AfterCount(1)),
                                   accumulation_mode=trigger.AccumulationMode.DISCARDING)
                 | beam.GroupByKey()
                 | beam.Map(print)
               )

如果我们有触发器 trigger.Repeatedly(trigger.AfterCount(1)),所有元素都会在它们到来时被触发,没有被丢弃的元素(但是 late_window_closed 这是预期的,因为它迟到了):

('key', [('in_time_1', Timestamp(0)), ('in_time_2', Timestamp(0))])  # this two are together since they arrived together
('key', [('late_but_in_window', Timestamp(8))])
('key', [('in_time_window2', Timestamp(12))])
('key', [('in_time_window2_2', Timestamp(12))])

如果我们使用trigger.AfterCount(1)(不重复),我们只会得到到达管道的第一个元素:

('key', [('in_time_1', Timestamp(0)), ('in_time_2', Timestamp(0))])
('key', [('in_time_window2', Timestamp(12))])

请注意,in_time_(1,2) 都出现在第一个触发的窗格中,因为它们同时到达 (0),如果其中一个稍后出现,它将被删除。