Twisted python 从 kafka 读取并写入 elasticsearch
Twisted python to read from kafka and write to elasticsearch
我是 Twisted 的新手,这是我的第一个程序。
我找不到使用 kafka-python 库中的 KafkaConsumer 并使用 treq 触发对 elasticsearch 的 post 请求的方法。
我可以将问题分解成小块:
创建一个kafka消费者迭代器并从中读取数据(题目可能很大)
def consumeKafka():
consumer = KafkaConsumer(bootstrap_servers="kafka:9092", auto_offset_reset='earliest')
consumer.subscribe(['kafkapipeline'])
for v in consumer:
v.value
post 使用 treq
进行弹性搜索
def post(self):
d = treq.post('http://es:9200/pro/pr/', self.data)
d.addCallbacks(lambda x: print(x), lambda x: print("error %s " % x))
启动反应堆
from twisted.internet import reactor
reactor.callWhenRunning(consumeKafka)
reactor.run()
知道如何进行这项工作吗?
我根本不使用 Kafka,所以我不确定这是否适合你。另外,我假设您同时遇到 运行 Kafka 和 treq 问题。我在 Twisted 中处理迭代器的一种通用方法是使用 inlineCallbacks
等待结果,然后对结果做一些事情。
from twisted.internet import defer
@defer.inlineCallbacks
def consumeKafka():
consumer = KafkaConsumer(bootstrap_servers="kafka:9092", auto_offset_reset='earliest')
consumer.subscribe(['kafkapipeline'])
for v in consumer:
value = yield v.value
# do stuff with value
然后你可以简单地调用这个函数,reactor 会处理剩下的事情。所以您的主要部分将如下所示:
consumeKafka()
reactor.run()
请注意 consumeKafka()
函数 returns a Deferred
因此您可以根据需要添加回调和错误反馈。熟悉此模型后,请查看 Cooperator
对象以获得更多功能。
我是 Twisted 的新手,这是我的第一个程序。
我找不到使用 kafka-python 库中的 KafkaConsumer 并使用 treq 触发对 elasticsearch 的 post 请求的方法。
我可以将问题分解成小块: 创建一个kafka消费者迭代器并从中读取数据(题目可能很大)
def consumeKafka():
consumer = KafkaConsumer(bootstrap_servers="kafka:9092", auto_offset_reset='earliest')
consumer.subscribe(['kafkapipeline'])
for v in consumer:
v.value
post 使用 treq
进行弹性搜索def post(self):
d = treq.post('http://es:9200/pro/pr/', self.data)
d.addCallbacks(lambda x: print(x), lambda x: print("error %s " % x))
启动反应堆
from twisted.internet import reactor
reactor.callWhenRunning(consumeKafka)
reactor.run()
知道如何进行这项工作吗?
我根本不使用 Kafka,所以我不确定这是否适合你。另外,我假设您同时遇到 运行 Kafka 和 treq 问题。我在 Twisted 中处理迭代器的一种通用方法是使用 inlineCallbacks
等待结果,然后对结果做一些事情。
from twisted.internet import defer
@defer.inlineCallbacks
def consumeKafka():
consumer = KafkaConsumer(bootstrap_servers="kafka:9092", auto_offset_reset='earliest')
consumer.subscribe(['kafkapipeline'])
for v in consumer:
value = yield v.value
# do stuff with value
然后你可以简单地调用这个函数,reactor 会处理剩下的事情。所以您的主要部分将如下所示:
consumeKafka()
reactor.run()
请注意 consumeKafka()
函数 returns a Deferred
因此您可以根据需要添加回调和错误反馈。熟悉此模型后,请查看 Cooperator
对象以获得更多功能。