Spark DataFrame java.lang.OutOfMemoryError: GC overhead limit exceeded on long loop run
Spark DataFrame java.lang.OutOfMemoryError: GC overhead limit exceeded on long loop run
我正在 运行创建一个 Spark 应用程序(Spark 1.6.3 集群),它对 2 个小数据集进行一些计算,并将结果写入 S3 Parquet 文件。
这是我的代码:
public void doWork(JavaSparkContext sc, Date writeStartDate, Date writeEndDate, String[] extraArgs) throws Exception {
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
S3Client s3Client = new S3Client(ConfigTestingUtils.getBasicAWSCredentials());
boolean clearOutputBeforeSaving = false;
if (extraArgs != null && extraArgs.length > 0) {
if (extraArgs[0].equals("clearOutput")) {
clearOutputBeforeSaving = true;
} else {
logger.warn("Unknown param " + extraArgs[0]);
}
}
Date currRunDate = new Date(writeStartDate.getTime());
while (currRunDate.getTime() < writeEndDate.getTime()) {
try {
SparkReader<FirstData> sparkReader = new SparkReader<>(sc);
JavaRDD<FirstData> data1 = sparkReader.readDataPoints(
inputDir,
currRunDate,
getMinOfEndDateAndNextDay(currRunDate, writeEndDate));
// Normalize to 1 hours & 0.25 degrees
JavaRDD<FirstData> distinctData1 = data1.distinct();
// Floor all (distinct) values to 6 hour windows
JavaRDD<FirstData> basicData1BySixHours = distinctData1.map(d1 -> new FirstData(
d1.getId(),
TimeUtils.floorTimePerSixHourWindow(d1.getTimeStamp()),
d1.getLatitude(),
d1.getLongitude()));
// Convert Data1 to Dataframes
DataFrame data1DF = sqlContext.createDataFrame(basicData1BySixHours, FirstData.class);
data1DF.registerTempTable("data1");
// Read Data2 DataFrame
String currDateString = TimeUtils.getSimpleDailyStringFromDate(currRunDate);
String inputS3Path = basedirInput + "/dt=" + currDateString;
DataFrame data2DF = sqlContext.read().parquet(inputS3Path);
data2DF.registerTempTable("data2");
// Join data1 and data2
DataFrame mergedDataDF = sqlContext.sql("SELECT D1.Id,D2.beaufort,COUNT(1) AS hours " +
"FROM data1 as D1,data2 as D2 " +
"WHERE D1.latitude=D2.latitude AND D1.longitude=D2.longitude AND D1.timeStamp=D2.dataTimestamp " +
"GROUP BY D1.Id,D1.timeStamp,D1.longitude,D1.latitude,D2.beaufort");
// Create histogram per ID
JavaPairRDD<String, Iterable<Row>> mergedDataRows = mergedDataDF.toJavaRDD().groupBy(md -> md.getAs("Id"));
JavaRDD<MergedHistogram> mergedHistogram = mergedDataRows.map(new MergedHistogramCreator());
logger.info("Number of data1 results: " + data1DF.select("lId").distinct().count());
logger.info("Number of coordinates with data: " + data1DF.select("longitude","latitude").distinct().count());
logger.info("Number of results with beaufort histograms: " + mergedDataDF.select("Id").distinct().count());
// Save to parquet
String outputS3Path = basedirOutput + "/dt=" + TimeUtils.getSimpleDailyStringFromDate(currRunDate);
if (clearOutputBeforeSaving) {
writeWithCleanup(outputS3Path, mergedHistogram, MergedHistogram.class, sqlContext, s3Client);
} else {
write(outputS3Path, mergedHistogram, MergedHistogram.class, sqlContext);
}
} finally {
TimeUtils.progressToNextDay(currRunDate);
}
}
}
public void write(String outputS3Path, JavaRDD<MergedHistogram> outputRDD, Class outputClass, SQLContext sqlContext) {
// Apply a schema to an RDD of JavaBeans and save it as Parquet.
DataFrame fullDataDF = sqlContext.createDataFrame(outputRDD, outputClass);
fullDataDF.write().parquet(outputS3Path);
}
public void writeWithCleanup(String outputS3Path, JavaRDD<MergedHistogram> outputRDD, Class outputClass,
SQLContext sqlContext, S3Client s3Client) {
String fileKey = S3Utils.getS3Key(outputS3Path);
String bucket = S3Utils.getS3Bucket(outputS3Path);
logger.info("Deleting existing dir: " + outputS3Path);
s3Client.deleteAll(bucket, fileKey);
write(outputS3Path, outputRDD, outputClass, sqlContext);
}
public Date getMinOfEndDateAndNextDay(Date startTime, Date proposedEndTime) {
long endOfDay = startTime.getTime() - startTime.getTime() % MILLIS_PER_DAY + MILLIS_PER_DAY ;
if (endOfDay < proposedEndTime.getTime()) {
return new Date(endOfDay);
}
return proposedEndTime;
}
data1 的大小约为 150,000,data2 的大小约为 500,000。
我的代码所做的基本上是进行一些数据操作,合并 2 个数据对象,进行更多操作,打印一些统计数据并保存到 parquet。
Spark 每个服务器有 25GB 内存,代码 运行 没问题。
每次迭代大约需要 2-3 分钟。
当我 运行 它出现在大量日期时,问题就开始了。
过了一会儿,我得到一个 OutOfMemory:
java.lang.OutOfMemoryError: GC overhead limit exceeded
at scala.collection.immutable.List.$colon$colon$colon(List.scala:127)
at org.json4s.JsonDSL$JsonListAssoc.$tilde(JsonDSL.scala:98)
at org.apache.spark.util.JsonProtocol$.taskEndToJson(JsonProtocol.scala:139)
at org.apache.spark.util.JsonProtocol$.sparkEventToJson(JsonProtocol.scala:72)
at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:144)
at org.apache.spark.scheduler.EventLoggingListener.onTaskEnd(EventLoggingListener.scala:164)
at org.apache.spark.scheduler.SparkListenerBus$class.onPostEvent(SparkListenerBus.scala:42)
at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:55)
at org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:38)
at org.apache.spark.util.AsynchronousListenerBus$$anon$$anonfun$run$$anonfun$apply$mcV$sp.apply$mcV$sp(AsynchronousListenerBus.scala:87)
at org.apache.spark.util.AsynchronousListenerBus$$anon$$anonfun$run$$anonfun$apply$mcV$sp.apply(AsynchronousListenerBus.scala:72)
at org.apache.spark.util.AsynchronousListenerBus$$anon$$anonfun$run$$anonfun$apply$mcV$sp.apply(AsynchronousListenerBus.scala:72)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.util.AsynchronousListenerBus$$anon$$anonfun$run.apply$mcV$sp(AsynchronousListenerBus.scala:71)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1181)
at org.apache.spark.util.AsynchronousListenerBus$$anon.run(AsynchronousListenerBus.scala:70)
上次 运行,它在 233 次迭代后崩溃了。
它崩溃的行是这样的:
logger.info("Number of coordinates with data: " + data1DF.select("longitude","latitude").distinct().count());
谁能告诉我最终崩溃的原因是什么?
当GC占用进程总执行时间的98%以上时,就会出现该错误。您可以通过转到 http://master:4040.
中的阶段选项卡来监控 Spark Web UI 中的 GC 时间
尝试在提交 spark 应用程序时使用 spark.{driver/executor}.memory 增加 driver/executor(以产生此错误为准)内存。
要尝试的另一件事是更改 java 正在使用的垃圾收集器。阅读这篇文章:https://databricks.com/blog/2015/05/28/tuning-java-garbage-collection-for-spark-applications.html。它非常清楚地解释了为什么会出现 GC 开销错误以及哪种垃圾收集器最适合您的应用程序。
我不确定每个人都会觉得这个解决方案可行,但是将 Spark 集群升级到 2.2.0 似乎已经解决了这个问题。
我已经 运行 我的应用程序好几天了,还没有崩溃。
我正在 运行创建一个 Spark 应用程序(Spark 1.6.3 集群),它对 2 个小数据集进行一些计算,并将结果写入 S3 Parquet 文件。
这是我的代码:
public void doWork(JavaSparkContext sc, Date writeStartDate, Date writeEndDate, String[] extraArgs) throws Exception {
SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);
S3Client s3Client = new S3Client(ConfigTestingUtils.getBasicAWSCredentials());
boolean clearOutputBeforeSaving = false;
if (extraArgs != null && extraArgs.length > 0) {
if (extraArgs[0].equals("clearOutput")) {
clearOutputBeforeSaving = true;
} else {
logger.warn("Unknown param " + extraArgs[0]);
}
}
Date currRunDate = new Date(writeStartDate.getTime());
while (currRunDate.getTime() < writeEndDate.getTime()) {
try {
SparkReader<FirstData> sparkReader = new SparkReader<>(sc);
JavaRDD<FirstData> data1 = sparkReader.readDataPoints(
inputDir,
currRunDate,
getMinOfEndDateAndNextDay(currRunDate, writeEndDate));
// Normalize to 1 hours & 0.25 degrees
JavaRDD<FirstData> distinctData1 = data1.distinct();
// Floor all (distinct) values to 6 hour windows
JavaRDD<FirstData> basicData1BySixHours = distinctData1.map(d1 -> new FirstData(
d1.getId(),
TimeUtils.floorTimePerSixHourWindow(d1.getTimeStamp()),
d1.getLatitude(),
d1.getLongitude()));
// Convert Data1 to Dataframes
DataFrame data1DF = sqlContext.createDataFrame(basicData1BySixHours, FirstData.class);
data1DF.registerTempTable("data1");
// Read Data2 DataFrame
String currDateString = TimeUtils.getSimpleDailyStringFromDate(currRunDate);
String inputS3Path = basedirInput + "/dt=" + currDateString;
DataFrame data2DF = sqlContext.read().parquet(inputS3Path);
data2DF.registerTempTable("data2");
// Join data1 and data2
DataFrame mergedDataDF = sqlContext.sql("SELECT D1.Id,D2.beaufort,COUNT(1) AS hours " +
"FROM data1 as D1,data2 as D2 " +
"WHERE D1.latitude=D2.latitude AND D1.longitude=D2.longitude AND D1.timeStamp=D2.dataTimestamp " +
"GROUP BY D1.Id,D1.timeStamp,D1.longitude,D1.latitude,D2.beaufort");
// Create histogram per ID
JavaPairRDD<String, Iterable<Row>> mergedDataRows = mergedDataDF.toJavaRDD().groupBy(md -> md.getAs("Id"));
JavaRDD<MergedHistogram> mergedHistogram = mergedDataRows.map(new MergedHistogramCreator());
logger.info("Number of data1 results: " + data1DF.select("lId").distinct().count());
logger.info("Number of coordinates with data: " + data1DF.select("longitude","latitude").distinct().count());
logger.info("Number of results with beaufort histograms: " + mergedDataDF.select("Id").distinct().count());
// Save to parquet
String outputS3Path = basedirOutput + "/dt=" + TimeUtils.getSimpleDailyStringFromDate(currRunDate);
if (clearOutputBeforeSaving) {
writeWithCleanup(outputS3Path, mergedHistogram, MergedHistogram.class, sqlContext, s3Client);
} else {
write(outputS3Path, mergedHistogram, MergedHistogram.class, sqlContext);
}
} finally {
TimeUtils.progressToNextDay(currRunDate);
}
}
}
public void write(String outputS3Path, JavaRDD<MergedHistogram> outputRDD, Class outputClass, SQLContext sqlContext) {
// Apply a schema to an RDD of JavaBeans and save it as Parquet.
DataFrame fullDataDF = sqlContext.createDataFrame(outputRDD, outputClass);
fullDataDF.write().parquet(outputS3Path);
}
public void writeWithCleanup(String outputS3Path, JavaRDD<MergedHistogram> outputRDD, Class outputClass,
SQLContext sqlContext, S3Client s3Client) {
String fileKey = S3Utils.getS3Key(outputS3Path);
String bucket = S3Utils.getS3Bucket(outputS3Path);
logger.info("Deleting existing dir: " + outputS3Path);
s3Client.deleteAll(bucket, fileKey);
write(outputS3Path, outputRDD, outputClass, sqlContext);
}
public Date getMinOfEndDateAndNextDay(Date startTime, Date proposedEndTime) {
long endOfDay = startTime.getTime() - startTime.getTime() % MILLIS_PER_DAY + MILLIS_PER_DAY ;
if (endOfDay < proposedEndTime.getTime()) {
return new Date(endOfDay);
}
return proposedEndTime;
}
data1 的大小约为 150,000,data2 的大小约为 500,000。
我的代码所做的基本上是进行一些数据操作,合并 2 个数据对象,进行更多操作,打印一些统计数据并保存到 parquet。
Spark 每个服务器有 25GB 内存,代码 运行 没问题。 每次迭代大约需要 2-3 分钟。
当我 运行 它出现在大量日期时,问题就开始了。
过了一会儿,我得到一个 OutOfMemory:
java.lang.OutOfMemoryError: GC overhead limit exceeded
at scala.collection.immutable.List.$colon$colon$colon(List.scala:127)
at org.json4s.JsonDSL$JsonListAssoc.$tilde(JsonDSL.scala:98)
at org.apache.spark.util.JsonProtocol$.taskEndToJson(JsonProtocol.scala:139)
at org.apache.spark.util.JsonProtocol$.sparkEventToJson(JsonProtocol.scala:72)
at org.apache.spark.scheduler.EventLoggingListener.logEvent(EventLoggingListener.scala:144)
at org.apache.spark.scheduler.EventLoggingListener.onTaskEnd(EventLoggingListener.scala:164)
at org.apache.spark.scheduler.SparkListenerBus$class.onPostEvent(SparkListenerBus.scala:42)
at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
at org.apache.spark.scheduler.LiveListenerBus.onPostEvent(LiveListenerBus.scala:31)
at org.apache.spark.util.ListenerBus$class.postToAll(ListenerBus.scala:55)
at org.apache.spark.util.AsynchronousListenerBus.postToAll(AsynchronousListenerBus.scala:38)
at org.apache.spark.util.AsynchronousListenerBus$$anon$$anonfun$run$$anonfun$apply$mcV$sp.apply$mcV$sp(AsynchronousListenerBus.scala:87)
at org.apache.spark.util.AsynchronousListenerBus$$anon$$anonfun$run$$anonfun$apply$mcV$sp.apply(AsynchronousListenerBus.scala:72)
at org.apache.spark.util.AsynchronousListenerBus$$anon$$anonfun$run$$anonfun$apply$mcV$sp.apply(AsynchronousListenerBus.scala:72)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.util.AsynchronousListenerBus$$anon$$anonfun$run.apply$mcV$sp(AsynchronousListenerBus.scala:71)
at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1181)
at org.apache.spark.util.AsynchronousListenerBus$$anon.run(AsynchronousListenerBus.scala:70)
上次 运行,它在 233 次迭代后崩溃了。
它崩溃的行是这样的:
logger.info("Number of coordinates with data: " + data1DF.select("longitude","latitude").distinct().count());
谁能告诉我最终崩溃的原因是什么?
当GC占用进程总执行时间的98%以上时,就会出现该错误。您可以通过转到 http://master:4040.
中的阶段选项卡来监控 Spark Web UI 中的 GC 时间尝试在提交 spark 应用程序时使用 spark.{driver/executor}.memory 增加 driver/executor(以产生此错误为准)内存。
要尝试的另一件事是更改 java 正在使用的垃圾收集器。阅读这篇文章:https://databricks.com/blog/2015/05/28/tuning-java-garbage-collection-for-spark-applications.html。它非常清楚地解释了为什么会出现 GC 开销错误以及哪种垃圾收集器最适合您的应用程序。
我不确定每个人都会觉得这个解决方案可行,但是将 Spark 集群升级到 2.2.0 似乎已经解决了这个问题。
我已经 运行 我的应用程序好几天了,还没有崩溃。