Apache Kafka - KStream 与 KStream Join 最新消息
Apache Kafka - KStream with KStream Join latest messages
我已经创建了 KStreams,我想将它们连接在一起。两个流的输出如下:
流 1:
2 {"CODE":"AAAA96","STATUS":"SUBMITTED","ID":2}
流 2:
26 {"DESCRIPTION":"blah blah blah","QUANTITY":1,"ID_CUSTOMER_ORDER":"GR0100926","ID":26}
我想创建这两个流的连接流(内部连接),所以我创建了以下 KStream:
KStream<String, String> s_joined = s_order
.join(s_order_item, (left,right) -> left + right,
JoinWindows.of(Duration.ofSeconds(30)))
.mapValues(value -> {
String[] arrOfstr = value.split("(?<=})");
JSONObject jl = new JSONObject(arrOfstr[0]);
JSONObject jr = new JSONObject(arrOfstr[1]);
JSONObject json = new JSONObject();
Iterator<String> keys = jl.keys();
while(keys.hasNext()) {
String key = keys.next();
json.put(key, jl.get(key));
}
keys = jr.keys();
while(keys.hasNext()) {
String key = keys.next();
json.put(key, jr.get(key));
}
return json.toString();
});
在这个 KStream 中,我只是使用了一个连接,我正在改变输出消息的格式,仅此而已。
通过一个例子我会解释我想做什么:
以下消息在window内发布:
流 1
9 {"CODE":"AAAA98","STATUS":"CANCELED","ID":"9"}
流 2
9 {"DESCRIPTION":"blah blah blah","QUANTITY":3,"ID_CUSTOMER_ORDER":"GR0100121","ID":"9"}
9 {"DESCRIPTION":"blah blah blah","QUANTITY":0,"ID_CUSTOMER_ORDER":"GR0100480","ID":"9"}
9 {"DESCRIPTION":"blah blah blah","QUANTITY":1,"ID_CUSTOMER_ORDER":"GR0100606","ID":"9"}
9 {"DESCRIPTION":"blah blah blah","QUANTITY":7,"ID_CUSTOMER_ORDER":"GR0100339","ID":"9"}
9 {"DESCRIPTION":"blah blah blah","QUANTITY":6,"ID_CUSTOMER_ORDER":"GR0100911","ID":"9"}
加入流
发布的内容
9 {"CODE":"AAAA98","STATUS":"CANCELED","DESCRIPTION":"blah blah blah","QUANTITY":3,"ID_CUSTOMER_ORDER":"GR0100121","ID":"9"}
9 {"CODE":"AAAA98","STATUS":"CANCELED","DESCRIPTION":"blah blah blah","QUANTITY":0,"ID_CUSTOMER_ORDER":"GR0100480","ID":"9"}
9 {"CODE":"AAAA98","STATUS":"CANCELED","DESCRIPTION":"blah blah blah","QUANTITY":1,"ID_CUSTOMER_ORDER":"GR0100606","ID":"9"}
9 {"CODE":"AAAA98","STATUS":"CANCELED","DESCRIPTION":"blah blah blah","QUANTITY":7,"ID_CUSTOMER_ORDER":"GR0100339","ID":"9"}
9 {"CODE":"AAAA98","STATUS":"CANCELED","DESCRIPTION":"blah blah blah","QUANTITY":6,"ID_CUSTOMER_ORDER":"GR0100911","ID":"9"}
我要发布的内容
9 {"CODE":"AAAA98","STATUS":"CANCELED","DESCRIPTION":"blah blah blah","QUANTITY":6,"ID_CUSTOMER_ORDER":"GR0100911","ID":"9"}
总而言之,我只想发布 window 中的最新消息,而不是全部。这可能吗?
您可以使用 groupByKey
函数,其中 returns KGroupedStream
然后使用 map/reduce
函数以所需的方式转换它。请参阅 Kafka Streams DSL 了解更多信息。
我找到了答案。实现我想要做的事情的方法是使用函数 suppress
。更详细地说,您 groupByKey()
KStream 然后使用 Window
函数。最后,聚合分组数据并使用 suppress
.
s_joined.toStream()
.groupByKey()
.WindowedBy(...)
.aggregate(...)
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()));
我已经创建了 KStreams,我想将它们连接在一起。两个流的输出如下:
流 1:
2 {"CODE":"AAAA96","STATUS":"SUBMITTED","ID":2}
流 2:
26 {"DESCRIPTION":"blah blah blah","QUANTITY":1,"ID_CUSTOMER_ORDER":"GR0100926","ID":26}
我想创建这两个流的连接流(内部连接),所以我创建了以下 KStream:
KStream<String, String> s_joined = s_order
.join(s_order_item, (left,right) -> left + right,
JoinWindows.of(Duration.ofSeconds(30)))
.mapValues(value -> {
String[] arrOfstr = value.split("(?<=})");
JSONObject jl = new JSONObject(arrOfstr[0]);
JSONObject jr = new JSONObject(arrOfstr[1]);
JSONObject json = new JSONObject();
Iterator<String> keys = jl.keys();
while(keys.hasNext()) {
String key = keys.next();
json.put(key, jl.get(key));
}
keys = jr.keys();
while(keys.hasNext()) {
String key = keys.next();
json.put(key, jr.get(key));
}
return json.toString();
});
在这个 KStream 中,我只是使用了一个连接,我正在改变输出消息的格式,仅此而已。
通过一个例子我会解释我想做什么:
以下消息在window内发布:
流 1
9 {"CODE":"AAAA98","STATUS":"CANCELED","ID":"9"}
流 2
9 {"DESCRIPTION":"blah blah blah","QUANTITY":3,"ID_CUSTOMER_ORDER":"GR0100121","ID":"9"}
9 {"DESCRIPTION":"blah blah blah","QUANTITY":0,"ID_CUSTOMER_ORDER":"GR0100480","ID":"9"}
9 {"DESCRIPTION":"blah blah blah","QUANTITY":1,"ID_CUSTOMER_ORDER":"GR0100606","ID":"9"}
9 {"DESCRIPTION":"blah blah blah","QUANTITY":7,"ID_CUSTOMER_ORDER":"GR0100339","ID":"9"}
9 {"DESCRIPTION":"blah blah blah","QUANTITY":6,"ID_CUSTOMER_ORDER":"GR0100911","ID":"9"}
加入流
发布的内容
9 {"CODE":"AAAA98","STATUS":"CANCELED","DESCRIPTION":"blah blah blah","QUANTITY":3,"ID_CUSTOMER_ORDER":"GR0100121","ID":"9"}
9 {"CODE":"AAAA98","STATUS":"CANCELED","DESCRIPTION":"blah blah blah","QUANTITY":0,"ID_CUSTOMER_ORDER":"GR0100480","ID":"9"}
9 {"CODE":"AAAA98","STATUS":"CANCELED","DESCRIPTION":"blah blah blah","QUANTITY":1,"ID_CUSTOMER_ORDER":"GR0100606","ID":"9"}
9 {"CODE":"AAAA98","STATUS":"CANCELED","DESCRIPTION":"blah blah blah","QUANTITY":7,"ID_CUSTOMER_ORDER":"GR0100339","ID":"9"}
9 {"CODE":"AAAA98","STATUS":"CANCELED","DESCRIPTION":"blah blah blah","QUANTITY":6,"ID_CUSTOMER_ORDER":"GR0100911","ID":"9"}
我要发布的内容
9 {"CODE":"AAAA98","STATUS":"CANCELED","DESCRIPTION":"blah blah blah","QUANTITY":6,"ID_CUSTOMER_ORDER":"GR0100911","ID":"9"}
总而言之,我只想发布 window 中的最新消息,而不是全部。这可能吗?
您可以使用 groupByKey
函数,其中 returns KGroupedStream
然后使用 map/reduce
函数以所需的方式转换它。请参阅 Kafka Streams DSL 了解更多信息。
我找到了答案。实现我想要做的事情的方法是使用函数 suppress
。更详细地说,您 groupByKey()
KStream 然后使用 Window
函数。最后,聚合分组数据并使用 suppress
.
s_joined.toStream()
.groupByKey()
.WindowedBy(...)
.aggregate(...)
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()));