加入时 spark 结构化流中的巨大 hadoop 应用程序数据 mongo
Huge hadoop appdata in spark structured streaming when joining mongo
我想加入一些来自基于 json 的 kafka 源的事件,其中 url 字段在 Mongodb 集合中具有相关数据。然后聚合它们,包括额外的 Mongodb 数据并将数据输出到 GCS 接收器。
当我 运行 我的结构化流式 spark 应用程序时,我的 spark 集群开始无限制地填充磁盘 space。我已将水印配置为 0 秒(因为我仅聚合来自当前处理批次的事件),因此最大状态应为 1 或 2 个批次。但是我有这个可用磁盘 space 图(杀死应用程序时稳定):
几乎所有填满我硬盘的数据都位于:
/hadoop/yarn/nm-local-dir/usercache/myuser/appcache/myapplication-id
如果我禁用 mongodb 加入,可用磁盘会随着时间的推移保持稳定,但我需要加入的数据。
我想加入的 mongodb 集合大约有 11 GB,我输入的 kafka 主题大约有 3k records/sec。
我的代码如下所示:
import com.mongodb.spark.MongoSpark
import org.apache.spark.sql._
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
object Main {
def main(args: Array[String]) {
val conf = new SparkConf()
.setAppName("My awesome joined app")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.streaming.receiver.writeAheadLog.enable", "true")
.set("spark.streaming.receiver.writeAheadLog.closeFileAfterWrite", "true")
.set("spark.streaming.driver.writeAheadLog.closeFileAfterWrite", "true")
// Mongo config
.set("spark.mongodb.input.uri", "mongodb://mongo/urls.urls")
val session = SparkSession
.builder
.config(conf)
.getOrCreate()
val topic = "events"
val kafkaStream = session
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", topic)
.option("startingOffsets" , "latest")
.option("maxOffsetsPerTrigger", 10000000)
.option("failOnDataLoss", false)
.option("kafka.max.partition.fetch.bytes", 10485760)
.option("kafka.receive.buffer.bytes", 16000000)
.load()
val urlsDf = MongoSpark.load(session).toDF
import session.implicits._
stream
.selectExpr("CAST (value AS STRING)", " CAST (timestamp AS STRING)").as[(String, String)]
.withColumn("name", json_tuple('value, "name"))
.withColumn("url", json_tuple('value, "url"))
.withColumn("a_name", when(col("name") === "a", 1).otherwise(0))
.withColumn("b_name", when(col("name") === "b", 1).otherwise(0))
.withColumn("date", json_tuple('value, "date"))
// We don't care about reinjecting old data, fake watermarking
.withColumn("server_time", current_timestamp)
.withWatermark("server_time", "0 seconds")
.groupBy($"url", $"server_time")
.agg(
sum(s"a_name") as "a_name",
sum(s"b_name") as "b_name"
)
// Join with entities
.join(urlsDf, $"url" === $"_id", "left_outer")
.select(
"url",
"some_value_from_mongo", // from the joined stream
s"a_name",
s"b_name"
)
.coalesce(24)
.writeStream
.format("parquet")
.outputMode("append")
.option("path", "gs://my-custom-data")
.option("checkpointLocation", "/my-custom-data/checkpoints")
.trigger(Trigger.ProcessingTime(10 * 60 * 1000)) // 10 minutes
.start
.awaitTermination
}
}
问题已解决,将 dynamicAllocation 设置为 false。
为此,您可以在 spark-submit 中设置以下配置:
--conf "spark.dynamicAllocation.enabled=false" \
在执行程序存在之前,Yarn 或 Spark 没有删除不需要的状态似乎存在问题。 https://issues.apache.org/jira/browse/YARN-7070.
除此之外,加入一个 11gb 不断增长的 mongo 集合是一个糟糕的架构设计。随着集合的增加,将需要增加 spark 执行器。在我的例子中,由于工作人员之间的大量洗牌,这也会导致大量的 GC 时间。
我想加入一些来自基于 json 的 kafka 源的事件,其中 url 字段在 Mongodb 集合中具有相关数据。然后聚合它们,包括额外的 Mongodb 数据并将数据输出到 GCS 接收器。
当我 运行 我的结构化流式 spark 应用程序时,我的 spark 集群开始无限制地填充磁盘 space。我已将水印配置为 0 秒(因为我仅聚合来自当前处理批次的事件),因此最大状态应为 1 或 2 个批次。但是我有这个可用磁盘 space 图(杀死应用程序时稳定):
几乎所有填满我硬盘的数据都位于:
/hadoop/yarn/nm-local-dir/usercache/myuser/appcache/myapplication-id
如果我禁用 mongodb 加入,可用磁盘会随着时间的推移保持稳定,但我需要加入的数据。
我想加入的 mongodb 集合大约有 11 GB,我输入的 kafka 主题大约有 3k records/sec。
我的代码如下所示:
import com.mongodb.spark.MongoSpark
import org.apache.spark.sql._
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
object Main {
def main(args: Array[String]) {
val conf = new SparkConf()
.setAppName("My awesome joined app")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.streaming.receiver.writeAheadLog.enable", "true")
.set("spark.streaming.receiver.writeAheadLog.closeFileAfterWrite", "true")
.set("spark.streaming.driver.writeAheadLog.closeFileAfterWrite", "true")
// Mongo config
.set("spark.mongodb.input.uri", "mongodb://mongo/urls.urls")
val session = SparkSession
.builder
.config(conf)
.getOrCreate()
val topic = "events"
val kafkaStream = session
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", topic)
.option("startingOffsets" , "latest")
.option("maxOffsetsPerTrigger", 10000000)
.option("failOnDataLoss", false)
.option("kafka.max.partition.fetch.bytes", 10485760)
.option("kafka.receive.buffer.bytes", 16000000)
.load()
val urlsDf = MongoSpark.load(session).toDF
import session.implicits._
stream
.selectExpr("CAST (value AS STRING)", " CAST (timestamp AS STRING)").as[(String, String)]
.withColumn("name", json_tuple('value, "name"))
.withColumn("url", json_tuple('value, "url"))
.withColumn("a_name", when(col("name") === "a", 1).otherwise(0))
.withColumn("b_name", when(col("name") === "b", 1).otherwise(0))
.withColumn("date", json_tuple('value, "date"))
// We don't care about reinjecting old data, fake watermarking
.withColumn("server_time", current_timestamp)
.withWatermark("server_time", "0 seconds")
.groupBy($"url", $"server_time")
.agg(
sum(s"a_name") as "a_name",
sum(s"b_name") as "b_name"
)
// Join with entities
.join(urlsDf, $"url" === $"_id", "left_outer")
.select(
"url",
"some_value_from_mongo", // from the joined stream
s"a_name",
s"b_name"
)
.coalesce(24)
.writeStream
.format("parquet")
.outputMode("append")
.option("path", "gs://my-custom-data")
.option("checkpointLocation", "/my-custom-data/checkpoints")
.trigger(Trigger.ProcessingTime(10 * 60 * 1000)) // 10 minutes
.start
.awaitTermination
}
}
问题已解决,将 dynamicAllocation 设置为 false。
为此,您可以在 spark-submit 中设置以下配置:
--conf "spark.dynamicAllocation.enabled=false" \
在执行程序存在之前,Yarn 或 Spark 没有删除不需要的状态似乎存在问题。 https://issues.apache.org/jira/browse/YARN-7070.
除此之外,加入一个 11gb 不断增长的 mongo 集合是一个糟糕的架构设计。随着集合的增加,将需要增加 spark 执行器。在我的例子中,由于工作人员之间的大量洗牌,这也会导致大量的 GC 时间。