Kafka Stream 和 KGlobalTable Join 问题
Kafka Stream and KGlobalTable Join issue
我在使用 GlobalKTable 加入 KStream 时遇到问题,非常感谢您的帮助。
给定两个 Kafka 主题 orders
和 customers
:
订单
"1" {"ID":"1","Name":"Myorder1","CustID":"100"}
"2" {"ID":"2","Name":"MyOrder2","CustID":"200"}
客户
"100" {"CustID":"100","CustName":"Customer1"}
"200" {"CustID":"200","CustName":"Customer2"}
要求是用客户名称丰富订单流
"1" {"ID":"1","Name":"Myorder1","CustID":"100","CustName":"Customer1"}
"2" {"ID":"2","Name":"MyOrder2","CustID":"200","CustName":"Customer2"}}
我正在尝试以下操作:
- 根据
orders
主题构建 KStream
- 根据
customers
主题构建一个 GlobalKTable
- 构建另一个连接订单和客户的流(在客户 table 中查找 Order.CustID)
KStream<String, EnrichedOrder> enrichedstreams = orders.join(
customers,
new KeyValueMapper<String, Order, String>() {
@Override
public String apply(String key, Order value) {
return value.CustID;
}
},
new ValueJoiner<Order,Customer, EnrichedOrder>() {
@Override
public EnrichedOrder apply(Order order, Customer customer) {
EnrichedOrder eorder = new EnrichedOrder();
eorder.CustID = order.CustID;
eorder.CustName = customer.CustName;
eorder.ID = order.ID;
eorder.Name = order.Name;
return eorder;
}
}
);
但是它没有给出任何结果,也没有抛出任何异常。
使用 leftJoin
时,我收到了客户的 NullPointer 异常。
如果您遇到类似问题,请告诉我并建议如何解决这个问题。
@deepak 你可能需要具体化你的 KTable
builder.table(customers, Materialized.as(customerStore));
然后流式传输订单并构建您的连接。
我们仔细看看你copy-paste的内容:
在 customers
主题中:
"100" {"CustID":"100","CustName":"Customer1"}
你可以注意到键是一个字符串,这个字符串包含double-quotes:"100"
。通常,字符串键不带 double-quotes 打印。我宁愿期待看到:
100 {"CustID":"100","CustName":"Customer1"}
换句话说,您的密钥的 Java 字符串表示形式是 ""100""
(或 "\"100\""
),而不是我们预期的 "100"
。
另一方面,您的 orders
主题中的值是一个 Json {"ID":"1","Name":"Myorder1","CustID":"100"}
,属性 CustID
是一个字符串,这次用 Java "100"
.
当您加入 orders
和 customers
时,您尝试将订单 CustID 100
与客户键 "100"
匹配。这将失败,因为 CustID 中缺少密钥中的 double-quotes。
我在使用 GlobalKTable 加入 KStream 时遇到问题,非常感谢您的帮助。
给定两个 Kafka 主题 orders
和 customers
:
订单
"1" {"ID":"1","Name":"Myorder1","CustID":"100"}
"2" {"ID":"2","Name":"MyOrder2","CustID":"200"}
客户
"100" {"CustID":"100","CustName":"Customer1"}
"200" {"CustID":"200","CustName":"Customer2"}
要求是用客户名称丰富订单流
"1" {"ID":"1","Name":"Myorder1","CustID":"100","CustName":"Customer1"}
"2" {"ID":"2","Name":"MyOrder2","CustID":"200","CustName":"Customer2"}}
我正在尝试以下操作:
- 根据
orders
主题构建 KStream - 根据
customers
主题构建一个 GlobalKTable - 构建另一个连接订单和客户的流(在客户 table 中查找 Order.CustID)
KStream<String, EnrichedOrder> enrichedstreams = orders.join(
customers,
new KeyValueMapper<String, Order, String>() {
@Override
public String apply(String key, Order value) {
return value.CustID;
}
},
new ValueJoiner<Order,Customer, EnrichedOrder>() {
@Override
public EnrichedOrder apply(Order order, Customer customer) {
EnrichedOrder eorder = new EnrichedOrder();
eorder.CustID = order.CustID;
eorder.CustName = customer.CustName;
eorder.ID = order.ID;
eorder.Name = order.Name;
return eorder;
}
}
);
但是它没有给出任何结果,也没有抛出任何异常。
使用 leftJoin
时,我收到了客户的 NullPointer 异常。
如果您遇到类似问题,请告诉我并建议如何解决这个问题。
@deepak 你可能需要具体化你的 KTable
builder.table(customers, Materialized.as(customerStore));
然后流式传输订单并构建您的连接。
我们仔细看看你copy-paste的内容:
在 customers
主题中:
"100" {"CustID":"100","CustName":"Customer1"}
你可以注意到键是一个字符串,这个字符串包含double-quotes:"100"
。通常,字符串键不带 double-quotes 打印。我宁愿期待看到:
100 {"CustID":"100","CustName":"Customer1"}
换句话说,您的密钥的 Java 字符串表示形式是 ""100""
(或 "\"100\""
),而不是我们预期的 "100"
。
另一方面,您的 orders
主题中的值是一个 Json {"ID":"1","Name":"Myorder1","CustID":"100"}
,属性 CustID
是一个字符串,这次用 Java "100"
.
当您加入 orders
和 customers
时,您尝试将订单 CustID 100
与客户键 "100"
匹配。这将失败,因为 CustID 中缺少密钥中的 double-quotes。