Spark JobEnd Listner 从 hdfs 路径移动源文件导致找不到文件异常
Spark JobEnd Listner moving source file from hdfs path causes file not found exception
火花版本:2.3
Spark 流应用程序正在从 hdfs 路径流式传输
Dataset<Row> lines = spark
.readStream()
.format("text")
.load("path");
经过数据的一些转换,对于一个文件,作业应该处于成功状态。
为作业结束添加了作业侦听器,它会在触发时移动文件。
@Override
public void onJobENd(SparkListenerJobEnd jobEnd) {
// Move source file to some other location which is finished processing.
}
文件已成功移动到另一个位置。但同时(确切的时间戳)spark throws following file not found exception.This 随机发生并且无法复制。但经常发生
即使特定作业已结束,spark 仍以某种方式引用该文件。
如何确保作业结束后文件未被 spark 引用并避免找不到此文件的问题
我可以在 :here
上找到
SparkListenerJobEnd
DAGScheduler does cleanUpAfterSchedulerStop, handleTaskCompletion, failJobAndIndependentStages, and markMapStageJobAsFinished.
- Same question with different approach
异常:
java.io.FileNotFoundException: File does not exist: <filename>
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:66)
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:56)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsUpdateTimes(FSNamesystem.java:1932)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1873)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1853)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1825)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:559)
at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getBlockLocations(AuthorizationProviderProxyClientProtocol.java:87)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:363)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1060)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2044)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2040)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2038)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
at org.apache.hadoop.ipc.RemoteExc
它是 spark 中的一个错误
https://issues.apache.org/jira/browse/SPARK-24364
邮件线程; http://mail-archives.apache.org/mod_mbox/spark-issues/201805.mbox/%3cJIRA.13161366.1527070555000.13725.1527070560256@Atlassian.JIRA%3e
修复:https://github.com/apache/spark/pull/21408/commits/c52d972e4ca09e0ede1bb9e60d3c07f80f605f88
固定版本:2.3.1/2.4.0
火花版本:2.3
Spark 流应用程序正在从 hdfs 路径流式传输
Dataset<Row> lines = spark .readStream() .format("text") .load("path");
经过数据的一些转换,对于一个文件,作业应该处于成功状态。
为作业结束添加了作业侦听器,它会在触发时移动文件。
@Override public void onJobENd(SparkListenerJobEnd jobEnd) { // Move source file to some other location which is finished processing. }
文件已成功移动到另一个位置。但同时(确切的时间戳)spark throws following file not found exception.This 随机发生并且无法复制。但经常发生
即使特定作业已结束,spark 仍以某种方式引用该文件。
如何确保作业结束后文件未被 spark 引用并避免找不到此文件的问题
我可以在 :here
上找到
SparkListenerJobEnd
DAGScheduler does cleanUpAfterSchedulerStop, handleTaskCompletion, failJobAndIndependentStages, and markMapStageJobAsFinished.
- Same question with different approach
异常:
java.io.FileNotFoundException: File does not exist: <filename>
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:66)
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:56)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsUpdateTimes(FSNamesystem.java:1932)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1873)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1853)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1825)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:559)
at org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getBlockLocations(AuthorizationProviderProxyClientProtocol.java:87)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:363)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1060)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2044)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2040)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2038)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
at org.apache.hadoop.ipc.RemoteExc
它是 spark 中的一个错误 https://issues.apache.org/jira/browse/SPARK-24364 邮件线程; http://mail-archives.apache.org/mod_mbox/spark-issues/201805.mbox/%3cJIRA.13161366.1527070555000.13725.1527070560256@Atlassian.JIRA%3e 修复:https://github.com/apache/spark/pull/21408/commits/c52d972e4ca09e0ede1bb9e60d3c07f80f605f88
固定版本:2.3.1/2.4.0