Apache Kafka Streams Interactive Queries - 如何创建值是实体而不是聚合的存储
Apache Kafka Streams Interactive Queries - How to create a store where the value is an entity and not an aggregation
我有一个主题接收具有以下信息的事件:
key -> orderId
(整数)
值 -> {"orderId" : aaa, "productId" : xxx, "userId" : yyy, "state" : "zzz"}
(JSON 包含订单的全部信息)
我想实现一个交互式查询以通过orderId 获取完整的订单信息。这个想法是能够从物化视图(Kafka Streams 存储)中获取订单的当前状态。
首先我创建主题的KStream:
KStream<Integer, JsonNode> stream = kStreamBuilder.stream(integerSerde, jsonSerde, STREAMING_TOPIC);
然后我创建一个 KTable
以将其分配给商店。问题是显然我只能创建值是聚合的商店,例如:stream.groupByKey().count("myStore");
我需要的商店应该有完整的订单信息,而不是聚合。这可能吗?
您也可以直接阅读主题 KTable
:
KTable<Integer, JsonNode> stream = kStreamBuilder.table(integerSerde, jsonSerde, STREAMING_TOPIC, "store-name-for-IQ");
此常见问题解答也可能有所帮助:http://docs.confluent.io/current/streams/faq.html#how-can-i-convert-a-kstream-to-a-ktable-without-an-aggregation-step
我有一个主题接收具有以下信息的事件:
key -> orderId
(整数)
值 -> {"orderId" : aaa, "productId" : xxx, "userId" : yyy, "state" : "zzz"}
(JSON 包含订单的全部信息)
我想实现一个交互式查询以通过orderId 获取完整的订单信息。这个想法是能够从物化视图(Kafka Streams 存储)中获取订单的当前状态。
首先我创建主题的KStream:
KStream<Integer, JsonNode> stream = kStreamBuilder.stream(integerSerde, jsonSerde, STREAMING_TOPIC);
然后我创建一个 KTable
以将其分配给商店。问题是显然我只能创建值是聚合的商店,例如:stream.groupByKey().count("myStore");
我需要的商店应该有完整的订单信息,而不是聚合。这可能吗?
您也可以直接阅读主题 KTable
:
KTable<Integer, JsonNode> stream = kStreamBuilder.table(integerSerde, jsonSerde, STREAMING_TOPIC, "store-name-for-IQ");
此常见问题解答也可能有所帮助:http://docs.confluent.io/current/streams/faq.html#how-can-i-convert-a-kstream-to-a-ktable-without-an-aggregation-step