Queryble过滤的KTable
Queryble filtered KTable
这是我构建的示例 KTable
,它是一个简单的聚合:
String name = stream
.groupByKey()
.aggregate(
() -> new Aggregate(config),
(key, value, aggregate) -> aggregate.addAndReturn(value),
Materialized
.<String, Aggregate>as(Stores.inMemoryKeyValueStore(config.OutputStore()))
.withCachingEnabled()
.withKeySerde(Serdes.String())
.withValueSerde(CustomSerdes.ObjectSerde()))
.filter(((key, value) -> value.isStateChanged()))
.filter((key, value) -> !value.getRecentlyViewed().isEmpty())
.queryableStoreName();
我需要做的是将最终的 KTable
(应用过滤后)存储在状态存储中而不是初始的 KTable
。目前 KTable.queryableStoreName()
returns null
。
我目前的解决方案是应用 filter()
,然后使用 KTable.toStream()
转换为流,最后再次存储为 KTable
,我认为这是低效的。还有其他解决办法吗
您可以通过提供可查询的商店名称强制实现 KTable
:
.aggregate()
.filter(..., Materialized.as("your-custom-store-name"));
根据您使用的版本,您可能需要指定一些泛型才能编译:
Materialized<KEY_TYPE, VALUE_TYPE, KeyValueStore<Bytes, byte[]>>.as("your-custom-store-name"))
这是我构建的示例 KTable
,它是一个简单的聚合:
String name = stream
.groupByKey()
.aggregate(
() -> new Aggregate(config),
(key, value, aggregate) -> aggregate.addAndReturn(value),
Materialized
.<String, Aggregate>as(Stores.inMemoryKeyValueStore(config.OutputStore()))
.withCachingEnabled()
.withKeySerde(Serdes.String())
.withValueSerde(CustomSerdes.ObjectSerde()))
.filter(((key, value) -> value.isStateChanged()))
.filter((key, value) -> !value.getRecentlyViewed().isEmpty())
.queryableStoreName();
我需要做的是将最终的 KTable
(应用过滤后)存储在状态存储中而不是初始的 KTable
。目前 KTable.queryableStoreName()
returns null
。
我目前的解决方案是应用 filter()
,然后使用 KTable.toStream()
转换为流,最后再次存储为 KTable
,我认为这是低效的。还有其他解决办法吗
您可以通过提供可查询的商店名称强制实现 KTable
:
.aggregate()
.filter(..., Materialized.as("your-custom-store-name"));
根据您使用的版本,您可能需要指定一些泛型才能编译:
Materialized<KEY_TYPE, VALUE_TYPE, KeyValueStore<Bytes, byte[]>>.as("your-custom-store-name"))