Java 网络应用程序中的任务执行

Task execution in Java web application

我正在开发 Spring MVC 网络应用程序。它的功能之一是文件转换(上传文件 -> 转换 -> 存储在服务器上)。 有些文件可能太大而无法即时转换,所以我决定在上传后将它们放在共享队列中。 文件将根据上传时间优先转换,即先进先出。

我的想法是在上传后将任务添加到控制器中的队列中。 还会有服务执行队列中的所有任务,如果为空,则等待添加新任务。我不需要调度 - 当队列不为空时,任务应该始终执行。

我读过 ExecutorService,但没有找到适合我的例子。

如果有任何建议,我将不胜感激。

编辑 感谢您的回答,我需要澄清我的问题: 基本上,我知道如何执行任务,我需要管理任务队列。用户应该能够查看队列并暂停、恢复或从队列中删除任务。 我的任务class:

public class ConvertTask implements Callable<String> {

    private Converter converter;
    private File source;
    private File target;
    private State state;
    private User user;

    public ConvertTask(Converter converter, File source, File target, User user) {
        this.converter = converter;
        this.source = source;
        this.target = target;
        this.user = user;
        this.state = State.READY;
    }

    @Override
    public String call() throws Exception {
        if (this.state == State.READY) {
            BaseConverterService converterService = ConverterUtils.getConverterService(this.converter);
            converterService.convert(this.source, this.target);
            MailSendServiceUtil.send(user.getEmail(), target.getName());
            return "success";
        }
        return "task not ready";
    }
}

我还创建了 class 负责管理 queue/tasks 然后是您的建议:

@Component
public class MyExecutorService {
    private LinkedBlockingQueue<ConvertTask> converterQueue = new LinkedBlockingQueue<>();

    private ExecutorService executorService = Executors.newSingleThreadExecutor();

    public void add(ConvertTask task) throws InterruptedException {
        converterQueue.put(task);
    }

    public void execute() throws InterruptedException, ExecutionException {
        while (!converterQueue.isEmpty()) {
            ConvertTask task = converterQueue.peek();
            Future<String> statusFuture = executorService.submit(task);
            String status = statusFuture.get();
            converterQueue.take();
        }
    }
}

我的观点是,如果队列不为空,如何执行任务并在添加新任务且队列先前为空时恢复。我想到了一些适合 add(ConvertTask task) 方法的代码。

上传后您应该return立即回复。客户端不能等待资源太久。但是,您可以在客户端设置中更改它。无论如何,如果你是 运行 一个后台任务,你可以在不与客户端交互的情况下完成它,或者在执行过程中通知客户端。这是执行器服务使用的可调用演示示例

/**
 * Created by Roma on 17.02.2015.
 */
class SumTask implements Callable<Integer> {
  private int num = 0;
  public SumTask(int num){
    this.num = num;
  }
  @Override
  public Integer call() throws Exception {
    int result = 0;
    for(int i=1;i<=num;i++){
      result+=i;
    }
    return result;
  }
}

public class CallableDemo {

  Integer result;
  Integer num;

  public Integer getNumValue() {
    return 123;
  }
  public Integer getNum() {
    return num;
  }

  public void setNum(Integer num) {
    this.num = num;
  }

  public Integer getResult() {
    return result;
  }

  public void setResult(Integer result) {
    this.result = result;
  }

  ExecutorService service =  Executors.newSingleThreadExecutor();

  public String  execute() {
    try{

      Future<Integer> future = service.submit(new SumTask(num));
      result = future.get();
      //System.out.println(result);
      service.shutdown();
    }
    catch(Exception e)
    {
      e.printStackTrace();
    }

    return "showlinks";
  }
}

问题更新后编辑

您不需要为任务创建任何队列,因为 ThreadPoolExecutor 实现有自己的队列。这是 Oracle Java 8 实现 newSingleThreadExecutor() 方法的源代码:

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

所以你只要直接提交一个新任务,它就会被 ThreadPoolExecutor

排队
@Component
public class MyExecutorService {
    private ExecutorService executorService = Executors.newSingleThreadExecutor();

    public void add(ConvertTask task) throws InterruptedException {
        Future<String> statusFuture = executorService.submit(task);
    }
}

如果您担心队列的边界,您可以显式创建一个队列实例并将其提供给 ThreadPoolExecutor 构造函数。

private executorService = new ThreadPoolExecutor(1, 1, 
                 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(MAX_SIZE));

请注意我已经删除了行

String status = statusFuture.get();

因为 get() 调用正在阻塞。如果您在提交的同一线程中有此行,则您的代码不再是异步的。您应该存储 Future 对象并在不同的线程中异步检查它们。或者可以考虑使用Java 8中介绍的CompletableFuture。查看 this post.