Kafka Stream 和 KGlobalTable Join 问题

Kafka Stream and KGlobalTable Join issue

我在使用 GlobalKTable 加入 KStream 时遇到问题,非常感谢您的帮助。

给定两个 Kafka 主题 orderscustomers

订单

"1"     {"ID":"1","Name":"Myorder1","CustID":"100"}

"2"     {"ID":"2","Name":"MyOrder2","CustID":"200"}

客户

"100"   {"CustID":"100","CustName":"Customer1"}

"200"   {"CustID":"200","CustName":"Customer2"}

要求是用客户名称丰富订单流

"1"     {"ID":"1","Name":"Myorder1","CustID":"100","CustName":"Customer1"}

"2"     {"ID":"2","Name":"MyOrder2","CustID":"200","CustName":"Customer2"}}

我正在尝试以下操作:

  1. 根据 orders 主题构建 KStream
  2. 根据 customers 主题构建一个 GlobalKTable
  3. 构建另一个连接订单和客户的流(在客户 table 中查找 Order.CustID)
KStream<String, EnrichedOrder> enrichedstreams = orders.join(
    customers,
    new KeyValueMapper<String, Order, String>() {            
        @Override
        public String apply(String key, Order value) {
           return value.CustID;
        }
    },
    new ValueJoiner<Order,Customer, EnrichedOrder>() {
        @Override
        public EnrichedOrder apply(Order order, Customer customer) {
            EnrichedOrder eorder = new EnrichedOrder();
            eorder.CustID = order.CustID;
            eorder.CustName = customer.CustName;
            eorder.ID = order.ID;
            eorder.Name = order.Name;           
            return eorder;
        }
    }
);

但是它没有给出任何结果,也没有抛出任何异常。

使用 leftJoin 时,我收到了客户的 NullPointer 异常。

如果您遇到类似问题,请告诉我并建议如何解决这个问题。

@deepak 你可能需要具体化你的 KTable

builder.table(customers, Materialized.as(customerStore));

然后流式传输订单并构建您的连接。

我们仔细看看你copy-paste的内容:

customers 主题中:

"100"   {"CustID":"100","CustName":"Customer1"}

你可以注意到键是一个字符串,这个字符串包含double-quotes"100"。通常,字符串键不带 double-quotes 打印。我宁愿期待看到:

 100    {"CustID":"100","CustName":"Customer1"}

换句话说,您的密钥的 Java 字符串表示形式是 ""100""(或 "\"100\""),而不是我们预期的 "100"

另一方面,您的 orders 主题中的值是一个 Json {"ID":"1","Name":"Myorder1","CustID":"100"},属性 CustID 是一个字符串,这次用 Java "100".

当您加入 orderscustomers 时,您尝试将订单 CustID 100 与客户键 "100" 匹配。这将失败,因为 CustID 中缺少密钥中的 double-quotes。