如果在提供给 kafka 的数据中遇到意外格式,当您重新启动 spark 作业时会发生什么
What happens when you restart a spark job if it encounters unexpected format in the data fed to kafka
我有一个关于 Spark Structured Streaming with Kafka 的问题。
假设我是 运行 一个 spark job 并且每件事都运行得很好。
有一天,我的 spark 作业失败了,因为提供给 kafka 的数据不一致。不一致可能是数据格式问题或 spark 无法处理的垃圾字符。在这种情况下,我们如何解决问题?有没有办法进入kafka主题,手动修改数据?
如果我们不修复数据问题并重新启动 spark 作业,它将读取导致失败的相同旧行,因为我们尚未提交检查点。那么我们如何摆脱这个循环。如何解决 Kafka 主题中的数据问题以恢复中止的 spark 作业?
除非您真的知道自己在做什么,否则我会避免尝试手动更改 Kafka 主题中的一条消息。
为防止将来发生这种情况,您可能需要考虑为您的数据使用模式(结合模式注册表)。
为了缓解您描述的问题,我看到了以下选项:
- 手动更改结构化流应用程序的消费者组的偏移量
- 创建一个从特定偏移量开始读取的“新”流作业
手动更改偏移量
使用 Sparks 结构化流时,消费者组由 Spark 自动设置。根据 code 消费者组将被定义为:
val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
您可以使用 kafka-consumer-groups
工具更改偏移量。首先通过
识别消费组的实际名称
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
然后为特定主题的消费者组设置偏移量(例如偏移量 100)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --execute --reset-offsets --group spark-kafka-source-1337 --topic topic1 --to-offset 100
如果您只需要更改特定分区的偏移量,您可以查看该工具的帮助功能以了解如何执行此操作。
创建新的流媒体作业
您可以使用 startingOffsets
中描述的 Spark 选项 Spark + Kafka integration guide:
Option: startingOffsets
value: "earliest", "latest" (streaming only), or json string """ {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} """
default: "latest" for streaming, "earliest" for batch
meaning: The start point when a query is started, either "earliest" which is from the earliest offsets, "latest" which is just from the latest offsets, or a json string specifying a starting offset for each TopicPartition. In the json, -2 as an offset can be used to refer to earliest, -1 to latest. Note: For batch queries, latest (either implicitly or by using -1 in json) is not allowed. For streaming queries, this only applies when a new query is started, and that resuming will always pick up from where the query left off. Newly discovered partitions during a query will start at earliest.
要实现这一点,重要的是要有一个“新”查询。这意味着您需要删除现有作业的检查点文件或创建完整的新应用程序。
我有一个关于 Spark Structured Streaming with Kafka 的问题。 假设我是 运行 一个 spark job 并且每件事都运行得很好。 有一天,我的 spark 作业失败了,因为提供给 kafka 的数据不一致。不一致可能是数据格式问题或 spark 无法处理的垃圾字符。在这种情况下,我们如何解决问题?有没有办法进入kafka主题,手动修改数据?
如果我们不修复数据问题并重新启动 spark 作业,它将读取导致失败的相同旧行,因为我们尚未提交检查点。那么我们如何摆脱这个循环。如何解决 Kafka 主题中的数据问题以恢复中止的 spark 作业?
除非您真的知道自己在做什么,否则我会避免尝试手动更改 Kafka 主题中的一条消息。
为防止将来发生这种情况,您可能需要考虑为您的数据使用模式(结合模式注册表)。
为了缓解您描述的问题,我看到了以下选项:
- 手动更改结构化流应用程序的消费者组的偏移量
- 创建一个从特定偏移量开始读取的“新”流作业
手动更改偏移量
使用 Sparks 结构化流时,消费者组由 Spark 自动设置。根据 code 消费者组将被定义为:
val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
您可以使用 kafka-consumer-groups
工具更改偏移量。首先通过
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
然后为特定主题的消费者组设置偏移量(例如偏移量 100)
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --execute --reset-offsets --group spark-kafka-source-1337 --topic topic1 --to-offset 100
如果您只需要更改特定分区的偏移量,您可以查看该工具的帮助功能以了解如何执行此操作。
创建新的流媒体作业
您可以使用 startingOffsets
中描述的 Spark 选项 Spark + Kafka integration guide:
Option: startingOffsets
value: "earliest", "latest" (streaming only), or json string """ {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} """
default: "latest" for streaming, "earliest" for batch
meaning: The start point when a query is started, either "earliest" which is from the earliest offsets, "latest" which is just from the latest offsets, or a json string specifying a starting offset for each TopicPartition. In the json, -2 as an offset can be used to refer to earliest, -1 to latest. Note: For batch queries, latest (either implicitly or by using -1 in json) is not allowed. For streaming queries, this only applies when a new query is started, and that resuming will always pick up from where the query left off. Newly discovered partitions during a query will start at earliest.
要实现这一点,重要的是要有一个“新”查询。这意味着您需要删除现有作业的检查点文件或创建完整的新应用程序。