如果取消时 runnable 正在进行中,如何取消 ScheduledFuture 并等待 runnable 停止?

How to cancel ShceduledFuture and wait for runnable to stop, if runnable is in progress at the moment of cancellation?

当任何命令在任何 ScheduledExecutorService 以固定速率调度时,它 returns ScheduledFuture 也可以被取消。 但是 "cancel" 不保证命令在取消 returns 后仍未执行,例如因为调用 "cancell" 时命令已经在执行中。

对于大多数用例来说,它的功能就足够了。但是我已经处理了在取消后需要阻塞当前线程的用例,如果命令已经在进行中,并且等到命令完成。换句话说,如果命令仍在执行,则调用取消的线程不应继续执行。用 mayInterruptIfRunning=true 取消也不合适,因为我不想破坏当前的执行,我只需要等待正常完成。

我没有找到如何通过标准 JDK 类 实现此要求。 问题1:我错了,这种功能存在吗?

于是决定自己实现: 导入 java.util.concurrent.*;

public class GracefullyStoppingScheduledFutureDecorator implements ScheduledFuture {

/**
 * @return the scheduled future with method special implementation of "cancel" method, 
 * which in additional to standard implementation, 
 * provides strongly guarantee that command is not in the middle of progress when "cancel" returns  
 */
public static ScheduledFuture schedule(Runnable command, long initialDelay, long period, TimeUnit unit, ScheduledExecutorService scheduler) {
    CancellableCommand cancellableCommand = new CancellableCommand(command);
    ScheduledFuture future = scheduler.scheduleAtFixedRate(cancellableCommand, initialDelay, period, unit);
    return new GracefullyStoppingScheduledFutureDecorator(future, cancellableCommand);
}

private GracefullyStoppingScheduledFutureDecorator(ScheduledFuture targetFuture, CancellableCommand command) {
    this.targetFuture = targetFuture;
    this.runnable = command;
}

private final ScheduledFuture targetFuture;
private final CancellableCommand runnable;

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
    runnable.cancel();
    return targetFuture.cancel(mayInterruptIfRunning);
}

@Override
public long getDelay(TimeUnit unit) {
    return targetFuture.getDelay(unit);
}

@Override
public int compareTo(Delayed o) {
    return targetFuture.compareTo(o);
}

@Override
public boolean isCancelled() {
    return targetFuture.isCancelled();
}

@Override
public boolean isDone() {
    return targetFuture.isDone();
}

@Override
public Object get() throws InterruptedException, ExecutionException {
    return targetFuture.get();
}

@Override
public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
    return targetFuture.get(timeout, unit);
}

private static class CancellableCommand implements Runnable {

    private final Object monitor = new Object();
    private final Runnable target;
    private boolean cancelled = false;

    private CancellableCommand(Runnable target) {
        this.target = target;
    }

        public void cancel() {
            synchronized (monitor) {
                cancelled = true;
            }
        }

        @Override
        public void run() {
            synchronized (monitor) {
                if (!cancelled) {
                    target.run();
                }
            }
        }

    }

}

问题2:有人能找出上面代码中的错误吗?

Question2: Could anybody find errors in the code above?

存在假设的死锁,可以通过以下场景重现:

  1. 有持有监视器 M1 的线程 T1
  2. 计划任务正在线程T2上执行(持有其监视器M2)并想进入M1,因此T2需要等待直到T1退出监视器M1。
  3. T1 决定取消任务,但由于它的监视器 M2 被任务本身锁定,我们陷入了死锁。

最有可能的情况 abovr 是不真实的,但为了避免所有可能的情况,我决定以无锁方式重写代码:

public class GracefullyStoppingScheduledFuture {

/**
 * @return the scheduled future with method special implementation of "cancel" method,
 * which in additional to standard implementation,
 * provides strongly guarantee that command is not in the middle of progress when "cancel" returns
 */
public static GracefullyStoppingScheduledFuture cheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit, ScheduledExecutorService scheduler) {
    CancellableCommand cancellableCommand = new CancellableCommand(command);
    ScheduledFuture future = scheduler.scheduleAtFixedRate(cancellableCommand, initialDelay, period, unit);
    return new GracefullyStoppingScheduledFuture(future, cancellableCommand);
}

private GracefullyStoppingScheduledFuture(ScheduledFuture targetFuture, CancellableCommand command) {
    this.targetFuture = targetFuture;
    this.runnable = command;
}

private final ScheduledFuture targetFuture;
private final CancellableCommand runnable;

public void cancelAndBeSureOfTermination(boolean mayInterruptIfRunning) throws InterruptedException, ExecutionException {
    try {
        targetFuture.cancel(mayInterruptIfRunning);
    } finally {
        runnable.cancel();
    }
}

private static class CancellableCommand implements Runnable {

    private static final int NOT_EXECUTING = 0;
    private static final int IN_PROGRESS = 1;
    private static final int CANCELLED_WITHOUT_OBSTRUCTION = 2;
    private static final int CANCELLED_IN_MIDDLE_OF_PROGRESS = 3;

    private final AtomicInteger state = new AtomicInteger(NOT_EXECUTING);
    private final AtomicReference<Thread> executionThread = new AtomicReference<>();
    private final CompletableFuture<Void> cancellationFuture = new CompletableFuture<>();
    private final Runnable target;

    private CancellableCommand(Runnable target) {
        this.target = target;
    }

    public void cancel() throws ExecutionException, InterruptedException {
        if (executionThread.get() == Thread.currentThread()) {
            // cancel method was called from target by itself
            state.set(CANCELLED_IN_MIDDLE_OF_PROGRESS);
            return;
        }
        while (true) {
            if (state.get() == CANCELLED_WITHOUT_OBSTRUCTION) {
                return;
            }
            if (state.get() == CANCELLED_IN_MIDDLE_OF_PROGRESS) {
                cancellationFuture.get();
                return;
            }
            if (state.compareAndSet(NOT_EXECUTING, CANCELLED_WITHOUT_OBSTRUCTION)) {
                return;
            }
            if (state.compareAndSet(IN_PROGRESS, CANCELLED_IN_MIDDLE_OF_PROGRESS)) {
                cancellationFuture.get();
                return;
            }
        }
    }

    @Override
    public void run() {
        if (!state.compareAndSet(NOT_EXECUTING, IN_PROGRESS)) {
            notifyWaiters();
            return;
        }

        try {
            executionThread.set(Thread.currentThread());
            target.run();
        } finally {
            executionThread.set(null);
            if (!state.compareAndSet(IN_PROGRESS, NOT_EXECUTING)) {
                notifyWaiters();
            }
        }
    }

    private void notifyWaiters() {
        if (state.get() == CANCELLED_WITHOUT_OBSTRUCTION) {
            // no need to notify anything
            return;
        }
        // someone waits for cancelling
        cancellationFuture.complete(null);
        return;
    }

}