初始化 MapState 的内容
Initialize the content of a MapState
我已经实现了具有以下结构的 Flink RichFunction
:
public class MyFunction extends KeyedBroadcastProcessFunction <String, InputType, BroadcastedStateType, OutputType> {
private MapState<String, MyState> myState;
@Override
public void open(Configuration conf)throws Exception{
myState = getRuntimeContext().getMapState(new MapStateDescriptor<>("state", Types.STRING, Types.POJO(BroadcastedStateType.class)));
}
@Override
public void processElement(InputType value, ReadOnlyContext ctx, Collector<OutputType> out) throws Exception {
MyState state = myState.get(value.ID());
// Do things
}
@Override
public void processBroadcastElement(BroadcastedStateType value, Context ctx, Collector<OutputType> out) throws Exception {
state.put(value.ID(), value.state()); // Update the mapState with value from broadcast
}
// retrieve all the state values and put them in the MapState
private void initialState() throws Exception{
Map<String, MyState> initialValues = ...;
this.cameras.putAll(initialValues);
}
}
mapState
变量存储通过 BroadcastedStream
更新的多个状态。更新在 processBroadcastElement()
函数中完成。
在作业开始时,我想使用 initialState()
函数初始化 mapState
。
问题是我无法在 open()
函数中使用它(请参阅 为什么)
在这种情况下初始化 mapState
的正确方法是什么? (在所有情况下都使用 RichFunctions)
您想实施org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
当你这样做时,你实现了两个方法:
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// called when it's time to save state
myState.clear();
// Update myState with current application state
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
// called when things start up, possibly recovering from an error
descriptor = new MapStateDescriptor<>("state", Types.STRING, Types.POJO(BroadcastedStateType.class));
myState = context.getKeyedStateStore().getMapState(descriptor);
if (context.isRestored()) {
// restore application state from myState
}
}
您在 initializeState() 方法而不是 open() 中初始化了 myState 变量。
我不相信您真的可以在 initializeState() 中初始化广播状态。修改广播状态的唯一方法是通过您在 processBroadcastElement 方法中获得的 read/write 上下文。
但是你可以做的是在initializeState中使用context.isRestored()来确定KeyedBroadcastProcessFunction是否是第一次初始化,并设置一个瞬态局部变量来记录这个信息。然后第一次调用 processBroadcastElement 方法时,您可以使用此信息来决定在广播状态中存储什么。但是你必须在广播流上发送一些东西才能开始。
我已经实现了具有以下结构的 Flink RichFunction
:
public class MyFunction extends KeyedBroadcastProcessFunction <String, InputType, BroadcastedStateType, OutputType> {
private MapState<String, MyState> myState;
@Override
public void open(Configuration conf)throws Exception{
myState = getRuntimeContext().getMapState(new MapStateDescriptor<>("state", Types.STRING, Types.POJO(BroadcastedStateType.class)));
}
@Override
public void processElement(InputType value, ReadOnlyContext ctx, Collector<OutputType> out) throws Exception {
MyState state = myState.get(value.ID());
// Do things
}
@Override
public void processBroadcastElement(BroadcastedStateType value, Context ctx, Collector<OutputType> out) throws Exception {
state.put(value.ID(), value.state()); // Update the mapState with value from broadcast
}
// retrieve all the state values and put them in the MapState
private void initialState() throws Exception{
Map<String, MyState> initialValues = ...;
this.cameras.putAll(initialValues);
}
}
mapState
变量存储通过 BroadcastedStream
更新的多个状态。更新在 processBroadcastElement()
函数中完成。
在作业开始时,我想使用 initialState()
函数初始化 mapState
。
问题是我无法在 open()
函数中使用它(请参阅
在这种情况下初始化 mapState
的正确方法是什么? (在所有情况下都使用 RichFunctions)
您想实施org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
当你这样做时,你实现了两个方法:
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// called when it's time to save state
myState.clear();
// Update myState with current application state
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
// called when things start up, possibly recovering from an error
descriptor = new MapStateDescriptor<>("state", Types.STRING, Types.POJO(BroadcastedStateType.class));
myState = context.getKeyedStateStore().getMapState(descriptor);
if (context.isRestored()) {
// restore application state from myState
}
}
您在 initializeState() 方法而不是 open() 中初始化了 myState 变量。
我不相信您真的可以在 initializeState() 中初始化广播状态。修改广播状态的唯一方法是通过您在 processBroadcastElement 方法中获得的 read/write 上下文。
但是你可以做的是在initializeState中使用context.isRestored()来确定KeyedBroadcastProcessFunction是否是第一次初始化,并设置一个瞬态局部变量来记录这个信息。然后第一次调用 processBroadcastElement 方法时,您可以使用此信息来决定在广播状态中存储什么。但是你必须在广播流上发送一些东西才能开始。