使用 Apache Beam 进行实时监控
Live monitoring using Apache Beam
我想使用 Apache Beam 完成以下任务:
calculate every 5 seconds the events that are read from pubsub in the last minute
目标是对传入的速率数据有半实时视图。之后可以将其扩展到更复杂的用例。
经过搜索,我没有找到解决这个看似简单问题的方法。不起作用的东西:
- 全局window + 重复触发器(没有输入时触发器不触发)
- 滑动 window + withoutDefaults(显然不允许空 windows 被发射)
关于如何解决这个问题有什么建议吗?
如前所述,Beam 不会为空 windows 发射数据。除了 Rui Wang 给出的原因之外,我们还可以增加后期如何处理这些空白窗格的挑战。
无论如何,您描述的特定用例 - 监视消息数量的滚动计数 - 应该可以通过一些工作实现,即使指标最终下降到零。一种可能性是发布稳定数量的虚拟消息,这些消息会推进水印并触发窗格,但稍后会在管道中被过滤掉。这种方法的问题是需要调整发布来源,而且可能并不总是 convenient/possible。另一个方法涉及生成此虚假数据作为另一个输入并将其与主流共同分组。优点是一切都可以在 Dataflow 中完成,而无需调整源或接收器。为了说明这一点,我提供了一个例子。
输入分为两个流。对于虚拟元素,我使用 GenerateSequence
每 5 秒创建一个新元素。然后我 window PCollection(windowing 策略需要与主流策略兼容,所以我将使用相同的策略)。然后我将该元素映射到值为 0 的键值对(我们可以使用其他值,因为我们知道该元素来自哪个流,但我想证明不计算虚拟记录)。
PCollection<KV<String,Integer>> dummyStream = p
.apply("Generate Sequence", GenerateSequence.from(0).withRate(1, Duration.standardSeconds(5)))
.apply("Window Messages - Dummy", Window.<Long>into(
...
.apply("Count Messages - Dummy", ParDo.of(new DoFn<Long, KV<String, Integer>>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
c.output(KV.of("num_messages", 0));
}
}));
对于从 Pub/Sub 读取的主流,我将每条记录映射到值 1。稍后,我将使用 map-reduce 阶段将所有记录添加到典型的字数统计示例中。
PCollection<KV<String,Integer>> mainStream = p
.apply("Get Messages - Data", PubsubIO.readStrings().fromTopic(topic))
.apply("Window Messages - Data", Window.<String>into(
...
.apply("Count Messages - Data", ParDo.of(new DoFn<String, KV<String, Integer>>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
c.output(KV.of("num_messages", 1));
}
}));
然后我们需要使用 CoGroupByKey
加入它们(我使用相同的 num_messages
键来分组计数)。当两个输入之一有元素时,这个阶段将输出结果,因此解除这里的主要问题(空 windows 没有 Pub/Sub 消息)。
final TupleTag<Integer> dummyTag = new TupleTag<>();
final TupleTag<Integer> dataTag = new TupleTag<>();
PCollection<KV<String, CoGbkResult>> coGbkResultCollection = KeyedPCollectionTuple.of(dummyTag, dummyStream)
.and(dataTag, mainStream).apply(CoGroupByKey.<String>create());
最后,我们将所有的相加以获得 window 的消息总数。如果没有来自 dataTag
的元素,则总和将默认为 0。
public void processElement(ProcessContext c, BoundedWindow window) {
Integer total_sum = new Integer(0);
Iterable<Integer> dataTagVal = c.element().getValue().getAll(dataTag);
for (Integer val : dataTagVal) {
total_sum += val;
}
LOG.info("Window: " + window.toString() + ", Number of messages: " + total_sum.toString());
}
结果应该是这样的:
请注意,来自不同 windows 的结果可能是无序的(这在写入 BigQuery 时无论如何都可能发生)并且我没有使用 window 设置来优化示例。
完整代码:
public class EmptyWindows {
private static final Logger LOG = LoggerFactory.getLogger(EmptyWindows.class);
public static interface MyOptions extends PipelineOptions {
@Description("Input topic")
String getInput();
void setInput(String s);
}
@SuppressWarnings("serial")
public static void main(String[] args) {
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
Pipeline p = Pipeline.create(options);
String topic = options.getInput();
PCollection<KV<String,Integer>> mainStream = p
.apply("Get Messages - Data", PubsubIO.readStrings().fromTopic(topic))
.apply("Window Messages - Data", Window.<String>into(
SlidingWindows.of(Duration.standardMinutes(1))
.every(Duration.standardSeconds(5)))
.triggering(AfterWatermark.pastEndOfWindow())
.withAllowedLateness(Duration.ZERO)
.accumulatingFiredPanes())
.apply("Count Messages - Data", ParDo.of(new DoFn<String, KV<String, Integer>>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
//LOG.info("New data element in main output");
c.output(KV.of("num_messages", 1));
}
}));
PCollection<KV<String,Integer>> dummyStream = p
.apply("Generate Sequence", GenerateSequence.from(0).withRate(1, Duration.standardSeconds(5)))
.apply("Window Messages - Dummy", Window.<Long>into(
SlidingWindows.of(Duration.standardMinutes(1))
.every(Duration.standardSeconds(5)))
.triggering(AfterWatermark.pastEndOfWindow())
.withAllowedLateness(Duration.ZERO)
.accumulatingFiredPanes())
.apply("Count Messages - Dummy", ParDo.of(new DoFn<Long, KV<String, Integer>>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
//LOG.info("New dummy element in main output");
c.output(KV.of("num_messages", 0));
}
}));
final TupleTag<Integer> dummyTag = new TupleTag<>();
final TupleTag<Integer> dataTag = new TupleTag<>();
PCollection<KV<String, CoGbkResult>> coGbkResultCollection = KeyedPCollectionTuple.of(dummyTag, dummyStream)
.and(dataTag, mainStream).apply(CoGroupByKey.<String>create());
coGbkResultCollection
.apply("Log results", ParDo.of(new DoFn<KV<String, CoGbkResult>, Void>() {
@ProcessElement
public void processElement(ProcessContext c, BoundedWindow window) {
Integer total_sum = new Integer(0);
Iterable<Integer> dataTagVal = c.element().getValue().getAll(dataTag);
for (Integer val : dataTagVal) {
total_sum += val;
}
LOG.info("Window: " + window.toString() + ", Number of messages: " + total_sum.toString());
}
}));
p.run();
}
}
解决此问题的另一种方法是使用有状态 DoFn
和循环 Timer
,每 5 秒滴答触发一次。此循环计时器生成实时监控所需的默认数据,并确保每个 window 至少有一个事件要处理。
描述的方法的一个问题是,在具有多个键的系统中,需要为每个键生成这些“虚拟”事件。
参见https://beam.apache.org/blog/looping-timers/。该文章中的选项 1 和 2 分别是外部心跳源和光束管道中的生成源。选项 3 是循环定时器。
我想使用 Apache Beam 完成以下任务:
calculate every 5 seconds the events that are read from pubsub in the last minute
目标是对传入的速率数据有半实时视图。之后可以将其扩展到更复杂的用例。
经过搜索,我没有找到解决这个看似简单问题的方法。不起作用的东西:
- 全局window + 重复触发器(没有输入时触发器不触发)
- 滑动 window + withoutDefaults(显然不允许空 windows 被发射)
关于如何解决这个问题有什么建议吗?
如前所述,Beam 不会为空 windows 发射数据。除了 Rui Wang 给出的原因之外,我们还可以增加后期如何处理这些空白窗格的挑战。
无论如何,您描述的特定用例 - 监视消息数量的滚动计数 - 应该可以通过一些工作实现,即使指标最终下降到零。一种可能性是发布稳定数量的虚拟消息,这些消息会推进水印并触发窗格,但稍后会在管道中被过滤掉。这种方法的问题是需要调整发布来源,而且可能并不总是 convenient/possible。另一个方法涉及生成此虚假数据作为另一个输入并将其与主流共同分组。优点是一切都可以在 Dataflow 中完成,而无需调整源或接收器。为了说明这一点,我提供了一个例子。
输入分为两个流。对于虚拟元素,我使用 GenerateSequence
每 5 秒创建一个新元素。然后我 window PCollection(windowing 策略需要与主流策略兼容,所以我将使用相同的策略)。然后我将该元素映射到值为 0 的键值对(我们可以使用其他值,因为我们知道该元素来自哪个流,但我想证明不计算虚拟记录)。
PCollection<KV<String,Integer>> dummyStream = p
.apply("Generate Sequence", GenerateSequence.from(0).withRate(1, Duration.standardSeconds(5)))
.apply("Window Messages - Dummy", Window.<Long>into(
...
.apply("Count Messages - Dummy", ParDo.of(new DoFn<Long, KV<String, Integer>>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
c.output(KV.of("num_messages", 0));
}
}));
对于从 Pub/Sub 读取的主流,我将每条记录映射到值 1。稍后,我将使用 map-reduce 阶段将所有记录添加到典型的字数统计示例中。
PCollection<KV<String,Integer>> mainStream = p
.apply("Get Messages - Data", PubsubIO.readStrings().fromTopic(topic))
.apply("Window Messages - Data", Window.<String>into(
...
.apply("Count Messages - Data", ParDo.of(new DoFn<String, KV<String, Integer>>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
c.output(KV.of("num_messages", 1));
}
}));
然后我们需要使用 CoGroupByKey
加入它们(我使用相同的 num_messages
键来分组计数)。当两个输入之一有元素时,这个阶段将输出结果,因此解除这里的主要问题(空 windows 没有 Pub/Sub 消息)。
final TupleTag<Integer> dummyTag = new TupleTag<>();
final TupleTag<Integer> dataTag = new TupleTag<>();
PCollection<KV<String, CoGbkResult>> coGbkResultCollection = KeyedPCollectionTuple.of(dummyTag, dummyStream)
.and(dataTag, mainStream).apply(CoGroupByKey.<String>create());
最后,我们将所有的相加以获得 window 的消息总数。如果没有来自 dataTag
的元素,则总和将默认为 0。
public void processElement(ProcessContext c, BoundedWindow window) {
Integer total_sum = new Integer(0);
Iterable<Integer> dataTagVal = c.element().getValue().getAll(dataTag);
for (Integer val : dataTagVal) {
total_sum += val;
}
LOG.info("Window: " + window.toString() + ", Number of messages: " + total_sum.toString());
}
结果应该是这样的:
请注意,来自不同 windows 的结果可能是无序的(这在写入 BigQuery 时无论如何都可能发生)并且我没有使用 window 设置来优化示例。
完整代码:
public class EmptyWindows {
private static final Logger LOG = LoggerFactory.getLogger(EmptyWindows.class);
public static interface MyOptions extends PipelineOptions {
@Description("Input topic")
String getInput();
void setInput(String s);
}
@SuppressWarnings("serial")
public static void main(String[] args) {
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
Pipeline p = Pipeline.create(options);
String topic = options.getInput();
PCollection<KV<String,Integer>> mainStream = p
.apply("Get Messages - Data", PubsubIO.readStrings().fromTopic(topic))
.apply("Window Messages - Data", Window.<String>into(
SlidingWindows.of(Duration.standardMinutes(1))
.every(Duration.standardSeconds(5)))
.triggering(AfterWatermark.pastEndOfWindow())
.withAllowedLateness(Duration.ZERO)
.accumulatingFiredPanes())
.apply("Count Messages - Data", ParDo.of(new DoFn<String, KV<String, Integer>>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
//LOG.info("New data element in main output");
c.output(KV.of("num_messages", 1));
}
}));
PCollection<KV<String,Integer>> dummyStream = p
.apply("Generate Sequence", GenerateSequence.from(0).withRate(1, Duration.standardSeconds(5)))
.apply("Window Messages - Dummy", Window.<Long>into(
SlidingWindows.of(Duration.standardMinutes(1))
.every(Duration.standardSeconds(5)))
.triggering(AfterWatermark.pastEndOfWindow())
.withAllowedLateness(Duration.ZERO)
.accumulatingFiredPanes())
.apply("Count Messages - Dummy", ParDo.of(new DoFn<Long, KV<String, Integer>>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
//LOG.info("New dummy element in main output");
c.output(KV.of("num_messages", 0));
}
}));
final TupleTag<Integer> dummyTag = new TupleTag<>();
final TupleTag<Integer> dataTag = new TupleTag<>();
PCollection<KV<String, CoGbkResult>> coGbkResultCollection = KeyedPCollectionTuple.of(dummyTag, dummyStream)
.and(dataTag, mainStream).apply(CoGroupByKey.<String>create());
coGbkResultCollection
.apply("Log results", ParDo.of(new DoFn<KV<String, CoGbkResult>, Void>() {
@ProcessElement
public void processElement(ProcessContext c, BoundedWindow window) {
Integer total_sum = new Integer(0);
Iterable<Integer> dataTagVal = c.element().getValue().getAll(dataTag);
for (Integer val : dataTagVal) {
total_sum += val;
}
LOG.info("Window: " + window.toString() + ", Number of messages: " + total_sum.toString());
}
}));
p.run();
}
}
解决此问题的另一种方法是使用有状态 DoFn
和循环 Timer
,每 5 秒滴答触发一次。此循环计时器生成实时监控所需的默认数据,并确保每个 window 至少有一个事件要处理。
参见https://beam.apache.org/blog/looping-timers/。该文章中的选项 1 和 2 分别是外部心跳源和光束管道中的生成源。选项 3 是循环定时器。