如何在kafka流中动态处理并发送到不同的主题

How to process dynamically in kafka streams and send to different topic

我正在创建一个流处理应用程序。它应该创建一个 kafka 流连接。当消息到达时,我需要做的事情如下:

要求:

  1. 在处理下一个消息之前,应在流功能的帮助下处理和发送每个消息,而不是使用另一个 kafka 生产者
  2. 如果达到第一个要求,那么我应该可以将消息发送到将根据类型动态决定的主题。

如果您想检查类型,您实际上是在 filter 匹配那些类型的事件。

因此,您不需要 map 或 foreach,使用 filter(...).to(topic}

会更好
    final ObjectMapper mapper = Util.getObjectMapper();
    KStream notTestEvents = input.filter((key, value) -> {
        //check type and ask object from factory
        try {
            JSONObject msg = mapper.readValue(value, JSONObject.class); // You should probably use JSONDeserializer instead, which does this for you
            String type = msg.get("type").toString();
            System.out.println("OUT");
            return !type.equalsIgnoreCase("test");     
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        }
    );
    notTestEvents.to("notStream");

另一种选择是分支

KStream<String, String>[] branches = events.branch(
    (k, v) -> { 
       return !mapper
          .readValue(value, JSONObject.class)
          .get("type").toString();
          .equalsIgnoreCase("test")
    },
    (k, v) -> true
);
branches[0].map(...).to("notStream");
branches[1].map(...).to("output_topic");
  1. Each msg should be processed and sent before processing the next msg with the help of stream function and not using another kafka producer

默认情况下无论如何都会发生这种情况。

  1. If the requirement one is achieved then I should be able to send msg to topics that will be decided dynamically according to type.

首先,为了简化根据事件类型处理事件的步骤,请查看 branch()branch() 函数允许您提供固定数量的谓词以将消息路由到不同的子流中。然后您可以独立处理这些子流,例如使用 map() 函数。最后,您可以将每个子流发送到一个单独的主题,to().

KStream<String, Event>[] branches = events.branch(
    (id, event) -> event.getTransactionValue() >= FRAUD_LIMIT,
    (id, event) -> event.getTransactionValue() < FRAUD_LIMIT);
branches[0].map(...).to(suspiciousTransactionsTopicName);
branches[1].map(...).to(validatedTransactionsTopicName);

您还可以根据事件负载中的内容在 to() 中做出真正动态的路由决策。这里,输出主题的名称来源于事件数据。

myStream.to(
  (eventId, event, record) -> "topic-prefix-" + event.methodOfYourEventLikeGetTypeName()
);

此外,如果动态路由决策需要事件中无法直接获得的信息,您可以选择使用与路由相关的信息动态丰富原始事件(例如,通过将原始事件流与table 与路由相关的信息),然后通过 to() 进行动态路由。有关详细信息,请参阅 https://www.confluent.io/blog/putting-events-in-their-place-with-dynamic-routing/