Kafka 流 return 所有记录,其中 fieldx = 某个值

Kafka streams return all records where fieldx = some value

我有多个非唯一字段进入 kafka 的记录,我们称它们为 Field1 ... Field n。

我想对 return fieldx = 某个值的所有记录编写查询。让我们来看下面这个简单的例子。假设订单进入系统,订单中的字段之一是 customerId。一个基本操作是获取特定客户的所有订单。如何使用 Kafka Streams 执行此操作?

我已经有了一个 KTable 和所有记录的物化视图,所以我可以遍历视图中的所有记录并挑选出我想要的记录,但这似乎效率低下且成本高昂.

我真的很想创建一个物化视图,其中视图包含按 fieldx 分组的记录,但我看不出有什么方法可以做到这一点。看来您只能将 groupby 与聚合、计数、归约等一起使用。

关于如何做到这一点有什么想法吗?

这是一个按客户 ID 过滤订单的示例。对于此查询,无需创建 KTable 来进行分组或聚合。然而,由于 Kafka 主题是 append-only 没有二级索引的日志,您确实需要遍历所有消息以找到与您的客户 ID 匹配的订单流。

StreamsBuilder builder = new StreamsBuilder();
KStream<String, Order> orderStream = builder.stream("orders");
orderStream.filter((k,v) -> "customer-1".equals(v.customerId));

请注意,上面的代码假设您的订单流也有字符串类型的键,但这些键被忽略了。

另请注意,您需要指定 Kafka Streams 如何将消息反序列化到您的订单中 class。您可以使用 Consumed.with(...) 指定反序列化器。

有关完整示例,请参阅 github 上的 Kafka Streams 示例存储库:https://github.com/confluentinc/kafka-streams-examples

并不是说这些类型的查询也可以使用 KSQL 编写:https://www.confluent.io/stream-processing-cookbook/

您应该在 "customerID" 上对您的订单流进行分组,并将所有订单汇总到一个列表中。结果 KTable 将有 <CustomerId, [List of Order]> 类型的事件。

使用交互式查询,您可以查询状态存储,

StreamsBuilder builder = new StreamsBuilder();
KStream<String, Order> orderStream = builder.stream("orders");
KTable<String,ArrayList<Order>> orderTable = orderStream
      .groupBy((key,value)-> value .get("customerId"))
      .aggregate(()-> new ArrayList<Order>(),
                 (key,val,agg)-> agg.add(val),
                  Materialized.as("customer-orders")
                  .withValueSerde(ArrayListSerde())          
       ); 

它将创建一个物化视图"customer-orders",您可以通过 rest 端点查询它。

您可以按照以下 link 将 KTables 公开为 Rest 端点:

https://docs.confluent.io/current/streams/developer-guide/interactive-queries.html