Spring 引导多线程异步不工作
Spring boot multithreaded async not working
任务是调用数据库,检索某些记录更新并保存它们。
如果记录量很大,我们想做这个异步,但是,这似乎没有正确实现。
主要class:
@SpringBootApplication
@EnableAsync
MainApplication() {
@Bean("threadPoolExecutor")
public TaskExecutor getAsyncExecutor(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(DataSourceConfig.getTHREAD_POOL_SIZE());
executor.setMaxPoolSize(DataSourceConfig.getTHREAD_POOL_SIZE());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setThreadNamePrefix("RetryEnhancement-");
return executor;
}
}
第一个服务中的方法:
@Service
public class FirstService() {
@Transactional
public void fullProcess() {
for(int counter = 0; counter < ConfigFile.getTHREADS(); counter++){
secondaryService.threads();
}
}
}
第二个服务中的方法:
@Service
public class SecondService () {
@Async("threadPoolExecutor")
public void threads() {
while(thirdService.threadMethod()) {
//doNothing
}
}
}
第三个服务中的方法:
@Service
public class ThirdService() {
@Transactional
public boolean threads() {
Record record = repository.fetchRecord();
if(record!=null) {
updateRecord(record);
saveRecord(record);
return true;
} else {
return false;
}
}
}
存储库:
public interface repository extends CrudRepository<Record, long> {
@Lock(LockModeType.PESSIMISTIC_WRITE)
Record fetchRecord();
}
我发现的问题是,虽然代码执行得非常好,但它似乎是同步执行的(通过添加 .sleep 并在记录器中观察执行发现)。
单独的线程似乎在等待另一个线程被执行。
我可能做错了什么,如果另一个线程已经解释了这个问题,请参考它,尽管我无法在不同的线程中找到这个问题。
您的解决方案太复杂了。放弃所有这些,只需注入 TaskExecutor
并在单独的线程中执行 updateRecord
(您可能需要再次检索它,因为您现在使用的是不同的线程和连接。
像这样应该可以解决问题
private final TaskExecutor executor; // injected through constructor
public void process() {
Stream<Record> records = repository.fetchRecords(); // Using a stream gives you a lazy cursor!
records.forEach(this::processRecord);
}
private void processRecord(Record record) {
executor.submit({
updateRecord(record);
saveRecord(record);
});
}
您可能希望将 processRecord
放入另一个对象并使其成为 @Transactional
或将其包装在 TransactionTemplate
中以获得该行为。
任务是调用数据库,检索某些记录更新并保存它们。 如果记录量很大,我们想做这个异步,但是,这似乎没有正确实现。
主要class:
@SpringBootApplication
@EnableAsync
MainApplication() {
@Bean("threadPoolExecutor")
public TaskExecutor getAsyncExecutor(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(DataSourceConfig.getTHREAD_POOL_SIZE());
executor.setMaxPoolSize(DataSourceConfig.getTHREAD_POOL_SIZE());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setThreadNamePrefix("RetryEnhancement-");
return executor;
}
}
第一个服务中的方法:
@Service
public class FirstService() {
@Transactional
public void fullProcess() {
for(int counter = 0; counter < ConfigFile.getTHREADS(); counter++){
secondaryService.threads();
}
}
}
第二个服务中的方法:
@Service
public class SecondService () {
@Async("threadPoolExecutor")
public void threads() {
while(thirdService.threadMethod()) {
//doNothing
}
}
}
第三个服务中的方法:
@Service
public class ThirdService() {
@Transactional
public boolean threads() {
Record record = repository.fetchRecord();
if(record!=null) {
updateRecord(record);
saveRecord(record);
return true;
} else {
return false;
}
}
}
存储库:
public interface repository extends CrudRepository<Record, long> {
@Lock(LockModeType.PESSIMISTIC_WRITE)
Record fetchRecord();
}
我发现的问题是,虽然代码执行得非常好,但它似乎是同步执行的(通过添加 .sleep 并在记录器中观察执行发现)。 单独的线程似乎在等待另一个线程被执行。 我可能做错了什么,如果另一个线程已经解释了这个问题,请参考它,尽管我无法在不同的线程中找到这个问题。
您的解决方案太复杂了。放弃所有这些,只需注入 TaskExecutor
并在单独的线程中执行 updateRecord
(您可能需要再次检索它,因为您现在使用的是不同的线程和连接。
像这样应该可以解决问题
private final TaskExecutor executor; // injected through constructor
public void process() {
Stream<Record> records = repository.fetchRecords(); // Using a stream gives you a lazy cursor!
records.forEach(this::processRecord);
}
private void processRecord(Record record) {
executor.submit({
updateRecord(record);
saveRecord(record);
});
}
您可能希望将 processRecord
放入另一个对象并使其成为 @Transactional
或将其包装在 TransactionTemplate
中以获得该行为。