使用和不使用 keyBy 的 RichFlatMap 中的状态管理

State management in RichFlatMap with and without keyBy

我有一个像这样的流应用程序:

DataStream<MyObject> stream1 = source
                .keyBy("clientip")
                .flatMap(new MyFlatMapFunction())
                .name("Stream1");

//...
public class MyFlatMapFunction extends RichFlatMapFunction<MyObject, MyObject> {

    private transient ValueState<Boolean> valueState;

    @Override
    public void open(Configuration parameters)
    {
        StateTtlConfig ttlConfig = StateTtlConfig
                .newBuilder(Time.minutes(12))
                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp).cleanupInBackground()
                .build();

        ValueStateDescriptor<Boolean> valueStateeDescriptor = new ValueStateDescriptor<>(
                "valueState",
                Types.BOOLEAN);

        valueStateeDescriptor.enableTimeToLive(ttlConfig);
        valueState = getRuntimeContext().getState(valueState);
    }

    @Override
    public void flatMap(MyObject myObject, Collector<MyObject> collector) throws Exception
    {
       // get value from value state, check if it is matched with something
       // if matches some condition, then collector.collect(myObject)
       // update state for each myObject
    }
}

不是:有 3 个工人在不同的 3 台机器上,并行度为 16。总并行度为 48。

当我实现这段代码时,我一直假设“如果 ip 地址 1.2.3.4 符合条件,那么来自同一 ip 地址 1.2.3.4 的后续请求始终符合条件,直到状态被清除” .这个说法正确吗?

我从flink文档了解到,如果ip地址1.2.3.4转到machine1(通过生成clientip的哈希值),那么所有来自ip地址1.2.3.4的请求总是转到machine1?

open()方法在 taskmanager jvm 中被调用一次。因此 flink 创建了 48 个 flatMapOperation 实例(48 个实例中的 1-15 个驻留在 machine1 中,48 个中的 16-32 个实例驻留在 machine2 中,48 个实例中的 33-48 个实例驻留在 machine3 中)并且每个 flatMapInstance 将 运行 打开方法。这意味着 open 方法需要 48 运行s?

最后,所有 48 个实例都访问相同的状态,但值不同(因为状态是本地的)。我的意思是,一部分实例组(比如说机器 1 上的 16 个实例)将获得相同的状态值。

最后,如果FlatMap之前没有keyBy,那么ip地址1.2.3.4的请求可以随机到machine1,machine2或machine3吗?

  1. 由于您执行了 keyBy("clientip"),因此该字段具有相同值的所有记录都将由相同的 MyFlatMapFunction sub-task 处理。所以所有记录的集合被划分为 48 sub-tasks,假设 IP 地址的计数均匀分布,每个 sub-task 将获得大约所有记录的 1/48。
  2. 是的,将实例化 48 个 MyFlatMapFunction 实例,因此对 open().
  3. 进行 48 次调用
  4. all of 48 instances access the same state。不,状态是 per-unique 键,因此状态按键值在 48 sub-task 之间分区。
  5. 如果没有 keyBy(),则 MyFlatMapFunction 运算符的每个 sub-task 将从源中获取分区中的任何数据。这取决于您的数据源,例如如果您正在阅读 Kafka 主题,并且该主题有 48 个分区,则存在从 Kafka 分区到 MyFlatMapFunction sub-task 的一对一映射。如果您的 Kafka 分区少于 48 个,则您的 MyFlatMapFunction sub-task 中的某些分区将无法获取任何数据。如果要将传入记录重新分配给所有 sub-task,则可以执行 rebalance()。但请注意,您将无法维持 per-IP 地址状态。