pySpark Kafka Direct Streaming 更新 Zookeeper / Kafka Offset
pySpark Kafka Direct Streaming update Zookeeper / Kafka Offset
目前我正在使用 Kafka / Zookeeper 和 pySpark (1.6.0)。
我已经成功创建了一个使用 KafkaUtils.createDirectStream()
.
的 kafka 消费者
所有流媒体都没有问题,但我认识到,在我消费了一些消息后,我的 Kafka 主题没有更新到当前偏移量。
由于我们需要更新主题以在此处进行监控,这有点奇怪。
在 Spark 的文档中我发现了这条评论:
offsetRanges = []
def storeOffsetRanges(rdd):
global offsetRanges
offsetRanges = rdd.offsetRanges()
return rdd
def printOffsetRanges(rdd):
for o in offsetRanges:
print "%s %s %s %s" % (o.topic, o.partition, o.fromOffset, o.untilOffset)
directKafkaStream\
.transform(storeOffsetRanges)\
.foreachRDD(printOffsetRanges)
You can use this to update Zookeeper yourself if you want Zookeeper-based Kafka monitoring tools to show progress of the streaming application.
我在 Scala 中找到了解决方案,但找不到 python 的等效解决方案。
这是 Scala 示例:http://geeks.aretotally.in/spark-streaming-kafka-direct-api-store-offsets-in-zk/
问题
但问题是,我如何才能从那时起更新动物园管理员?
我遇到了类似的问题。
你是对的,通过使用 directStream,意味着直接使用 kafka low-level API,它没有更新 reader 偏移量。
scala/java 周围有几个例子,但 python 没有。
但是自己做很容易,你需要做的是:
- 从开头的偏移量读取
- 最后保存偏移量
例如,我通过执行以下操作在 redis 中保存每个分区的偏移量:
stream.foreachRDD(lambda rdd: save_offset(rdd))
def save_offset(rdd):
ranges = rdd.offsetRanges()
for rng in ranges:
rng.untilOffset # save offset somewhere
那么在开始时,您可以使用:
fromoffset = {}
topic_partition = TopicAndPartition(topic, partition)
fromoffset[topic_partition]= int(value) #the value of int read from where you store previously.
对于一些使用zk跟踪偏移量的工具,最好将偏移量保存在zookeeper中。
这一页:
https://community.hortonworks.com/articles/81357/manually-resetting-offset-for-a-kafka-topic.html
描述如何设置偏移量,基本上,zk 节点是:
/consumers/[consumer_name]/offsets/[主题名称]/[分区 ID]
因为我们用的是directStream,所以你得自己编一个consumer name
我用 python kazoo 库编写了一些函数来保存和读取 Kafka 偏移量。
获取Kazoo客户端单例的第一个函数:
ZOOKEEPER_SERVERS = "127.0.0.1:2181"
def get_zookeeper_instance():
from kazoo.client import KazooClient
if 'KazooSingletonInstance' not in globals():
globals()['KazooSingletonInstance'] = KazooClient(ZOOKEEPER_SERVERS)
globals()['KazooSingletonInstance'].start()
return globals()['KazooSingletonInstance']
然后函数读写偏移量:
def read_offsets(zk, topics):
from pyspark.streaming.kafka import TopicAndPartition
from_offsets = {}
for topic in topics:
for partition in zk.get_children(f'/consumers/{topic}'):
topic_partion = TopicAndPartition(topic, int(partition))
offset = int(zk.get(f'/consumers/{topic}/{partition}')[0])
from_offsets[topic_partion] = offset
return from_offsets
def save_offsets(rdd):
zk = get_zookeeper_instance()
for offset in rdd.offsetRanges():
path = f"/consumers/{offset.topic}/{offset.partition}"
zk.ensure_path(path)
zk.set(path, str(offset.untilOffset).encode())
然后在开始流式传输之前,您可以从 zookeeper 读取偏移量并将它们传递给 createDirectStream
对于 fromOffsets
参数。:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
def main(brokers="127.0.0.1:9092", topics=['test1', 'test2']):
sc = SparkContext(appName="PythonStreamingSaveOffsets")
ssc = StreamingContext(sc, 2)
zk = get_zookeeper_instance()
from_offsets = read_offsets(zk, topics)
directKafkaStream = KafkaUtils.createDirectStream(
ssc, topics, {"metadata.broker.list": brokers},
fromOffsets=from_offsets)
directKafkaStream.foreachRDD(save_offsets)
if __name__ == "__main__":
main()
目前我正在使用 Kafka / Zookeeper 和 pySpark (1.6.0)。
我已经成功创建了一个使用 KafkaUtils.createDirectStream()
.
所有流媒体都没有问题,但我认识到,在我消费了一些消息后,我的 Kafka 主题没有更新到当前偏移量。
由于我们需要更新主题以在此处进行监控,这有点奇怪。
在 Spark 的文档中我发现了这条评论:
offsetRanges = []
def storeOffsetRanges(rdd):
global offsetRanges
offsetRanges = rdd.offsetRanges()
return rdd
def printOffsetRanges(rdd):
for o in offsetRanges:
print "%s %s %s %s" % (o.topic, o.partition, o.fromOffset, o.untilOffset)
directKafkaStream\
.transform(storeOffsetRanges)\
.foreachRDD(printOffsetRanges)
You can use this to update Zookeeper yourself if you want Zookeeper-based Kafka monitoring tools to show progress of the streaming application.
我在 Scala 中找到了解决方案,但找不到 python 的等效解决方案。 这是 Scala 示例:http://geeks.aretotally.in/spark-streaming-kafka-direct-api-store-offsets-in-zk/
问题
但问题是,我如何才能从那时起更新动物园管理员?
我遇到了类似的问题。 你是对的,通过使用 directStream,意味着直接使用 kafka low-level API,它没有更新 reader 偏移量。 scala/java 周围有几个例子,但 python 没有。 但是自己做很容易,你需要做的是:
- 从开头的偏移量读取
- 最后保存偏移量
例如,我通过执行以下操作在 redis 中保存每个分区的偏移量:
stream.foreachRDD(lambda rdd: save_offset(rdd))
def save_offset(rdd):
ranges = rdd.offsetRanges()
for rng in ranges:
rng.untilOffset # save offset somewhere
那么在开始时,您可以使用:
fromoffset = {}
topic_partition = TopicAndPartition(topic, partition)
fromoffset[topic_partition]= int(value) #the value of int read from where you store previously.
对于一些使用zk跟踪偏移量的工具,最好将偏移量保存在zookeeper中。 这一页: https://community.hortonworks.com/articles/81357/manually-resetting-offset-for-a-kafka-topic.html 描述如何设置偏移量,基本上,zk 节点是: /consumers/[consumer_name]/offsets/[主题名称]/[分区 ID] 因为我们用的是directStream,所以你得自己编一个consumer name
我用 python kazoo 库编写了一些函数来保存和读取 Kafka 偏移量。
获取Kazoo客户端单例的第一个函数:
ZOOKEEPER_SERVERS = "127.0.0.1:2181"
def get_zookeeper_instance():
from kazoo.client import KazooClient
if 'KazooSingletonInstance' not in globals():
globals()['KazooSingletonInstance'] = KazooClient(ZOOKEEPER_SERVERS)
globals()['KazooSingletonInstance'].start()
return globals()['KazooSingletonInstance']
然后函数读写偏移量:
def read_offsets(zk, topics):
from pyspark.streaming.kafka import TopicAndPartition
from_offsets = {}
for topic in topics:
for partition in zk.get_children(f'/consumers/{topic}'):
topic_partion = TopicAndPartition(topic, int(partition))
offset = int(zk.get(f'/consumers/{topic}/{partition}')[0])
from_offsets[topic_partion] = offset
return from_offsets
def save_offsets(rdd):
zk = get_zookeeper_instance()
for offset in rdd.offsetRanges():
path = f"/consumers/{offset.topic}/{offset.partition}"
zk.ensure_path(path)
zk.set(path, str(offset.untilOffset).encode())
然后在开始流式传输之前,您可以从 zookeeper 读取偏移量并将它们传递给 createDirectStream
对于 fromOffsets
参数。:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
def main(brokers="127.0.0.1:9092", topics=['test1', 'test2']):
sc = SparkContext(appName="PythonStreamingSaveOffsets")
ssc = StreamingContext(sc, 2)
zk = get_zookeeper_instance()
from_offsets = read_offsets(zk, topics)
directKafkaStream = KafkaUtils.createDirectStream(
ssc, topics, {"metadata.broker.list": brokers},
fromOffsets=from_offsets)
directKafkaStream.foreachRDD(save_offsets)
if __name__ == "__main__":
main()