Flink 可重扩展键控流状态函数

Flink re-scalable keyed stream stateful function

我有以下 Flink 作业,我尝试使用后端类型为 RockDB 的键控流状态函数 (MapState),

environment
.addSource(consumer).name("MyKafkaSource").uid("kafka-id")
.flatMap(pojoMapper).name("MyMapFunction").uid("map-id")
.keyBy(new MyKeyExtractor())
.map(new MyRichMapFunction()).name("MyRichMapFunction").uid("rich-map-id")
.addSink(sink).name("MyFileSink").uid("sink-id")

MyRichMapFunction 是一个有状态函数,它扩展了 RichMapFunction,它具有以下代码,

public static class MyRichMapFunction extends RichMapFunction<MyEvent, MyEvent> {
    private transient MapState<String, Boolean> cache;
    @Override
    public void open(Configuration config) {
        MapStateDescriptor<String, Boolean> descriptor =
                new MapStateDescriptor("seen-values", TypeInformation.of(new TypeHint<String>() {}), TypeInformation.of(new TypeHint<Boolean>() {}));
        cache = getRuntimeContext().getMapState(descriptor);
    }
    @Override
    public MyEvent map(MyEvent value) throws Exception {
        if (cache.contains(value.getEventId())) {
            value.setIsSeenAlready(Boolean.TRUE);
            return value;
        }
        value.setIsSeenAlready(Boolean.FALSE);
        cache.put(value.getEventId(), Boolean.TRUE)
        return value;
    }
}

将来我想重新缩放并行度(从2到4),所以我的问题是,如何实现可重新缩放的键控状态,以便在更改并行度后我可以获得相应的缓存键控数据到相应的任务槽。我试图探索这个,在那里我找到了一个文档 here。据此,可以通过使用为此提供 snapshotState/restoreState 方法的 ListCheckPointed 接口来实现可重新缩放的运算符状态。但不确定如何实现可重新缩放的键控状态(MyRichMapFunction)?我是否需要为我的 MyRichMapFunction class 实现 ListCheckPointed 接口?如果是,我如何根据 restoreState 方法上的新并行密钥散列重新分配缓存(我的 MapState 将在启用 TTL 的情况下保存大量密钥,假设最多它将在任何时间点保存 10 亿个密钥)?有人可以帮我解决这个问题吗,或者如果你给我指出任何例子也很好。

您编写的代码已经可以重新缩放; Flink 的 managed keyed state 在设计上是可扩展的。通过重新平衡键对实例的分配来重新缩放键控状态。 (您可以将键控状态视为分片 key/value 存储。从技术上讲,一致性哈希用于将键映射到 键组 ,并且每个并行实例负责一些关键组。重新缩放只涉及在实例之间重新分配关键组。)

ListCheckpointed 接口用于在非键控上下文中使用的状态,因此它不适合您正在做的事情。另请注意,ListCheckpointed 将在 Flink 1.11 中弃用,取而代之的是更通用的 CheckpointedFunction.

还有一件事:如果 MyKeyExtractor 通过 value.getEventId() 键控,那么您可以使用 ValueState<Boolean> 作为缓存,而不是 MapState<String, Boolean>。这是有效的,因为对于键控状态,每个键都有一个单独的 ValueState 值。当您需要为流中的每个键存储多个 attribute/value 对时,您只需要使用 MapState。

大部分内容在 Hands-on Training, which includes an example 下的 Flink 文档中讨论,这与您正在做的非常接近。