spring 引导处理大请求正文 - 通过多线程处理批量
spring boot handling big request body - processing bulk through multithreading
我有一个 spring 引导控制器,它接收了许多像下面这样的用户
样本json
{
"users": [
{ "name":"john", "age":18, "type":"1"},
{ "name":"kim", , "age":18, "type":"2"},
{ "name":"Fits", "age":18, "type","3"},
]
}
请求处理程序
@RequestMapping(value = "/users", method = RequestMethod.POST, headers = "Accept=application/json")
public void Add(@RequestBody List<user> users) throws Exception {
// Here I am iterating users and writing one by one to different message topic based on the type
// if any error in the given user while writing to message topic I am storing that user in other DB
}
当我在用户列表中有大约 100 个用户时,它工作得很好,但如果列表很大,比如 1000 等,它会花费太多时间。
那么是否有任何 spring 批处理作业可以分配给它来执行此操作?
我只想 return http 响应代码 202 请求并将此负载分配给 spring 批处理作业
一个选项是在单独的线程中使用 Spring Async Task
长时间 运行ning 进程,因此不会等待执行整个请求并发回响应。
首先像这样配置异步任务。
@Configuration
@EnableAsync
public class AsynchTaskConfiguration{
@Bean
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(2);
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("ProcessUsers-");
executor.initialize();
return executor;
}
}
并且您可以在您的服务中使用异步任务来处理用户
@Service
public class UserProcessingService {
private final AnotherUserProcessingService service;
@Autowired
public UserProcessingService (AnotherUserProcessingService service) {
this.service= service;
}
@Async
public CompletableFuture<List<User>> processUser(List<User> users) throws InterruptedException {
users.forEach(user -> logger.info("Processing " + user));
List<User> usersListResult = service.process(users);
// Artificial delay of 1s for demonstration purposes
Thread.sleep(1000L);
return CompletableFuture.completedFuture(usersListResult);
}
}
processUser(User user)
带有 @Async
注释,表示该方法将 运行 根据上面提供的 taskExecutor
配置在单独的线程中。 @EnableAsync
启用 Spring
到 运行 后台线程中用 @Async
注释的任何方法。
并确保使用异步任务处理用户的服务必须在 @Configuration
class 中创建或由 @ComponentScan
获取。您可以根据需要自定义 taskExecutor
。
在这里您可以找到 ThreadPoolTaskExecutor 的工作原理。
https://github.com/softnrajkumar1994/multithreading-example
重要
1)而不是在单个请求中有 1000 个用户,请将这些用户列表作为小块发送
public class ThreadManager {
private static ThreadPoolExecutor stpe = null;
static {
/**
*
* corePoolSize --->the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
*
* maximumPoolSize --- >the maximum number of threads to allow in the
* pool
*
* keepAliveTime---> when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
*
* unit the time unit for the {@code keepAliveTime} argument
*
* workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
*/
stpe = new ThreadPoolExecutor(5, 10, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1090));
System.out.println("THREAD MANAGER INTIALIZED SUCCESSFULLY");
}
public static void execute(Runnable task) {
stpe.execute(task);
}
}
上面的class会收到可运行的任务,用线程池的空闲线程执行。
示例用户 class:
public class User {
private String name;
private String mobile;
private String email;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getMobile() {
return mobile;
}
public void setMobile(String mobile) {
this.mobile = mobile;
}
public String getEmail() {
return email;
}
public void setEmail(String email) {
this.email = email;
}
}
@RestController
public class UserController {
@PostMapping("/users")
public void Add(@RequestBody List<User> users) throws Exception {
/**
* Here we are rotating user's list and assigning each and every user into a
* separate worker thread, so that work will be done parallely
*/
for (User user : users) {
try {
ThreadManager.execute(new UserWork(user));
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
自定义 runnable worker class 用于处理用户 object.Do 您在 runnable 方法中的业务实现。
public class UserWork implements Runnable {
private User user;
public UserWork(User user) {
this.user = user;
}
@Override
public void run() {
// Please add your businees logic here
// Here I am iterating users and writing one by one to different message topic based on the type
// if any error in the given user while writing to message topic I am storing that user in other DB
}
}
我有一个 spring 引导控制器,它接收了许多像下面这样的用户
样本json
{
"users": [
{ "name":"john", "age":18, "type":"1"},
{ "name":"kim", , "age":18, "type":"2"},
{ "name":"Fits", "age":18, "type","3"},
]
}
请求处理程序
@RequestMapping(value = "/users", method = RequestMethod.POST, headers = "Accept=application/json")
public void Add(@RequestBody List<user> users) throws Exception {
// Here I am iterating users and writing one by one to different message topic based on the type
// if any error in the given user while writing to message topic I am storing that user in other DB
}
当我在用户列表中有大约 100 个用户时,它工作得很好,但如果列表很大,比如 1000 等,它会花费太多时间。 那么是否有任何 spring 批处理作业可以分配给它来执行此操作?
我只想 return http 响应代码 202 请求并将此负载分配给 spring 批处理作业
一个选项是在单独的线程中使用 Spring Async Task
长时间 运行ning 进程,因此不会等待执行整个请求并发回响应。
首先像这样配置异步任务。
@Configuration
@EnableAsync
public class AsynchTaskConfiguration{
@Bean
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(2);
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("ProcessUsers-");
executor.initialize();
return executor;
}
}
并且您可以在您的服务中使用异步任务来处理用户
@Service
public class UserProcessingService {
private final AnotherUserProcessingService service;
@Autowired
public UserProcessingService (AnotherUserProcessingService service) {
this.service= service;
}
@Async
public CompletableFuture<List<User>> processUser(List<User> users) throws InterruptedException {
users.forEach(user -> logger.info("Processing " + user));
List<User> usersListResult = service.process(users);
// Artificial delay of 1s for demonstration purposes
Thread.sleep(1000L);
return CompletableFuture.completedFuture(usersListResult);
}
}
processUser(User user)
带有 @Async
注释,表示该方法将 运行 根据上面提供的 taskExecutor
配置在单独的线程中。 @EnableAsync
启用 Spring
到 运行 后台线程中用 @Async
注释的任何方法。
并确保使用异步任务处理用户的服务必须在 @Configuration
class 中创建或由 @ComponentScan
获取。您可以根据需要自定义 taskExecutor
。
在这里您可以找到 ThreadPoolTaskExecutor 的工作原理。
https://github.com/softnrajkumar1994/multithreading-example
重要
1)而不是在单个请求中有 1000 个用户,请将这些用户列表作为小块发送
public class ThreadManager {
private static ThreadPoolExecutor stpe = null;
static {
/**
*
* corePoolSize --->the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
*
* maximumPoolSize --- >the maximum number of threads to allow in the
* pool
*
* keepAliveTime---> when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
*
* unit the time unit for the {@code keepAliveTime} argument
*
* workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
*/
stpe = new ThreadPoolExecutor(5, 10, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1090));
System.out.println("THREAD MANAGER INTIALIZED SUCCESSFULLY");
}
public static void execute(Runnable task) {
stpe.execute(task);
}
}
上面的class会收到可运行的任务,用线程池的空闲线程执行。
示例用户 class:
public class User {
private String name;
private String mobile;
private String email;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getMobile() {
return mobile;
}
public void setMobile(String mobile) {
this.mobile = mobile;
}
public String getEmail() {
return email;
}
public void setEmail(String email) {
this.email = email;
}
}
@RestController
public class UserController {
@PostMapping("/users")
public void Add(@RequestBody List<User> users) throws Exception {
/**
* Here we are rotating user's list and assigning each and every user into a
* separate worker thread, so that work will be done parallely
*/
for (User user : users) {
try {
ThreadManager.execute(new UserWork(user));
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
自定义 runnable worker class 用于处理用户 object.Do 您在 runnable 方法中的业务实现。
public class UserWork implements Runnable {
private User user;
public UserWork(User user) {
this.user = user;
}
@Override
public void run() {
// Please add your businees logic here
// Here I am iterating users and writing one by one to different message topic based on the type
// if any error in the given user while writing to message topic I am storing that user in other DB
}
}