如何在运行时配置 flink 作业?
How to configure flink jobs at runtime?
是否可以在运行时配置 flink 应用程序?例如,我有一个流式应用程序,它读取输入,进行一些转换,然后过滤掉低于特定阈值的所有元素。但是,我希望这个阈值在运行时是可配置的,这意味着我可以更改它而不必重新启动我的 flink 作业。示例代码:
DataStream<MyModel> myModelDataStream = // get input ...
// do some stuff ...
.filter(new RichFilterFunction<MyModel>() {
@Override
public boolean filter(MyModel value) throws Exception {
return value.someValue() > someGlobalState.getThreshold();
}
})
// write to some sink ...
DataStream<MyConfig> myConfigDataStream = // get input ...
// ...
.process(new RichProcessFunction<MyConfig>() {
someGlobalState.setThreshold(MyConfig.getThreshold());
})
// ...
有没有可能实现这个?例如可以通过配置流更改的全局状态。
是的,您可以使用 BroadcastProcessFunction
来做到这一点。大致是这样的:
MapStateDescriptor<Void, Threshold> bcStateDescriptor =
new MapStateDescriptor<>("thresholds", Types.VOID, Threshold.class);
DataStream<MyModel> myModelDataStream = // get input ...
DataStream<Threshold> thresholds = // get input...
BroadcastStream<Threshold> controlStream = thresholds.broadcast(bcStateDescriptor);
DataStream<MyModel> result = myModelDataStream
.connect(controlStream)
.process(new MyFunction());
public class MyFunction extends BroadcastProcessFunction<MyModel, Long, MyModel> {
@Override
public void processBroadcastElement(Threshold newthreshold, Context ctx, Collector<MyModel> out) {
MapStateDescriptor stateDescriptor = new MapStateDescriptor<>("thresholds", Types.VOID, Threshold.class)
BroadcastState<Void, Threshold> bcState = ctx.getBroadcastState(stateDescriptor);
bcState.put(null, newthreshold);
}
@Override
public void processElement(MyModel model, Collector<MyModel> out) {
Threshold threshold = ctx.getBroadcastState(new MapStateDescriptor<>("threshold", Types.VOID, Threshold.class)).get(null);
if (threshold.value() == null || model.getData() > threshold.value()) {
out.collect(model);
}
}
}
是否可以在运行时配置 flink 应用程序?例如,我有一个流式应用程序,它读取输入,进行一些转换,然后过滤掉低于特定阈值的所有元素。但是,我希望这个阈值在运行时是可配置的,这意味着我可以更改它而不必重新启动我的 flink 作业。示例代码:
DataStream<MyModel> myModelDataStream = // get input ...
// do some stuff ...
.filter(new RichFilterFunction<MyModel>() {
@Override
public boolean filter(MyModel value) throws Exception {
return value.someValue() > someGlobalState.getThreshold();
}
})
// write to some sink ...
DataStream<MyConfig> myConfigDataStream = // get input ...
// ...
.process(new RichProcessFunction<MyConfig>() {
someGlobalState.setThreshold(MyConfig.getThreshold());
})
// ...
有没有可能实现这个?例如可以通过配置流更改的全局状态。
是的,您可以使用 BroadcastProcessFunction
来做到这一点。大致是这样的:
MapStateDescriptor<Void, Threshold> bcStateDescriptor =
new MapStateDescriptor<>("thresholds", Types.VOID, Threshold.class);
DataStream<MyModel> myModelDataStream = // get input ...
DataStream<Threshold> thresholds = // get input...
BroadcastStream<Threshold> controlStream = thresholds.broadcast(bcStateDescriptor);
DataStream<MyModel> result = myModelDataStream
.connect(controlStream)
.process(new MyFunction());
public class MyFunction extends BroadcastProcessFunction<MyModel, Long, MyModel> {
@Override
public void processBroadcastElement(Threshold newthreshold, Context ctx, Collector<MyModel> out) {
MapStateDescriptor stateDescriptor = new MapStateDescriptor<>("thresholds", Types.VOID, Threshold.class)
BroadcastState<Void, Threshold> bcState = ctx.getBroadcastState(stateDescriptor);
bcState.put(null, newthreshold);
}
@Override
public void processElement(MyModel model, Collector<MyModel> out) {
Threshold threshold = ctx.getBroadcastState(new MapStateDescriptor<>("threshold", Types.VOID, Threshold.class)).get(null);
if (threshold.value() == null || model.getData() > threshold.value()) {
out.collect(model);
}
}
}