ThreadPoolTask​​Executor 中的 ConcurrentModificationException 错误

ConcurrentModificationException Error in ThreadPoolTaskExecutor

我使用这样的配置注释创建 ThreadPoolTask​​Executor

public class AsyncConfiguration implements AsyncConfigurer, EnvironmentAware {

    private final Logger log = LoggerFactory.getLogger(AsyncConfiguration.class);

    private RelaxedPropertyResolver propertyResolver;

    @Override
    public void setEnvironment(Environment environment) {
        this.propertyResolver = new RelaxedPropertyResolver(environment, "async.");
    }

    @Override
    @Bean
    public Executor getAsyncExecutor() {
        log.debug("Creating Async Task Executor");
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(propertyResolver.getProperty("corePoolSize", Integer.class, 30));
        executor.setMaxPoolSize(propertyResolver.getProperty("maxPoolSize", Integer.class, 150));
        executor.setQueueCapacity(propertyResolver.getProperty("queueCapacity", Integer.class, 10000));
        executor.setThreadNamePrefix("app-Executor-");
        return new ExceptionHandlingAsyncTaskExecutor(executor);
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new SimpleAsyncUncaughtExceptionHandler();
    }
}

我在这个class

中使用了ThreadPoolExecutor
@Component
public class TrapReceiver extends Thread implements CommandResponder {

    @Inject
    private ApplicationContext applicationContext;

    @Inject
    private Executor executor;

    public TrapReceiver(){
    }

    List<PDUv1> listPdu = new ArrayList<PDUv1>();

    @PostConstruct
    public void init() {
        this.start();
    }

    public synchronized void processPdu(CommandResponderEvent cmdRespEvent) {
        PDUv1 pdu = (PDUv1) cmdRespEvent.getPDU();
        listPdu.add(pdu);
        if (pdu != null) {
            if(listPdu.size() == 3){ //3 pdu per thread
                List<PDUv1> temp = new ArrayList<PDUv1>();
                temp.addAll(listPdu);
                TrapInsertor trapInsertor = (TrapInsertor) applicationContext.getBean("trapInsertor");
                trapInsertor.setProperty(temp);
                executor.execute(trapInsertor);
                listPdu.clear();
            }
        }
    }

这是我的帖子class

public class TrapInsertor implements Runnable {

    @Inject
    private TrapProcessorService trapProcessorService;

    private List<PDUv1> listPdu;

    public void setProperty(List<PDUv1> listPdu){
        this.listPdu = listPdu;
    }

    @Override
    public void run() {
        try{
            System.out.println(Thread.currentThread().getName()+" Start process "+listPdu.size()+" PDU");
            for(PDUv1 pdu : listPdu){
                String[] varBinding = pdu.getVariableBindings().toString().replace("[", "").replace("]", "").split(", ");
                trapProcessorService.processTrap(varBinding);
            }
            listPdu.clear();
        }catch (Exception e) {
            e.printStackTrace();
        }finally{
        }
    }
}

但有时我会遇到这样的错误

java.util.ConcurrentModificationException
    at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:859)
    at java.util.ArrayList$Itr.next(ArrayList.java:831)
    at app.snmp.test.TrapInsertor.run(TrapInsertor.java:37)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

也喜欢这个

org.springframework.dao.CannotAcquireLockException: could not execute statement; SQL [n/a]; nested exception is org.hibernate.exception.LockAcquisitionException: could not execute statement
    at org.springframework.orm.jpa.vendor.HibernateJpaDialect.convertHibernateAccessException(HibernateJpaDialect.java:239)
    at org.springframework.orm.jpa.vendor.HibernateJpaDialect.translateExceptionIfPossible(HibernateJpaDialect.java:214)
    at org.springframework.orm.jpa.JpaTransactionManager.doCommit(JpaTransactionManager.java:521)
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:757)
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:726)
    at org.springframework.transaction.interceptor.TransactionAspectSupport.commitTransactionAfterReturning(TransactionAspectSupport.java:497)
    at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:277)
    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:96)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:653)
    at app.snmp.test.service.TrapProcessorService$$EnhancerBySpringCGLIB$d040253.processTrap(<generated>)
    at app.snmp.test.TrapInsertor.run(TrapInsertor.java:39)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.hibernate.exception.LockAcquisitionException: could not execute statement
    at org.hibernate.dialect.MySQLDialect.convert(MySQLDialect.java:451)
    at org.hibernate.exception.internal.StandardSQLExceptionConverter.convert(StandardSQLExceptionConverter.java:49)
    at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:126)
    at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:112)
    at org.hibernate.engine.jdbc.internal.ResultSetReturnImpl.executeUpdate(ResultSetReturnImpl.java:211)
    at org.hibernate.engine.jdbc.batch.internal.NonBatchingBatch.addToBatch(NonBatchingBatch.java:62)
    at org.hibernate.persister.entity.AbstractEntityPersister.update(AbstractEntityPersister.java:3281)
    at org.hibernate.persister.entity.AbstractEntityPersister.updateOrInsert(AbstractEntityPersister.java:3183)
    at org.hibernate.persister.entity.AbstractEntityPersister.update(AbstractEntityPersister.java:3525)
    at org.hibernate.action.internal.EntityUpdateAction.execute(EntityUpdateAction.java:159)
    at org.hibernate.engine.spi.ActionQueue.executeActions(ActionQueue.java:463)
    at org.hibernate.engine.spi.ActionQueue.executeActions(ActionQueue.java:349)
    at org.hibernate.event.internal.AbstractFlushingEventListener.performExecutions(AbstractFlushingEventListener.java:350)
    at org.hibernate.event.internal.DefaultFlushEventListener.onFlush(DefaultFlushEventListener.java:56)
    at org.hibernate.internal.SessionImpl.flush(SessionImpl.java:1222)
    at org.hibernate.internal.SessionImpl.managedFlush(SessionImpl.java:425)
    at org.hibernate.engine.transaction.internal.jdbc.JdbcTransaction.beforeTransactionCommit(JdbcTransaction.java:101)
    at org.hibernate.engine.transaction.spi.AbstractTransactionImpl.commit(AbstractTransactionImpl.java:177)
    at org.hibernate.jpa.internal.TransactionImpl.commit(TransactionImpl.java:77)
    at org.springframework.orm.jpa.JpaTransactionManager.doCommit(JpaTransactionManager.java:517)
    ... 12 more
