Beam java SDK 2.10.0 with Kafka source and Dataflow runner: windowed Count.perElement never fires data out
Beam java SDK 2.10.0 with Kafka source and Dataflow runner: windowed Count.perElement never fires data out
我在 Google DataFlow
上遇到 运行 Beam SDK 到 2.10.0 作业的问题
流程很简单:我使用 Kafka 作为源,然后应用 Fixed windows,然后按键计数元素。但看起来数据永远不会离开计数阶段,直到工作耗尽。 Count.PerElement/Combine.perKey(Count)/Combine.GroupedValues.out0
的输出集合始终为零。仅在耗尽数据流作业后才发布元素。
代码如下:
public KafkaProcessingJob(BaseOptions options) {
PCollection<GenericRecord> genericRecordPCollection = Pipeline.create(options)
.apply("Read binary Kafka messages", KafkaIO.<String, byte[]>read()
.withBootstrapServers(options.getBootstrapServers())
.updateConsumerProperties(configureConsumerProperties())
.withCreateTime(Duration.standardMinutes(1L))
.withTopics(inputTopics)
.withReadCommitted()
.commitOffsetsInFinalize()
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(ByteArrayDeserializer.class))
.apply("Map binary message to Avro GenericRecord", new DecodeBinaryKafkaMessage());
.apply("Apply windowing to records", Window.into(FixedWindows.of(Duration.standardMinutes(5)))
.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
.discardingFiredPanes()
.withAllowedLateness(Duration.standardMinutes(5)))
.apply("Write aggregated data to BigQuery", MapElements.into(TypeDescriptors.strings()).via(rec -> getKey(rec)))
.apply(Count.<String>perElement())
.apply(
new WriteWindowedToBigQuery<>(
project,
dataset,
table,
configureWindowedTableWrite()));
}
private Map<String, Object> configureConsumerProperties() {
Map<String, Object> configUpdates = Maps.newHashMap();
configUpdates.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return configUpdates;
}
private static String getKey(GenericRecord record) {
//extract key
}
看起来流量永远不会离开.apply(Count.<String>perElement())
的舞台
有人可以帮忙吗?
我找到原因了。
与这里使用的TimestampPolicy有关(.withCreateTime(Duration.standardMinutes(1L))
)。
由于我们的 Kafka 主题中存在空分区,因此从未使用默认的 TimestampPolicy 推进主题水印。
我需要实施自定义策略来解决问题。
我在 Google DataFlow
上遇到 运行 Beam SDK 到 2.10.0 作业的问题流程很简单:我使用 Kafka 作为源,然后应用 Fixed windows,然后按键计数元素。但看起来数据永远不会离开计数阶段,直到工作耗尽。 Count.PerElement/Combine.perKey(Count)/Combine.GroupedValues.out0
的输出集合始终为零。仅在耗尽数据流作业后才发布元素。
代码如下:
public KafkaProcessingJob(BaseOptions options) {
PCollection<GenericRecord> genericRecordPCollection = Pipeline.create(options)
.apply("Read binary Kafka messages", KafkaIO.<String, byte[]>read()
.withBootstrapServers(options.getBootstrapServers())
.updateConsumerProperties(configureConsumerProperties())
.withCreateTime(Duration.standardMinutes(1L))
.withTopics(inputTopics)
.withReadCommitted()
.commitOffsetsInFinalize()
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(ByteArrayDeserializer.class))
.apply("Map binary message to Avro GenericRecord", new DecodeBinaryKafkaMessage());
.apply("Apply windowing to records", Window.into(FixedWindows.of(Duration.standardMinutes(5)))
.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
.discardingFiredPanes()
.withAllowedLateness(Duration.standardMinutes(5)))
.apply("Write aggregated data to BigQuery", MapElements.into(TypeDescriptors.strings()).via(rec -> getKey(rec)))
.apply(Count.<String>perElement())
.apply(
new WriteWindowedToBigQuery<>(
project,
dataset,
table,
configureWindowedTableWrite()));
}
private Map<String, Object> configureConsumerProperties() {
Map<String, Object> configUpdates = Maps.newHashMap();
configUpdates.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return configUpdates;
}
private static String getKey(GenericRecord record) {
//extract key
}
看起来流量永远不会离开.apply(Count.<String>perElement())
有人可以帮忙吗?
我找到原因了。
与这里使用的TimestampPolicy有关(.withCreateTime(Duration.standardMinutes(1L))
)。
由于我们的 Kafka 主题中存在空分区,因此从未使用默认的 TimestampPolicy 推进主题水印。 我需要实施自定义策略来解决问题。