重新启动 PySpark 作业不会获取在 pyspark 消费者关闭时插入到 Kafka 主题中的记录
Restarting a PySpark job doesn't get the records which were inserted into Kafka Topic while the pyspark consumer is down
我正在 运行 执行 pyspark 作业,数据流来自 Kafka。
我正在尝试在我的 windows 系统中复制一个场景,以了解当数据持续输入 Kafka 时消费者出现故障时会发生什么。
这是我所期望的。
- 生产者已启动并生成消息 1、2 和 3。
- 消费者在线并消费消息 1、2 和 3。
- 现在,当生产者生成消息 4、5 和 6 等时,消费者由于某种原因宕机...
- 当消费者出现时,我希望它应该从停止的地方开始阅读。因此消费者必须能够从消息 4、5、6 等中读取....
我的 pyspark 应用程序无法达到我的预期。这是我创建 Spark Session 的方法。
session.readStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "clickapijson")
.option("startingoffsets" , "latest") \
.load()
我用谷歌搜索并收集了相当多的信息。似乎 groupID 在这里是相关的。 Kafka 维护特定 groupID 中每个消费者读取的偏移量的跟踪。如果消费者订阅了一个带有 groupId 的主题,比如 G1,kafka 会注册这个组和 consumerID 并跟踪这个 groupID 和 ConsumerID。如果消费者因为某种原因必须关闭,并以相同的 groupID 重新启动,那么 kafka 将拥有已读取偏移量的信息,因此消费者将从它停止的地方读取数据。
当我使用以下命令在 CLI 中调用消费者作业时,就会发生这种情况。
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic "clickapijson" --consumer-property group.id=test
现在,当我的生产者产生消息 1,2 和 3 时,消费者就可以消费了。在读取第 3 条消息后,我终止了 运行ning 消费者作业(CLI .bat 文件)。我的制作人制作消息 4、5 和 6 等等……
现在我带回了我的消费者工作(CLI .bat 文件),它能够从它停止的地方(从消息 4)读取数据。这符合我的预期。
我无法在 pyspark 中做同样的事情。
当我包含 option("group.id" , "test")
时,它会抛出一条错误消息,指出 Kafka 选项 group.id
不受支持,因为用户指定的消费者组未用于跟踪偏移量。
观察控制台输出,每次启动我的 pyspark 消费者作业时,它都会创建一个新的 groupID。如果我的 pyspark 作业之前有 运行 和 groupID 但失败了,当它重新启动时它不会选择相同的旧 groupID。它随机获得一个新的 groupID。 Kafka有之前groupID的offset信息,没有当前新生成的groupID。因此,我的 pyspark 应用程序在停机时无法读取输入 Kafka 的数据。
如果是这种情况,那么当消费者作业由于某些故障而停止时,我不会丢失我的数据吗?
如何将我自己的 groupid 提供给 pyspark 应用程序或如何使用相同的旧 groupid 重新启动我的 pyspark 应用程序?
在当前的 Spark 版本 (2.4.5) 中,无法提供您自己的 group.id
,因为它是由 Spark 自动创建的(正如您已经观察到的那样)。 here 给出了有关从 Kafka 读取的 Spark 偏移量管理的完整详细信息,并总结如下:
Note that the following Kafka params cannot be set and the Kafka source or sink will throw an exception:
group.id: Kafka source will create a unique group id for each query automatically.
auto.offset.reset: Set the source option startingOffsets to specify where to start instead. Structured Streaming manages which offsets are consumed internally, rather than rely on the kafka Consumer to do it. This will ensure that no data is missed when new topics/partitions are dynamically subscribed. Note that startingOffsets only applies when a new streaming query is started, and that resuming will always pick up from where the query left off.
enable.auto.commit: Kafka source doesn’t commit any offset.
为了让 Spark 能够记住它从 Kafka 读取的位置,您需要启用检查点并提供一个路径位置来存储检查点文件。在 Python 这看起来像:
aggDF \
.writeStream \
.outputMode("complete") \
.option("checkpointLocation", "path/to/HDFS/dir") \
.format("memory") \
.start()
Recovering from Failures with Checkpointing 上的 Spark 文档中提供了有关检查点的更多详细信息。
我正在 运行 执行 pyspark 作业,数据流来自 Kafka。 我正在尝试在我的 windows 系统中复制一个场景,以了解当数据持续输入 Kafka 时消费者出现故障时会发生什么。
这是我所期望的。
- 生产者已启动并生成消息 1、2 和 3。
- 消费者在线并消费消息 1、2 和 3。
- 现在,当生产者生成消息 4、5 和 6 等时,消费者由于某种原因宕机...
- 当消费者出现时,我希望它应该从停止的地方开始阅读。因此消费者必须能够从消息 4、5、6 等中读取....
我的 pyspark 应用程序无法达到我的预期。这是我创建 Spark Session 的方法。
session.readStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "clickapijson")
.option("startingoffsets" , "latest") \
.load()
我用谷歌搜索并收集了相当多的信息。似乎 groupID 在这里是相关的。 Kafka 维护特定 groupID 中每个消费者读取的偏移量的跟踪。如果消费者订阅了一个带有 groupId 的主题,比如 G1,kafka 会注册这个组和 consumerID 并跟踪这个 groupID 和 ConsumerID。如果消费者因为某种原因必须关闭,并以相同的 groupID 重新启动,那么 kafka 将拥有已读取偏移量的信息,因此消费者将从它停止的地方读取数据。
当我使用以下命令在 CLI 中调用消费者作业时,就会发生这种情况。
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic "clickapijson" --consumer-property group.id=test
现在,当我的生产者产生消息 1,2 和 3 时,消费者就可以消费了。在读取第 3 条消息后,我终止了 运行ning 消费者作业(CLI .bat 文件)。我的制作人制作消息 4、5 和 6 等等…… 现在我带回了我的消费者工作(CLI .bat 文件),它能够从它停止的地方(从消息 4)读取数据。这符合我的预期。
我无法在 pyspark 中做同样的事情。
当我包含 option("group.id" , "test")
时,它会抛出一条错误消息,指出 Kafka 选项 group.id
不受支持,因为用户指定的消费者组未用于跟踪偏移量。
观察控制台输出,每次启动我的 pyspark 消费者作业时,它都会创建一个新的 groupID。如果我的 pyspark 作业之前有 运行 和 groupID 但失败了,当它重新启动时它不会选择相同的旧 groupID。它随机获得一个新的 groupID。 Kafka有之前groupID的offset信息,没有当前新生成的groupID。因此,我的 pyspark 应用程序在停机时无法读取输入 Kafka 的数据。
如果是这种情况,那么当消费者作业由于某些故障而停止时,我不会丢失我的数据吗?
如何将我自己的 groupid 提供给 pyspark 应用程序或如何使用相同的旧 groupid 重新启动我的 pyspark 应用程序?
在当前的 Spark 版本 (2.4.5) 中,无法提供您自己的 group.id
,因为它是由 Spark 自动创建的(正如您已经观察到的那样)。 here 给出了有关从 Kafka 读取的 Spark 偏移量管理的完整详细信息,并总结如下:
Note that the following Kafka params cannot be set and the Kafka source or sink will throw an exception:
group.id: Kafka source will create a unique group id for each query automatically.
auto.offset.reset: Set the source option startingOffsets to specify where to start instead. Structured Streaming manages which offsets are consumed internally, rather than rely on the kafka Consumer to do it. This will ensure that no data is missed when new topics/partitions are dynamically subscribed. Note that startingOffsets only applies when a new streaming query is started, and that resuming will always pick up from where the query left off.
enable.auto.commit: Kafka source doesn’t commit any offset.
为了让 Spark 能够记住它从 Kafka 读取的位置,您需要启用检查点并提供一个路径位置来存储检查点文件。在 Python 这看起来像:
aggDF \
.writeStream \
.outputMode("complete") \
.option("checkpointLocation", "path/to/HDFS/dir") \
.format("memory") \
.start()
Recovering from Failures with Checkpointing 上的 Spark 文档中提供了有关检查点的更多详细信息。