使用 apache beam Kafkaio(数据流)聚合主题
Aggregating Topics with apache beam Kafkaio (Dataflow)
我在压缩的 kafka 主题中有缓慢移动的数据,在另一个主题中也有快速移动的数据。
1) 快速移动的数据是来自 Kafka 的实时摄取的无界事件。
2) 慢速移动数据是用于丰富快速移动数据的元数据。这是一个紧凑的主题,数据不经常更新 (days/months)。
3) 每个快速移动的数据负载都应该有一个具有相同 customerId 的元数据负载,可以将它们聚合在一起。
我想根据 customerId 聚合 fast/slow 移动数据(在两个主题的数据中都很常见)。我想知道你会怎么做?到目前为止:
PTransform<PBegin, PCollection<KV<byte[], byte[]>>> kafka = KafkaIO.<byte[], byte[]>read()
.withBootstrapServers(“url:port")
.withTopics([“fast-moving-data”, “slow-moving-data"])
.withKeyDeserializer(ByteArrayDeserializer.class)
.withValueDeserializer(ByteArrayDeserializer.class)
.updateConsumerProperties((Map) props)
.withoutMetadata();
我注意到我可以使用 .withTopics 并指定我想使用的不同主题,但在这一点之后我无法找到任何示例来帮助聚合。任何帮助,将不胜感激。
我建议分别阅读这些主题,为管道创建两个不同的输入。您可以稍后 cross/join 他们。跨越它们的方法是提供缓慢移动的流作为热路径的侧输入(快速移动的 PCollection 的转换)。
看这里:https://beam.apache.org/documentation/programming-guide/#side-inputs
本 中也讨论了以下模式,对于您的用例来说可能是一个很好的探索模式。可能成为问题的一项是压缩的缓慢移动流的大小。希望它有用。
对于此模式,我们可以使用 GenerateSequence 源转换定期发出一个值,例如每天一次。
通过在每个元素上激活的数据驱动触发器将此值传递到全局 window。
在 DoFn 中,使用此过程作为触发器从有界源中提取数据
创建用于下游转换的 SideInput。
请务必注意,由于此模式使用全局-window SideInput 在处理时间触发,因此与事件时间中正在处理的元素的匹配将是不确定的。例如,如果我们有一个在事件时间窗口化的主管道,那些 windows 将看到的 SideInput View 的版本将取决于在处理时间而不是事件时间触发的最新触发器。
同样重要的是要注意,通常 SideInput 应该适合内存。
Java(SDK 2.9.0):
在下面的示例中,sideinput 以非常短的间隔更新,这样可以很容易地看到效果。期望侧输入更新缓慢,例如每隔几个小时或每天一次。
在下面的示例代码中,我们使用了在 DoFn 中创建的地图,它变成了 View.asSingleton,这是针对此模式的推荐方法。
下面的示例说明了该模式,请注意 View.asSingleton 会在每次计数器更新时重建。
对于您的用例,您可以将 GenerateSequence
转换替换为 PubSubIO
转换。这有意义吗?
public static void main(String[] args) {
// Create pipeline
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(PipelineOptions.class);
// Using View.asSingleton, this pipeline uses a dummy external service as illustration.
// Run in debug mode to see the output
Pipeline p = Pipeline.create(options);
// Create slowly updating sideinput
PCollectionView<Map<String, String>> map = p
.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(5L)))
.apply(Window.<Long>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
.discardingFiredPanes())
.apply(ParDo.of(new DoFn<Long, Map<String, String>>() {
@ProcessElement public void process(@Element Long input,
OutputReceiver<Map<String, String>> o) {
// Do any external reads needed here...
// We will make use of our dummy external service.
// Every time this triggers, the complete map will be replaced with that read from
// the service.
o.output(DummyExternalService.readDummyData());
}
})).apply(View.asSingleton());
// ---- Consume slowly updating sideinput
// GenerateSequence is only used here to generate dummy data for this illustration.
// You would use your real source for example PubSubIO, KafkaIO etc...
p.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(1L)))
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))))
.apply(Sum.longsGlobally().withoutDefaults())
.apply(ParDo.of(new DoFn<Long, KV<Long, Long>>() {
@ProcessElement public void process(ProcessContext c) {
Map<String, String> keyMap = c.sideInput(map);
c.outputWithTimestamp(KV.of(1L, c.element()), Instant.now());
LOG.debug("Value is {} key A is {} and key B is {}"
, c.element(), keyMap.get("Key_A"),keyMap.get("Key_B"));
}
}).withSideInputs(map));
p.run();
}
public static class DummyExternalService {
public static Map<String, String> readDummyData() {
Map<String, String> map = new HashMap<>();
Instant now = Instant.now();
DateTimeFormatter dtf = DateTimeFormat.forPattern("HH:MM:SS");
map.put("Key_A", now.minus(Duration.standardSeconds(30)).toString(dtf));
map.put("Key_B", now.minus(Duration.standardSeconds(30)).toString());
return map;
}
}
我在压缩的 kafka 主题中有缓慢移动的数据,在另一个主题中也有快速移动的数据。
1) 快速移动的数据是来自 Kafka 的实时摄取的无界事件。
2) 慢速移动数据是用于丰富快速移动数据的元数据。这是一个紧凑的主题,数据不经常更新 (days/months)。
3) 每个快速移动的数据负载都应该有一个具有相同 customerId 的元数据负载,可以将它们聚合在一起。
我想根据 customerId 聚合 fast/slow 移动数据(在两个主题的数据中都很常见)。我想知道你会怎么做?到目前为止:
PTransform<PBegin, PCollection<KV<byte[], byte[]>>> kafka = KafkaIO.<byte[], byte[]>read()
.withBootstrapServers(“url:port")
.withTopics([“fast-moving-data”, “slow-moving-data"])
.withKeyDeserializer(ByteArrayDeserializer.class)
.withValueDeserializer(ByteArrayDeserializer.class)
.updateConsumerProperties((Map) props)
.withoutMetadata();
我注意到我可以使用 .withTopics 并指定我想使用的不同主题,但在这一点之后我无法找到任何示例来帮助聚合。任何帮助,将不胜感激。
我建议分别阅读这些主题,为管道创建两个不同的输入。您可以稍后 cross/join 他们。跨越它们的方法是提供缓慢移动的流作为热路径的侧输入(快速移动的 PCollection 的转换)。
看这里:https://beam.apache.org/documentation/programming-guide/#side-inputs
本
对于此模式,我们可以使用 GenerateSequence 源转换定期发出一个值,例如每天一次。 通过在每个元素上激活的数据驱动触发器将此值传递到全局 window。 在 DoFn 中,使用此过程作为触发器从有界源中提取数据 创建用于下游转换的 SideInput。
请务必注意,由于此模式使用全局-window SideInput 在处理时间触发,因此与事件时间中正在处理的元素的匹配将是不确定的。例如,如果我们有一个在事件时间窗口化的主管道,那些 windows 将看到的 SideInput View 的版本将取决于在处理时间而不是事件时间触发的最新触发器。
同样重要的是要注意,通常 SideInput 应该适合内存。
Java(SDK 2.9.0):
在下面的示例中,sideinput 以非常短的间隔更新,这样可以很容易地看到效果。期望侧输入更新缓慢,例如每隔几个小时或每天一次。
在下面的示例代码中,我们使用了在 DoFn 中创建的地图,它变成了 View.asSingleton,这是针对此模式的推荐方法。
下面的示例说明了该模式,请注意 View.asSingleton 会在每次计数器更新时重建。
对于您的用例,您可以将 GenerateSequence
转换替换为 PubSubIO
转换。这有意义吗?
public static void main(String[] args) {
// Create pipeline
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(PipelineOptions.class);
// Using View.asSingleton, this pipeline uses a dummy external service as illustration.
// Run in debug mode to see the output
Pipeline p = Pipeline.create(options);
// Create slowly updating sideinput
PCollectionView<Map<String, String>> map = p
.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(5L)))
.apply(Window.<Long>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
.discardingFiredPanes())
.apply(ParDo.of(new DoFn<Long, Map<String, String>>() {
@ProcessElement public void process(@Element Long input,
OutputReceiver<Map<String, String>> o) {
// Do any external reads needed here...
// We will make use of our dummy external service.
// Every time this triggers, the complete map will be replaced with that read from
// the service.
o.output(DummyExternalService.readDummyData());
}
})).apply(View.asSingleton());
// ---- Consume slowly updating sideinput
// GenerateSequence is only used here to generate dummy data for this illustration.
// You would use your real source for example PubSubIO, KafkaIO etc...
p.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(1L)))
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))))
.apply(Sum.longsGlobally().withoutDefaults())
.apply(ParDo.of(new DoFn<Long, KV<Long, Long>>() {
@ProcessElement public void process(ProcessContext c) {
Map<String, String> keyMap = c.sideInput(map);
c.outputWithTimestamp(KV.of(1L, c.element()), Instant.now());
LOG.debug("Value is {} key A is {} and key B is {}"
, c.element(), keyMap.get("Key_A"),keyMap.get("Key_B"));
}
}).withSideInputs(map));
p.run();
}
public static class DummyExternalService {
public static Map<String, String> readDummyData() {
Map<String, String> map = new HashMap<>();
Instant now = Instant.now();
DateTimeFormatter dtf = DateTimeFormat.forPattern("HH:MM:SS");
map.put("Key_A", now.minus(Duration.standardSeconds(30)).toString(dtf));
map.put("Key_B", now.minus(Duration.standardSeconds(30)).toString());
return map;
}
}