Twisted python 从 kafka 读取并写入 elasticsearch

Twisted python to read from kafka and write to elasticsearch

我是 Twisted 的新手,这是我的第一个程序。

我找不到使用 kafka-python 库中的 KafkaConsumer 并使用 treq 触发对 elasticsearch 的 post 请求的方法。

我可以将问题分解成小块: 创建一个kafka消费者迭代器并从中读取数据(题目可能很大)

def consumeKafka():
    consumer = KafkaConsumer(bootstrap_servers="kafka:9092", auto_offset_reset='earliest')
    consumer.subscribe(['kafkapipeline'])
    for v in consumer:
        v.value

post 使用 treq

进行弹性搜索
def post(self):
    d = treq.post('http://es:9200/pro/pr/', self.data)
    d.addCallbacks(lambda x: print(x), lambda x: print("error %s " % x))

启动反应堆

from twisted.internet import reactor
reactor.callWhenRunning(consumeKafka)
reactor.run()

知道如何进行这项工作吗?

我根本不使用 Kafka,所以我不确定这是否适合你。另外,我假设您同时遇到 运行 Kafka 和 treq 问题。我在 Twisted 中处理迭代器的一种通用方法是使用 inlineCallbacks 等待结果,然后对结果做一些事情。

from twisted.internet import defer

@defer.inlineCallbacks
def consumeKafka():
    consumer = KafkaConsumer(bootstrap_servers="kafka:9092", auto_offset_reset='earliest')
    consumer.subscribe(['kafkapipeline'])
    for v in consumer:
        value = yield v.value
        # do stuff with value

然后你可以简单地调用这个函数,reactor 会处理剩下的事情。所以您的主要部分将如下所示:

consumeKafka()
reactor.run()

请注意 consumeKafka() 函数 returns a Deferred 因此您可以根据需要添加回调和错误反馈。熟悉此模型后,请查看 Cooperator 对象以获得更多功能。