不同池中线程之间的通信 java

communication between threads in the different pools java

我有两个本地线程池,一个池有 4 个线程,第二个池有 5 个线程。

我希望这两个池相互通信。

例如,第一个池的第二个线程 (1.2) 与第二个池的第五个线程 (2.5) 通信,即

1.2 -> 2.5
1.1 -> 2.2
1.3 -> 2.1
1.4 -> 2.3

1.2 finished sending the message to 2.5 and wants to send the other message to the second pool, but 2.5 is still busy, but 2.4 if free to process messages from 1.2

如何让第一个池中的线程与第二个池中的第一个空闲线程通信?

如何在 java 中实现它?

也许我应该使用消息代理或类似的东西? (或BlockingQueue,Exchanger/Pipereader

谢谢

(您的示例不清楚,但我认为您要求的方案是一个池中的线程不关心另一个池中的哪个线程获取消息。)

可能有很多方法可以做到这一点,但一个简单的方法是:

  1. 为每个池创建一个有界消息队列
  2. 每个池中的每个线程都从其池的队列中读取消息
  3. 一个池中的线程通过将消息添加到另一个池的队列来将消息发送到另一个池。

消息代理也可以工作,但它可能有点过头了。您很可能不想要成熟消息代理的可靠性/持久性/分布。

How do I make threads from first pool communicate to the first free thread from second pool?

我不确定您是否有任何其他特定需求,但如果两个池都是本地的并且您只是愿意实施典型的生产者 - 消费者模式,其中 N 线程(作为池的一部分)充当生产者和另一个 M 线程(作为另一个池的一部分)充当消费者,您不关心第二个池的哪个线程实例处理消息,我会通过 - BlockingQueue 实现。

你拿一个 BlockingQueue 的实例(比如 ArrayBlockingQueue OR LinkedBlockingQueue OR PriorityBlockingQueue 并且包 java.util.concurrent 中还有一些实现)并分享这个实例在实际池线程中限制 - take() 只能由消费者线程和任何消费者线程完成。

How can I implement it in java?

您可以像下面这样创建池,

ExecutorService pool_1 = Executors.newFixedThreadPool(4);

ExecutorService pool_2 = Executors.newFixedThreadPool(4);

然后您将实际线程提供给共享阻塞队列的这些池。线程可以像下面这样创建——它只是一个伪代码。

public class Pool1Runnable implements Runnable {

   private final BlockingQueue queue;

   public Pool1Runnable(BlockingQueue queue){
     this.queue=queue;
   }

    @Override
    public void run() {
        System.out.println("Pool1Runnable");
    }

}

现在您为 pool2 编写线程实现并确保它们的 run() 实现在队列上使用 take()

您创建池实例、线程实例 - 为生产者和消费者分开(为所有线程提供单个队列实例,使其充当通信通道),然后使用池执行这些线程实例。

希望对您有所帮助!!

其他人指出的最直接的方法是在池之间设置 BlockingQueue。如果我没记错的话,你的问题与多个生产者和多个消费者分别发送和处理消息是一样的。

这是您可以构建的一种实现。添加注释的参数很少,您可以根据您的问题场景调整它们。基本上,您有 2 个池和一个用于并行调用生产者和消费者的池。

public class MultiProducerConsumer {

private static final int MAX_PRODUCERS = 4;
private static final int MAX_CONSUMERS = 5;

private ExecutorService producerPool = new ThreadPoolExecutor(2, MAX_PRODUCERS, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
private ExecutorService consumerPool = new ThreadPoolExecutor(2, MAX_CONSUMERS, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());

//ThreadPool for holding the main threads for consumer and producer
private ExecutorService mainPool = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());

/**
 * Indicates the stopping condition for the consumer, without this it has no idea when to stop
 */
private AtomicBoolean readerComplete = new AtomicBoolean(false);

/**
 * This is the queue for passing message from producer to consumer.
 * Keep queue size depending on how slow is your consumer relative to producer, or base it on resource constraints
 */
private BlockingQueue<String> queue = new ArrayBlockingQueue<>(1);

public static void main(String[] args) throws InterruptedException {
    long startTime = System.currentTimeMillis();
    MultiProducerConsumer multiProducerConsumer = new MultiProducerConsumer();
    multiProducerConsumer.process();
    System.out.println("Time taken in seconds - " + (System.currentTimeMillis() - startTime)/1000f);
}

private void process() throws InterruptedException {
    mainPool.execute(this::consume);
    mainPool.execute(this::produce);
    Thread.sleep(10); // allow the pool to get initiated
    mainPool.shutdown();
    mainPool.awaitTermination(5, TimeUnit.SECONDS);
}

private void consume() {
    try {
        while (!readerComplete.get()) { //wait for reader to complete
            consumeAndExecute();
        }
        while (!queue.isEmpty()) { //process any residue tasks
            consumeAndExecute();
        }
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        try {
            consumerPool.shutdown();
            consumerPool.awaitTermination(5, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

private void consumeAndExecute() throws InterruptedException {
    if (!queue.isEmpty()) {
        String msg = queue.take(); //takes or waits if queue is empty
        consumerPool.execute(() -> {
            System.out.println("c-" + Thread.currentThread().getName() + "-" + msg);
        });
    }
}


private void produce() {
    try {
        for (int i = 0; i < MAX_PRODUCERS; i++) {
            producerPool.execute(() -> {
                try {
                    String random = getRandomNumber() + "";
                    queue.put(random);
                    System.out.println("p-" + Thread.currentThread().getName() + "-" + random);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
    } finally {
        try {
            Thread.sleep(10); //allow pool to get initiated
            producerPool.shutdown();
            producerPool.awaitTermination(5, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        readerComplete.set(true); //mark producer as done, so that consumer can exit
    }
}

private int getRandomNumber() {
    return (int) (Math.random() * 50 + 1);
}

}

这是输出:

p-pool-1-thread-2-43
p-pool-1-thread-2-32
p-pool-1-thread-2-12
c-pool-2-thread-1-43
c-pool-2-thread-1-12
c-pool-2-thread-2-32
p-pool-1-thread-1-3
c-pool-2-thread-1-3
Time taken in seconds - 0.1