Caused by: com.mysql.jdbc.exceptions.jdbc4.MySQLTransactionRollbackException: Deadlock found when trying to get lock; try restarting transaction
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
    at com.mysql.jdbc.Util.handleNewInstance(Util.java:377)
    at com.mysql.jdbc.Util.getInstance(Util.java:360)
    at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:985)
    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3887)
    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3823)
    at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2435)
    at com.mysql.jdbc.ServerPreparedStatement.serverExecute(ServerPreparedStatement.java:1288)
    at com.mysql.jdbc.ServerPreparedStatement.executeInternal(ServerPreparedStatement.java:794)
    at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement.java:2141)
    at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement.java:2077)
    at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement.java:2062)
    at com.zaxxer.hikari.proxy.PreparedStatementProxy.executeUpdate(PreparedStatementProxy.java:61)
    at com.zaxxer.hikari.proxy.PreparedStatementJavassistProxy.executeUpdate(PreparedStatementJavassistProxy.java)
    at org.hibernate.engine.jdbc.internal.ResultSetReturnImpl.executeUpdate(ResultSetReturnImpl.java:208)
    ... 27 more

我的代码有什么问题?如果由 listPdu 集合引起的错误在线程之间共享? .........

通过在此配置中添加作用域原型解决了 ConcurrentModificationException 错误 class。原型意味着每次请求时都会创建新的 bean 实例。谢谢@Vladimir Sitnikov

@Configuration
public class TrapProcessorComponentConfiguration {

    @Bean
    @Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public TrapInsertor trapInsertor(){
        return new TrapInsertor();
    }
}

问题出在您的 spring 配置中。您可能会重复使用相同的 trapInsertor 实例,因此所有的例外情况。

您的 "trapInsertor" bean 是单例 bean 吗?我认为 bean 在 spring.

中默认是单例的

看看会发生什么:

  1. 您收集了 3 件物品
  2. 调用trapInsertor.setProperty(temp); // 说是arrayList1
  3. 调用 executor.execute(trapInsertor);,但 trapInsertor 尚未启动(线程池可能需要一段时间才能开始工作)
  4. 您又收集了 3 件物品
  5. 调用trapInsertor.setProperty(temp); // 说是arrayList2
  6. 致电executor.execute(trapInsertor);
  7. 现在#3 和#6 中的操作开始起作用了。他们可能会看到相同的 arrayList2.

所以你可以拥有:

  1. 数据丢失。基本上arrayList1根本就没有处理!
  2. 一个线程迭代列表 for(PDUv1 pdu : listPdu){(在 TrapInsertor 中),另一个线程执行 listPdu.clear()(在 TrapInsertor 中)。这导致 ConcurrentModificationException.
  3. 两个线程都可能向数据库写入数据,从而互相锁定,因此MySQLTransactionRollbackException: Deadlock found when trying to get lock;

我建议你应该重新创建你提交给执行者的任务。 换句话说,你需要像 scope="prototype" 这样的东西(见 How do I force a spring container not to return a singleton instance of a bean?

您可以制作正在处理的列表的防御性副本,只需将 TrapInsertor.setProperty() 修改为类似以下内容:

public void setProperty(List<PDUv1> listPdu){ this.listPdu = new ArrayList<PDUv1>(listPdu); }

这样,每个线程都将使用自己的列表实例,从而防止并发修改。