Phaser 同步使用
Phaser Synchronization Usage
一般问题
众所周知,Phaser 可用于同步所有任务的开始时间,如 Niklas Schlimm JavaDocs and this blog 中所述。
Niklas 绘制了一幅非常容易理解的同步图像:
|Phaser |Phaser |Phaser |
Task 1 | ------> | ------> | ------> | ...
Task 2 | ------> | ------> | ------> | ...
...
现在假设有一个任务层次结构:
|Phaser |Phaser |Phaser |Phaser |Phaser |Phaser |Phaser | ...
Master | | | ------> | | | ------> | | ...
Task 1.1 | ----------------> | | ----------------> | | ----------> ...
Task 1.2 | ----------------> | | ----------------> | | ----------> ...
... | | | | | | | | ...
Task 2.1 | ------> | | | ------> | | | ------> | ...
Task 2.2 | ------> | | | ------> | | | ------> | ...
... | | | | | | | | ...
Task 3.1 | | ------> | | | ------> | | | ...
Task 3.2 | | ------> | | | ------> | | | ...
... | | | | | | | | ...
所以依赖关系树是这样的:
Master
/-----------/ \-----------\
| Task 2
Task 1 |
| Task 3
\-----------\ /-----------/
Master'
在一般情况下,需要解决一棵依赖关系树(假设在游戏管道中,有些是 AI/游戏逻辑/渲染任务)。幸运的是有一个 "big" 同步点并且树是固定的(但不是各方的数量)。用几个移相器解决是微不足道的。但是可以只使用一个移相器吗?
一个特例
具体我做了一个程序来解决下面的问题
|phasers[0]|phasers[1]|phasers[2]|phasers[0]|phasers[1]|phasers[2]| ...
Task 1 | -------> | | | -------> | | | ...
Task 2 | -------> | | | -------> | | | ...
Task 3 | | | -------> | | | -------> | ...
Task 4 | | -------> | | | -------> | | ...
代码在这里:
public class VolatileTester {
private int a = 0, b = 0; // change to volatile here
private int c = 0;
private final int TEST_COUNT = 100_000;
private int[] testResult = new int[TEST_COUNT];
private static void printResult(int[] result) {
final Map<Integer, Integer> countMap = new HashMap<>();
for (final int n : result) {
countMap.put(n, countMap.getOrDefault(n, 0) + 1);
}
countMap.forEach((n, count) -> {
System.out.format("%d -> %d%n", n, count);
});
}
private void runTask1() {
a = 5;
b = 10;
}
private void runTask2() {
if (b == 10) {
if (a == 5) {
c = 1;
} else {
c = 2;
}
} else {
if (a == 5) {
c = 3;
} else {
c = 4;
}
}
}
private void runTask3() {
// "reset task"
a = 0;
b = 0;
c = 0;
}
private static class PhaserRunner implements Runnable {
private final Phaser loopStartPhaser;
private final Phaser loopEndPhaser;
private final Runnable runnable;
public PhaserRunner(Phaser loopStartPhaser, Phaser loopEndPhaser, Runnable runnable) {
this.loopStartPhaser = loopStartPhaser;
this.loopEndPhaser = loopEndPhaser;
this.runnable = runnable;
}
@Override
public void run() {
while (loopStartPhaser.arriveAndAwaitAdvance() >= 0) {
runnable.run();
loopEndPhaser.arrive();
}
}
}
void runTest() throws InterruptedException {
final Phaser[] phasers = new Phaser[]{new Phaser(3), new Phaser(3), new Phaser(2)};
final Thread[] threads = new Thread[]{
// build tree of dependencies here
new Thread(new PhaserRunner(phasers[0], phasers[1], this::runTask1)),
new Thread(new PhaserRunner(phasers[0], phasers[1], this::runTask2)),
new Thread(new PhaserRunner(phasers[2], phasers[0], this::runTask3))
};
try {
for (Thread thread : threads) {
thread.start();
}
phasers[0].arrive(); // phaser of last round
for (int i = 0; i < TEST_COUNT; i++) {
phasers[1].arriveAndAwaitAdvance();
// Task4 here
testResult[i] = c;
phasers[2].arrive();
}
} finally {
for (Phaser phaser : phasers) {
phaser.forceTermination();
}
}
for (Thread thread : threads) {
thread.join();
}
printResult(testResult);
}
}
可以看到使用了多个Phaser
。保留多个移相器(如上)还是只使用一个大移相器更好?或者Java推荐的其他同步方法?
是的,你可以用一个 Phaser
。 CyclicBarrier
在每个循环后有 Runnable barrierAction
即 运行。 Phaser
支持覆盖 onAdvance
的类似功能。
When the final party for a given phase arrives, an optional action is
performed and the phase advances. These actions are performed by the
party triggering a phase advance, and are arranged by overriding
method onAdvance(int, int), which also controls termination.
Overriding this method is similar to, but more flexible than,
providing a barrier action to a CyclicBarrier.
基本上
Phaser phaser = new Phaser() {
protected boolean onAdvance(int phase, int parties) {
// Signal Master thread to perform its task and wait for it to finish
}
};
您应用程序中的所有任务都以逐步方式进行,这意味着单个移相器就足够了。要求是任务可以循环跳过阶段,例如,对于每三个阶段,给定的任务应该 运行 两个阶段,然后跳过一个阶段(空闲一个阶段)。综上所述,
- 任务应该在每个工作步骤之前执行
arriveAndAwaitAdvance()
。
- 一个任务应该只调用
arriveAndAwaitAdvance()
来跳过一个阶段。
为此,每个任务都可以使用一个布尔数组,在名为 enabled
的示例中,指定它是否在给定的阶段编号启用。
通过使用模块化代数 (enabled[phase % enabled.length]
),我们可以定义循环模式。例如,要指定任务应该 运行 三分之一,我们将 enabled
声明为 new boolean[]{true, false, false}
.
请记住,无论是否执行任何实际工作,任务都必须推进阶段。
我相应地修正了你的例子:
import java.util.concurrent.*;
import java.util.*;
public class VolatileTester {
private int a = 0, b = 0; // change to volatile here
private int c = 0;
private final int TEST_COUNT = 100;
private int[] testResult = new int[TEST_COUNT];
private static void printResult(int[] result) {
final Map<Integer, Integer> countMap = new HashMap<>();
for (final int n : result) {
countMap.put(n, countMap.getOrDefault(n, 0) + 1);
}
countMap.forEach((n, count) -> {
System.out.format("%d -> %d%n", n, count);
});
}
private void runTask1() {
a = 5;
b = 10;
}
private void runTask2() {
if (b == 10) {
if (a == 5) {
c = 1;
} else {
c = 2;
}
} else {
if (a == 5) {
c = 3;
} else {
c = 4;
}
}
}
private void runTask3() {
// "reset task"
a = 0;
b = 0;
c = 0;
}
private static class PhaserRunner implements Runnable {
private final Phaser phaser;
private final Runnable runnable;
private boolean[] enabled;
public PhaserRunner(Phaser phaser, boolean[] enabled, Runnable runnable) {
this.phaser = phaser;
this.runnable = runnable;
this.enabled = enabled;
}
@Override
public void run() {
int phase;
for (;;) {
phase = phaser.arriveAndAwaitAdvance();
if (phase < 0) {
break;
} else if (enabled[phase % enabled.length]) {
System.out.println("I'm running: " + Thread.currentThread());
runnable.run();
}
}
}
}
public void runTest() throws InterruptedException {
final Phaser phaser = new Phaser(4);
final Thread[] threads = new Thread[]{
// build tree of dependencies here
new Thread(new PhaserRunner(phaser, new boolean[]{true, false, false}, this::runTask1), "T1"),
new Thread(new PhaserRunner(phaser, new boolean[]{false, false, true}, this::runTask2), "T2"),
new Thread(new PhaserRunner(phaser, new boolean[]{false, true, false}, this::runTask3), "T3")
};
try {
for (Thread thread : threads) {
thread.start();
}
for (int i = 0; i < TEST_COUNT; i++) {
testResult[i] = c;
phaser.arriveAndAwaitAdvance();
}
} finally {
phaser.forceTermination();
}
for (Thread thread : threads) {
thread.join();
}
printResult(testResult);
}
public static void main(String[]args) throws Exception {
new VolatileTester().runTest();
}
}
一般问题
众所周知,Phaser 可用于同步所有任务的开始时间,如 Niklas Schlimm JavaDocs and this blog 中所述。
Niklas 绘制了一幅非常容易理解的同步图像:
|Phaser |Phaser |Phaser |
Task 1 | ------> | ------> | ------> | ...
Task 2 | ------> | ------> | ------> | ...
...
现在假设有一个任务层次结构:
|Phaser |Phaser |Phaser |Phaser |Phaser |Phaser |Phaser | ...
Master | | | ------> | | | ------> | | ...
Task 1.1 | ----------------> | | ----------------> | | ----------> ...
Task 1.2 | ----------------> | | ----------------> | | ----------> ...
... | | | | | | | | ...
Task 2.1 | ------> | | | ------> | | | ------> | ...
Task 2.2 | ------> | | | ------> | | | ------> | ...
... | | | | | | | | ...
Task 3.1 | | ------> | | | ------> | | | ...
Task 3.2 | | ------> | | | ------> | | | ...
... | | | | | | | | ...
所以依赖关系树是这样的:
Master
/-----------/ \-----------\
| Task 2
Task 1 |
| Task 3
\-----------\ /-----------/
Master'
在一般情况下,需要解决一棵依赖关系树(假设在游戏管道中,有些是 AI/游戏逻辑/渲染任务)。幸运的是有一个 "big" 同步点并且树是固定的(但不是各方的数量)。用几个移相器解决是微不足道的。但是可以只使用一个移相器吗?
一个特例
具体我做了一个程序来解决下面的问题
|phasers[0]|phasers[1]|phasers[2]|phasers[0]|phasers[1]|phasers[2]| ...
Task 1 | -------> | | | -------> | | | ...
Task 2 | -------> | | | -------> | | | ...
Task 3 | | | -------> | | | -------> | ...
Task 4 | | -------> | | | -------> | | ...
代码在这里:
public class VolatileTester {
private int a = 0, b = 0; // change to volatile here
private int c = 0;
private final int TEST_COUNT = 100_000;
private int[] testResult = new int[TEST_COUNT];
private static void printResult(int[] result) {
final Map<Integer, Integer> countMap = new HashMap<>();
for (final int n : result) {
countMap.put(n, countMap.getOrDefault(n, 0) + 1);
}
countMap.forEach((n, count) -> {
System.out.format("%d -> %d%n", n, count);
});
}
private void runTask1() {
a = 5;
b = 10;
}
private void runTask2() {
if (b == 10) {
if (a == 5) {
c = 1;
} else {
c = 2;
}
} else {
if (a == 5) {
c = 3;
} else {
c = 4;
}
}
}
private void runTask3() {
// "reset task"
a = 0;
b = 0;
c = 0;
}
private static class PhaserRunner implements Runnable {
private final Phaser loopStartPhaser;
private final Phaser loopEndPhaser;
private final Runnable runnable;
public PhaserRunner(Phaser loopStartPhaser, Phaser loopEndPhaser, Runnable runnable) {
this.loopStartPhaser = loopStartPhaser;
this.loopEndPhaser = loopEndPhaser;
this.runnable = runnable;
}
@Override
public void run() {
while (loopStartPhaser.arriveAndAwaitAdvance() >= 0) {
runnable.run();
loopEndPhaser.arrive();
}
}
}
void runTest() throws InterruptedException {
final Phaser[] phasers = new Phaser[]{new Phaser(3), new Phaser(3), new Phaser(2)};
final Thread[] threads = new Thread[]{
// build tree of dependencies here
new Thread(new PhaserRunner(phasers[0], phasers[1], this::runTask1)),
new Thread(new PhaserRunner(phasers[0], phasers[1], this::runTask2)),
new Thread(new PhaserRunner(phasers[2], phasers[0], this::runTask3))
};
try {
for (Thread thread : threads) {
thread.start();
}
phasers[0].arrive(); // phaser of last round
for (int i = 0; i < TEST_COUNT; i++) {
phasers[1].arriveAndAwaitAdvance();
// Task4 here
testResult[i] = c;
phasers[2].arrive();
}
} finally {
for (Phaser phaser : phasers) {
phaser.forceTermination();
}
}
for (Thread thread : threads) {
thread.join();
}
printResult(testResult);
}
}
可以看到使用了多个Phaser
。保留多个移相器(如上)还是只使用一个大移相器更好?或者Java推荐的其他同步方法?
是的,你可以用一个 Phaser
。 CyclicBarrier
在每个循环后有 Runnable barrierAction
即 运行。 Phaser
支持覆盖 onAdvance
的类似功能。
When the final party for a given phase arrives, an optional action is performed and the phase advances. These actions are performed by the party triggering a phase advance, and are arranged by overriding method onAdvance(int, int), which also controls termination. Overriding this method is similar to, but more flexible than, providing a barrier action to a CyclicBarrier.
基本上
Phaser phaser = new Phaser() {
protected boolean onAdvance(int phase, int parties) {
// Signal Master thread to perform its task and wait for it to finish
}
};
您应用程序中的所有任务都以逐步方式进行,这意味着单个移相器就足够了。要求是任务可以循环跳过阶段,例如,对于每三个阶段,给定的任务应该 运行 两个阶段,然后跳过一个阶段(空闲一个阶段)。综上所述,
- 任务应该在每个工作步骤之前执行
arriveAndAwaitAdvance()
。 - 一个任务应该只调用
arriveAndAwaitAdvance()
来跳过一个阶段。
为此,每个任务都可以使用一个布尔数组,在名为 enabled
的示例中,指定它是否在给定的阶段编号启用。
通过使用模块化代数 (enabled[phase % enabled.length]
),我们可以定义循环模式。例如,要指定任务应该 运行 三分之一,我们将 enabled
声明为 new boolean[]{true, false, false}
.
请记住,无论是否执行任何实际工作,任务都必须推进阶段。
我相应地修正了你的例子:
import java.util.concurrent.*;
import java.util.*;
public class VolatileTester {
private int a = 0, b = 0; // change to volatile here
private int c = 0;
private final int TEST_COUNT = 100;
private int[] testResult = new int[TEST_COUNT];
private static void printResult(int[] result) {
final Map<Integer, Integer> countMap = new HashMap<>();
for (final int n : result) {
countMap.put(n, countMap.getOrDefault(n, 0) + 1);
}
countMap.forEach((n, count) -> {
System.out.format("%d -> %d%n", n, count);
});
}
private void runTask1() {
a = 5;
b = 10;
}
private void runTask2() {
if (b == 10) {
if (a == 5) {
c = 1;
} else {
c = 2;
}
} else {
if (a == 5) {
c = 3;
} else {
c = 4;
}
}
}
private void runTask3() {
// "reset task"
a = 0;
b = 0;
c = 0;
}
private static class PhaserRunner implements Runnable {
private final Phaser phaser;
private final Runnable runnable;
private boolean[] enabled;
public PhaserRunner(Phaser phaser, boolean[] enabled, Runnable runnable) {
this.phaser = phaser;
this.runnable = runnable;
this.enabled = enabled;
}
@Override
public void run() {
int phase;
for (;;) {
phase = phaser.arriveAndAwaitAdvance();
if (phase < 0) {
break;
} else if (enabled[phase % enabled.length]) {
System.out.println("I'm running: " + Thread.currentThread());
runnable.run();
}
}
}
}
public void runTest() throws InterruptedException {
final Phaser phaser = new Phaser(4);
final Thread[] threads = new Thread[]{
// build tree of dependencies here
new Thread(new PhaserRunner(phaser, new boolean[]{true, false, false}, this::runTask1), "T1"),
new Thread(new PhaserRunner(phaser, new boolean[]{false, false, true}, this::runTask2), "T2"),
new Thread(new PhaserRunner(phaser, new boolean[]{false, true, false}, this::runTask3), "T3")
};
try {
for (Thread thread : threads) {
thread.start();
}
for (int i = 0; i < TEST_COUNT; i++) {
testResult[i] = c;
phaser.arriveAndAwaitAdvance();
}
} finally {
phaser.forceTermination();
}
for (Thread thread : threads) {
thread.join();
}
printResult(testResult);
}
public static void main(String[]args) throws Exception {
new VolatileTester().runTest();
}
}