光束侧输入模式逻辑说明
Beam Side Input Pattern Logic Explaination
在官方的Beam编程指南中,介绍了当我们有侧输入时应该使用的模式。我很好奇为什么模式是这样的。请参阅下面的标准代码作为示例。
查看此处的代码片段:
ParDo.of(
new DoFn<Long, KV<Long, Long>>() {
@ProcessElement
public void process(ProcessContext c) {
Map<String, String> keyMap = c.sideInput(map);
c.outputWithTimestamp(KV.of(1L, c.element()), Instant.now());
LOG.debug(
"Value is {}, key A is {}, and key B is {}.",
c.element(),
keyMap.get("Key_A"),
keyMap.get("Key_B"));
}
})
.withSideInputs(map))
Beam 在代码末尾有 .withSideInputs(map)
表示此代码块中有侧输入。在块的中间,我们有 Map<String, String> keyMap = c.sideInput(map);
对我来说这很奇怪,因为我们在最后让代码知道我们有一个侧输入,但我们在块的中间使用它。这是为什么?为什么要在末尾标明,中间却可以用?
附件是 Beam 提供的使用此模式的完整示例。
public static void sideInputPatterns() {
// This pipeline uses View.asSingleton for a placeholder external service.
// Run in debug mode to see the output.
Pipeline p = Pipeline.create();
// Create a side input that updates each second.
PCollectionView<Map<String, String>> map =
p.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(5L)))
.apply(
Window.<Long>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
.discardingFiredPanes())
.apply(
ParDo.of(
new DoFn<Long, Map<String, String>>() {
@ProcessElement
public void process(
@Element Long input, OutputReceiver<Map<String, String>> o) {
// Replace map with test data from the placeholder external service.
// Add external reads here.
o.output(PlaceholderExternalService.readTestData());
}
}))
.apply(View.asSingleton());
// Consume side input. GenerateSequence generates test data.
// Use a real source (like PubSubIO or KafkaIO) in production.
p.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(1L)))
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))))
.apply(Sum.longsGlobally().withoutDefaults())
.apply(
ParDo.of(
new DoFn<Long, KV<Long, Long>>() {
@ProcessElement
public void process(ProcessContext c) {
Map<String, String> keyMap = c.sideInput(map);
c.outputWithTimestamp(KV.of(1L, c.element()), Instant.now());
LOG.debug(
"Value is {}, key A is {}, and key B is {}.",
c.element(),
keyMap.get("Key_A"),
keyMap.get("Key_B"));
}
})
.withSideInputs(map));
}
/** Placeholder class that represents an external service generating test data. */
public static class PlaceholderExternalService {
public static Map<String, String> readTestData() {
Map<String, String> map = new HashMap<>();
Instant now = Instant.now();
DateTimeFormatter dtf = DateTimeFormat.forPattern("HH:MM:SS");
map.put("Key_A", now.minus(Duration.standardSeconds(30)).toString(dtf));
map.put("Key_B", now.minus(Duration.standardSeconds(30)).toString());
return map;
}
}
Apache Beam 模型是一种有趣的混合 declarative/procedural 编程。当与 SDK 交互时,您正在幕后构建一个转换的有向无环图。有一些特殊的转换 (ParDos) 允许程序员添加无法使用 Apache Beam 原语捕获的特殊逻辑。
就 SideInputs 而言,图中所声明的有点像“Promise”或“Future”。您声明此 ParDo 将在将来 运行 时收到一个您可以使用的 SideInput。这就是为什么你可以“在中间使用它”,尽管它是在 ParDo 的末尾“声明”的;该代码仅在图中定义一个节点,但不执行该图本身。
如果您想查看 SideInputs 的技术规范,可以阅读 design document。
在官方的Beam编程指南中,介绍了当我们有侧输入时应该使用的模式。我很好奇为什么模式是这样的。请参阅下面的标准代码作为示例。
查看此处的代码片段:
ParDo.of(
new DoFn<Long, KV<Long, Long>>() {
@ProcessElement
public void process(ProcessContext c) {
Map<String, String> keyMap = c.sideInput(map);
c.outputWithTimestamp(KV.of(1L, c.element()), Instant.now());
LOG.debug(
"Value is {}, key A is {}, and key B is {}.",
c.element(),
keyMap.get("Key_A"),
keyMap.get("Key_B"));
}
})
.withSideInputs(map))
Beam 在代码末尾有 .withSideInputs(map)
表示此代码块中有侧输入。在块的中间,我们有 Map<String, String> keyMap = c.sideInput(map);
对我来说这很奇怪,因为我们在最后让代码知道我们有一个侧输入,但我们在块的中间使用它。这是为什么?为什么要在末尾标明,中间却可以用?
附件是 Beam 提供的使用此模式的完整示例。
public static void sideInputPatterns() {
// This pipeline uses View.asSingleton for a placeholder external service.
// Run in debug mode to see the output.
Pipeline p = Pipeline.create();
// Create a side input that updates each second.
PCollectionView<Map<String, String>> map =
p.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(5L)))
.apply(
Window.<Long>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
.discardingFiredPanes())
.apply(
ParDo.of(
new DoFn<Long, Map<String, String>>() {
@ProcessElement
public void process(
@Element Long input, OutputReceiver<Map<String, String>> o) {
// Replace map with test data from the placeholder external service.
// Add external reads here.
o.output(PlaceholderExternalService.readTestData());
}
}))
.apply(View.asSingleton());
// Consume side input. GenerateSequence generates test data.
// Use a real source (like PubSubIO or KafkaIO) in production.
p.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(1L)))
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))))
.apply(Sum.longsGlobally().withoutDefaults())
.apply(
ParDo.of(
new DoFn<Long, KV<Long, Long>>() {
@ProcessElement
public void process(ProcessContext c) {
Map<String, String> keyMap = c.sideInput(map);
c.outputWithTimestamp(KV.of(1L, c.element()), Instant.now());
LOG.debug(
"Value is {}, key A is {}, and key B is {}.",
c.element(),
keyMap.get("Key_A"),
keyMap.get("Key_B"));
}
})
.withSideInputs(map));
}
/** Placeholder class that represents an external service generating test data. */
public static class PlaceholderExternalService {
public static Map<String, String> readTestData() {
Map<String, String> map = new HashMap<>();
Instant now = Instant.now();
DateTimeFormatter dtf = DateTimeFormat.forPattern("HH:MM:SS");
map.put("Key_A", now.minus(Duration.standardSeconds(30)).toString(dtf));
map.put("Key_B", now.minus(Duration.standardSeconds(30)).toString());
return map;
}
}
Apache Beam 模型是一种有趣的混合 declarative/procedural 编程。当与 SDK 交互时,您正在幕后构建一个转换的有向无环图。有一些特殊的转换 (ParDos) 允许程序员添加无法使用 Apache Beam 原语捕获的特殊逻辑。
就 SideInputs 而言,图中所声明的有点像“Promise”或“Future”。您声明此 ParDo 将在将来 运行 时收到一个您可以使用的 SideInput。这就是为什么你可以“在中间使用它”,尽管它是在 ParDo 的末尾“声明”的;该代码仅在图中定义一个节点,但不执行该图本身。
如果您想查看 SideInputs 的技术规范,可以阅读 design document。