Exchanger 的实际用例是什么?

What's a realistic use case for Exchanger?

java.util.Exchanger 是同步器的最佳选择的实际用例是什么?

我看过来自 GitHub 和教程网站的片段,但它们似乎总是做作,最好用 TransferQueue

解决

ExchangerTransferQueue有很大的不同,基本上TransferQueue如果消费者很慢就可以填满你所有的内存,而Exchanger使用常量内存,产生一个配对线程准备就绪时同步。

显然,使用其中一种会对同步和工作流程的效率产生巨大影响(但也会影响资源使用的确定性)。

注意:生产者和消费者请求同步的原因可能是不同的,不仅是他们的缓冲区已满,还可能是一个正在等待另一个。此外,使用 Exchange producer/consumer 会感到困惑(消费者可以将结果交付给生产者),而使用 TransferQueue 您应该创建其他结构。

举个例子,将您的并发进程想象成一个管道基础设施,节点上下移动水(节点包含水泵)。

此外,泵送效应在所有基础设施中移动(如波浪),认为您的并发进程使用一个线程结果用于另一个线程等等。

(代码中的注释)

package com.computermind.sandbox.concurrent;

import lombok.AllArgsConstructor;
import lombok.SneakyThrows;

import java.util.concurrent.Exchanger;
import java.util.concurrent.ThreadLocalRandom;

public class ExchangerTransferQueue {
    
    // helper for wait
    @SneakyThrows
    public static void _wait() {
        Thread.sleep(ThreadLocalRandom.current().nextInt(1_000, 5_000));
    }

    // the pumping capacity is the work a node must to do, this work
    // is moving across threads then, the `PumpingCapacity` "P1" will
    // be used by the producer but later (when water move) "P1" will be
    // used by consumer and so (the `exchange` sync move PumpingCapacity
    // instances)
    @AllArgsConstructor
    static
    class PumpingCapacity {
        String name;
        int from;
        int to;

        // do "up" work (if any)
        boolean up(String node) {
            if(from < 1) return false;
            System.out.printf("~ node %s pump up %s%n", node, name);
            from -= 1; to += 1;
            return true;
        }
        // do "down" work (if any)
        boolean down(String node) {
            if(to < 1) return false;
            System.out.printf("~ node %s pump down %s%n", node, name);
            from += 1; to -= 1;
            return true;
        }
    }

    // the producer have a initial PumpingCapacity and
    // create the exchange to the next node
    static class WaterPumpProducer implements Runnable {
        PumpingCapacity p;
        final Exchanger<PumpingCapacity> b = new Exchanger<>();

        WaterPumpProducer(PumpingCapacity p) {
            this.p = p;
        }

        @SneakyThrows
        @Override
        public void run() {
            // for ever
            while(true) {
                // do work
                while (p.up("Producer")) _wait();
                // and exchange
                System.out.println("Producer need change");
                p = b.exchange(p);
            }
        }
    }

    // an interemediate node have two PumpingCapacity one working
    // with the predecessor and other with the successor
    static class WaterPumpNode implements Runnable {
        PumpingCapacity p, q;
        final Exchanger<PumpingCapacity> a;
        final Exchanger<PumpingCapacity> b = new Exchanger<>();

        WaterPumpNode(PumpingCapacity p, PumpingCapacity q, Exchanger<PumpingCapacity> a) {
            this.p = p;
            this.q = q;
            this.a = a;
        }

        @SneakyThrows
        @Override
        public void run() {
            while(true) {
                while (p.down("Node")) _wait();
                while (q.up("Node")) _wait();
                System.out.println("Node need change");
                p = a.exchange(p);
                q = b.exchange(q);
            }
        }
    }

    static class WaterPumpConsumer implements Runnable {
        PumpingCapacity p;
        final Exchanger<PumpingCapacity> a;

        WaterPumpConsumer(PumpingCapacity initialCapacity, Exchanger<PumpingCapacity> a) {
            p = initialCapacity;
            this.a = a;
        }

        @SneakyThrows
        @Override
        public void run() {
            while(true) {
                while (p.down("Consumer")) _wait();
                System.out.println("Consumer need change");
                p = a.exchange(p);
            }
        }
    }

