Apache Beam :: 无法让 groupbykey 与会话 window 和 java 一起工作
Apache Beam :: can't get groupbykey work with session window with java
我有一个简单的问题。
假设我正在读取一个镶木地板文件,它每行生成一个 avro GenericRecord
对象,如下所示。
{"name":"john", "surename":"doe", "age":40, "user_pk":"john:doe:40", "unique_attribute":"j1"}
{"name":"john", "surename":"doe", "age":40, "user_pk":"john:doe:40", "unique_attribute":"j2"}
{"name":"john", "surename":"doe", "age":40, "user_pk":"john:doe:40", "unique_attribute":"j3"}
{"name":"john", "surename":"doe", "age":40, "user_pk":"john:doe:40", "unique_attribute":"j4"}
{"name":"paul", "surename":"carl", "age":28, "user_pk":"paul:carl:28", "unique_attribute":"p1"}
{"name":"paul", "surename":"carl", "age":28, "user_pk":"paul:carl:28", "unique_attribute":"p2"}
{"name":"paul", "surename":"carl", "age":28, "user_pk":"paul:carl:28", "unique_attribute":"p3"}
此文件是故意展平的,我想取消展平它们。
- 我们知道输入是有序的,我想处理它们直到下一个会话密钥,并传递给管道中的下一个应用程序,以保持内存需求最小,
所以中间阶段应该 return
KV<String, Iterable<GenericRecord>>
甚至更好结合 KV<String, GenericRecord>
.
<"john:doe:40", {"name":"john", "surename":"doe", "age":40, ["unique_attribute":"j1", ...]}>
<"paul:carl:28", {"name":"paul", "surename":"carl", "age":28, "user_pk":, ["unique_attribute":"p1", ...]}
这就是我目前所知道的;
pipeline.apply("FilePattern", FileIO.match().filepattern(PARQUET_FILE_PATTERN))
.apply("FileReadMatches", FileIO.readMatches())
.apply("ParquetReadFiles", ParquetIO.readFiles(schema))
.apply("SetKeyValuePK", WithKeys.of(input -> AvroSupport.of(input).extractString("user_pk").get())).setCoder(KvCoder.of(StringUtf8Coder.of(), AvroCoder.of(schema)))
.apply(Window.into(Sessions.withGapDuration(Duration.standardSeconds(5L)))).setCoder(KvCoder.of(StringUtf8Coder.of(), AvroCoder.of(schema)))
.apply("SetGroupByPK", GroupByKey.create()).setCoder(KvCoder.of(StringUtf8Coder.of(), IterableCoder.of(AvroCoder.of(schema))))
...
...
我不知道是否有更好的方法,但现在我使用了 Sessions.withGapDuration
窗口化策略。
我预计我会在每 ~5 秒内得到一个分组元素 KV<String, Iterable<GenericRecord>> element
,但在 GroupByKey
之后我什么也没得到,我什至不确定 GroupByKey
是否真的在做任何事情,但是我知道内存正在迅速增加,所以它必须等待所有项目。
所以问题是,你将如何设置一个窗口函数,让我可以进行 groupbykey。
我也试过 Combine.byKey
,因为它应该是 GroupByKey + Windowing Function
但无法实现?
我已经设法让 groupby 工作了,但不确定我是否完全理解。
我不得不添加两个想法。
Beam 中的第一个(任何?)IO 操作不添加时间戳。
.apply("WithTimestamp", WithTimestamps.of(input -> Instant.now()))
其次我添加了一个 Triger
所以 GroupByKey
实际上会被触发。不知道为什么它一开始没有触发。我确定有人对此有解释。
.apply("SessionWindow", Window.<KV<String, GenericRecord>>into(Sessions.withGapDuration(Duration.standardSeconds(5L))).triggering(
AfterWatermark.pastEndOfWindow()
.withLateFirings(AfterProcessingTime
.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes())
它并不完美,仍然需要等待几分钟才能看到 GroupByKey
被触发,即使 window 只是 5s
,但它被触发了结束,也就是进步。
编辑:
好的,看起来不需要时间戳,我假设是因为 window 是基于会话而不是基于时间的。
我还将设置更改为 streaming
options.as(StreamingOptions.class)
.setStreaming(true);
我希望这对遇到类似问题的人有所帮助。
我有一个简单的问题。
假设我正在读取一个镶木地板文件,它每行生成一个 avro GenericRecord
对象,如下所示。
{"name":"john", "surename":"doe", "age":40, "user_pk":"john:doe:40", "unique_attribute":"j1"}
{"name":"john", "surename":"doe", "age":40, "user_pk":"john:doe:40", "unique_attribute":"j2"}
{"name":"john", "surename":"doe", "age":40, "user_pk":"john:doe:40", "unique_attribute":"j3"}
{"name":"john", "surename":"doe", "age":40, "user_pk":"john:doe:40", "unique_attribute":"j4"}
{"name":"paul", "surename":"carl", "age":28, "user_pk":"paul:carl:28", "unique_attribute":"p1"}
{"name":"paul", "surename":"carl", "age":28, "user_pk":"paul:carl:28", "unique_attribute":"p2"}
{"name":"paul", "surename":"carl", "age":28, "user_pk":"paul:carl:28", "unique_attribute":"p3"}
此文件是故意展平的,我想取消展平它们。
- 我们知道输入是有序的,我想处理它们直到下一个会话密钥,并传递给管道中的下一个应用程序,以保持内存需求最小,
所以中间阶段应该 return
KV<String, Iterable<GenericRecord>>
甚至更好结合KV<String, GenericRecord>
.
<"john:doe:40", {"name":"john", "surename":"doe", "age":40, ["unique_attribute":"j1", ...]}>
<"paul:carl:28", {"name":"paul", "surename":"carl", "age":28, "user_pk":, ["unique_attribute":"p1", ...]}
这就是我目前所知道的;
pipeline.apply("FilePattern", FileIO.match().filepattern(PARQUET_FILE_PATTERN))
.apply("FileReadMatches", FileIO.readMatches())
.apply("ParquetReadFiles", ParquetIO.readFiles(schema))
.apply("SetKeyValuePK", WithKeys.of(input -> AvroSupport.of(input).extractString("user_pk").get())).setCoder(KvCoder.of(StringUtf8Coder.of(), AvroCoder.of(schema)))
.apply(Window.into(Sessions.withGapDuration(Duration.standardSeconds(5L)))).setCoder(KvCoder.of(StringUtf8Coder.of(), AvroCoder.of(schema)))
.apply("SetGroupByPK", GroupByKey.create()).setCoder(KvCoder.of(StringUtf8Coder.of(), IterableCoder.of(AvroCoder.of(schema))))
...
...
我不知道是否有更好的方法,但现在我使用了 Sessions.withGapDuration
窗口化策略。
我预计我会在每 ~5 秒内得到一个分组元素 KV<String, Iterable<GenericRecord>> element
,但在 GroupByKey
之后我什么也没得到,我什至不确定 GroupByKey
是否真的在做任何事情,但是我知道内存正在迅速增加,所以它必须等待所有项目。
所以问题是,你将如何设置一个窗口函数,让我可以进行 groupbykey。
我也试过 Combine.byKey
,因为它应该是 GroupByKey + Windowing Function
但无法实现?
我已经设法让 groupby 工作了,但不确定我是否完全理解。 我不得不添加两个想法。 Beam 中的第一个(任何?)IO 操作不添加时间戳。
.apply("WithTimestamp", WithTimestamps.of(input -> Instant.now()))
其次我添加了一个 Triger
所以 GroupByKey
实际上会被触发。不知道为什么它一开始没有触发。我确定有人对此有解释。
.apply("SessionWindow", Window.<KV<String, GenericRecord>>into(Sessions.withGapDuration(Duration.standardSeconds(5L))).triggering(
AfterWatermark.pastEndOfWindow()
.withLateFirings(AfterProcessingTime
.pastFirstElementInPane().plusDelayOf(Duration.ZERO)))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes())
它并不完美,仍然需要等待几分钟才能看到 GroupByKey
被触发,即使 window 只是 5s
,但它被触发了结束,也就是进步。
编辑: 好的,看起来不需要时间戳,我假设是因为 window 是基于会话而不是基于时间的。 我还将设置更改为 streaming
options.as(StreamingOptions.class)
.setStreaming(true);
我希望这对遇到类似问题的人有所帮助。