Exchanger 的实际用例是什么?
What's a realistic use case for Exchanger?
java.util.Exchanger 是同步器的最佳选择的实际用例是什么?
我看过来自 GitHub 和教程网站的片段,但它们似乎总是做作,最好用 TransferQueue
解决
Exchanger
和TransferQueue
有很大的不同,基本上TransferQueue
如果消费者很慢就可以填满你所有的内存,而Exchanger
使用常量内存,产生一个配对线程准备就绪时同步。
显然,使用其中一种会对同步和工作流程的效率产生巨大影响(但也会影响资源使用的确定性)。
注意:生产者和消费者请求同步的原因可能是不同的,不仅是他们的缓冲区已满,还可能是一个正在等待另一个。此外,使用 Exchange
producer/consumer 会感到困惑(消费者可以将结果交付给生产者),而使用 TransferQueue
您应该创建其他结构。
举个例子,将您的并发进程想象成一个管道基础设施,节点上下移动水(节点包含水泵)。
此外,泵送效应在所有基础设施中移动(如波浪),认为您的并发进程使用一个线程结果用于另一个线程等等。
(代码中的注释)
package com.computermind.sandbox.concurrent;
import lombok.AllArgsConstructor;
import lombok.SneakyThrows;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ThreadLocalRandom;
public class ExchangerTransferQueue {
// helper for wait
@SneakyThrows
public static void _wait() {
Thread.sleep(ThreadLocalRandom.current().nextInt(1_000, 5_000));
}
// the pumping capacity is the work a node must to do, this work
// is moving across threads then, the `PumpingCapacity` "P1" will
// be used by the producer but later (when water move) "P1" will be
// used by consumer and so (the `exchange` sync move PumpingCapacity
// instances)
@AllArgsConstructor
static
class PumpingCapacity {
String name;
int from;
int to;
// do "up" work (if any)
boolean up(String node) {
if(from < 1) return false;
System.out.printf("~ node %s pump up %s%n", node, name);
from -= 1; to += 1;
return true;
}
// do "down" work (if any)
boolean down(String node) {
if(to < 1) return false;
System.out.printf("~ node %s pump down %s%n", node, name);
from += 1; to -= 1;
return true;
}
}
// the producer have a initial PumpingCapacity and
// create the exchange to the next node
static class WaterPumpProducer implements Runnable {
PumpingCapacity p;
final Exchanger<PumpingCapacity> b = new Exchanger<>();
WaterPumpProducer(PumpingCapacity p) {
this.p = p;
}
@SneakyThrows
@Override
public void run() {
// for ever
while(true) {
// do work
while (p.up("Producer")) _wait();
// and exchange
System.out.println("Producer need change");
p = b.exchange(p);
}
}
}
// an interemediate node have two PumpingCapacity one working
// with the predecessor and other with the successor
static class WaterPumpNode implements Runnable {
PumpingCapacity p, q;
final Exchanger<PumpingCapacity> a;
final Exchanger<PumpingCapacity> b = new Exchanger<>();
WaterPumpNode(PumpingCapacity p, PumpingCapacity q, Exchanger<PumpingCapacity> a) {
this.p = p;
this.q = q;
this.a = a;
}
@SneakyThrows
@Override
public void run() {
while(true) {
while (p.down("Node")) _wait();
while (q.up("Node")) _wait();
System.out.println("Node need change");
p = a.exchange(p);
q = b.exchange(q);
}
}
}
static class WaterPumpConsumer implements Runnable {
PumpingCapacity p;
final Exchanger<PumpingCapacity> a;
WaterPumpConsumer(PumpingCapacity initialCapacity, Exchanger<PumpingCapacity> a) {
p = initialCapacity;
this.a = a;
}
@SneakyThrows
@Override
public void run() {
while(true) {
while (p.down("Consumer")) _wait();
System.out.println("Consumer need change");
p = a.exchange(p);
}
}
}
@SneakyThrows
public static void main(String... args) {
WaterPumpProducer producer = new WaterPumpProducer(new PumpingCapacity("P1", 5, 0));
WaterPumpNode node = new WaterPumpNode(new PumpingCapacity("P2", 0, 3), new PumpingCapacity("P3", 3, 0), producer.b);
WaterPumpConsumer consumer = new WaterPumpConsumer(new PumpingCapacity("P4", 0, 2), node.b);
// consumer run first, the consumer do job!
new Thread(consumer).start();
// wait to see consumer wait
Thread.sleep(15_000);
new Thread(node).start();
// wait to see node wait
Thread.sleep(15_000);
new Thread(producer).start();
// see how PumpingCapacities up and down across all pipe infrastructure
}
}
有输出(有注释)
~ node Consumer pump down P4 <-- only consumer is working
~ node Consumer pump down P4
Consumer need change <-- and stop since need change
~ node Node pump down P2 <-- when node start do job
~ node Node pump down P2
~ node Node pump down P2
~ node Node pump up P3
~ node Node pump up P3
~ node Node pump up P3
Node need change <-- and need change and wait producer
~ node Producer pump up P1 <-- when producer start do job
~ node Producer pump up P1
~ node Producer pump up P1
~ node Producer pump up P1
~ node Producer pump up P1
Producer need change <-- here all nodes work and
~ node Producer pump up P2 PumpingCapacities go up and down
~ node Consumer pump down P3 moving water
~ node Node pump down P1
~ node Node pump down P1
~ node Consumer pump down P3
~ node Producer pump up P2
~ node Node pump down P1
~ node Consumer pump down P3
~ node Producer pump up P2
~ node Node pump down P1
Consumer need change
Producer need change
~ node Node pump down P1
~ node Node pump up P4
~ node Node pump up P4
Node need change
~ node Node pump down P2
~ node Consumer pump down P4
~ node Producer pump up P1
...
内存使用量恒定(如果您使用TransferQueue
则不会)。
然后
What would be a realistic use case where java.util.Exchanger would be the best choice of synchronizer?
任何两个进程相互同步以执行一个任务的任务,其中一个进程影响另一个进程,反之亦然。 producer/consumer 生产者必须等待消费者(即因此不会积累过多的工作)的模式可能是一个很好的例子。
rather than move it along to another thread for further action?
因为两个线程正在共享信息并且您希望这些操作同时完成,所以您不能只将它从一个线程传递到另一个线程。
假设你有以下递归函数
G{i+1} = g( H{i}, G{i} )
H{i+1} = h( H{i}, G{i} )
你可以开始每一步两个线程到运行并行g
和h
或者你可以使用Exchanger
并且只启动一次两个线程。
当然您可以使用许多其他结构,但您会发现在这些结构中您必须考虑死锁的可能性,而 Exchanger
使交换变得简单和安全。
例如,假设我们要进行某种生物模拟(即https://en.wikipedia.org/wiki/Nicholson%E2%80%93Bailey_model)
static void NicholsonBailey(double H, double P, double k, double a, double c, AtomicBoolean stop) {
// H and P exchange their values (H get p and P get H)
final Exchanger<Double> e = new Exchanger<>();
// H function
new Thread(() -> { try {
double h = H, p = P;
while(!stop.get()) {
h = k * h * Math.exp(-a * p); // expensive
p = e.exchange(h);
}
e.exchange(0., 1, TimeUnit.MILLISECONDS);
} catch (InterruptedException | TimeoutException ex) { /* end */ }}).start();
// P function
new Thread(() -> { try {
double h = H, p = P;
while(!stop.get()) {
System.out.printf("(H, P) := (%e, %e)%n", h, p);
p = c * h * (1 - Math.exp(-a * p)); // expensive
h = e.exchange(p);
}
e.exchange(0., 1, TimeUnit.MILLISECONDS);
} catch (InterruptedException | TimeoutException ex) { /* end */ }}).start();
}
@SneakyThrows
public static void main(String... args) {
AtomicBoolean stop = new AtomicBoolean(false);
double k = 2, a = 0.02, c = 1;
NicholsonBailey(Math.log(k) / a + 0.3, (k * Math.log(k)) / ((k - 1) * a * c) + 0.3, k, a, c, stop);
// run simulation until stop
Thread.sleep(100);
stop.set(true);
}
只需使用 Exchanger
我们就可以轻松同步两个计算。
java.util.Exchanger 是同步器的最佳选择的实际用例是什么?
我看过来自 GitHub 和教程网站的片段,但它们似乎总是做作,最好用 TransferQueue
解决Exchanger
和TransferQueue
有很大的不同,基本上TransferQueue
如果消费者很慢就可以填满你所有的内存,而Exchanger
使用常量内存,产生一个配对线程准备就绪时同步。
显然,使用其中一种会对同步和工作流程的效率产生巨大影响(但也会影响资源使用的确定性)。
注意:生产者和消费者请求同步的原因可能是不同的,不仅是他们的缓冲区已满,还可能是一个正在等待另一个。此外,使用 Exchange
producer/consumer 会感到困惑(消费者可以将结果交付给生产者),而使用 TransferQueue
您应该创建其他结构。
举个例子,将您的并发进程想象成一个管道基础设施,节点上下移动水(节点包含水泵)。
此外,泵送效应在所有基础设施中移动(如波浪),认为您的并发进程使用一个线程结果用于另一个线程等等。
(代码中的注释)
package com.computermind.sandbox.concurrent;
import lombok.AllArgsConstructor;
import lombok.SneakyThrows;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ThreadLocalRandom;
public class ExchangerTransferQueue {
// helper for wait
@SneakyThrows
public static void _wait() {
Thread.sleep(ThreadLocalRandom.current().nextInt(1_000, 5_000));
}
// the pumping capacity is the work a node must to do, this work
// is moving across threads then, the `PumpingCapacity` "P1" will
// be used by the producer but later (when water move) "P1" will be
// used by consumer and so (the `exchange` sync move PumpingCapacity
// instances)
@AllArgsConstructor
static
class PumpingCapacity {
String name;
int from;
int to;
// do "up" work (if any)
boolean up(String node) {
if(from < 1) return false;
System.out.printf("~ node %s pump up %s%n", node, name);
from -= 1; to += 1;
return true;
}
// do "down" work (if any)
boolean down(String node) {
if(to < 1) return false;
System.out.printf("~ node %s pump down %s%n", node, name);
from += 1; to -= 1;
return true;
}
}
// the producer have a initial PumpingCapacity and
// create the exchange to the next node
static class WaterPumpProducer implements Runnable {
PumpingCapacity p;
final Exchanger<PumpingCapacity> b = new Exchanger<>();
WaterPumpProducer(PumpingCapacity p) {
this.p = p;
}
@SneakyThrows
@Override
public void run() {
// for ever
while(true) {
// do work
while (p.up("Producer")) _wait();
// and exchange
System.out.println("Producer need change");
p = b.exchange(p);
}
}
}
// an interemediate node have two PumpingCapacity one working
// with the predecessor and other with the successor
static class WaterPumpNode implements Runnable {
PumpingCapacity p, q;
final Exchanger<PumpingCapacity> a;
final Exchanger<PumpingCapacity> b = new Exchanger<>();
WaterPumpNode(PumpingCapacity p, PumpingCapacity q, Exchanger<PumpingCapacity> a) {
this.p = p;
this.q = q;
this.a = a;
}
@SneakyThrows
@Override
public void run() {
while(true) {
while (p.down("Node")) _wait();
while (q.up("Node")) _wait();
System.out.println("Node need change");
p = a.exchange(p);
q = b.exchange(q);
}
}
}
static class WaterPumpConsumer implements Runnable {
PumpingCapacity p;
final Exchanger<PumpingCapacity> a;
WaterPumpConsumer(PumpingCapacity initialCapacity, Exchanger<PumpingCapacity> a) {
p = initialCapacity;
this.a = a;
}
@SneakyThrows
@Override
public void run() {
while(true) {
while (p.down("Consumer")) _wait();
System.out.println("Consumer need change");
p = a.exchange(p);
}
}
}
@SneakyThrows
public static void main(String... args) {
WaterPumpProducer producer = new WaterPumpProducer(new PumpingCapacity("P1", 5, 0));
WaterPumpNode node = new WaterPumpNode(new PumpingCapacity("P2", 0, 3), new PumpingCapacity("P3", 3, 0), producer.b);
WaterPumpConsumer consumer = new WaterPumpConsumer(new PumpingCapacity("P4", 0, 2), node.b);
// consumer run first, the consumer do job!
new Thread(consumer).start();
// wait to see consumer wait
Thread.sleep(15_000);
new Thread(node).start();
// wait to see node wait
Thread.sleep(15_000);
new Thread(producer).start();
// see how PumpingCapacities up and down across all pipe infrastructure
}
}
有输出(有注释)
~ node Consumer pump down P4 <-- only consumer is working
~ node Consumer pump down P4
Consumer need change <-- and stop since need change
~ node Node pump down P2 <-- when node start do job
~ node Node pump down P2
~ node Node pump down P2
~ node Node pump up P3
~ node Node pump up P3
~ node Node pump up P3
Node need change <-- and need change and wait producer
~ node Producer pump up P1 <-- when producer start do job
~ node Producer pump up P1
~ node Producer pump up P1
~ node Producer pump up P1
~ node Producer pump up P1
Producer need change <-- here all nodes work and
~ node Producer pump up P2 PumpingCapacities go up and down
~ node Consumer pump down P3 moving water
~ node Node pump down P1
~ node Node pump down P1
~ node Consumer pump down P3
~ node Producer pump up P2
~ node Node pump down P1
~ node Consumer pump down P3
~ node Producer pump up P2
~ node Node pump down P1
Consumer need change
Producer need change
~ node Node pump down P1
~ node Node pump up P4
~ node Node pump up P4
Node need change
~ node Node pump down P2
~ node Consumer pump down P4
~ node Producer pump up P1
...
内存使用量恒定(如果您使用TransferQueue
则不会)。
然后
What would be a realistic use case where java.util.Exchanger would be the best choice of synchronizer?
任何两个进程相互同步以执行一个任务的任务,其中一个进程影响另一个进程,反之亦然。 producer/consumer 生产者必须等待消费者(即因此不会积累过多的工作)的模式可能是一个很好的例子。
rather than move it along to another thread for further action?
因为两个线程正在共享信息并且您希望这些操作同时完成,所以您不能只将它从一个线程传递到另一个线程。
假设你有以下递归函数
G{i+1} = g( H{i}, G{i} )
H{i+1} = h( H{i}, G{i} )
你可以开始每一步两个线程到运行并行g
和h
或者你可以使用Exchanger
并且只启动一次两个线程。
当然您可以使用许多其他结构,但您会发现在这些结构中您必须考虑死锁的可能性,而 Exchanger
使交换变得简单和安全。
例如,假设我们要进行某种生物模拟(即https://en.wikipedia.org/wiki/Nicholson%E2%80%93Bailey_model)
static void NicholsonBailey(double H, double P, double k, double a, double c, AtomicBoolean stop) {
// H and P exchange their values (H get p and P get H)
final Exchanger<Double> e = new Exchanger<>();
// H function
new Thread(() -> { try {
double h = H, p = P;
while(!stop.get()) {
h = k * h * Math.exp(-a * p); // expensive
p = e.exchange(h);
}
e.exchange(0., 1, TimeUnit.MILLISECONDS);
} catch (InterruptedException | TimeoutException ex) { /* end */ }}).start();
// P function
new Thread(() -> { try {
double h = H, p = P;
while(!stop.get()) {
System.out.printf("(H, P) := (%e, %e)%n", h, p);
p = c * h * (1 - Math.exp(-a * p)); // expensive
h = e.exchange(p);
}
e.exchange(0., 1, TimeUnit.MILLISECONDS);
} catch (InterruptedException | TimeoutException ex) { /* end */ }}).start();
}
@SneakyThrows
public static void main(String... args) {
AtomicBoolean stop = new AtomicBoolean(false);
double k = 2, a = 0.02, c = 1;
NicholsonBailey(Math.log(k) / a + 0.3, (k * Math.log(k)) / ((k - 1) * a * c) + 0.3, k, a, c, stop);
// run simulation until stop
Thread.sleep(100);
stop.set(true);
}
只需使用 Exchanger
我们就可以轻松同步两个计算。