java.io.EOFException:在 java.io.DataInputStream.readInt(DataInputStream.java:392) ~[na:1.8.0_252] 处为空

java.io.EOFException: null at java.io.DataInputStream.readInt(DataInputStream.java:392) ~[na:1.8.0_252]

我有时会在 tomcat 日志中收到以下错误,这会阻止我的 java 进程生成文件(有关我的代码功能的更多信息在以下错误消息下方进行了说明)。我不确定为什么会这样。发生这种情况时,我在 ActiveMQ Web 控制台中看到有一条消息待处理。所以,为了解决这个问题,我不得不停止 tomcat 并再次重新启动它——这基本上重新部署了我的 java 进程 WAR 文件,然后该消息被消耗并且文件得到 generated.This 经常发生。下面的代码(尤其是 RequestDaoImpl.java 文件中写入文件生成逻辑的 sendMessage 方法中)有什么地方看起来不太好吗?请指教。谢谢!

 java.io.EOFException: null
            at java.io.DataInputStream.readInt(DataInputStream.java:392) ~[na:1.8.0_252]
            at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:268) ~[activemq-client-5.15.8.jar:5.15.8]
            at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:240) ~[activemq-client-5.15.8.jar:5.15.8]
            at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:232) ~[activemq-client-5.15.8.jar:5.15.8]
            at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215) ~[activemq-client-5.15.8.jar:5.15.8]
            at java.lang.Thread.run(Thread.java:748) [na:1.8.0_252]
    
    2020-07-07 09:24:14.406  WARN 3138 --- [0.1:61616@52927] o.a.a.t.failover.FailoverTransport       : Transport (tcp://localhost:61616) failed , attempting to automatically reconnect: {}
    
    java.io.EOFException: null
            at java.io.DataInputStream.readInt(DataInputStream.java:392) ~[na:1.8.0_252]
            at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:268) ~[activemq-client-5.15.8.jar:5.15.8]
            at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:240) ~[activemq-client-5.15.8.jar:5.15.8]
            at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:232) ~[activemq-client-5.15.8.jar:5.15.8]
            at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215) ~[activemq-client-5.15.8.jar:5.15.8]
            at java.lang.Thread.run(Thread.java:748) [na:1.8.0_252]
    
    2020-07-07 09:24:14.406  WARN 3138 --- [0.1:61616@52929] o.a.a.t.failover.FailoverTransport       : Transport (tcp://localhost:61616) failed , attempting to automatically reconnect: {}
    
    java.io.EOFException: null
            at java.io.DataInputStream.readInt(DataInputStream.java:392) ~[na:1.8.0_252]
            at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:268) ~[activemq-client-5.15.8.jar:5.15.8]
            at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:240) ~[activemq-client-5.15.8.jar:5.15.8]
            at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:232) ~[activemq-client-5.15.8.jar:5.15.8]
            at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215) ~[activemq-client-5.15.8.jar:5.15.8]
            at java.lang.Thread.run(Thread.java:748) [na:1.8.0_252]
    
    2020-07-07 09:24:14.407  WARN 3138 --- [0.1:61616@52932] o.a.a.t.failover.FailoverTransport       : Transport (tcp://localhost:61616) failed , attempting to automatically reconnect: {}

以下是我部署在 Tomcat 中的 Java 进程的代码,该进程持续监听 ActiveMQ,一旦看到 COMPLETE,它就会在生成文件。

 @Component
  public class DownloadConsumer {
    
    @Autowired
    private JavaMailSender javaMailSender;
    
    // one instance, reuse
    private final CloseableHttpClient httpClient = HttpClients.createDefault();
    
            
    
    // Working Code with JMS 2.0
    @JmsListener(destination = "MessageProducerJMSV1")
        public void processBrokerQueues(String message) throws DaoException {
            
        
         try {
            
            RequestDao requestDao = (RequestDao) context.getBean("requestDao");
            
            String receivedStatus = requestDao.getRequestStatus(message);
            
            
             
            //Retrieve Username from the message to include in an email
             String[] parts = message.split("#");
             String userName = parts[1].trim();
             
            //Retrieve personnelID from the message to include in the webservice calls
            
             String personnelID = parts[3].trim();
            
            
            
            
            //Before sending this message, do the check for COMPLETE or ERROR etc
            if(receivedStatus.equals("COMPLETE")) {
                
                
                
                String latestUUID = requestDao.getUUID();
                
                logger.info("Received UUID in Controller is as follows! ");
                logger.info(latestUUID);
                
                requestDao.sendMessage(message,latestUUID);
                logger.info("Received status is COMPLETE! ");
                logger.info("Sending email to the user! ");
                String emailMessage = "Dear "+userName+",<p>Your files are ready. </p><p> Thanks,<br/> Jack/p>";
                String recipientEmail = userName+"@organization.com";
                
                
                
                
                /*****************************************************\
                // START: EMAIL Related Code
                
                 *******************************************************/
                
                MimeMessage msg = javaMailSender.createMimeMessage();
                 MimeMessageHelper helper = new MimeMessageHelper(msg, true);
                 helper.setFrom("ABCResearch@organization.com");
                 helper.setTo(recipientEmail);
                 helper.setSubject("Requested Files !");
                 helper.setText(emailMessage,true);
                 
                 javaMailSender.send(msg);
                 
                
                    
                                
            }
            else {
                
                
                // Getting JMS connection from the server and starting it
                ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
                Connection connection = connectionFactory.createConnection();
                connection.start();
                
                Session session = connection.createSession(false,
                        Session.AUTO_ACKNOWLEDGE);
                
                // Destination represents here our queue 'MessageProducerJMSV1' on the  JMS server
                Destination destination = session.createQueue(subject);
                
                
                MessageProducer producer = session.createProducer(destination);
                
                //Sending message to the queue
                TextMessage toSendMessage = session.createTextMessage(message);
                
                long delay = 300 * 1000;
                
                toSendMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
                
                producer.send(toSendMessage);
                
                
                
                
            }
            
            }
            catch(Throwable th){
                th.printStackTrace();   
                
            }
            
         }
   // URL of the JMS server. DEFAULT_BROKER_URL will just mean that JMS server is on localhost
    private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
    private static String subject = "MessageProducerJMSV1"; //Queue Name
    // default broker URL is : tcp://localhost:61616"
    
    private static ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("ApplicationContext.xml");
    private static final Logger logger = LoggerFactory.getLogger(DownloadConsumer.class);
    
    

}

我的 RequestDao 界面如下所示:

public interface RequestDao {
    
    public void sendMessage(String msg,String uuid) throws DaoException;
    public String getRequestStatus(String msg)throws DaoException;
    public String getUUID() throws DaoException;

}

而 RequestDaoImpl.java 如下所示:

public class RequestDaoImpl implements RequestDao {
    
    public void setDataSource(DataSource dataSource) 
    {       
        jdbcTemplate = new JdbcTemplate(dataSource);                                    
    }
    
    //Method to get latest UUID once the status is COMPLETE
    @Override
    public String getUUID() throws DaoException {
        DataSource ds = null;
        Connection conn = null;
        PreparedStatement pstmt = null;
        ResultSet rs = null;
        String uuid = null;
        
         try {
               ds = jdbcTemplate.getDataSource();
               conn = ds.getConnection();   
               pstmt = conn.prepareStatement(getUUIDSQL);
               rs = pstmt.executeQuery();   
               logger.info("The UUID received is as follows:");
               if(rs.next()) {
                    uuid = rs.getString("UUID");
                    logger.info(uuid);
                }else {
                    logger.info("Cannot Retrieve UUID");
                }
                                
         }
         catch(Throwable th) {
                throw new DaoException(th.getMessage(), th);
            }
            finally {
                if (rs != null) { try { rs.close(); } catch (SQLException e) { e.printStackTrace(); }}
                if (pstmt != null) { try { pstmt.close(); } catch(SQLException sqe) { sqe.printStackTrace(); }}
                if (conn != null) { try { conn.close(); } catch (SQLException sqle) { sqle.printStackTrace(); }}
                
            }   
        
        return uuid;
    }
    
    @Override
    public String getRequestStatus(String msg) throws DaoException {
        DataSource ds = null;
        Connection conn = null;
        PreparedStatement pstmt = null;
        ResultSet rs = null;
        String requestStatus = null;
        
         try {
               
                ds = jdbcTemplate.getDataSource();
                conn = ds.getConnection();  
                
                
                 String[] parts =   msg.split("#");
                 String requestID = parts[0].trim();
                 String userName =  parts[1].trim();
                 String applicationName = parts[2].trim();
                 
                
                
                /*===========================================================================*/
                /*    Code to get the request status from the Request Log table              */ 
                /*===========================================================================*/
                pstmt = conn.prepareStatement(getRequestLogSQL);
                pstmt.setString(1,userName);
                pstmt.setString(2,applicationName);
                pstmt.setString(3, requestID);
                rs = pstmt.executeQuery();  
                
                logger.info("The status received is as follows:");
                
            
                if(rs.next()) {
                    
                    requestStatus = rs.getString("REQUEST_STATUS");
                    logger.info(requestStatus);
                    
                }else {
                    logger.info("Cannot Retrieve REQUEST_STATUS for user "+userName);
                }
                                
         }
         catch(Throwable th) {
                throw new DaoException(th.getMessage(), th);
            }
            finally {
                if (rs != null) { try { rs.close(); } catch (SQLException e) { e.printStackTrace(); }}
                if (pstmt != null) { try { pstmt.close(); } catch(SQLException sqe) { sqe.printStackTrace(); }}
                if (conn != null) { try { conn.close(); } catch (SQLException sqle) { sqle.printStackTrace(); }}
                
            }   
        
        return requestStatus;
    }

    @Override
    public void sendMessage(String msg, String uuid) throws DaoException {
        
        DataSource ds = null;
        Connection conn = null;
        PreparedStatement pstmt = null;
        PreparedStatement pstmtDept3 = null;
        PreparedStatement pstmtDept1 = null; 
        PreparedStatement pstmtDept2 = null; 
        
        ResultSet rs = null;
        ResultSet rsDemo = null;
        ResultSet rsFacts = null;
        ResultSet rsEncounters = null;
        
        
        logger.info("Getting message from ActiveMQ: "+msg+" and UUID :- "+uuid+" in daoImpl");
        
         String[] parts = msg.split("#");
         String requestID = parts[0].trim();
         String userName = parts[1].trim();
         String applicationName = parts[2].trim();
         String personnelID = parts[3].trim();
        
        logger.info("Request ID "+requestID);
        logger.info("User Name "+userName);
        logger.info("Application Name "+applicationName);
        logger.info("Personnel ID "+personnelID);
        
        
        
         try {
               
                ds = jdbcTemplate.getDataSource();
                conn = ds.getConnection();  
                            
                pstmtDept3 = conn.prepareStatement(getPersonDemographics);
                pstmtDept3.setString(1, requestID);
                rsDemo = pstmtDept3.executeQuery();
                
                pstmtDept1 = conn.prepareStatement(getPersonFacts);
                pstmtDept1.setString(1, requestID);
                rsFacts = pstmtDept1.executeQuery();
                
                pstmtDept2 = conn.prepareStatement(getPersonEncounters);
                pstmtDept2.setString(1, requestID);
                rsEncounters = pstmtDept2.executeQuery();
                        
                        
                 Path dir = Paths.get("/mnt/nfs/Data/dev/downloader/person_data_downloader", userName);
                 
                 Files.createDirectories(dir);
                 
                 
                 OutputStream fos = Files.newOutputStream(dir.resolve("mnt_nfs_Data_dev_downloader_person_data_downloader_"+ userName +"_"+ uuid +".zip"));
                 BufferedOutputStream bos = new BufferedOutputStream(fos);
                 ZipOutputStream zos = new ZipOutputStream(bos);
                                     
                Path file = dir.resolve("department1_person_downloader_" + uuid + ".csv");
                 Path filefacts = dir.resolve("department2_person_downloader_" + uuid + ".csv");
                 Path fileEncounters = dir.resolve("department3_person_downloader_" + uuid + ".csv");
                 
                 
                 
                 Map<String,ResultSet> dataMap = new HashMap<>();
                 dataMap.put(file.getFileName().toString(),rsDemo);
                 dataMap.put(filefacts.getFileName().toString(),rsFacts);
                 dataMap.put(fileEncounters.getFileName().toString(),rsEncounters);

                 for (Map.Entry<String,ResultSet> e : dataMap.entrySet()){
                   zos.putNextEntry(new ZipEntry(e.getKey()));
                   CSVWriter writer = new CSVWriter(new OutputStreamWriter(zos,StandardCharsets.UTF_8));
                   writer.writeAll(e.getValue(), true);
                   writer.flush();
                   zos.closeEntry();
                      
                 }
                 zos.close();
                             
                
                }
            catch(Throwable th) {
                throw new DaoException(th.getMessage(), th);
            }
            finally {
                if (rs != null) { try { rs.close(); } catch (SQLException e) { e.printStackTrace(); }}
                if (pstmt != null) { try { pstmt.close(); } catch(SQLException sqe) { sqe.printStackTrace(); }}
                if (pstmtDept3 != null) { try { pstmtDept3.close(); } catch(SQLException sqe1) { sqe1.printStackTrace(); }}
                if (pstmtDept1 != null) { try { pstmtDept1.close(); } catch(SQLException sqefacts) { sqefacts.printStackTrace(); }}
                if (pstmtDept2 != null) { try { pstmtDept2.close(); } catch(SQLException sqeencounters) { sqeencounters.printStackTrace(); }}
                if (rsDemo != null) { try { rsDemo.close(); } catch (SQLException ed) { ed.printStackTrace(); }}
                if (rsFacts != null) { try { rsFacts.close(); } catch (SQLException efacts) { efacts.printStackTrace(); }}
                if (rsEncounters != null) { try { rsEncounters.close(); } catch (SQLException eEncounters) { eEncounters.printStackTrace(); }}
                if (conn != null) { try { conn.close(); } catch (SQLException sqle) { sqle.printStackTrace(); }}
                
            }   
        
        
        
    }
    
    

    private String getRequestLogSQL =  Utilities.getFileSQL("sql"+
            File.separator+"getRequestLog.sql", logger);
    
    private String getPersonDemographics =  Utilities.getFileSQL("sql"+
            File.separator+"getPersonDemographics.sql", logger);
    
    private String getPersonFacts =  Utilities.getFileSQL("sql"+
            File.separator+"getPersonFacts.sql", logger);
    
    private String getPersonEncounters =  Utilities.getFileSQL("sql"+
            File.separator+"getPersonEncounters.sql", logger);
    
    private String getUUIDSQL =  Utilities.getFileSQL("sql"+
            File.separator+"getUUID.sql", logger);
    
    private JdbcTemplate jdbcTemplate;  
    
    long unixTimestamp = Instant.now().getEpochSecond();
    
    private static final Logger logger = LoggerFactory.getLogger(RequestDaoImpl.class);

    

    

}

在application.properties文件中,我定义了以下内容:

spring.activemq.broker-url=failover://tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=admin

编辑以添加更多关于我的东西如何工作的说明:

我在 RHEL tomcat 上部署了 2 个 spring 引导应用程序 server 1 并且我在 RHEL server 1.

上安装了一个 activemq

app1 向 activemq 发送消息,app2 使用来自 activemq 的消息。 app2 有时似乎在 tcp://localhost:61616 上有连接问题。

因此,如果我决定为回退代理使用第二个 URI,并且如果代理位于另一台服务器(RHEL 服务器 2)上,根据我上面解释的 2 spring 启动设置,有一个跌倒另一台服务器上的后台代理可能无法工作。

在这种情况下,最好的策略是什么?

注意: 此 post 中的代码用于 springboot app2

该错误表明客户端已失去与远程代理实例的连接。发生这种情况的原因有很多,例如中间的负载平衡器或某些严格的防火墙规则或只是某种一般的网络故障。客户端可以配置为使用 failover 传输在失去连接时自动重新连接到代理。