Kafka Stream 根据特定条件生成自定义消息列表
Kafka Stream producing custom list of messages based on certain conditions
我们有以下流处理需求。
Source Stream ->
transform(condition check - If (true) then generate MULTIPLE ADDITIONAL messages else just transform the incoming message) ->
output kafka topic
Example:
If condition is true for message B(D,E,F are the additional messages produced)
A,B,C -> A,D,E,F,C -> Sink Kafka Topic
If condition is false
A,B,C -> A,B,C -> Sink Kafka Topic
有没有办法在 Kafka 流中实现这一点?
您可以使用 flatMap()
或 flatMapValues()
方法。这些方法获取一条记录并生成零条、一条或多条记录。
flatMap()
可以修改键、值及其数据类型,而flatMapValues()
保留原始键并更改值和值数据类型。
这是一个伪代码示例,考虑到新消息 "C"、"D"、"E" 将有一个新密钥。
KStream<byte[], String> inputStream = builder.stream("inputTopic");
KStream<byte[], String> outStream = inputStream.flatMap(
(key,value)->{
List<KeyValue<byte[], String>> result = new LinkedList<>();
// If message value is "B". Otherwise place your condition based on data
if(value.equalsTo("B")){
result.add(KeyValue.pair("<new key for message C>","C"));
result.add(KeyValue.pair("<new key for message D>","D"));
result.add(KeyValue.pair("<new key for message E>","E"));
}else{
result.add(KeyValue.pair(key,value));
}
return result;
});
outStream.to("sinkTopic");
我们有以下流处理需求。
Source Stream ->
transform(condition check - If (true) then generate MULTIPLE ADDITIONAL messages else just transform the incoming message) ->
output kafka topic
Example:
If condition is true for message B(D,E,F are the additional messages produced)
A,B,C -> A,D,E,F,C -> Sink Kafka Topic
If condition is false
A,B,C -> A,B,C -> Sink Kafka Topic
有没有办法在 Kafka 流中实现这一点?
您可以使用 flatMap()
或 flatMapValues()
方法。这些方法获取一条记录并生成零条、一条或多条记录。
flatMap()
可以修改键、值及其数据类型,而flatMapValues()
保留原始键并更改值和值数据类型。
这是一个伪代码示例,考虑到新消息 "C"、"D"、"E" 将有一个新密钥。
KStream<byte[], String> inputStream = builder.stream("inputTopic");
KStream<byte[], String> outStream = inputStream.flatMap(
(key,value)->{
List<KeyValue<byte[], String>> result = new LinkedList<>();
// If message value is "B". Otherwise place your condition based on data
if(value.equalsTo("B")){
result.add(KeyValue.pair("<new key for message C>","C"));
result.add(KeyValue.pair("<new key for message D>","D"));
result.add(KeyValue.pair("<new key for message E>","E"));
}else{
result.add(KeyValue.pair(key,value));
}
return result;
});
outStream.to("sinkTopic");