如何在kafka流中动态处理并发送到不同的主题
How to process dynamically in kafka streams and send to different topic
我正在创建一个流处理应用程序。它应该创建一个 kafka 流连接。当消息到达时,我需要做的事情如下:
- 检查消息类型
- 通过调用特定类型处理服务来处理消息
以根据消息类型决定的特定主题结束
public java.util.function.Consumer<KStream<String, String>> process() {
String topic;
return input ->
input.map((key, value) -> {
//check type and ask object from factory
try {
JSONObject msg = Util.getObjectMapper().readValue(value, JSONObject.class);
String type = msg.get("type").toString();
if(type.equalsIgnoreCase("test")){
//processing started
msgTypeHandlerFactory
.createInstance(type)
.enrichAndRelay(msg);
System.out.println("IN");
}
else{
input.to("notStream");
System.out.println("OUT");
}
} catch (JsonProcessingException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
return KeyValue.pair(key, value);
})
.to("output_topic");
}
上面代码的问题是我正在使用 map 函数,它使我能够使用 .to() 函数发送流。
我想要的是检查每条消息的类型,然后进行处理,相应地发送到另一个流。
为此,我应该使用没有给我 .to() 函数的 foreach 函数,所以我必须创建另一个 Kafka Producer 来完成这项工作。
要求:
- 在处理下一个消息之前,应在流功能的帮助下处理和发送每个消息,而不是使用另一个 kafka 生产者
- 如果达到第一个要求,那么我应该可以将消息发送到将根据类型动态决定的主题。
如果您想检查类型,您实际上是在 filter
匹配那些类型的事件。
因此,您不需要 map 或 foreach,使用 filter(...).to(topic}
会更好
final ObjectMapper mapper = Util.getObjectMapper();
KStream notTestEvents = input.filter((key, value) -> {
//check type and ask object from factory
try {
JSONObject msg = mapper.readValue(value, JSONObject.class); // You should probably use JSONDeserializer instead, which does this for you
String type = msg.get("type").toString();
System.out.println("OUT");
return !type.equalsIgnoreCase("test");
} catch (JsonProcessingException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
);
notTestEvents.to("notStream");
另一种选择是分支
KStream<String, String>[] branches = events.branch(
(k, v) -> {
return !mapper
.readValue(value, JSONObject.class)
.get("type").toString();
.equalsIgnoreCase("test")
},
(k, v) -> true
);
branches[0].map(...).to("notStream");
branches[1].map(...).to("output_topic");
- Each msg should be processed and sent before processing the next msg with the help of stream function and not using another kafka producer
默认情况下无论如何都会发生这种情况。
- If the requirement one is achieved then I should be able to send msg to topics that will be decided dynamically according to type.
首先,为了简化根据事件类型处理事件的步骤,请查看 branch()
。 branch()
函数允许您提供固定数量的谓词以将消息路由到不同的子流中。然后您可以独立处理这些子流,例如使用 map()
函数。最后,您可以将每个子流发送到一个单独的主题,to()
.
KStream<String, Event>[] branches = events.branch(
(id, event) -> event.getTransactionValue() >= FRAUD_LIMIT,
(id, event) -> event.getTransactionValue() < FRAUD_LIMIT);
branches[0].map(...).to(suspiciousTransactionsTopicName);
branches[1].map(...).to(validatedTransactionsTopicName);
您还可以根据事件负载中的内容在 to()
中做出真正动态的路由决策。这里,输出主题的名称来源于事件数据。
myStream.to(
(eventId, event, record) -> "topic-prefix-" + event.methodOfYourEventLikeGetTypeName()
);
此外,如果动态路由决策需要事件中无法直接获得的信息,您可以选择使用与路由相关的信息动态丰富原始事件(例如,通过将原始事件流与table 与路由相关的信息),然后通过 to()
进行动态路由。有关详细信息,请参阅 https://www.confluent.io/blog/putting-events-in-their-place-with-dynamic-routing/。
我正在创建一个流处理应用程序。它应该创建一个 kafka 流连接。当消息到达时,我需要做的事情如下:
- 检查消息类型
- 通过调用特定类型处理服务来处理消息
以根据消息类型决定的特定主题结束
public java.util.function.Consumer<KStream<String, String>> process() { String topic; return input -> input.map((key, value) -> { //check type and ask object from factory try { JSONObject msg = Util.getObjectMapper().readValue(value, JSONObject.class); String type = msg.get("type").toString(); if(type.equalsIgnoreCase("test")){ //processing started msgTypeHandlerFactory .createInstance(type) .enrichAndRelay(msg); System.out.println("IN"); } else{ input.to("notStream"); System.out.println("OUT"); } } catch (JsonProcessingException e) { e.printStackTrace(); } catch (Exception e) { e.printStackTrace(); } return KeyValue.pair(key, value); }) .to("output_topic"); }
上面代码的问题是我正在使用 map 函数,它使我能够使用 .to() 函数发送流。 我想要的是检查每条消息的类型,然后进行处理,相应地发送到另一个流。 为此,我应该使用没有给我 .to() 函数的 foreach 函数,所以我必须创建另一个 Kafka Producer 来完成这项工作。
要求:
- 在处理下一个消息之前,应在流功能的帮助下处理和发送每个消息,而不是使用另一个 kafka 生产者
- 如果达到第一个要求,那么我应该可以将消息发送到将根据类型动态决定的主题。
如果您想检查类型,您实际上是在 filter
匹配那些类型的事件。
因此,您不需要 map 或 foreach,使用 filter(...).to(topic}
final ObjectMapper mapper = Util.getObjectMapper();
KStream notTestEvents = input.filter((key, value) -> {
//check type and ask object from factory
try {
JSONObject msg = mapper.readValue(value, JSONObject.class); // You should probably use JSONDeserializer instead, which does this for you
String type = msg.get("type").toString();
System.out.println("OUT");
return !type.equalsIgnoreCase("test");
} catch (JsonProcessingException e) {
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
);
notTestEvents.to("notStream");
另一种选择是分支
KStream<String, String>[] branches = events.branch(
(k, v) -> {
return !mapper
.readValue(value, JSONObject.class)
.get("type").toString();
.equalsIgnoreCase("test")
},
(k, v) -> true
);
branches[0].map(...).to("notStream");
branches[1].map(...).to("output_topic");
- Each msg should be processed and sent before processing the next msg with the help of stream function and not using another kafka producer
默认情况下无论如何都会发生这种情况。
- If the requirement one is achieved then I should be able to send msg to topics that will be decided dynamically according to type.
首先,为了简化根据事件类型处理事件的步骤,请查看 branch()
。 branch()
函数允许您提供固定数量的谓词以将消息路由到不同的子流中。然后您可以独立处理这些子流,例如使用 map()
函数。最后,您可以将每个子流发送到一个单独的主题,to()
.
KStream<String, Event>[] branches = events.branch(
(id, event) -> event.getTransactionValue() >= FRAUD_LIMIT,
(id, event) -> event.getTransactionValue() < FRAUD_LIMIT);
branches[0].map(...).to(suspiciousTransactionsTopicName);
branches[1].map(...).to(validatedTransactionsTopicName);
您还可以根据事件负载中的内容在 to()
中做出真正动态的路由决策。这里,输出主题的名称来源于事件数据。
myStream.to(
(eventId, event, record) -> "topic-prefix-" + event.methodOfYourEventLikeGetTypeName()
);
此外,如果动态路由决策需要事件中无法直接获得的信息,您可以选择使用与路由相关的信息动态丰富原始事件(例如,通过将原始事件流与table 与路由相关的信息),然后通过 to()
进行动态路由。有关详细信息,请参阅 https://www.confluent.io/blog/putting-events-in-their-place-with-dynamic-routing/。