加入时 spark 结构化流中的巨大 hadoop 应用程序数据 mongo

Huge hadoop appdata in spark structured streaming when joining mongo

我想加入一些来自基于 json 的 kafka 源的事件,其中 url 字段在 Mongodb 集合中具有相关数据。然后聚合它们,包括额外的 Mongodb 数据并将数据输出到 GCS 接收器。

当我 运行 我的结构化流式 spark 应用程序时,我的 spark 集群开始无限制地填充磁盘 space。我已将水印配置为 0 秒(因为我仅聚合来自当前处理批次的事件),因此最大状态应为 1 或 2 个批次。但是我有这个可用磁盘 space 图(杀死应用程序时稳定):

几乎所有填满我硬盘的数据都位于: /hadoop/yarn/nm-local-dir/usercache/myuser/appcache/myapplication-id

如果我禁用 mongodb 加入,可用磁盘会随着时间的推移保持稳定,但我需要加入的数据。

我想加入的 mongodb 集合大约有 11 GB,我输入的 kafka 主题大约有 3k records/sec。

我的代码如下所示:

import com.mongodb.spark.MongoSpark
import org.apache.spark.sql._
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger

object Main {
  def main(args: Array[String]) {
    val conf = new SparkConf()
      .setAppName("My awesome joined app")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .set("spark.streaming.receiver.writeAheadLog.enable", "true")
      .set("spark.streaming.receiver.writeAheadLog.closeFileAfterWrite", "true")
      .set("spark.streaming.driver.writeAheadLog.closeFileAfterWrite", "true")

      // Mongo config
      .set("spark.mongodb.input.uri", "mongodb://mongo/urls.urls")

    val session = SparkSession
      .builder
      .config(conf)
      .getOrCreate()

    val topic = "events"
    val kafkaStream = session
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "kafka:9092")
      .option("subscribe", topic)
      .option("startingOffsets" , "latest")
      .option("maxOffsetsPerTrigger", 10000000)
      .option("failOnDataLoss", false)
      .option("kafka.max.partition.fetch.bytes", 10485760)
      .option("kafka.receive.buffer.bytes", 16000000)
      .load()

    val urlsDf = MongoSpark.load(session).toDF

    import session.implicits._
    stream
      .selectExpr("CAST (value AS STRING)", " CAST (timestamp AS STRING)").as[(String, String)]
      .withColumn("name", json_tuple('value, "name"))
      .withColumn("url", json_tuple('value, "url"))

      .withColumn("a_name", when(col("name") === "a", 1).otherwise(0))
      .withColumn("b_name", when(col("name") === "b", 1).otherwise(0))

      .withColumn("date", json_tuple('value, "date"))

      // We don't care about reinjecting old data, fake watermarking
      .withColumn("server_time", current_timestamp)
      .withWatermark("server_time", "0 seconds")

      .groupBy($"url", $"server_time")
      .agg(
        sum(s"a_name") as "a_name",
        sum(s"b_name") as "b_name"
      )

      // Join with entities
      .join(urlsDf, $"url" === $"_id", "left_outer")

      .select(
        "url",
        "some_value_from_mongo", // from the joined stream
        s"a_name",
        s"b_name"
      )

      .coalesce(24)
      .writeStream
      .format("parquet")
      .outputMode("append")
      .option("path", "gs://my-custom-data")
      .option("checkpointLocation", "/my-custom-data/checkpoints")
      .trigger(Trigger.ProcessingTime(10 * 60 * 1000)) // 10 minutes

      .start
      .awaitTermination
  }
}

问题已解决,将 dynamicAllocation 设置为 false。

为此,您可以在 spark-submit 中设置以下配置:

  --conf "spark.dynamicAllocation.enabled=false" \

在执行程序存在之前,Yarn 或 Spark 没有删除不需要的状态似乎存在问题。 https://issues.apache.org/jira/browse/YARN-7070.

除此之外,加入一个 11gb 不断增长的 mongo 集合是一个糟糕的架构设计。随着集合的增加,将需要增加 spark 执行器。在我的例子中,由于工作人员之间的大量洗牌,这也会导致大量的 GC 时间。