我们如何管理 Spark Structured Streaming 中的偏移量? (_spark_metadata 的问题)

How we manage offsets in Spark Structured Streaming? (Issues with _spark_metadata )

背景: 我已经编写了一个简单的 spark 结构化流应用程序来将数据从 Kafka 移动到 S3。发现为了支持 exactly-once 保证,spark 创建了 _spark_metadata 文件夹,最终变得太大,当流媒体应用程序长时间运行时,元数据文件夹变得如此之大,以至于我们开始出现 OOM 错误。我想摆脱 Spark Structured Streaming 的元数据和检查点文件夹并自己管理偏移量。

我们如何在 Spark Streaming 中管理偏移量: 我已经使用 val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges 来获取 Spark Structured Streaming 中的偏移量。但是想知道如何使用 Spark Structured Streaming 获取偏移量和其他元数据来管理我们自己的检查点。您有实现检查点的示例程序吗?

我们如何在 Spark Structured Streaming 中管理偏移量? 看着这个 JIRA https://issues-test.apache.org/jira/browse/SPARK-18258。看起来没有提供偏移量。我们该怎么办?

问题是在 6 小时内元数据的大小增加到 45MB,并且一直增长到将近 13GB。分配的驱动程序内存为 5GB。那时系统因 OOM 而崩溃。想知道如何避免让这个元数据变得如此之大?如何让元数据不记录那么多信息。

代码:

1. Reading records from Kafka topic
  Dataset<Row> inputDf = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
  .option("subscribe", "topic1") \
  .option("startingOffsets", "earliest") \
  .load()
2. Use from_json API from Spark to extract your data for further transformation in a dataset.
   Dataset<Row> dataDf = inputDf.select(from_json(col("value").cast("string"), EVENT_SCHEMA).alias("event"))
       ....withColumn("oem_id", col("metadata.oem_id"));
3. Construct a temp table of above dataset using SQLContext
   SQLContext sqlContext = new SQLContext(sparkSession);
   dataDf.createOrReplaceTempView("event");
4. Flatten events since Parquet does not support hierarchical data.
5. Store output in parquet format on S3
   StreamingQuery query = flatDf.writeStream().format("parquet")

数据集 dataDf = inputDf.select(from_json(col("value").cast("string"), EVENT_SCHEMA).alias("event")) .select("event.metadata", "event.data", "event.connection", "event.registration_event","event.version_event" ); SQLContext sqlContext = new SQLContext(sparkSession); dataDf.createOrReplaceTempView("event"); 数据集 flatDf = sqlContext .sql("select " + "日期、时间、id," + flattenSchema(EVENT_SCHEMA, "event") + "来自事件"); StreamingQuery 查询 = flatDf .writeStream() .outputMode("append") .option("compression", "snappy") 格式("parquet") .option("checkpointLocation", checkpointLocation) .option("path", 输出路径) .partitionBy("date", "time", "id") .trigger(Trigger.ProcessingTime(triggerProcessingTime)) 。开始(); query.awaitTermination();

对于非批处理 Spark Structured Streaming KAFKA 集成:

Quote:

Structured Streaming ignores the offsets commits in Apache Kafka.

Instead, it relies on its own offsets management on the driver side which is responsible for distributing offsets to executors and for checkpointing them at the end of the processing round (epoch or micro-batch).

如果您遵循 Spark KAFKA 集成指南,则无需担心。

优秀参考:https://www.waitingforcode.com/apache-spark-structured-streaming/apache-spark-structured-streaming-apache-kafka-offsets-management/read

对于批处理情况不同,你需要自己管理并存储偏移量。

更新 根据评论,我建议问题略有不同,建议您查看 . In addition to your updated comments and the fact that there is no error, I suggest you consukt this on metadata for Spark Structured Streaming https://www.waitingforcode.com/apache-spark-structured-streaming/checkpoint-storage-structured-streaming/read。看代码,跟我的风格不一样,但是看不出明显的错误。