如果 2 个其他线程调用 signalAll,如何唤醒线程?

How to awake thread if 2 other threads called signalAll?

我有 2 个工作线程和 1 个处理线程。

当处理线程尝试处理某事而工作线程正在执行它们的工作时,处理线程应该等待,并在工作线程中完成的所有工作完成后唤醒。

我怎样才能唤醒这个线程?我将尝试在这个伪代码中演示我的意思

class Worker{
      /... variables ../
     private final Condition condition;
     public Worker(Condition condition){
        // constructor
     } 

     public void start(){
        Thread thread = new Thread(this::run);
        thread.start();
     }

     private void run(){
        try{
            ...
            condition.signalAll();    
        }
     }
}

和处理线程类似

      class Processing{
          private final Lock lock = new ReentrantLock();
          private final Condition condition = lock.newCondition();
          public void start(){
    
            Worker worker1 = new Worker(condition);
            Worker worket2 = new Worker(condition);
            worker1.start();
            worker2.start();
    
            // some code //
            while( // some flag indicating that workers are doing something){
                condition.await();
            }
            // this should be processed only after both worker1 and worker2 called signalAll
         }
   }

这样的事情可能吗?线程等待直到多个源调用 notifyAll() 而不是一次。我希望我把这个问题解释清楚了。

感谢帮助!

您可以使用 CountDownLatch 来实现该行为。

使用两个条件并让处理线程等待它们。

Is something like this possible? For thread to wait untill multiple sources called notifyAll() and not just once.

是的,您可以使用例如 CyclicBarrier:

A synchronization aid that allows a set of threads to all wait for each other to reach a common barrier point. CyclicBarriers are useful in programs involving a fixed sized party of threads that must occasionally wait for each other. The barrier is called cyclic because it can be re-used after the waiting threads are released. A CyclicBarrier supports an optional Runnable command that is run once per barrier point, after the last thread in the party arrives, but before any threads are released. This barrier action is useful for updating shared-state before any of the parties continue.

CountDownLatch

不同,CyclicBarrier 可以重复使用多次

A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes. A CountDownLatch is initialized with a given count. The await methods block until the current count reaches zero due to invocations of the countDown() method, after which all waiting threads are released and any subsequent invocations of await return immediately. This is a one-shot phenomenon -- the count cannot be reset. If you need a version that resets the count, consider using a CyclicBarrier.

class Worker{
      /... variables ../
     private final CyclicBarrier barrier;
     public Worker(CyclicBarrier condition){
        // constructor
     } 

     public void start(){
        Thread thread = new Thread(this::run);
        thread.start();
     }

     private void run(){
         try {
             ...
             barrier.await();
         } catch (InterruptedException | BrokenBarrierException e) {
            // Do Something
       }
     }
 }

      class Processing{
          final int total_threads = 3;
          CyclicBarrier barrier = new CyclicBarrier(total_threads);
          public void start(){
    
            Worker worker1 = new Worker(barrier);
            Worker worker2 = new Worker(barrier);
            worker1.start();
            worker2.start();
    
            // some code //
            try {
               barrier.await();
           } catch (InterruptedException | BrokenBarrierException e) {
              // Do Something
           }
   }

在此示例中,master 线程等待 worker1worker2 调用屏障,之后所有三个线程都恢复它们的工作工作。

对于这种简单的情况,您不需要使用条件(这不是那么简单,顺便说一句 https://www.baeldung.com/java-concurrent-locks#working-with-conditions)。还有几个其他琐碎的选项:

  1. 将您的 worker 定义为一个线程:
    class Worker extends Thread {
        public void run() {
            ...
        } 
    }

并在您的处理器中使用旧的好 .join() 来等待两个线程完成:

   class Processing {
       public void start() throws Exception {
           Worker worker1 = new Worker(condition);
           Worker worket2 = new Worker(condition);
           worker1.start();
           worker2.start();
           
           worker1.join();
           worker2.join();
           // this will be processed only after both worker1 and worker2 have their run() finished
        }
   }
  1. 使用 CountDownLatch:
    class Worker {
        private final CountDownLatch latch;
    
        public Worker(CountDownLatch latch) {
            this.latch = latch;
        }
    
        private void run() {
            try {
             ...
            } finally {
               latch.countDown();
            }
        }
    }
    
    class Processing {
        public void start() throws Exception {
            CountDownLatch latch = new CountDownLatch(2);
            Worker worker1 = new Worker(latch);
            Worker worket2 = new Worker(latch);
            worker1.start();
            worker2.start();
            
            latch.await();
    
            // this will be processed only after both worker1 and worker2 called countDown()
        }
    }

当然,你可以使用CyclicBarrier,但为什么你需要这种可重复使用的东西?您甚至可以使用 Phaser https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Phaser.html,但在我看来,这也是过度工程:)