Flink 如何处理迭代循环中的时间戳?
How does Flink treat timestamps within iterative loops?
如何在 Flink 的迭代 DataStream 循环中处理时间戳?
例如,这是 Flink 中一个简单的迭代循环的示例,其中反馈循环与输入流的类型不同:
DataStream<MyInput> inputStream = env.addSource(new MyInputSourceFunction());
IterativeStream.ConnectedIterativeStreams<MyInput, MyFeedback> iterativeStream = inputStream.iterate().withFeedbackType(MyFeedback.class);
// define an output tag so we can emit feedback objects via a side output
final OutputTag<MyFeedback> outputTag = new OutputTag<MyFeedback>("feedback-output"){};
// now do some processing
SingleOutputStreamOperator<MyOutput> combinedStreams = iterativeStream.process(new CoProcessFunction<MyInput, MyFeedback, MyOutput>() {
@Override
public void processElement1(MyInput value, Context ctx, Collector<MyOutput> out) throws Exception {
// do some processing of the stream of MyInput values
// emit MyOutput values downstream by calling out.collect()
out.collect(someInstanceOfMyOutput);
}
@Override
public void processElement2(MyFeedback value, Context ctx, Collector<MyOutput> out) throws Exception {
// do some more processing on the feedback classes
// emit feedback items
ctx.output(outputTag, someInstanceOfMyFeedback);
}
});
iterativeStream.closeWith(combinedStreams.getSideOutput(outputTag));
我的问题围绕着 Flink 如何在反馈循环中使用时间戳:
- 在
ConnectedIterativeStreams
中,Flink 如何处理跨常规输入和反馈对象流的输入对象的排序?如果我将一个对象发射到反馈循环中,相对于常规输入对象流,循环头什么时候能看到它?
- 使用事件时间处理时行为有何变化?
AFAICT,Flink 不对输入对象的顺序提供任何保证。当我尝试在 Flink 中对聚类算法使用迭代时,我已经 运行 进入了这个,其中质心更新没有得到及时处理。我找到的唯一解决方案是基本上创建传入事件和质心更新的单个(联合)流,而不是使用 co-stream.
仅供参考,this proposal 可以解决一些 short-comings 的迭代问题。
如何在 Flink 的迭代 DataStream 循环中处理时间戳?
例如,这是 Flink 中一个简单的迭代循环的示例,其中反馈循环与输入流的类型不同:
DataStream<MyInput> inputStream = env.addSource(new MyInputSourceFunction());
IterativeStream.ConnectedIterativeStreams<MyInput, MyFeedback> iterativeStream = inputStream.iterate().withFeedbackType(MyFeedback.class);
// define an output tag so we can emit feedback objects via a side output
final OutputTag<MyFeedback> outputTag = new OutputTag<MyFeedback>("feedback-output"){};
// now do some processing
SingleOutputStreamOperator<MyOutput> combinedStreams = iterativeStream.process(new CoProcessFunction<MyInput, MyFeedback, MyOutput>() {
@Override
public void processElement1(MyInput value, Context ctx, Collector<MyOutput> out) throws Exception {
// do some processing of the stream of MyInput values
// emit MyOutput values downstream by calling out.collect()
out.collect(someInstanceOfMyOutput);
}
@Override
public void processElement2(MyFeedback value, Context ctx, Collector<MyOutput> out) throws Exception {
// do some more processing on the feedback classes
// emit feedback items
ctx.output(outputTag, someInstanceOfMyFeedback);
}
});
iterativeStream.closeWith(combinedStreams.getSideOutput(outputTag));
我的问题围绕着 Flink 如何在反馈循环中使用时间戳:
- 在
ConnectedIterativeStreams
中,Flink 如何处理跨常规输入和反馈对象流的输入对象的排序?如果我将一个对象发射到反馈循环中,相对于常规输入对象流,循环头什么时候能看到它? - 使用事件时间处理时行为有何变化?
AFAICT,Flink 不对输入对象的顺序提供任何保证。当我尝试在 Flink 中对聚类算法使用迭代时,我已经 运行 进入了这个,其中质心更新没有得到及时处理。我找到的唯一解决方案是基本上创建传入事件和质心更新的单个(联合)流,而不是使用 co-stream.
仅供参考,this proposal 可以解决一些 short-comings 的迭代问题。