通过示例了解 java 中的移相器
Understanding phaser in java with an example
我正在尝试了解 java 中的 Phaser。我写了一个例子,提前卡在等待其他人到达。
据我了解,phaser 被用作可重用的线程同步(不像 CountdownLatch 不可重用)带有屏障操作的屏障(不像用于共享状态的 Cyclicbarrier,Phaser 不必共享状态在屏障作用中)。如果我错了请纠正我。
因此,在我的示例中,我试图在一定数量的 parties/threads 达到障碍后在每个线程中执行一些随机加法和减法代码。我做错了什么?
import static java.lang.String.*;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.stream.IntStream;
public class PhaserUsage implements Callable<String> {
private static final int THREAD_POOL_SIZE = 10;
private final Phaser phaser;
private PhaserUsage(Phaser phaser) {
this.phaser = phaser;
}
public static void main(String a[]) {
ExecutorService execService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
CompletionService<String> completionService = new ExecutorCompletionService<>(execService);
Phaser phaser = new Phaser(1);
IntStream.range(0, THREAD_POOL_SIZE)
.forEach(nbr -> completionService.submit(new PhaserUsage(phaser)));
execService.shutdown();
try {
while (!execService.isTerminated()) {
String result = completionService.take().get();
System.out.println(format("Result is: %s", result));
}
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
@Override
public String call() {
String threadName = Thread.currentThread().getName();
System.out.println(format("Registering...%s",threadName));
phaser.register();
System.out.println(format("Arrive and await advance...%s",threadName));
phaser.arriveAndAwaitAdvance(); // await all creation
int a = 0, b = 1;
Random random = new Random();
for (int i = 0; i < random.nextInt(10000000); i++) {
a = a + b;
b = a - b;
}
System.out.println(format("De-registering...%s",threadName));
phaser.arriveAndDeregister();
return format("Thread %s results: a = %s, b = %s", threadName, a, b);
}
}
您使用值 1 初始化 Phaser:
Phaser phaser = new Phaser(1);
这意味着您的主线程是您正在等待的线程之一,但它从不调用 arrive()。
由于您的线程数是固定的,您应该使用线程数初始化 Phaser,并删除 register() 调用。
问题是您无法从正在注册的任务中调用phaser.register()
。使用移相器时始终遵循以下两个规则:
- 只有注册的任务才能注册其他任务。这意味着任务不能注册自己。
- 所有已注册的任务必须在结束前注销。 一个好的做法是使用移相器将代码包装在一个
finally
块周围,该块在最后注销(参见示例).
这是您的固定程序(注意创建移相器的行):
import static java.lang.String.*;
import java.util.Random;
import java.util.concurrent.*;
import java.util.stream.IntStream;
public class PhaserUsage implements Callable<String> {
private static final int THREAD_POOL_SIZE = 10;
private final Phaser phaser;
private PhaserUsage(Phaser phaser) {
this.phaser = phaser;
}
public static void main(String a[]) {
ExecutorService execService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
CompletionService<String> completionService = new ExecutorCompletionService<>(execService);
// since we know beforehand how many tasks we have, initialize the
// number of participants in the constructor; other wise register
// *before* launching the task
Phaser phaser = new Phaser(THREAD_POOL_SIZE);
IntStream.range(0, THREAD_POOL_SIZE)
.forEach(nbr -> completionService.submit(new PhaserUsage(phaser)));
execService.shutdown();
try {
while (!execService.isTerminated()) {
String result = completionService.take().get();
System.out.println(format("Result is: %s", result));
}
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
@Override
public String call() {
String threadName = Thread.currentThread().getName();
System.out.println(format("Arrive and await advance...%s",threadName));
phaser.arriveAndAwaitAdvance(); // await all creation
int a = 0, b = 1;
Random random = new Random();
for (int i = 0; i < random.nextInt(10000000); i++) {
a = a + b;
b = a - b;
}
System.out.println(format("De-registering...%s",threadName));
phaser.arriveAndDeregister();
return format("Thread %s results: a = %s, b = %s", threadName, a, b);
}
}
这是没有 phaser.register() 的工作代码:
import static java.lang.String.*;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.stream.IntStream;
public class PhaserUsage implements Callable<String> {
private static final int THREAD_POOL_SIZE = 10;
private Phaser phaser;
private PhaserUsage(Phaser phaser) {
this.phaser = phaser;
}
public static void main(String a[]) {
ExecutorService execService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
CompletionService<String> completionService = new ExecutorCompletionService<>(execService);
Phaser phaser = new Phaser(1);
IntStream.range(0, THREAD_POOL_SIZE)
.forEach(nbr -> completionService.submit(new PhaserUsage(phaser)));
execService.shutdown();
try {
while (!execService.isTerminated()) {
String result = completionService.take().get();
System.out.println(format("Result is: %s", result));
}
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
@Override
public String call() {
String threadName = Thread.currentThread().getName();
System.out.println(format("Registering...%s",threadName));
//phaser.register();
System.out.println(format("Arrive and await advance...%s",threadName));
phaser.arriveAndAwaitAdvance(); // await all creation
int a = 0, b = 1;
Random random = new Random();
for (int i = 0; i < random.nextInt(10000000); i++) {
a = a + b;
b = a - b;
}
System.out.println(format("De-registering...%s",threadName));
phaser.arriveAndDeregister();
return format("Thread %s results: a = %s, b = %s", threadName, a, b);
}
}
我正在尝试了解 java 中的 Phaser。我写了一个例子,提前卡在等待其他人到达。
据我了解,phaser 被用作可重用的线程同步(不像 CountdownLatch 不可重用)带有屏障操作的屏障(不像用于共享状态的 Cyclicbarrier,Phaser 不必共享状态在屏障作用中)。如果我错了请纠正我。
因此,在我的示例中,我试图在一定数量的 parties/threads 达到障碍后在每个线程中执行一些随机加法和减法代码。我做错了什么?
import static java.lang.String.*;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.stream.IntStream;
public class PhaserUsage implements Callable<String> {
private static final int THREAD_POOL_SIZE = 10;
private final Phaser phaser;
private PhaserUsage(Phaser phaser) {
this.phaser = phaser;
}
public static void main(String a[]) {
ExecutorService execService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
CompletionService<String> completionService = new ExecutorCompletionService<>(execService);
Phaser phaser = new Phaser(1);
IntStream.range(0, THREAD_POOL_SIZE)
.forEach(nbr -> completionService.submit(new PhaserUsage(phaser)));
execService.shutdown();
try {
while (!execService.isTerminated()) {
String result = completionService.take().get();
System.out.println(format("Result is: %s", result));
}
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
@Override
public String call() {
String threadName = Thread.currentThread().getName();
System.out.println(format("Registering...%s",threadName));
phaser.register();
System.out.println(format("Arrive and await advance...%s",threadName));
phaser.arriveAndAwaitAdvance(); // await all creation
int a = 0, b = 1;
Random random = new Random();
for (int i = 0; i < random.nextInt(10000000); i++) {
a = a + b;
b = a - b;
}
System.out.println(format("De-registering...%s",threadName));
phaser.arriveAndDeregister();
return format("Thread %s results: a = %s, b = %s", threadName, a, b);
}
}
您使用值 1 初始化 Phaser:
Phaser phaser = new Phaser(1);
这意味着您的主线程是您正在等待的线程之一,但它从不调用 arrive()。
由于您的线程数是固定的,您应该使用线程数初始化 Phaser,并删除 register() 调用。
问题是您无法从正在注册的任务中调用phaser.register()
。使用移相器时始终遵循以下两个规则:
- 只有注册的任务才能注册其他任务。这意味着任务不能注册自己。
- 所有已注册的任务必须在结束前注销。 一个好的做法是使用移相器将代码包装在一个
finally
块周围,该块在最后注销(参见示例).
这是您的固定程序(注意创建移相器的行):
import static java.lang.String.*;
import java.util.Random;
import java.util.concurrent.*;
import java.util.stream.IntStream;
public class PhaserUsage implements Callable<String> {
private static final int THREAD_POOL_SIZE = 10;
private final Phaser phaser;
private PhaserUsage(Phaser phaser) {
this.phaser = phaser;
}
public static void main(String a[]) {
ExecutorService execService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
CompletionService<String> completionService = new ExecutorCompletionService<>(execService);
// since we know beforehand how many tasks we have, initialize the
// number of participants in the constructor; other wise register
// *before* launching the task
Phaser phaser = new Phaser(THREAD_POOL_SIZE);
IntStream.range(0, THREAD_POOL_SIZE)
.forEach(nbr -> completionService.submit(new PhaserUsage(phaser)));
execService.shutdown();
try {
while (!execService.isTerminated()) {
String result = completionService.take().get();
System.out.println(format("Result is: %s", result));
}
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
@Override
public String call() {
String threadName = Thread.currentThread().getName();
System.out.println(format("Arrive and await advance...%s",threadName));
phaser.arriveAndAwaitAdvance(); // await all creation
int a = 0, b = 1;
Random random = new Random();
for (int i = 0; i < random.nextInt(10000000); i++) {
a = a + b;
b = a - b;
}
System.out.println(format("De-registering...%s",threadName));
phaser.arriveAndDeregister();
return format("Thread %s results: a = %s, b = %s", threadName, a, b);
}
}
这是没有 phaser.register() 的工作代码:
import static java.lang.String.*;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.stream.IntStream;
public class PhaserUsage implements Callable<String> {
private static final int THREAD_POOL_SIZE = 10;
private Phaser phaser;
private PhaserUsage(Phaser phaser) {
this.phaser = phaser;
}
public static void main(String a[]) {
ExecutorService execService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
CompletionService<String> completionService = new ExecutorCompletionService<>(execService);
Phaser phaser = new Phaser(1);
IntStream.range(0, THREAD_POOL_SIZE)
.forEach(nbr -> completionService.submit(new PhaserUsage(phaser)));
execService.shutdown();
try {
while (!execService.isTerminated()) {
String result = completionService.take().get();
System.out.println(format("Result is: %s", result));
}
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
@Override
public String call() {
String threadName = Thread.currentThread().getName();
System.out.println(format("Registering...%s",threadName));
//phaser.register();
System.out.println(format("Arrive and await advance...%s",threadName));
phaser.arriveAndAwaitAdvance(); // await all creation
int a = 0, b = 1;
Random random = new Random();
for (int i = 0; i < random.nextInt(10000000); i++) {
a = a + b;
b = a - b;
}
System.out.println(format("De-registering...%s",threadName));
phaser.arriveAndDeregister();
return format("Thread %s results: a = %s, b = %s", threadName, a, b);
}
}