Kafka Stream 按两个字段聚合
Kafka Stream aggregation by two fields
我正在使用 kafka 流为仓库中的项目创建聚合(总和)。
可以添加(例如从供应商处购买)或删除(例如出售的物品)物品。
应用中,一个仓库可以服务多个门店,公司有多个仓库。
在这种情况下,我需要使用两个字段对交易进行汇总和分组:商品名称和商店名称。
仅使用商品名称(一个字段)求和很简单,但是如何使用额外的分组(例如,每个商店的每个商品的总库存)或(每个仓库的总库存,每个商品名称)?
我的(过于简化的)代码
InventoryKafkaMessage.java
public class InventoryKafkaMessage {
private String warehouseId; // warehouse ID
private String itemName; // item name
private long quantity; // always positive
private String type; // ADD or REMOVE
private String storeLocation; // store ID
private long transactionTimestamp;
// ... some others, but not relevant for this question
}
使用项目名称作为关键字发送到源主题的消息。
InventoryAggregatorStream.java
流是
var inventorySerde = new JsonSerde<>(InventoryKafkaMessage.class);
var sourceStream = builder.stream("supplychain-warehouse-inventory", Consumed.with(Serdes.String(), inventorySerde));
// aggregating by key (item name)
logisticStream.mapValues((k, v) -> v.getType().equalsIgnoreCase("ADD") ? v.getQuantity() : -1 * v.getQuantity())
.groupByKey()
.aggregate(() -> 0l, (aggKey, newValue, aggValue) -> aggValue + newValue,
Materialized.with(Serdes.String(), Serdes.Long()))
.toStream().to("stream-supplychain-wharehouse-inventory-total", Produced.with(Serdes.String(), Serdes.Long()));
要对多个属性进行分组,您可以定义一个包含这两个属性的组合类型并将其设置为键。例如,您可以定义一个类型:
public class GroupingKey {
private String warehouseId;
private String itemName;
public GroupingKey(String warehouseId, String itemName) {
// set fields
}
// etc
}
// usage:
sourceStream = builder.stream("supplychain-warehouse-inventory",
Consumed.with(Serdes.String(), inventorySerde));
newKeyStream = sourceStream.selectKey((k, v) -> new GroupingKey(v.warehouseId, v.itemName));
newKeyStream.groupByKey()...
我正在使用 kafka 流为仓库中的项目创建聚合(总和)。
可以添加(例如从供应商处购买)或删除(例如出售的物品)物品。
应用中,一个仓库可以服务多个门店,公司有多个仓库。
在这种情况下,我需要使用两个字段对交易进行汇总和分组:商品名称和商店名称。
仅使用商品名称(一个字段)求和很简单,但是如何使用额外的分组(例如,每个商店的每个商品的总库存)或(每个仓库的总库存,每个商品名称)?
我的(过于简化的)代码
InventoryKafkaMessage.java
public class InventoryKafkaMessage {
private String warehouseId; // warehouse ID
private String itemName; // item name
private long quantity; // always positive
private String type; // ADD or REMOVE
private String storeLocation; // store ID
private long transactionTimestamp;
// ... some others, but not relevant for this question
}
使用项目名称作为关键字发送到源主题的消息。
InventoryAggregatorStream.java
流是
var inventorySerde = new JsonSerde<>(InventoryKafkaMessage.class);
var sourceStream = builder.stream("supplychain-warehouse-inventory", Consumed.with(Serdes.String(), inventorySerde));
// aggregating by key (item name)
logisticStream.mapValues((k, v) -> v.getType().equalsIgnoreCase("ADD") ? v.getQuantity() : -1 * v.getQuantity())
.groupByKey()
.aggregate(() -> 0l, (aggKey, newValue, aggValue) -> aggValue + newValue,
Materialized.with(Serdes.String(), Serdes.Long()))
.toStream().to("stream-supplychain-wharehouse-inventory-total", Produced.with(Serdes.String(), Serdes.Long()));
要对多个属性进行分组,您可以定义一个包含这两个属性的组合类型并将其设置为键。例如,您可以定义一个类型:
public class GroupingKey {
private String warehouseId;
private String itemName;
public GroupingKey(String warehouseId, String itemName) {
// set fields
}
// etc
}
// usage:
sourceStream = builder.stream("supplychain-warehouse-inventory",
Consumed.with(Serdes.String(), inventorySerde));
newKeyStream = sourceStream.selectKey((k, v) -> new GroupingKey(v.warehouseId, v.itemName));
newKeyStream.groupByKey()...