Spark Structured Streaming with Kafka 不支持 startingOffset="earliest"
Spark Structured Streaming with Kafka doesn't honor startingOffset="earliest"
我已经设置了 Spark Structured Streaming (Spark 2.3.2) 以从 Kafka (2.0.0) 读取数据。如果消息在 Spark 流作业启动之前进入主题,我将无法从主题的开头使用。 Spark Streaming 的这种预期行为是否忽略了在 Spark Stream 作业的初始 运行 之前生成的 Kafka 消息(即使使用 .option("stratingOffsets","earliest"))?
重现步骤
在开始流作业之前,创建 test
主题(单个代理、单个分区)并向该主题生成消息(在我的示例中为 3 条消息)。
使用以下命令启动spark-shell:spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2.3.1.0.0-78 --repositories http://repo.hortonworks.com/content/repositories/releases/
执行下面的spark scala代码。
// Local
val df = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9097")
.option("failOnDataLoss","false")
.option("stratingOffsets","earliest")
.option("subscribe", "test")
.load()
// Sink Console
val ds = df.writeStream.format("console").queryName("Write to console")
.trigger(org.apache.spark.sql.streaming.Trigger.ProcessingTime("10 second"))
.start()
预期与实际产出
我希望流从 offset=1 开始。但是,它从 offset=3 开始读取。可以看到kafka client其实是在重新设置起始偏移量:2019-06-18 21:22:57 INFO Fetcher:583 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Resetting offset for partition test-0 to offset 3.
我可以看到 spark 流处理我在启动流作业后生成的消息。
Spark Streaming 的这种预期行为是否忽略了在初始 运行 Spark Stream 作业(即使 .option("stratingOffsets","earliest")
)之前生成的 Kafka 消息?
2019-06-18 21:22:57 INFO AppInfoParser:109 - Kafka version : 2.0.0.3.1.0.0-78
2019-06-18 21:22:57 INFO AppInfoParser:110 - Kafka commitId : 0f47b27cde30d177
2019-06-18 21:22:57 INFO MicroBatchExecution:54 - Starting new streaming query.
2019-06-18 21:22:57 INFO Metadata:273 - Cluster ID: LqofSZfjTu29BhZm6hsgsg
2019-06-18 21:22:57 INFO AbstractCoordinator:677 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Discovered group coordinator localhost:9097 (id: 2147483647 rack: null)
2019-06-18 21:22:57 INFO ConsumerCoordinator:462 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Revoking previously assigned partitions []
2019-06-18 21:22:57 INFO AbstractCoordinator:509 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] (Re-)joining group
2019-06-18 21:22:57 INFO AbstractCoordinator:473 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Successfully joined group with generation 1
2019-06-18 21:22:57 INFO ConsumerCoordinator:280 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Setting newly assigned partitions [test-0]
2019-06-18 21:22:57 INFO Fetcher:583 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Resetting offset for partition test-0 to offset 3.
2019-06-18 21:22:58 INFO KafkaSource:54 - Initial offsets: {"test":{"0":3}}
2019-06-18 21:22:58 INFO Fetcher:583 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Resetting offset for partition test-0 to offset 3.
2019-06-18 21:22:58 INFO MicroBatchExecution:54 - Committed offsets for batch 0. Metadata OffsetSeqMetadata(0,1560910978083,Map(spark.sql.shuffle.partitions -> 200, spark.sql.streaming.stateStore.providerClass -> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider))
2019-06-18 21:22:58 INFO KafkaSource:54 - GetBatch called with start = None, end = {"test":{"0":3}}
Spark Batch 模式
我能够确认批处理模式从头开始读取 - 因此 Kafka 保留配置没有问题
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9097")
.option("subscribe", "test")
.load()
df.count // Long = 3
您可以通过两种方式执行此操作。从 kafka 加载数据到流数据帧或从 kafka 加载数据到静态数据帧(用于测试)。
我认为您没有看到数据是因为 group-id。 kafka 会将消费者组和偏移量提交给一个内部主题。确保组名称对于每次读取都是唯一的。
这里有两个选项。
选项 1:从 kafka 读取数据到流数据帧
// spark streaming with kafka
import org.apache.spark.sql.streaming.ProcessingTime
val ds1 = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers","app01.app.test.net:9097,app02.app.test.net:9097")
.option("subscribe", "kafka-testing-topic")
.option("kafka.security.protocol", "SASL_PLAINTEXT")
.option("startingOffsets","earliest")
.option("maxOffsetsPerTrigger","6000")
.load()
val ds2 = ds1.select(from_json($"value".cast(StringType), dataSchema).as("data")).select("data.*")
val ds3 = ds2.groupBy("TABLE_NAME").count()
ds3.writeStream
.trigger(ProcessingTime("10 seconds"))
.queryName("query1").format("console")
.outputMode("complete")
.start()
.awaitTermination()
方案二:从kafka读取数据到静态dataframe(为了测试,会从头加载)
// Subscribe to 1 topic defaults to the earliest and latest offsets
val ds1 = spark.read.format("kafka")
.option("kafka.bootstrap.servers","app01.app.test.net:9097,app02.app.test.net:9097")
.option("subscribe", "kafka-testing-topic")
.option("kafka.security.protocol", "SASL_PLAINTEXT")
.option("spark.streaming.kafka.consumer.cache.enabled","false")
.load()
val ds2 = ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)","topic","partition","offset","timestamp")
val ds3 = ds2.select("value").rdd.map(x => x.toString)
ds3.count()
哈哈这是一个简单的错字:"stratingOffsets"应该是"startingOffsets"
我已经设置了 Spark Structured Streaming (Spark 2.3.2) 以从 Kafka (2.0.0) 读取数据。如果消息在 Spark 流作业启动之前进入主题,我将无法从主题的开头使用。 Spark Streaming 的这种预期行为是否忽略了在 Spark Stream 作业的初始 运行 之前生成的 Kafka 消息(即使使用 .option("stratingOffsets","earliest"))?
重现步骤
在开始流作业之前,创建
test
主题(单个代理、单个分区)并向该主题生成消息(在我的示例中为 3 条消息)。使用以下命令启动spark-shell:
spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2.3.1.0.0-78 --repositories http://repo.hortonworks.com/content/repositories/releases/
执行下面的spark scala代码。
// Local
val df = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9097")
.option("failOnDataLoss","false")
.option("stratingOffsets","earliest")
.option("subscribe", "test")
.load()
// Sink Console
val ds = df.writeStream.format("console").queryName("Write to console")
.trigger(org.apache.spark.sql.streaming.Trigger.ProcessingTime("10 second"))
.start()
预期与实际产出
我希望流从 offset=1 开始。但是,它从 offset=3 开始读取。可以看到kafka client其实是在重新设置起始偏移量:2019-06-18 21:22:57 INFO Fetcher:583 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Resetting offset for partition test-0 to offset 3.
我可以看到 spark 流处理我在启动流作业后生成的消息。
Spark Streaming 的这种预期行为是否忽略了在初始 运行 Spark Stream 作业(即使 .option("stratingOffsets","earliest")
)之前生成的 Kafka 消息?
2019-06-18 21:22:57 INFO AppInfoParser:109 - Kafka version : 2.0.0.3.1.0.0-78
2019-06-18 21:22:57 INFO AppInfoParser:110 - Kafka commitId : 0f47b27cde30d177
2019-06-18 21:22:57 INFO MicroBatchExecution:54 - Starting new streaming query.
2019-06-18 21:22:57 INFO Metadata:273 - Cluster ID: LqofSZfjTu29BhZm6hsgsg
2019-06-18 21:22:57 INFO AbstractCoordinator:677 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Discovered group coordinator localhost:9097 (id: 2147483647 rack: null)
2019-06-18 21:22:57 INFO ConsumerCoordinator:462 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Revoking previously assigned partitions []
2019-06-18 21:22:57 INFO AbstractCoordinator:509 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] (Re-)joining group
2019-06-18 21:22:57 INFO AbstractCoordinator:473 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Successfully joined group with generation 1
2019-06-18 21:22:57 INFO ConsumerCoordinator:280 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Setting newly assigned partitions [test-0]
2019-06-18 21:22:57 INFO Fetcher:583 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Resetting offset for partition test-0 to offset 3.
2019-06-18 21:22:58 INFO KafkaSource:54 - Initial offsets: {"test":{"0":3}}
2019-06-18 21:22:58 INFO Fetcher:583 - [Consumer clientId=consumer-2, groupId=spark-kafka-source-e948eee9-3024-4f14-bcb8-75b80d43cbb1--181544888-driver-0] Resetting offset for partition test-0 to offset 3.
2019-06-18 21:22:58 INFO MicroBatchExecution:54 - Committed offsets for batch 0. Metadata OffsetSeqMetadata(0,1560910978083,Map(spark.sql.shuffle.partitions -> 200, spark.sql.streaming.stateStore.providerClass -> org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider))
2019-06-18 21:22:58 INFO KafkaSource:54 - GetBatch called with start = None, end = {"test":{"0":3}}
Spark Batch 模式
我能够确认批处理模式从头开始读取 - 因此 Kafka 保留配置没有问题
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9097")
.option("subscribe", "test")
.load()
df.count // Long = 3
您可以通过两种方式执行此操作。从 kafka 加载数据到流数据帧或从 kafka 加载数据到静态数据帧(用于测试)。
我认为您没有看到数据是因为 group-id。 kafka 会将消费者组和偏移量提交给一个内部主题。确保组名称对于每次读取都是唯一的。
这里有两个选项。
选项 1:从 kafka 读取数据到流数据帧
// spark streaming with kafka
import org.apache.spark.sql.streaming.ProcessingTime
val ds1 = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers","app01.app.test.net:9097,app02.app.test.net:9097")
.option("subscribe", "kafka-testing-topic")
.option("kafka.security.protocol", "SASL_PLAINTEXT")
.option("startingOffsets","earliest")
.option("maxOffsetsPerTrigger","6000")
.load()
val ds2 = ds1.select(from_json($"value".cast(StringType), dataSchema).as("data")).select("data.*")
val ds3 = ds2.groupBy("TABLE_NAME").count()
ds3.writeStream
.trigger(ProcessingTime("10 seconds"))
.queryName("query1").format("console")
.outputMode("complete")
.start()
.awaitTermination()
方案二:从kafka读取数据到静态dataframe(为了测试,会从头加载)
// Subscribe to 1 topic defaults to the earliest and latest offsets
val ds1 = spark.read.format("kafka")
.option("kafka.bootstrap.servers","app01.app.test.net:9097,app02.app.test.net:9097")
.option("subscribe", "kafka-testing-topic")
.option("kafka.security.protocol", "SASL_PLAINTEXT")
.option("spark.streaming.kafka.consumer.cache.enabled","false")
.load()
val ds2 = ds1.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)","topic","partition","offset","timestamp")
val ds3 = ds2.select("value").rdd.map(x => x.toString)
ds3.count()
哈哈这是一个简单的错字:"stratingOffsets"应该是"startingOffsets"