spring 引导处理大请求正文 - 通过多线程处理批量

spring boot handling big request body - processing bulk through multithreading

我有一个 spring 引导控制器,它接收了许多像下面这样的用户

样本json

{
  "users": [
    { "name":"john", "age":18, "type":"1"},
    { "name":"kim", , "age":18, "type":"2"},
    { "name":"Fits", "age":18, "type","3"},
  ]
 }

请求处理程序

@RequestMapping(value = "/users", method = RequestMethod.POST, headers = "Accept=application/json")
public void Add(@RequestBody List<user> users) throws Exception {

 // Here I am iterating users and writing one by one to different message topic based on the type
 // if any error in the given user while writing to message topic I am storing that user in other DB


}

当我在用户列表中有大约 100 个用户时,它工作得很好,但如果列表很大,比如 1000 等,它会花费太多时间。 那么是否有任何 spring 批处理作业可以分配给它来执行此操作?

我只想 return http 响应代码 202 请求并将此负载分配给 spring 批处理作业

一个选项是在单独的线程中使用 Spring Async Task 长时间 运行ning 进程,因此不会等待执行整个请求并发回响应。

首先像这样配置异步任务。

@Configuration
@EnableAsync
public class AsynchTaskConfiguration{

    @Bean
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(2);
        executor.setMaxPoolSize(2);
        executor.setQueueCapacity(500);
        executor.setThreadNamePrefix("ProcessUsers-");
        executor.initialize();
        return executor;
    }
}

并且您可以在您的服务中使用异步任务来处理用户

@Service
public class UserProcessingService {


    private final AnotherUserProcessingService service;
    @Autowired
    public UserProcessingService (AnotherUserProcessingService service) {
        this.service= service;
    }

    @Async
    public CompletableFuture<List<User>> processUser(List<User> users) throws InterruptedException {

        users.forEach(user -> logger.info("Processing " + user));
        List<User> usersListResult = service.process(users);
        // Artificial delay of 1s for demonstration purposes
        Thread.sleep(1000L);
        return CompletableFuture.completedFuture(usersListResult);
    }

}

processUser(User user) 带有 @Async 注释,表示该方法将 运行 根据上面提供的 taskExecutor 配置在单独的线程中。 @EnableAsync 启用 Spring 到 运行 后台线程中用 @Async 注释的任何方法。 并确保使用异步任务处理用户的服务必须在 @Configuration class 中创建或由 @ComponentScan 获取。您可以根据需要自定义 taskExecutor

在这里您可以找到 ThreadPoolTaskExecutor 的工作原理。

https://github.com/softnrajkumar1994/multithreading-example

重要

1)而不是在单个请求中有 1000 个用户,请将这些用户列表作为小块发送

public class ThreadManager {

    private static ThreadPoolExecutor stpe = null;


    static {
         /**
         *
         *  corePoolSize --->the number of threads to keep in the pool, even
         *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
         *        
         *  maximumPoolSize --- >the maximum number of threads to allow in the
         *        pool
         *        
         *  keepAliveTime---> when the number of threads is greater than
         *        the core, this is the maximum time that excess idle threads
         *        will wait for new tasks before terminating.
         *        
         *  unit the time unit for the {@code keepAliveTime} argument
         *  
         *  workQueue the queue to use for holding tasks before they are
         *        executed.  This queue will hold only the {@code Runnable}
         *        tasks submitted by the {@code execute} method.

         */
        stpe = new ThreadPoolExecutor(5, 10, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1090));
        System.out.println("THREAD MANAGER INTIALIZED SUCCESSFULLY");
    }

    public static void execute(Runnable task) {
        stpe.execute(task);
    }
}

上面的class会收到可运行的任务,用线程池的空闲线程执行。

示例用户 class:

 public class User {

    private String name;
    private String mobile;
    private String email;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getMobile() {
        return mobile;
    }

    public void setMobile(String mobile) {
        this.mobile = mobile;
    }

    public String getEmail() {
        return email;
    }

    public void setEmail(String email) {
        this.email = email;
    }

}

@RestController
public class UserController {

    @PostMapping("/users")
    public void Add(@RequestBody List<User> users) throws Exception {
        /**
         * Here we are rotating user's list and assigning each and every user into a
         * separate worker thread, so that work will be done parallely
         */
        for (User user : users) {
            try {
                ThreadManager.execute(new UserWork(user));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

}

自定义 runnable worker class 用于处理用户 object.Do 您在 runnable 方法中的业务实现。

  public class UserWork implements Runnable {



    private User user;

    public UserWork(User user) {
        this.user = user;
    }

    @Override
    public void run() {
        // Please add your businees logic here
// Here I am iterating users and writing one by one to different message topic based on the type
 // if any error in the given user while writing to message topic I am storing that user in other DB
    }

}