无法写入 EMR 中的 s3 获取 NPE
Cannot write to s3 inside EMR getting NPE
有人以前看过这个吗?可怕的 NPE,所以不知道是什么原因造成的。
正在尝试通过 EMR 集群中的 spark .write.mode(SaveMode.Overwrite).parquet
写入 s3。
Exception in thread "main" java.lang.NullPointerException
at com.amazon.ws.emr.hadoop.fs.s3.lite.S3Errors.isHttp200WithErrorCode(S3Errors.java:57)
at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:100)
at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:184)
at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.deleteObjects(AmazonS3LiteClient.java:127)
at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.deleteAll(Jets3tNativeFileSystemStore.java:364)
at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.doSingleThreadedBatchDelete(S3NativeFileSystem.java:1380)
at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.delete(S3NativeFileSystem.java:663)
at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.delete(EmrFileSystem.java:337)
at org.apache.spark.internal.io.FileCommitProtocol.deleteWithJob(FileCommitProtocol.scala:124)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.deleteMatchingPartitions(InsertIntoHadoopFsRelationCommand.scala:223)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:122)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery.apply(SparkPlan.scala:156)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:566)
...
我会要求您使用 s3n:/// 或 s3a:///。还要确保在你的 Spark Session 中你有这些配置
val spark = SparkSession.builder
.appName(appName)
.config("spark.hadoop.fs.s3a.multiobjectdelete.enable","false")
.config("spark.hadoop.fs.s3a.fast.upload","true")
.config("spark.sql.parquet.filterPushdown", "true")
.config("spark.sql.parquet.mergeSchema", "false")
.config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")
.config("spark.speculation", "false")
.config("fs.s3a.retry.throttle.limit","20")
.config("fs.s3a.retry.throttle.interval","500ms")
根据 AWS,这是一个已知错误并已在 EMR 5.28.0 中修复,因此只需升级 EMR。
有人以前看过这个吗?可怕的 NPE,所以不知道是什么原因造成的。
正在尝试通过 EMR 集群中的 spark .write.mode(SaveMode.Overwrite).parquet
写入 s3。
Exception in thread "main" java.lang.NullPointerException
at com.amazon.ws.emr.hadoop.fs.s3.lite.S3Errors.isHttp200WithErrorCode(S3Errors.java:57)
at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:100)
at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:184)
at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.deleteObjects(AmazonS3LiteClient.java:127)
at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.deleteAll(Jets3tNativeFileSystemStore.java:364)
at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.doSingleThreadedBatchDelete(S3NativeFileSystem.java:1380)
at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.delete(S3NativeFileSystem.java:663)
at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.delete(EmrFileSystem.java:337)
at org.apache.spark.internal.io.FileCommitProtocol.deleteWithJob(FileCommitProtocol.scala:124)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.deleteMatchingPartitions(InsertIntoHadoopFsRelationCommand.scala:223)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:122)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery.apply(SparkPlan.scala:156)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:566)
...
我会要求您使用 s3n:/// 或 s3a:///。还要确保在你的 Spark Session 中你有这些配置
val spark = SparkSession.builder
.appName(appName)
.config("spark.hadoop.fs.s3a.multiobjectdelete.enable","false")
.config("spark.hadoop.fs.s3a.fast.upload","true")
.config("spark.sql.parquet.filterPushdown", "true")
.config("spark.sql.parquet.mergeSchema", "false")
.config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")
.config("spark.speculation", "false")
.config("fs.s3a.retry.throttle.limit","20")
.config("fs.s3a.retry.throttle.interval","500ms")
根据 AWS,这是一个已知错误并已在 EMR 5.28.0 中修复,因此只需升级 EMR。