使用和不使用 keyBy 的 RichFlatMap 中的状态管理
State management in RichFlatMap with and without keyBy
我有一个像这样的流应用程序:
DataStream<MyObject> stream1 = source
.keyBy("clientip")
.flatMap(new MyFlatMapFunction())
.name("Stream1");
//...
public class MyFlatMapFunction extends RichFlatMapFunction<MyObject, MyObject> {
private transient ValueState<Boolean> valueState;
@Override
public void open(Configuration parameters)
{
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.minutes(12))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp).cleanupInBackground()
.build();
ValueStateDescriptor<Boolean> valueStateeDescriptor = new ValueStateDescriptor<>(
"valueState",
Types.BOOLEAN);
valueStateeDescriptor.enableTimeToLive(ttlConfig);
valueState = getRuntimeContext().getState(valueState);
}
@Override
public void flatMap(MyObject myObject, Collector<MyObject> collector) throws Exception
{
// get value from value state, check if it is matched with something
// if matches some condition, then collector.collect(myObject)
// update state for each myObject
}
}
不是:有 3 个工人在不同的 3 台机器上,并行度为 16。总并行度为 48。
当我实现这段代码时,我一直假设“如果 ip 地址 1.2.3.4 符合条件,那么来自同一 ip 地址 1.2.3.4 的后续请求始终符合条件,直到状态被清除” .这个说法正确吗?
我从flink文档了解到,如果ip地址1.2.3.4转到machine1(通过生成clientip的哈希值),那么所有来自ip地址1.2.3.4的请求总是转到machine1?
open()
方法在 taskmanager jvm 中被调用一次。因此 flink 创建了 48 个 flatMapOperation 实例(48 个实例中的 1-15 个驻留在 machine1 中,48 个中的 16-32 个实例驻留在 machine2 中,48 个实例中的 33-48 个实例驻留在 machine3 中)并且每个 flatMapInstance 将 运行 打开方法。这意味着 open 方法需要 48 运行s?
最后,所有 48 个实例都访问相同的状态,但值不同(因为状态是本地的)。我的意思是,一部分实例组(比如说机器 1 上的 16 个实例)将获得相同的状态值。
最后,如果FlatMap之前没有keyBy,那么ip地址1.2.3.4的请求可以随机到machine1,machine2或machine3吗?
- 由于您执行了
keyBy("clientip")
,因此该字段具有相同值的所有记录都将由相同的 MyFlatMapFunction
sub-task 处理。所以所有记录的集合被划分为 48 sub-tasks,假设 IP 地址的计数均匀分布,每个 sub-task 将获得大约所有记录的 1/48。
- 是的,将实例化 48 个
MyFlatMapFunction
实例,因此对 open()
. 进行 48 次调用
all of 48 instances access the same state
。不,状态是 per-unique 键,因此状态按键值在 48 sub-task 之间分区。
- 如果没有
keyBy()
,则 MyFlatMapFunction
运算符的每个 sub-task 将从源中获取分区中的任何数据。这取决于您的数据源,例如如果您正在阅读 Kafka 主题,并且该主题有 48 个分区,则存在从 Kafka 分区到 MyFlatMapFunction
sub-task 的一对一映射。如果您的 Kafka 分区少于 48 个,则您的 MyFlatMapFunction
sub-task 中的某些分区将无法获取任何数据。如果要将传入记录重新分配给所有 sub-task,则可以执行 rebalance()
。但请注意,您将无法维持 per-IP 地址状态。
我有一个像这样的流应用程序:
DataStream<MyObject> stream1 = source
.keyBy("clientip")
.flatMap(new MyFlatMapFunction())
.name("Stream1");
//...
public class MyFlatMapFunction extends RichFlatMapFunction<MyObject, MyObject> {
private transient ValueState<Boolean> valueState;
@Override
public void open(Configuration parameters)
{
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.minutes(12))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp).cleanupInBackground()
.build();
ValueStateDescriptor<Boolean> valueStateeDescriptor = new ValueStateDescriptor<>(
"valueState",
Types.BOOLEAN);
valueStateeDescriptor.enableTimeToLive(ttlConfig);
valueState = getRuntimeContext().getState(valueState);
}
@Override
public void flatMap(MyObject myObject, Collector<MyObject> collector) throws Exception
{
// get value from value state, check if it is matched with something
// if matches some condition, then collector.collect(myObject)
// update state for each myObject
}
}
不是:有 3 个工人在不同的 3 台机器上,并行度为 16。总并行度为 48。
当我实现这段代码时,我一直假设“如果 ip 地址 1.2.3.4 符合条件,那么来自同一 ip 地址 1.2.3.4 的后续请求始终符合条件,直到状态被清除” .这个说法正确吗?
我从flink文档了解到,如果ip地址1.2.3.4转到machine1(通过生成clientip的哈希值),那么所有来自ip地址1.2.3.4的请求总是转到machine1?
open()
方法在 taskmanager jvm 中被调用一次。因此 flink 创建了 48 个 flatMapOperation 实例(48 个实例中的 1-15 个驻留在 machine1 中,48 个中的 16-32 个实例驻留在 machine2 中,48 个实例中的 33-48 个实例驻留在 machine3 中)并且每个 flatMapInstance 将 运行 打开方法。这意味着 open 方法需要 48 运行s?
最后,所有 48 个实例都访问相同的状态,但值不同(因为状态是本地的)。我的意思是,一部分实例组(比如说机器 1 上的 16 个实例)将获得相同的状态值。
最后,如果FlatMap之前没有keyBy,那么ip地址1.2.3.4的请求可以随机到machine1,machine2或machine3吗?
- 由于您执行了
keyBy("clientip")
,因此该字段具有相同值的所有记录都将由相同的MyFlatMapFunction
sub-task 处理。所以所有记录的集合被划分为 48 sub-tasks,假设 IP 地址的计数均匀分布,每个 sub-task 将获得大约所有记录的 1/48。 - 是的,将实例化 48 个
MyFlatMapFunction
实例,因此对open()
. 进行 48 次调用
all of 48 instances access the same state
。不,状态是 per-unique 键,因此状态按键值在 48 sub-task 之间分区。- 如果没有
keyBy()
,则MyFlatMapFunction
运算符的每个 sub-task 将从源中获取分区中的任何数据。这取决于您的数据源,例如如果您正在阅读 Kafka 主题,并且该主题有 48 个分区,则存在从 Kafka 分区到MyFlatMapFunction
sub-task 的一对一映射。如果您的 Kafka 分区少于 48 个,则您的MyFlatMapFunction
sub-task 中的某些分区将无法获取任何数据。如果要将传入记录重新分配给所有 sub-task,则可以执行rebalance()
。但请注意,您将无法维持 per-IP 地址状态。