在 processBroadcastElement 函数中访问 flink 状态

Access flink state inside processBroadcastElement function

打算在 processBroadcastElement() 函数中进行一些状态管理。

final val actvTagsMapValue = new MapStateDescriptor[String, List[String]]("actvTagsMapValue", classOf[String], classOf[List[String]])

override def processBroadcastElement(...): Unit {
    val actvTagMap = getRuntimeContext.getMapState(actvTagsMapValue)
    val st = actvTagMap.entries() // this line produce an error
}

在访问状态期间出现以下错误

229797 [LabelShlfEvents -> Sink: Print to Std. Out (1/1)] WARN  
o.a.flink.runtime.taskmanager.Task - LabelShlfEvents -> Sink: Print to Std. Out (1/1) 
(d3154841fd8bd4cabc00e0145ac37ed8) switched from RUNNING to FAILED. 
java.lang.NullPointerException: No key set. This method should not be called outside of a keyed context.
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75)

不允许我这样做?

你不能这样做。原因很简单,因为 MapState(还有 ValueStateListState 以及 here 中描述的更多)是一种称为键控状态的状态。此状态已分区并限定为当前元素的输入键。

广播元素不以任何方式键控或分区,因此没有 KeyedContext 附加到这些元素。当您尝试访问 processBroadcastElement 内的状态时,Flink 不知道此请求的范围是哪个键,这就是为什么您会得到一个异常。

另一方面,您可以在 KeyedBroadcastProcessFunctionprocessElement 中安全地使用键控状态,因为这些元素将分配键,并且在键控状态的情况下范围是已知的。

如果您需要对非广播状态的广播元素使用状态,则需要按照文档中的描述将其实现为运算符状态。

这行不通,Dominik 在 中解释了原因。

您在 processBroadcastElement 中可以做的是 access/modify/delete 所有键的键控状态,通过使用 applyToKeyedStateKeyedStateFunction。但是,您必须注意在所有并行实例中确定性地运行。否则,在恢复或重新缩放后,您可能会出现不一致。

这是一个示例,它在收到任何广播消息后为每个键发出 ValueState 的值。

public static class DumpFunction
        extends KeyedBroadcastProcessFunction<Long, TaxiRide, String, TaxiRide> {
    private ValueStateDescriptor<TaxiRide> taxiDesc;
    private ValueState<TaxiRide> taxiState;
    
    @Override
    public void open(Configuration config) {
        taxiDesc = new ValueStateDescriptor<>("ride", TaxiRide.class);
        taxiState = getRuntimeContext().getState(taxiDescriptor);
    }
    
    @Override
    public void processElement(TaxiRide ride, ReadOnlyContext ctx, 
            Collector< TaxiRide> out) throws Exception {
        taxiState.update(ride);
    }
            
    @Override
    public void processBroadcastElement(String msg, Context ctx, Collector<TaxiRide> out) {
        ctx.applyToKeyedState(taxiDesc, new KeyedStateFunction<Long, ValueState<TaxiRide>>() {
            @Override
            public void process(Long taxiId, ValueState<TaxiRide> taxiState) throws Exception {
                out.collect(taxiState.value());
            }
        });
    }
}