Apache Flink:自定义触发器行为异常
Apache Flink: Custom trigger behaves unexpectedly
我有一个 DataStream,它由带有 属性 的事件组成,代表一批生产的元素。 属性,让我们称之为 'batchNumber',在我从同一个生产批次中摄取的每个事件中都是不变的。我每批收到多个事件。
我想在 'batchNumber' 变化时分析批次内的机器性能。我的方法是使用全局流并使用 'batchNumber' 作为键对其进行分区。我希望这会将全局流划分为 windows,其中每个事件都有 'batchNumber'。然后我定义了一个触发器,它应该在 'batchNumber' 更改时触发。然后我可以在 ProcessWindowFunction 中分析聚合数据。
我的问题是:
- 当 prodnr 改变时触发器并不总是触发
- 就算真的火了,也只是聚合了一个元素。我预计接近 200。
这是我正在使用的代码。
public class batchnrTrigger extends Trigger<ImaginePaperData, GlobalWindow> {
private static final long serialVersionUID = 1L;
public batchnrTrigger() {}
private final ValueStateDescriptor<Integer> prevbatchnr = new ValueStateDescriptor<>("batchnr", Integer.class);
@Override
public TriggerResult onElement(ImaginePaperData element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {
ValueState<Integer> batchnrState = ctx.getPartitionedState(prevbatchnr);
if (batchnrState == null || batchnrState.value() == null || !(element.batchnr == batchnrState.value())) {
System.out.println("batchnr BEFORE: " + batchnrState.value() + " NEW batchnr: " + element.batchnr + " ==> should fire and process elements from window!");
batchnrState.update(element.batchnr);
return TriggerResult.FIRE;
}
System.out.println("batchnr BEFORE: " + batchnrState.value() + " NEW batchnr: " + element.batchnr + " ==> should not fire and continue ingesting elements!");
batchnrState.update(element.batchnr);
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {
}
}
我是这样称呼这个触发器的:
DataStream<String> imaginePaperDataStream = nifiStreamSource
.map(new ImaginePaperDataConverter())
.keyBy((ImaginePaperData event) -> event.lunum)
.window(GlobalWindows.create())
.trigger(new LunumTrigger())
.process(new ImaginePaperWindowReportFunction());
我知道这个问题类似于 问题。但我使用的是 ValueState,我认为我的触发条件根本不相似。
我怎样才能让它工作?
您确定要通过 event.lunum 对流进行加密吗?如果您期望每个不同的 lunum 值大约有 200 个事件,那么这是有道理的。但是,如果对于 lunum 的每个值,每批次只有一个事件,那将解释您所看到的行为。
另外,您确定您的活动正在按顺序处理吗?如果批处理由于并行进程之间的竞争条件而在处理管道中的某处交错,这也可能有助于解释您所看到的情况。
此外,你应该在Trigger的clear方法中清除状态。并且您将需要实现一个 Evictor 以在处理后从 window 中删除元素。
windowAPI的这一部分相当复杂。我认为这个特定的应用程序可以更直接地实现为 RichFlatMap,它在 ListState 中收集项目,直到批号发生变化(您将保留在 ValueState 中)。
我有一个 DataStream,它由带有 属性 的事件组成,代表一批生产的元素。 属性,让我们称之为 'batchNumber',在我从同一个生产批次中摄取的每个事件中都是不变的。我每批收到多个事件。
我想在 'batchNumber' 变化时分析批次内的机器性能。我的方法是使用全局流并使用 'batchNumber' 作为键对其进行分区。我希望这会将全局流划分为 windows,其中每个事件都有 'batchNumber'。然后我定义了一个触发器,它应该在 'batchNumber' 更改时触发。然后我可以在 ProcessWindowFunction 中分析聚合数据。
我的问题是:
- 当 prodnr 改变时触发器并不总是触发
- 就算真的火了,也只是聚合了一个元素。我预计接近 200。
这是我正在使用的代码。
public class batchnrTrigger extends Trigger<ImaginePaperData, GlobalWindow> {
private static final long serialVersionUID = 1L;
public batchnrTrigger() {}
private final ValueStateDescriptor<Integer> prevbatchnr = new ValueStateDescriptor<>("batchnr", Integer.class);
@Override
public TriggerResult onElement(ImaginePaperData element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {
ValueState<Integer> batchnrState = ctx.getPartitionedState(prevbatchnr);
if (batchnrState == null || batchnrState.value() == null || !(element.batchnr == batchnrState.value())) {
System.out.println("batchnr BEFORE: " + batchnrState.value() + " NEW batchnr: " + element.batchnr + " ==> should fire and process elements from window!");
batchnrState.update(element.batchnr);
return TriggerResult.FIRE;
}
System.out.println("batchnr BEFORE: " + batchnrState.value() + " NEW batchnr: " + element.batchnr + " ==> should not fire and continue ingesting elements!");
batchnrState.update(element.batchnr);
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {
}
}
我是这样称呼这个触发器的:
DataStream<String> imaginePaperDataStream = nifiStreamSource
.map(new ImaginePaperDataConverter())
.keyBy((ImaginePaperData event) -> event.lunum)
.window(GlobalWindows.create())
.trigger(new LunumTrigger())
.process(new ImaginePaperWindowReportFunction());
我知道这个问题类似于
我怎样才能让它工作?
您确定要通过 event.lunum 对流进行加密吗?如果您期望每个不同的 lunum 值大约有 200 个事件,那么这是有道理的。但是,如果对于 lunum 的每个值,每批次只有一个事件,那将解释您所看到的行为。
另外,您确定您的活动正在按顺序处理吗?如果批处理由于并行进程之间的竞争条件而在处理管道中的某处交错,这也可能有助于解释您所看到的情况。
此外,你应该在Trigger的clear方法中清除状态。并且您将需要实现一个 Evictor 以在处理后从 window 中删除元素。
windowAPI的这一部分相当复杂。我认为这个特定的应用程序可以更直接地实现为 RichFlatMap,它在 ListState 中收集项目,直到批号发生变化(您将保留在 ValueState 中)。