事件驱动的 Kafka 消费者有 Python API 吗?

Is there a Python API for event-driven Kafka consumer?

我一直在尝试构建一个以 Kafka 作为唯一界面的 Flask 应用程序。出于这个原因,我想要一个 Kafka 消费者,当相关主题的流中有新消息时触发,并通过将消息推送回 Kafka 流来响应。

我一直在寻找类似于 Spring 实现的东西:

@KafkaListener(topics = "mytopic", groupId = "mygroup")
public void listen(String message) {
    System.out.println("Received Messasge in group mygroup: " + message);
}

我看过:

  1. kafka-python
  2. pykafka
  3. confluent-kafka

但是我在Python中找不到任何与事件驱动的实现方式相关的东西。

Kafka 消费者必须不断轮询以从代理检索数据。

Spring 给了你这个奇特的 API 但在幕后,它在循环中调用 poll 并且只在检索到记录时调用你的方法。

您可以使用您提到的任何 Python 客户端轻松构建类似的东西。就像在 Java 中一样,这不是由(大多数)Kafka 客户端直接公开的 API,而是由顶层提供的东西。这是需要构建的东西。

这是@MickaelMaison . I used kafka-python 给出的想法的实现。

from kafka import KafkaConsumer
import threading

BOOTSTRAP_SERVERS = ['localhost:9092']

def register_kafka_listener(topic, listener):
# Poll kafka
    def poll():
        # Initialize consumer Instance
        consumer = KafkaConsumer(topic, bootstrap_servers=BOOTSTRAP_SERVERS)

        print("About to start polling for topic:", topic)
        consumer.poll(timeout_ms=6000)
        print("Started Polling for topic:", topic)
        for msg in consumer:
            print("Entered the loop\nKey: ",msg.key," Value:", msg.value)
            kafka_listener(msg)
    print("About to register listener to topic:", topic)
    t1 = threading.Thread(target=poll)
    t1.start()
    print("started a background thread")

def kafka_listener(data):
    print("Image Ratings:\n", data.value.decode("utf-8"))

register_kafka_listener('topic1', kafka_listener)

轮询是在不同的线程中完成的。一旦收到消息,就会通过传递从 Kafka 检索到的数据来调用侦听器。