Q.Apache Flink:如何在 initializeState 期间获取当前密钥
Q.Apache Flink : how to get current key during initializeState
如果应用程序无法从后端恢复状态,我想从我自己的 db(es.eg) 初始化值状态,但是如何在 initializeState 期间获取当前密钥?
这里是示例代码:
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
KeyedStateStore stateStore = context.getKeyedStateStore();
ValueStateDescriptor<PickUpState> pickUpStateConfig = new ValueStateDescriptor<>("pickUpState", PickUpState.class);
ValueState<PickUpState> state = stateStore.getState(pickUpStateConfig);
pickUpState = state;
if(!context.isRestored()){
//get the current key helpful
String key = ...
PickUpState upState = initStateFromEs(key);
state.update(upState);
}
}
任何回复都会有所帮助,谢谢!
这是不可能的,因为调用 initializeState
时没有当前密钥。
用户功能的每个实例都跨多个键进行复用——即分配给该任务槽的键组中的所有键。 initializeState
只被调用一次,它需要执行所有这些键所需的任何操作。 (并且无法确定哪些键与给定实例相关。)
假设状态后端始终可用。唯一不正确的情况是存储快照的远程文件系统不可用,在这种情况下您无能为力——除了从头开始重建状态。例如,您可以使用状态处理器 API 从数据库中的数据构建新的状态快照。
如果应用程序无法从后端恢复状态,我想从我自己的 db(es.eg) 初始化值状态,但是如何在 initializeState 期间获取当前密钥?
这里是示例代码:
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
KeyedStateStore stateStore = context.getKeyedStateStore();
ValueStateDescriptor<PickUpState> pickUpStateConfig = new ValueStateDescriptor<>("pickUpState", PickUpState.class);
ValueState<PickUpState> state = stateStore.getState(pickUpStateConfig);
pickUpState = state;
if(!context.isRestored()){
//get the current key helpful
String key = ...
PickUpState upState = initStateFromEs(key);
state.update(upState);
}
}
任何回复都会有所帮助,谢谢!
这是不可能的,因为调用 initializeState
时没有当前密钥。
用户功能的每个实例都跨多个键进行复用——即分配给该任务槽的键组中的所有键。 initializeState
只被调用一次,它需要执行所有这些键所需的任何操作。 (并且无法确定哪些键与给定实例相关。)
假设状态后端始终可用。唯一不正确的情况是存储快照的远程文件系统不可用,在这种情况下您无能为力——除了从头开始重建状态。例如,您可以使用状态处理器 API 从数据库中的数据构建新的状态快照。