Spark Error: I/O error constructing remote block reader. java.nio.channels.ClosedByInterruptException at java.nio.channels.ClosedByInterruptException

Spark Error: I/O error constructing remote block reader. java.nio.channels.ClosedByInterruptException at java.nio.channels.ClosedByInterruptException

在单元测试中本地执行正常,但当 Spark Streaming 执行传播到真正的集群执行器时失败,就像它们无声地崩溃并且不再可用于上下文:

stream execution thread for kafkaDataGeneratorInactiveESP_02/Distance [id = 438f45a0-acd6-4729-953f-5a18ae208f1f, runId = a98c6d39-fe14-4ed5-b7fe-7e4009de51b2]] impl.BlockReaderFactory (BlockReaderFactory.java:getRemoteBlockReaderFromTcp(765)) - I/O error constructing remote block reader.
java.nio.channels.ClosedByInterruptException
        at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:656)
        at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:192)
        at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:533)
        at org.apache.hadoop.hdfs.DFSClient.newConnectedPeer(DFSClient.java:2940)
        at org.apache.hadoop.hdfs.client.impl.BlockReaderFactory.nextTcpPeer(BlockReaderFactory.java:822)
        at org.apache.hadoop.hdfs.client.impl.BlockReaderFactory.getRemoteBlockReaderFromTcp(BlockReaderFactory.java:747)
        at org.apache.hadoop.hdfs.client.impl.BlockReaderFactory.build(BlockReaderFactory.java:380)
        at org.apache.hadoop.hdfs.DFSInputStream.getBlockReader(DFSInputStream.java:644)
        at org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:575)
        at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:757)
        at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:829)
        at java.io.DataInputStream.read(DataInputStream.java:149)
        at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
        at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
        at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
        at java.io.InputStreamReader.read(InputStreamReader.java:184)
        at java.io.BufferedReader.fill(BufferedReader.java:161)
        at java.io.BufferedReader.readLine(BufferedReader.java:324)
        at java.io.BufferedReader.readLine(BufferedReader.java:389)
        at scala.io.BufferedSource$BufferedLineIterator.hasNext(BufferedSource.scala:74)
        at org.apache.spark.sql.execution.streaming.CommitLog.deserialize(CommitLog.scala:56)
        at org.apache.spark.sql.execution.streaming.CommitLog.deserialize(CommitLog.scala:48)
        at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.get(HDFSMetadataLog.scala:153)
        at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.$anonfun$getLatest(HDFSMetadataLog.scala:190)
        at scala.runtime.java8.JFunction1$mcVJ$sp.apply(JFunction1$mcVJ$sp.java:23)
        at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
        at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofLong.foreach(ArrayOps.scala:258)
        at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.getLatest(HDFSMetadataLog.scala:189)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.populateStartOffsets(MicroBatchExecution.scala:300)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream(MicroBatchExecution.scala:194)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:352)
        at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:350)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream(MicroBatchExecution.scala:191)
        at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:185)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:334)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon.run(StreamExecution.scala:245)

我尝试的第一件事是更改查询名称,使用斜杠和 space:kafkaDataGeneratorInactiveESP_02/Distance

中将其重新调整为正确的
.queryName("kafkaDataGeneratorInactive" + currentIter.metadata.getString("label"))

100% 证明字符串中有 / 或 space,错误没有消失。

失败的原因其实是query name和checkpoint location path重名了(但不是第一次尝试改进的部分)。后来又发现了一个错误日志:

2021-12-01 15:05:46,906 WARN  [main] streaming.StreamingQueryManager (Logging.scala:logWarning(69)) - Stopping existing streaming query [id=b13a69d7-5a2f-461e-91a7-a9138c4aa716, runId=9cb31852-d276-42d8-ade6-9839fa97f85c], as a new run is being started.

为什么查询停止了?那是因为在 Scala 中,我在循环中创建流式查询,迭代集合,同时保持所有查询名称和所有检查点名称相同。使它们唯一后(我只是使用集合中的字符串值),失败问题就消失了。