两个线程完成执行后清理

Cleaning up after two threads finish execution

我有一个应用程序是 运行 个作业,每个作业需要两个线程。这两个线程通常做一些工作并在彼此之后不久完成。然后在第二个线程完成后我需要做一些清理,但是由于线程正在做一些网络 IO,一个线程可能会被阻塞很长时间。在这种情况下,我希望在第一个线程完成后几秒钟进行清理。

我在回调中使用以下代码实现了此行为 class:

private boolean first = true;

public synchronized void done() throws InterruptedException {
    if (first) {
        first = false;
        wait(3000);
        // cleanup here, as soon as possible
    }
    else {
        notify();
    }
}

两个线程在完成时都会调用 done() 方法。第一个线程将在 wait() 中阻塞最多 3 秒,但当第二个线程调用 done() 方法时将立即收到通知。

我已经测试了这个实现,它似乎运行良好,但我很好奇是否有更好的方法来做到这一点。尽管这个实现看起来并不太复杂,但我担心我的程序会死锁或出现一些意想不到的同步问题。

由于done方法是同步的,所以一次只能执行一个线程,这一秒会等待发送通知,直到第一个完成它的整个工作,这可能会导致性能瓶颈。

我宁愿用简短的同步块来设计它,它主要更新boolean first

希望我了解您的需求。您想等待线程 a 完成,然后等待 3 秒或等待线程 b 结束。

最好使用较新的 Concurrent 工具而不是旧的 wait/notify,因为它们有很多边缘情况。

// Two threads running so count down from 2.
CountDownLatch wait = new CountDownLatch(2);

class TestRun implements Runnable {

    private final long waitTime;

    public TestRun(long waitTime) {
        this.waitTime = waitTime;
    }

    @Override
    public void run() {
        try {
            // Wait a few seconds.
            Thread.sleep(waitTime);
            // Finished! Count me down.
            wait.countDown();
            System.out.println(new Date() + ": " + Thread.currentThread().getName() + " - Finished");
        } catch (InterruptedException ex) {
            System.out.println(Thread.currentThread().getName() + " - Interrupted");
        }
    }

}

public void test() throws InterruptedException {
    // ThreadA
    Thread threadA = new Thread(new TestRun(10000), "Thread A");
    // ThreadB
    Thread threadB = new Thread(new TestRun(30000), "Thread B");
    // Fire them up.
    threadA.start();
    threadB.start();
    // Wait for all to finish but threadA must finish.
    threadA.join();
    // Wait up to 3 seconds for B.
    wait.await(3, TimeUnit.SECONDS);
    System.out.println(new Date() + ": Done");
    threadB.join();
}

愉快地打印:

Tue Sep 15 16:59:37 BST 2015: Thread A - Finished
Tue Sep 15 16:59:40 BST 2015: Done
Tue Sep 15 16:59:57 BST 2015: Thread B - Finished

已添加

有了新的清晰度——任何线程的结束都会启动计时器——我们可以使用第三个线程进行清理。每个线程在完成时必须调用一个方法来触发清理机制。

// Two threads running so count down from 2.
CountDownLatch wait = new CountDownLatch(2);

class TestRun implements Runnable {

    private final long waitTime;

    public TestRun(long waitTime) {
        this.waitTime = waitTime;
    }

    @Override
    public void run() {
        try {
            // Wait a few seconds.
            Thread.sleep(waitTime);
            // Finished! Count me down.
            wait.countDown();
            System.out.println(new Date() + ": " + Thread.currentThread().getName() + " - Finished");
            // Record that I've finished.
            finished();
        } catch (InterruptedException ex) {
            System.out.println(Thread.currentThread().getName() + " - Interrupted");
        }
    }

}

Runnable cleanup = new Runnable() {

    @Override
    public void run() {
        try {
            // Wait up to 3 seconds for both threads to clear.
            wait.await(3, TimeUnit.SECONDS);
            // Do your cleanup stuff here.
            // ...
            System.out.println(new Date() + ": " + Thread.currentThread().getName() + " - Finished");
        } catch (InterruptedException ex) {
            System.out.println(Thread.currentThread().getName() + " - Interrupted");
        }
    }

};

final AtomicBoolean cleanupStarted = new AtomicBoolean(false);

private void finished() {
    // Make sure I only start the cleanup once.
    if (cleanupStarted.compareAndSet(false, true)) {
        new Thread(cleanup, "Cleanup").start();
    }
}

public void test() throws InterruptedException {
    // ThreadA
    Thread threadA = new Thread(new TestRun(10000), "Thread A");
    // ThreadB
    Thread threadB = new Thread(new TestRun(30000), "Thread B");
    // Fire them up.
    threadA.start();
    threadB.start();
    System.out.println(new Date() + ": Done");
}