事件驱动的 Kafka 消费者有 Python API 吗?
Is there a Python API for event-driven Kafka consumer?
我一直在尝试构建一个以 Kafka 作为唯一界面的 Flask 应用程序。出于这个原因,我想要一个 Kafka 消费者,当相关主题的流中有新消息时触发,并通过将消息推送回 Kafka 流来响应。
我一直在寻找类似于 Spring 实现的东西:
@KafkaListener(topics = "mytopic", groupId = "mygroup")
public void listen(String message) {
System.out.println("Received Messasge in group mygroup: " + message);
}
我看过:
但是我在Python中找不到任何与事件驱动的实现方式相关的东西。
Kafka 消费者必须不断轮询以从代理检索数据。
Spring 给了你这个奇特的 API 但在幕后,它在循环中调用 poll 并且只在检索到记录时调用你的方法。
您可以使用您提到的任何 Python 客户端轻松构建类似的东西。就像在 Java 中一样,这不是由(大多数)Kafka 客户端直接公开的 API,而是由顶层提供的东西。这是你需要构建的东西。
这是@MickaelMaison . I used kafka-python 给出的想法的实现。
from kafka import KafkaConsumer
import threading
BOOTSTRAP_SERVERS = ['localhost:9092']
def register_kafka_listener(topic, listener):
# Poll kafka
def poll():
# Initialize consumer Instance
consumer = KafkaConsumer(topic, bootstrap_servers=BOOTSTRAP_SERVERS)
print("About to start polling for topic:", topic)
consumer.poll(timeout_ms=6000)
print("Started Polling for topic:", topic)
for msg in consumer:
print("Entered the loop\nKey: ",msg.key," Value:", msg.value)
kafka_listener(msg)
print("About to register listener to topic:", topic)
t1 = threading.Thread(target=poll)
t1.start()
print("started a background thread")
def kafka_listener(data):
print("Image Ratings:\n", data.value.decode("utf-8"))
register_kafka_listener('topic1', kafka_listener)
轮询是在不同的线程中完成的。一旦收到消息,就会通过传递从 Kafka 检索到的数据来调用侦听器。
我一直在尝试构建一个以 Kafka 作为唯一界面的 Flask 应用程序。出于这个原因,我想要一个 Kafka 消费者,当相关主题的流中有新消息时触发,并通过将消息推送回 Kafka 流来响应。
我一直在寻找类似于 Spring 实现的东西:
@KafkaListener(topics = "mytopic", groupId = "mygroup")
public void listen(String message) {
System.out.println("Received Messasge in group mygroup: " + message);
}
我看过:
但是我在Python中找不到任何与事件驱动的实现方式相关的东西。
Kafka 消费者必须不断轮询以从代理检索数据。
Spring 给了你这个奇特的 API 但在幕后,它在循环中调用 poll 并且只在检索到记录时调用你的方法。
您可以使用您提到的任何 Python 客户端轻松构建类似的东西。就像在 Java 中一样,这不是由(大多数)Kafka 客户端直接公开的 API,而是由顶层提供的东西。这是你需要构建的东西。
这是@MickaelMaison
from kafka import KafkaConsumer
import threading
BOOTSTRAP_SERVERS = ['localhost:9092']
def register_kafka_listener(topic, listener):
# Poll kafka
def poll():
# Initialize consumer Instance
consumer = KafkaConsumer(topic, bootstrap_servers=BOOTSTRAP_SERVERS)
print("About to start polling for topic:", topic)
consumer.poll(timeout_ms=6000)
print("Started Polling for topic:", topic)
for msg in consumer:
print("Entered the loop\nKey: ",msg.key," Value:", msg.value)
kafka_listener(msg)
print("About to register listener to topic:", topic)
t1 = threading.Thread(target=poll)
t1.start()
print("started a background thread")
def kafka_listener(data):
print("Image Ratings:\n", data.value.decode("utf-8"))
register_kafka_listener('topic1', kafka_listener)
轮询是在不同的线程中完成的。一旦收到消息,就会通过传递从 Kafka 检索到的数据来调用侦听器。