    @SneakyThrows
    public static void main(String... args) {

        WaterPumpProducer producer = new WaterPumpProducer(new PumpingCapacity("P1", 5, 0));
        WaterPumpNode node = new WaterPumpNode(new PumpingCapacity("P2", 0, 3), new PumpingCapacity("P3", 3, 0), producer.b);
        WaterPumpConsumer consumer = new WaterPumpConsumer(new PumpingCapacity("P4", 0, 2), node.b);

        // consumer run first, the consumer do job!
        new Thread(consumer).start();

        // wait to see consumer wait
        Thread.sleep(15_000);

        new Thread(node).start();

        // wait to see node wait
        Thread.sleep(15_000);

        new Thread(producer).start();

        // see how PumpingCapacities up and down across all pipe infrastructure
    }

}

有输出(有注释)

~ node Consumer pump down P4  <-- only consumer is working
~ node Consumer pump down P4
Consumer need change          <-- and stop since need change
~ node Node pump down P2      <-- when node start do job
~ node Node pump down P2
~ node Node pump down P2
~ node Node pump up P3
~ node Node pump up P3
~ node Node pump up P3
Node need change              <-- and need change and wait producer
~ node Producer pump up P1    <-- when producer start do job
~ node Producer pump up P1
~ node Producer pump up P1
~ node Producer pump up P1
~ node Producer pump up P1
Producer need change          <-- here all nodes work and
~ node Producer pump up P2        PumpingCapacities go up and down
~ node Consumer pump down P3      moving water
~ node Node pump down P1
~ node Node pump down P1
~ node Consumer pump down P3
~ node Producer pump up P2
~ node Node pump down P1
~ node Consumer pump down P3
~ node Producer pump up P2
~ node Node pump down P1
Consumer need change
Producer need change
~ node Node pump down P1
~ node Node pump up P4
~ node Node pump up P4
Node need change
~ node Node pump down P2
~ node Consumer pump down P4
~ node Producer pump up P1
...

内存使用量恒定(如果您使用TransferQueue则不会)。

然后

What would be a realistic use case where java.util.Exchanger would be the best choice of synchronizer?

任何两个进程相互同步以执行一个任务的任务,其中一个进程影响另一个进程,反之亦然。 producer/consumer 生产者必须等待消费者(即因此不会积累过多的工作)的模式可能是一个很好的例子。

rather than move it along to another thread for further action?

因为两个线程正在共享信息并且您希望这些操作同时完成,所以您不能只将它从一个线程传递到另一个线程。

假设你有以下递归函数

G{i+1} = g( H{i}, G{i} )
H{i+1} = h( H{i}, G{i} )

你可以开始每一步两个线程到运行并行gh或者你可以使用Exchanger 并且只启动一次两个线程。

当然您可以使用许多其他结构,但您会发现在这些结构中您必须考虑死锁的可能性,而 Exchanger 使交换变得简单和安全。

例如,假设我们要进行某种生物模拟(即https://en.wikipedia.org/wiki/Nicholson%E2%80%93Bailey_model

static void NicholsonBailey(double H, double P, double k, double a, double c, AtomicBoolean stop) {
    // H and P exchange their values (H get p and P get H)
    final Exchanger<Double> e = new Exchanger<>();

    // H function
    new Thread(() -> { try {
        double h = H, p = P;
        while(!stop.get()) {
            h = k * h * Math.exp(-a * p); // expensive
            p = e.exchange(h);
        }
        e.exchange(0., 1, TimeUnit.MILLISECONDS);
    } catch (InterruptedException | TimeoutException ex) { /* end */ }}).start();

    // P function
    new Thread(() -> { try {
        double h = H, p = P;
        while(!stop.get()) {
            System.out.printf("(H, P) := (%e, %e)%n", h, p);
            p = c * h * (1 - Math.exp(-a * p)); // expensive
            h = e.exchange(p);
        }
        e.exchange(0., 1, TimeUnit.MILLISECONDS);
    } catch (InterruptedException | TimeoutException ex) { /* end */ }}).start();
}

@SneakyThrows
public static void main(String... args) {
    AtomicBoolean stop = new AtomicBoolean(false);
    double k = 2, a = 0.02, c = 1;
    NicholsonBailey(Math.log(k) / a + 0.3, (k * Math.log(k)) / ((k - 1) * a * c) + 0.3, k, a, c, stop);

    // run simulation until stop
    Thread.sleep(100);
    stop.set(true);
}

只需使用 Exchanger 我们就可以轻松同步两个计算。