在 processBroadcastElement 函数中访问 flink 状态
Access flink state inside processBroadcastElement function
打算在 processBroadcastElement()
函数中进行一些状态管理。
final val actvTagsMapValue = new MapStateDescriptor[String, List[String]]("actvTagsMapValue", classOf[String], classOf[List[String]])
override def processBroadcastElement(...): Unit {
val actvTagMap = getRuntimeContext.getMapState(actvTagsMapValue)
val st = actvTagMap.entries() // this line produce an error
}
在访问状态期间出现以下错误
229797 [LabelShlfEvents -> Sink: Print to Std. Out (1/1)] WARN
o.a.flink.runtime.taskmanager.Task - LabelShlfEvents -> Sink: Print to Std. Out (1/1)
(d3154841fd8bd4cabc00e0145ac37ed8) switched from RUNNING to FAILED.
java.lang.NullPointerException: No key set. This method should not be called outside of a keyed context.
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75)
不允许我这样做?
你不能这样做。原因很简单,因为 MapState
(还有 ValueState
、ListState
以及 here 中描述的更多)是一种称为键控状态的状态。此状态已分区并限定为当前元素的输入键。
广播元素不以任何方式键控或分区,因此没有 KeyedContext
附加到这些元素。当您尝试访问 processBroadcastElement
内的状态时,Flink 不知道此请求的范围是哪个键,这就是为什么您会得到一个异常。
另一方面,您可以在 KeyedBroadcastProcessFunction
的 processElement
中安全地使用键控状态,因为这些元素将分配键,并且在键控状态的情况下范围是已知的。
如果您需要对非广播状态的广播元素使用状态,则需要按照文档中的描述将其实现为运算符状态。
这行不通,Dominik 在 中解释了原因。
您在 processBroadcastElement
中可以做的是 access/modify/delete 所有键的键控状态,通过使用 applyToKeyedState
和 KeyedStateFunction
。但是,您必须注意在所有并行实例中确定性地运行。否则,在恢复或重新缩放后,您可能会出现不一致。
这是一个示例,它在收到任何广播消息后为每个键发出 ValueState
的值。
public static class DumpFunction
extends KeyedBroadcastProcessFunction<Long, TaxiRide, String, TaxiRide> {
private ValueStateDescriptor<TaxiRide> taxiDesc;
private ValueState<TaxiRide> taxiState;
@Override
public void open(Configuration config) {
taxiDesc = new ValueStateDescriptor<>("ride", TaxiRide.class);
taxiState = getRuntimeContext().getState(taxiDescriptor);
}
@Override
public void processElement(TaxiRide ride, ReadOnlyContext ctx,
Collector< TaxiRide> out) throws Exception {
taxiState.update(ride);
}
@Override
public void processBroadcastElement(String msg, Context ctx, Collector<TaxiRide> out) {
ctx.applyToKeyedState(taxiDesc, new KeyedStateFunction<Long, ValueState<TaxiRide>>() {
@Override
public void process(Long taxiId, ValueState<TaxiRide> taxiState) throws Exception {
out.collect(taxiState.value());
}
});
}
}
打算在 processBroadcastElement()
函数中进行一些状态管理。
final val actvTagsMapValue = new MapStateDescriptor[String, List[String]]("actvTagsMapValue", classOf[String], classOf[List[String]])
override def processBroadcastElement(...): Unit {
val actvTagMap = getRuntimeContext.getMapState(actvTagsMapValue)
val st = actvTagMap.entries() // this line produce an error
}
在访问状态期间出现以下错误
229797 [LabelShlfEvents -> Sink: Print to Std. Out (1/1)] WARN
o.a.flink.runtime.taskmanager.Task - LabelShlfEvents -> Sink: Print to Std. Out (1/1)
(d3154841fd8bd4cabc00e0145ac37ed8) switched from RUNNING to FAILED.
java.lang.NullPointerException: No key set. This method should not be called outside of a keyed context.
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75)
不允许我这样做?
你不能这样做。原因很简单,因为 MapState
(还有 ValueState
、ListState
以及 here 中描述的更多)是一种称为键控状态的状态。此状态已分区并限定为当前元素的输入键。
广播元素不以任何方式键控或分区,因此没有 KeyedContext
附加到这些元素。当您尝试访问 processBroadcastElement
内的状态时,Flink 不知道此请求的范围是哪个键,这就是为什么您会得到一个异常。
另一方面,您可以在 KeyedBroadcastProcessFunction
的 processElement
中安全地使用键控状态,因为这些元素将分配键,并且在键控状态的情况下范围是已知的。
如果您需要对非广播状态的广播元素使用状态,则需要按照文档中的描述将其实现为运算符状态。
这行不通,Dominik 在
您在 processBroadcastElement
中可以做的是 access/modify/delete 所有键的键控状态,通过使用 applyToKeyedState
和 KeyedStateFunction
。但是,您必须注意在所有并行实例中确定性地运行。否则,在恢复或重新缩放后,您可能会出现不一致。
这是一个示例,它在收到任何广播消息后为每个键发出 ValueState
的值。
public static class DumpFunction
extends KeyedBroadcastProcessFunction<Long, TaxiRide, String, TaxiRide> {
private ValueStateDescriptor<TaxiRide> taxiDesc;
private ValueState<TaxiRide> taxiState;
@Override
public void open(Configuration config) {
taxiDesc = new ValueStateDescriptor<>("ride", TaxiRide.class);
taxiState = getRuntimeContext().getState(taxiDescriptor);
}
@Override
public void processElement(TaxiRide ride, ReadOnlyContext ctx,
Collector< TaxiRide> out) throws Exception {
taxiState.update(ride);
}
@Override
public void processBroadcastElement(String msg, Context ctx, Collector<TaxiRide> out) {
ctx.applyToKeyedState(taxiDesc, new KeyedStateFunction<Long, ValueState<TaxiRide>>() {
@Override
public void process(Long taxiId, ValueState<TaxiRide> taxiState) throws Exception {
out.collect(taxiState.value());
}
});
}
}