Spring 引导多线程异步不工作

Spring boot multithreaded async not working

任务是调用数据库,检索某些记录更新并保存它们。 如果记录量很大,我们想做这个异步,但是,这似乎没有正确实现。

主要class:

@SpringBootApplication
@EnableAsync
MainApplication() {

    @Bean("threadPoolExecutor")
    public TaskExecutor getAsyncExecutor(){
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(DataSourceConfig.getTHREAD_POOL_SIZE());
            executor.setMaxPoolSize(DataSourceConfig.getTHREAD_POOL_SIZE());
            executor.setWaitForTasksToCompleteOnShutdown(true);
            executor.setThreadNamePrefix("RetryEnhancement-");
            return executor;
        }
    }

第一个服务中的方法:

@Service
public class FirstService() {
    @Transactional
    public void fullProcess() {
        for(int counter = 0; counter < ConfigFile.getTHREADS(); counter++){
            secondaryService.threads();
         }
    }
}

第二个服务中的方法:

@Service
public class SecondService () {
    @Async("threadPoolExecutor")
    public void threads() {
        while(thirdService.threadMethod()) {
            //doNothing
        }
    }
}

第三个服务中的方法:

@Service
public class ThirdService() {
    @Transactional
    public boolean threads() {
        Record record = repository.fetchRecord();
        if(record!=null) {
            updateRecord(record);
            saveRecord(record);
            return true;
        } else {
            return false;
        }
    }
}

存储库:

public interface repository extends CrudRepository<Record, long> {
    @Lock(LockModeType.PESSIMISTIC_WRITE)
    Record fetchRecord();
}

我发现的问题是,虽然代码执行得非常好,但它似乎是同步执行的(通过添加 .sleep 并在记录器中观察执行发现)。 单独的线程似乎在等待另一个线程被执行。 我可能做错了什么,如果另一个线程已经解释了这个问题,请参考它,尽管我无法在不同的线程中找到这个问题。

您的解决方案太复杂了。放弃所有这些,只需注入 TaskExecutor 并在单独的线程中执行 updateRecord (您可能需要再次检索它,因为您现在使用的是不同的线程和连接。

像这样应该可以解决问题

private final TaskExecutor executor; // injected through constructor

public void process() {
  Stream<Record> records = repository.fetchRecords(); // Using a stream gives you a lazy cursor!
  records.forEach(this::processRecord);
}

private void processRecord(Record record) {
  executor.submit({
    updateRecord(record);
    saveRecord(record);
  });
}

您可能希望将 processRecord 放入另一个对象并使其成为 @Transactional 或将其包装在 TransactionTemplate 中以获得该行为。