Flink 维护配置状态
Flink maintaining configuration state
我有一个在 Flink 中维护配置的用例,我真的不知道如何处理。
假设我在某处存储了一些配置,我需要它来进行处理。在Flink作业初始化的时候,我想加载所有的配置。
这个配置也可以在Flink作业的运行期间被修改,所以我必须在内存中保留这个配置的状态,并在需要的时候更新它。可以从 KafkaSource 访问配置更新。
这就是我所拥有的:
我有一个函数可以加载整个配置,将其保持在一个状态并将其与我的数据流相关联:
public class MyConfiguration extends RichFlatMapFunction<Row, Row>{
private transient MapState<String, MyConfObject> configuration;
@Override
public void open(MyConfiguration config) throws Exception{
MapStateDescriptor<String,MyConfObject> descriptor = new MapStateDescriptor<String,MyConfObject>(
"configuration",
BasicTypeInfo.STRING_TYPE_INFO,
...
);
configuration = getRuntimeContext().getMapState(descriptor);
configuration.putAll(...); // Load configuration from somewhere
}
@Override
public void flatMap(Row value, Collector<Row> out) throws Exception {
MyConfObject conf = configuration.get(...);
... // Associate conf with data
out.collect(value);
}
}
我的管道是这样的:
DataStream<Row> dataStream = ...; // My data stream
DataStream<Map<String, MyConfObject> streamConf =
env.addSource(new FlinkKafkaConsumer<Row>(..., ..., ...)) // The stream of configuration updates
.map(...);
return dataStream
.assignTimestampsAndWatermarks(...)
.flatMap(new MyConfiguration())
... //Do some processing
.map(m -> {
ObjectMapper objectMapper = new ObjectMapper();
String json = objectMapper.writeValueAsString(m);
return json.getBytes();
});
我想要的是使用配置更新流 streamConf
来更新 MyConfiguration
平面地图函数内的状态变量。我该怎么做?
我建议您编写一个从 Kafka 读取配置信息的源代码,然后通过广播流将对配置的更改广播到映射函数。映射函数会将完整的当前配置存储在其持久状态中,广播流意味着映射函数的所有实例都将获得所有配置更改。
我有一个在 Flink 中维护配置的用例,我真的不知道如何处理。
假设我在某处存储了一些配置,我需要它来进行处理。在Flink作业初始化的时候,我想加载所有的配置。
这个配置也可以在Flink作业的运行期间被修改,所以我必须在内存中保留这个配置的状态,并在需要的时候更新它。可以从 KafkaSource 访问配置更新。
这就是我所拥有的:
我有一个函数可以加载整个配置,将其保持在一个状态并将其与我的数据流相关联:
public class MyConfiguration extends RichFlatMapFunction<Row, Row>{
private transient MapState<String, MyConfObject> configuration;
@Override
public void open(MyConfiguration config) throws Exception{
MapStateDescriptor<String,MyConfObject> descriptor = new MapStateDescriptor<String,MyConfObject>(
"configuration",
BasicTypeInfo.STRING_TYPE_INFO,
...
);
configuration = getRuntimeContext().getMapState(descriptor);
configuration.putAll(...); // Load configuration from somewhere
}
@Override
public void flatMap(Row value, Collector<Row> out) throws Exception {
MyConfObject conf = configuration.get(...);
... // Associate conf with data
out.collect(value);
}
}
我的管道是这样的:
DataStream<Row> dataStream = ...; // My data stream
DataStream<Map<String, MyConfObject> streamConf =
env.addSource(new FlinkKafkaConsumer<Row>(..., ..., ...)) // The stream of configuration updates
.map(...);
return dataStream
.assignTimestampsAndWatermarks(...)
.flatMap(new MyConfiguration())
... //Do some processing
.map(m -> {
ObjectMapper objectMapper = new ObjectMapper();
String json = objectMapper.writeValueAsString(m);
return json.getBytes();
});
我想要的是使用配置更新流 streamConf
来更新 MyConfiguration
平面地图函数内的状态变量。我该怎么做?
我建议您编写一个从 Kafka 读取配置信息的源代码,然后通过广播流将对配置的更改广播到映射函数。映射函数会将完整的当前配置存储在其持久状态中,广播流意味着映射函数的所有实例都将获得所有配置更改。