Java 网络应用程序中的任务执行
Task execution in Java web application
我正在开发 Spring MVC 网络应用程序。它的功能之一是文件转换(上传文件 -> 转换 -> 存储在服务器上)。
有些文件可能太大而无法即时转换,所以我决定在上传后将它们放在共享队列中。
文件将根据上传时间优先转换,即先进先出。
我的想法是在上传后将任务添加到控制器中的队列中。
还会有服务执行队列中的所有任务,如果为空,则等待添加新任务。我不需要调度 - 当队列不为空时,任务应该始终执行。
我读过 ExecutorService
,但没有找到适合我的例子。
如果有任何建议,我将不胜感激。
编辑
感谢您的回答,我需要澄清我的问题:
基本上,我知道如何执行任务,我需要管理任务队列。用户应该能够查看队列并暂停、恢复或从队列中删除任务。
我的任务class:
public class ConvertTask implements Callable<String> {
private Converter converter;
private File source;
private File target;
private State state;
private User user;
public ConvertTask(Converter converter, File source, File target, User user) {
this.converter = converter;
this.source = source;
this.target = target;
this.user = user;
this.state = State.READY;
}
@Override
public String call() throws Exception {
if (this.state == State.READY) {
BaseConverterService converterService = ConverterUtils.getConverterService(this.converter);
converterService.convert(this.source, this.target);
MailSendServiceUtil.send(user.getEmail(), target.getName());
return "success";
}
return "task not ready";
}
}
我还创建了 class 负责管理 queue/tasks 然后是您的建议:
@Component
public class MyExecutorService {
private LinkedBlockingQueue<ConvertTask> converterQueue = new LinkedBlockingQueue<>();
private ExecutorService executorService = Executors.newSingleThreadExecutor();
public void add(ConvertTask task) throws InterruptedException {
converterQueue.put(task);
}
public void execute() throws InterruptedException, ExecutionException {
while (!converterQueue.isEmpty()) {
ConvertTask task = converterQueue.peek();
Future<String> statusFuture = executorService.submit(task);
String status = statusFuture.get();
converterQueue.take();
}
}
}
我的观点是,如果队列不为空,如何执行任务并在添加新任务且队列先前为空时恢复。我想到了一些适合 add(ConvertTask task)
方法的代码。
上传后您应该return立即回复。客户端不能等待资源太久。但是,您可以在客户端设置中更改它。无论如何,如果你是 运行 一个后台任务,你可以在不与客户端交互的情况下完成它,或者在执行过程中通知客户端。这是执行器服务使用的可调用演示示例
/**
* Created by Roma on 17.02.2015.
*/
class SumTask implements Callable<Integer> {
private int num = 0;
public SumTask(int num){
this.num = num;
}
@Override
public Integer call() throws Exception {
int result = 0;
for(int i=1;i<=num;i++){
result+=i;
}
return result;
}
}
public class CallableDemo {
Integer result;
Integer num;
public Integer getNumValue() {
return 123;
}
public Integer getNum() {
return num;
}
public void setNum(Integer num) {
this.num = num;
}
public Integer getResult() {
return result;
}
public void setResult(Integer result) {
this.result = result;
}
ExecutorService service = Executors.newSingleThreadExecutor();
public String execute() {
try{
Future<Integer> future = service.submit(new SumTask(num));
result = future.get();
//System.out.println(result);
service.shutdown();
}
catch(Exception e)
{
e.printStackTrace();
}
return "showlinks";
}
}
问题更新后编辑
您不需要为任务创建任何队列,因为 ThreadPoolExecutor
实现有自己的队列。这是 Oracle Java 8
实现 newSingleThreadExecutor()
方法的源代码:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
所以你只要直接提交一个新任务,它就会被 ThreadPoolExecutor
排队
@Component
public class MyExecutorService {
private ExecutorService executorService = Executors.newSingleThreadExecutor();
public void add(ConvertTask task) throws InterruptedException {
Future<String> statusFuture = executorService.submit(task);
}
}
如果您担心队列的边界,您可以显式创建一个队列实例并将其提供给 ThreadPoolExecutor
构造函数。
private executorService = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(MAX_SIZE));
请注意我已经删除了行
String status = statusFuture.get();
因为 get()
调用正在阻塞。如果您在提交的同一线程中有此行,则您的代码不再是异步的。您应该存储 Future
对象并在不同的线程中异步检查它们。或者可以考虑使用Java 8
中介绍的CompletableFuture
。查看 this post.
我正在开发 Spring MVC 网络应用程序。它的功能之一是文件转换(上传文件 -> 转换 -> 存储在服务器上)。 有些文件可能太大而无法即时转换,所以我决定在上传后将它们放在共享队列中。 文件将根据上传时间优先转换,即先进先出。
我的想法是在上传后将任务添加到控制器中的队列中。 还会有服务执行队列中的所有任务,如果为空,则等待添加新任务。我不需要调度 - 当队列不为空时,任务应该始终执行。
我读过 ExecutorService
,但没有找到适合我的例子。
如果有任何建议,我将不胜感激。
编辑 感谢您的回答,我需要澄清我的问题: 基本上,我知道如何执行任务,我需要管理任务队列。用户应该能够查看队列并暂停、恢复或从队列中删除任务。 我的任务class:
public class ConvertTask implements Callable<String> {
private Converter converter;
private File source;
private File target;
private State state;
private User user;
public ConvertTask(Converter converter, File source, File target, User user) {
this.converter = converter;
this.source = source;
this.target = target;
this.user = user;
this.state = State.READY;
}
@Override
public String call() throws Exception {
if (this.state == State.READY) {
BaseConverterService converterService = ConverterUtils.getConverterService(this.converter);
converterService.convert(this.source, this.target);
MailSendServiceUtil.send(user.getEmail(), target.getName());
return "success";
}
return "task not ready";
}
}
我还创建了 class 负责管理 queue/tasks 然后是您的建议:
@Component
public class MyExecutorService {
private LinkedBlockingQueue<ConvertTask> converterQueue = new LinkedBlockingQueue<>();
private ExecutorService executorService = Executors.newSingleThreadExecutor();
public void add(ConvertTask task) throws InterruptedException {
converterQueue.put(task);
}
public void execute() throws InterruptedException, ExecutionException {
while (!converterQueue.isEmpty()) {
ConvertTask task = converterQueue.peek();
Future<String> statusFuture = executorService.submit(task);
String status = statusFuture.get();
converterQueue.take();
}
}
}
我的观点是,如果队列不为空,如何执行任务并在添加新任务且队列先前为空时恢复。我想到了一些适合 add(ConvertTask task)
方法的代码。
上传后您应该return立即回复。客户端不能等待资源太久。但是,您可以在客户端设置中更改它。无论如何,如果你是 运行 一个后台任务,你可以在不与客户端交互的情况下完成它,或者在执行过程中通知客户端。这是执行器服务使用的可调用演示示例
/**
* Created by Roma on 17.02.2015.
*/
class SumTask implements Callable<Integer> {
private int num = 0;
public SumTask(int num){
this.num = num;
}
@Override
public Integer call() throws Exception {
int result = 0;
for(int i=1;i<=num;i++){
result+=i;
}
return result;
}
}
public class CallableDemo {
Integer result;
Integer num;
public Integer getNumValue() {
return 123;
}
public Integer getNum() {
return num;
}
public void setNum(Integer num) {
this.num = num;
}
public Integer getResult() {
return result;
}
public void setResult(Integer result) {
this.result = result;
}
ExecutorService service = Executors.newSingleThreadExecutor();
public String execute() {
try{
Future<Integer> future = service.submit(new SumTask(num));
result = future.get();
//System.out.println(result);
service.shutdown();
}
catch(Exception e)
{
e.printStackTrace();
}
return "showlinks";
}
}
问题更新后编辑
您不需要为任务创建任何队列,因为 ThreadPoolExecutor
实现有自己的队列。这是 Oracle Java 8
实现 newSingleThreadExecutor()
方法的源代码:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
所以你只要直接提交一个新任务,它就会被 ThreadPoolExecutor
@Component
public class MyExecutorService {
private ExecutorService executorService = Executors.newSingleThreadExecutor();
public void add(ConvertTask task) throws InterruptedException {
Future<String> statusFuture = executorService.submit(task);
}
}
如果您担心队列的边界,您可以显式创建一个队列实例并将其提供给 ThreadPoolExecutor
构造函数。
private executorService = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(MAX_SIZE));
请注意我已经删除了行
String status = statusFuture.get();
因为 get()
调用正在阻塞。如果您在提交的同一线程中有此行,则您的代码不再是异步的。您应该存储 Future
对象并在不同的线程中异步检查它们。或者可以考虑使用Java 8
中介绍的CompletableFuture
。查看 this post.