如何使用可查询状态客户端获取 flink 中多个 keyBy 的状态?
How to get state for multiple keyBy in flink using queryable state client?
我使用的是Flink 1.4.2,有一个场景需要使用两个key。
例如
KeyedStream<UsageStatistics, Tuple> keyedStream = stream.keyBy("clusterId", "ssid");
usageCounts = keyedStream.process(new CustomProcessFunction(windowSize,queryableStateName));
值描述会
ValueStateDescriptor<SsidTotalUsage> descriptor = new ValueStateDescriptor(queryableStateName, SsidTotalUsage.class);
descriptor.setQueryable(queryableStateName);
谁能建议我在 flink 中使用可查询状态客户端获取多个键的状态?
下面的 QueryableClient 适用于单个键 'clusterId'。
kvState = queryableStateClient.getKvState(JobID.fromHexString(jobId), queryableStateName, clusterId, BasicTypeInfo.STRING_TYPE_INFO, descriptor);
多个键的type_info应该是什么?任何 suggestion/example 或与此相关的参考资料都会很有帮助吗?
我找到了解决方案。
我已经在 valueStateDescription 中给出了 TypeHint。
在 Flink 作业中:
TypeInformation<SsidTotalUsage> typeInformation = TypeInformation.of(new TypeHint<SsidTotalUsage>() {});
ValueStateDescriptor<SsidTotalUsage> descriptor = new ValueStateDescriptor(queryableStateName, typeInformation);
在客户端:
ValueStateDescriptor<SsidTotalUsage> descriptor = new ValueStateDescriptor(queryableStateName, typeInformation);
我有两个密钥,所以我使用了 Tuple2 class 并设置了我的密钥的值,如下所示。
注意:如果你有两个以上的键,那么你必须根据你的键select Tuple3, Tuple4 class。
Tuple2<String, String> tuple = new Tuple2<>();
tuple.f0 = clusterId;
tuple.f1 = ssid;
那我提供了TypeHint。
TypeHint<Tuple2<String, String>> typeHint = new TypeHint<Tuple2<String, String>>() {};
CompletableFuture<ValueState<SsidTotalUsage>> kvState = queryableStateClient.getKvState(JobID.fromHexString(jobId), queryableStateName, tuple, typeHint, descriptor);
在上面的代码中,getState 方法将 return ImmutableValueState
所以我需要像下面这样获取我的 pojo。
ImmutableValueState<SsidTotalUsage> state = (ImmutableValueState<SsidTotalUsage>) kvState.get();
totalUsage = state.value();
我使用的是Flink 1.4.2,有一个场景需要使用两个key。 例如
KeyedStream<UsageStatistics, Tuple> keyedStream = stream.keyBy("clusterId", "ssid");
usageCounts = keyedStream.process(new CustomProcessFunction(windowSize,queryableStateName));
值描述会
ValueStateDescriptor<SsidTotalUsage> descriptor = new ValueStateDescriptor(queryableStateName, SsidTotalUsage.class);
descriptor.setQueryable(queryableStateName);
谁能建议我在 flink 中使用可查询状态客户端获取多个键的状态?
下面的 QueryableClient 适用于单个键 'clusterId'。
kvState = queryableStateClient.getKvState(JobID.fromHexString(jobId), queryableStateName, clusterId, BasicTypeInfo.STRING_TYPE_INFO, descriptor);
多个键的type_info应该是什么?任何 suggestion/example 或与此相关的参考资料都会很有帮助吗?
我找到了解决方案。
我已经在 valueStateDescription 中给出了 TypeHint。
在 Flink 作业中:
TypeInformation<SsidTotalUsage> typeInformation = TypeInformation.of(new TypeHint<SsidTotalUsage>() {});
ValueStateDescriptor<SsidTotalUsage> descriptor = new ValueStateDescriptor(queryableStateName, typeInformation);
在客户端:
ValueStateDescriptor<SsidTotalUsage> descriptor = new ValueStateDescriptor(queryableStateName, typeInformation);
我有两个密钥,所以我使用了 Tuple2 class 并设置了我的密钥的值,如下所示。 注意:如果你有两个以上的键,那么你必须根据你的键select Tuple3, Tuple4 class。
Tuple2<String, String> tuple = new Tuple2<>();
tuple.f0 = clusterId;
tuple.f1 = ssid;
那我提供了TypeHint。
TypeHint<Tuple2<String, String>> typeHint = new TypeHint<Tuple2<String, String>>() {};
CompletableFuture<ValueState<SsidTotalUsage>> kvState = queryableStateClient.getKvState(JobID.fromHexString(jobId), queryableStateName, tuple, typeHint, descriptor);
在上面的代码中,getState 方法将 return ImmutableValueState 所以我需要像下面这样获取我的 pojo。
ImmutableValueState<SsidTotalUsage> state = (ImmutableValueState<SsidTotalUsage>) kvState.get();
totalUsage = state.value();