如何修改来自 Twitter API 保存在一个 kafka 主题中的消息并将其发送到另一个 kafka 主题

How to modify message from Twitter API saved in one kafka topic and send it to another kafka topic

我已经创建了 Kafka 生产者,它使用 hbc-core 从 Twitter API 为一个主题生成消息,我想修改这些消息,因为我只需要很少的字段,比如推文创建时间、id 字符串、一些基本的有关该推文中的用户和文本的信息。我尝试使用 Kafka Streams 和 POJO 模型,但我在提取文本时遇到了问题,因为全文可能位于不同的命名字段中,具体取决于推文是否被转发、是否有超过 140 个标志等。 我的 POJO 模型:

  "type": "object",
  "properties": {
    "created_at": { "type": "string" },
    "id_str": { "type": "string" },
    "user": {
      "type": "object",
      "properties": {
        "location": { "type": "string" },
        "followers_count": { "type": "integer" },
        "friends_count": { "type": "integer" },
        "created_at": { "type": "string" }
      }
    },
    "text": { "type": "string" }
  }
}

这是使用 Kafka Streams 的正确方法还是有更好的解决方案来提取这些字段并放入另一个主题?

无需中间客户端、系统、kafka 流花哨的实用程序或奇迹框架,专注于旧的简单方法:重用生产者和生成和发送完整 POJOS 的代码。

Producers 是线程安全的,所以使用同一个生产者实例通过不同线程生产两个或多个主题是完全没问题的。

这只是一种简化,因为我不知道您的实施细节。我假设消息 (POJO) 是一个简单的 String。发挥一些想象力,相信不同的字母是字段。从 fullPojo,您想发送一条仅包含两个字段的消息,表示为 yv,到另一个主题。

  String fullPojo = "xxxxxyxxxxv";
  //some logic to extract the desired fields
  String shortPojo = getDesiredFields(fullPojo);
  /* shortPojo="yv" */

在您的 Kafka 集群上创建一个新主题,对于此示例,它将被称为 shortPojoTopic

只需使用将完整数据发送到原始主题的相同 producer,方法是进行第二次调用,以便使用仅包含过滤值的消息填充短主题:

producer.send(new ProducerRecord<String, String>(fullPojoTopic,  fullPojo));
producer.send(new ProducerRecord<String, String>(shortPojoTopic, shortPojo));

第二次调用也可以从另一个辅助线程完成。如果你想在这里完成多线程,你可以定义第二个线程来进行“过滤”工作。只需将原始 producer 引用传递给第二个线程,link 这两个线程都带有类似 FIFO 结构(deques, queues,...)的东西,其中包含 fullPojos。

  • 原线程将fullPojo发送到fullPojoTopic主题,并将fullPojo推入队列
  • 这个辅助“过滤器”线程将从 queue/deque 中删除顶部消息,提取创建 shortPojo 的所需字段,并将其发送到 shortPojoTopic 使用相同 producer,无需担心生产者同步问题)。

如果其中一个主题处于错误状态并且不能接受更多消息,或者其中一个主题位于刚刚失败的不同 Kafka 集群上(在此如果您还需要两个不同的生产者),或者即使过滤过程在过滤某些格式错误的消息时发现一些困难。 例如,即使 shortPojoTopicout,也不会影响第一个线程的性能,因为它将继续发送他的 fullPojos 而没有 issues/delays.

始终注意内存使用:queue/deque 大小应该是 limited/controlled 以某种方式避免 OOM 如果第二个线程被大量时间卡住,或者如果它不能跟随第一个线程的节奏。如果发生这种情况,它将无法read/remove 足够快地发送消息,从而产生可能导致提到的 OOM 问题的延迟

此外,即使 topic/broker 没有问题,这种分离也会提高总体性能,因为原始线程不必等待每次迭代时在他的线程中发生的过滤过程.

第一个线程只发送 POJO;第二个线程只是过滤并发送短 POJO。 简单的职责,全部并行。

假设您可以控制生产者及其发送的内容,我建议将逻辑直接放在那里,以避免其他中间系统(流,...)。只需提取核心代码中的字段,并使用相同的生成器将恢复的 Pojo 生成到另一个主题。只使用一个线程或根据需要使用多少。

我敢打赌,这比你能想到的任何流实用程序都要快得多。

如果您无权访问该代码,您可以创建中间消费者-生产者服务,在下一节中恢复。


  • 如果原始POJO生成和生产的代码无法访问

如果您只能访问完整的 POJO 主题,而不能访问上一步(生成消息并将它们发送到主题的代码),则第二个选项可能是创建一个中间的 kafka 消费者生产者,它使用来自 fullPojoTopic 的消息,提取字段并将过滤后的 shortPojo 生成到 shortPojoTopic.

请注意,逻辑与第一种方法相同,但此解决方案意味着更大的资源浪费新的生产者和消费者线程(相信我,他们创建了很多辅助线程),一个新的消费者组来管理,在线路上双重传输 fullPOJO 消息,等等。

我的意见是,仅当您无法直接访问以第一种方式生成和生成完整 POJOS 的代码,或者您希望更好地控制过滤的微服务时,才应使用此选项完整的数据并将所需的字段发送到另一个主题。