如何使用可查询状态客户端获取 flink 中多个 keyBy 的状态?

How to get state for multiple keyBy in flink using queryable state client?

我使用的是Flink 1.4.2,有一个场景需要使用两个key。 例如

KeyedStream<UsageStatistics, Tuple> keyedStream = stream.keyBy("clusterId", "ssid");
usageCounts = keyedStream.process(new CustomProcessFunction(windowSize,queryableStateName));

值描述会

ValueStateDescriptor<SsidTotalUsage> descriptor = new ValueStateDescriptor(queryableStateName, SsidTotalUsage.class);
        descriptor.setQueryable(queryableStateName);

谁能建议我在 flink 中使用可查询状态客户端获取多个键的状态?

下面的 QueryableClient 适用于单个键 'clusterId'。

kvState = queryableStateClient.getKvState(JobID.fromHexString(jobId), queryableStateName, clusterId, BasicTypeInfo.STRING_TYPE_INFO, descriptor);

多个键的type_info应该是什么?任何 suggestion/example 或与此相关的参考资料都会很有帮助吗?

我找到了解决方案。

我已经在 valueStateDescription 中给出了 TypeHint。

在 Flink 作业中:

TypeInformation<SsidTotalUsage> typeInformation = TypeInformation.of(new TypeHint<SsidTotalUsage>() {});

ValueStateDescriptor<SsidTotalUsage> descriptor = new ValueStateDescriptor(queryableStateName, typeInformation);

在客户端:

ValueStateDescriptor<SsidTotalUsage> descriptor = new ValueStateDescriptor(queryableStateName, typeInformation);

我有两个密钥,所以我使用了 Tuple2 class 并设置了我的密钥的值,如下所示。 注意:如果你有两个以上的键,那么你必须根据你的键select Tuple3, Tuple4 class。

 Tuple2<String, String> tuple = new Tuple2<>();
 tuple.f0 = clusterId;
 tuple.f1 = ssid;

那我提供了TypeHint。

TypeHint<Tuple2<String, String>> typeHint = new TypeHint<Tuple2<String, String>>() {};

CompletableFuture<ValueState<SsidTotalUsage>> kvState = queryableStateClient.getKvState(JobID.fromHexString(jobId), queryableStateName, tuple, typeHint, descriptor);

在上面的代码中,getState 方法将 return ImmutableValueState 所以我需要像下面这样获取我的 pojo。

ImmutableValueState<SsidTotalUsage> state = (ImmutableValueState<SsidTotalUsage>) kvState.get();

totalUsage = state.value();