Flink 如何处理迭代循环中的时间戳?

How does Flink treat timestamps within iterative loops?

如何在 Flink 的迭代 DataStream 循环中处理时间戳?

例如,这是 Flink 中一个简单的迭代循环的示例,其中反馈循环与输入流的类型不同:

DataStream<MyInput> inputStream = env.addSource(new MyInputSourceFunction());
IterativeStream.ConnectedIterativeStreams<MyInput, MyFeedback> iterativeStream = inputStream.iterate().withFeedbackType(MyFeedback.class);
// define an output tag so we can emit feedback objects via a side output
final OutputTag<MyFeedback> outputTag = new OutputTag<MyFeedback>("feedback-output"){};
// now do some processing
SingleOutputStreamOperator<MyOutput> combinedStreams = iterativeStream.process(new CoProcessFunction<MyInput, MyFeedback, MyOutput>() {
    @Override
    public void processElement1(MyInput value, Context ctx, Collector<MyOutput> out) throws Exception {
        // do some processing of the stream of MyInput values
        // emit MyOutput values downstream by calling out.collect()
        out.collect(someInstanceOfMyOutput);
    }

    @Override
    public void processElement2(MyFeedback value, Context ctx, Collector<MyOutput> out) throws Exception {
        // do some more processing on the feedback classes
        // emit feedback items
        ctx.output(outputTag, someInstanceOfMyFeedback);
    }
});

iterativeStream.closeWith(combinedStreams.getSideOutput(outputTag));

我的问题围绕着 Flink 如何在反馈循环中使用时间戳:

AFAICT,Flink 不对输入对象的顺序提供任何保证。当我尝试在 Flink 中对聚类算法使用迭代时,我已经 运行 进入了这个,其中质心更新没有得到及时处理。我找到的唯一解决方案是基本上创建传入事件和质心更新的单个(联合)流,而不是使用 co-stream.

仅供参考,this proposal 可以解决一些 short-comings 的迭代问题。