Apache Flink - 在不使用广播状态的情况下更新操作员内的配置
Apache Flink - Update configuration within operator without using Broadcast state
我们正在使用 flink 为每个事件进行 http 调用,这需要存储在数据库中的某些数据。
该数据大约每周更新一次。此更新必须交给运营商。
有没有什么方法可以在不使用广播流的情况下在运算符中更新此数据,因为我们试图在我们的体系结构中保持较低的流数量,并且因为数据的变化不频繁?
可能的选项:
A) 您可以简单地使用带有计时器的 ProcessFunction
并每隔 X 分钟提取一次更改。
B) 如果您的状态很小并且重新启动不是太关键:如果您不更新数据(403?),您的服务器请求可能会失败。然后你可以只加载 open
中的数据,当你收到 403 并恢复时让你的操作员失败。
编辑:
A) 如何工作的示例。假设你有
Source(记录)->MyAsyncFunc(输出)->Sink(输出)
我会去添加另一个功能
Source(Record)->ConfFetcher(Tuple2(Record, Conf))->MyAsyncFunc(Output)->Sink(Output)
编辑2:
正如您在评论中指出的那样,Flink 计时器绑定到键控状态。但是,对于这个用例,我们根本不需要使用任何 Flink 定时器,只需使用 Java 个定时器。
private static class PullConfig<T> extends RichMapFunction<T, Tuple2<T, Conf>> {
private transient ScheduledExecutorService service = Executors.newScheduledThreadPool(1);
private transient volatile Conf conf;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
service.scheduleWithFixedDelay(this::pullConfig, 0, 1, TimeUnit.HOURS);
}
void pullConfig() {
conf = ...
}
@Override
public Tuple2<T, Conf> map(T value) throws Exception {
return new Tuple2(value, conf);
}
...
}
我们正在使用 flink 为每个事件进行 http 调用,这需要存储在数据库中的某些数据。 该数据大约每周更新一次。此更新必须交给运营商。
有没有什么方法可以在不使用广播流的情况下在运算符中更新此数据,因为我们试图在我们的体系结构中保持较低的流数量,并且因为数据的变化不频繁?
可能的选项:
A) 您可以简单地使用带有计时器的 ProcessFunction
并每隔 X 分钟提取一次更改。
B) 如果您的状态很小并且重新启动不是太关键:如果您不更新数据(403?),您的服务器请求可能会失败。然后你可以只加载 open
中的数据,当你收到 403 并恢复时让你的操作员失败。
编辑:
A) 如何工作的示例。假设你有
Source(记录)->MyAsyncFunc(输出)->Sink(输出)
我会去添加另一个功能
Source(Record)->ConfFetcher(Tuple2(Record, Conf))->MyAsyncFunc(Output)->Sink(Output)
编辑2:
正如您在评论中指出的那样,Flink 计时器绑定到键控状态。但是,对于这个用例,我们根本不需要使用任何 Flink 定时器,只需使用 Java 个定时器。
private static class PullConfig<T> extends RichMapFunction<T, Tuple2<T, Conf>> {
private transient ScheduledExecutorService service = Executors.newScheduledThreadPool(1);
private transient volatile Conf conf;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
service.scheduleWithFixedDelay(this::pullConfig, 0, 1, TimeUnit.HOURS);
}
void pullConfig() {
conf = ...
}
@Override
public Tuple2<T, Conf> map(T value) throws Exception {
return new Tuple2(value, conf);
}
...
}