我怎样才能重写这个主线程 - 工作线程同步
How can I rewrite this main thread - worker threads synchronization
我有一个程序是这样的
public class Test implements Runnable
{
public int local_counter
public static int global_counter
// Barrier waits for as many threads as we launch + main thread
public static CyclicBarrier thread_barrier = new CyclicBarrier (n_threads + 1);
/* Constructors etc. */
public void run()
{
for (int i=0; i<100; i++)
{
thread_barrier.await();
local_counter = 0;
for(int j=0 ; j = 20 ; j++)
local_counter++;
thread_barrier.await();
}
}
public void main()
{
/* Create and launch some threads, stored on thread_array */
for(int i=0 ; i<100 ; i++)
{
thread_barrier.await();
thread_barrier.await();
for (int t=1; t<thread_array.length; t++)
{
global_counter += thread_array[t].local_counter;
}
}
}
}
基本上,我有几个线程有自己的本地计数器,我正在这样做(在一个循环中)
|----| | |----|
|main| | |pool|
|----| | |----|
|
-------------------------------------------------------
barrier (get local counters before they're overwritten)
-------------------------------------------------------
|
| 1. reset local counter
| 2. do some computations
| involving local counter
|
-------------------------------------------------------
barrier (synchronize all threads)
-------------------------------------------------------
|
1. update global counter |
using each thread's |
local counter |
这应该一切都很好,但事实证明这并不能很好地扩展。在 16 个物理节点集群上,6-8 个线程后的加速可以忽略不计,因此我必须摆脱其中一个等待。我试过使用 CyclicBarrier,它的扩展性非常好,Semaphores,它做的一样多,还有一个自定义库 (jbarrier),它工作得很好,直到线程多于物理内核,此时它的性能比顺序版本差。但是我无法想出一种不停止所有线程两次的方法。
编辑:虽然我感谢您对我的程序中任何其他可能的瓶颈的所有和任何见解,但我正在寻找有关此特定问题的答案。如果需要我可以提供更具体的例子
嗯。我不确定是否完全理解,但我认为您的主要问题是您尝试过多地重复使用一组预定义的线程。你应该让 Java 处理这个(这就是 executors/fork-join 池的用途)。为了解决您的问题,split/process/merge(或map/reduce)似乎适合我。自 java 8 以来,这是一种非常简单的实现方法(感谢 stream/fork-join pool/completable 未来的 API)。我在这里提出 2 个备选方案:
Java 8 流
对我来说,你的问题看起来可以恢复为 map/reduce 问题。并且如果你可以使用 Java 8 个流,你可以将性能问题委托给它。我会做什么:
1. 创建一个并行流,包含您的处理输入(您甚至可以使用方法动态生成输入)。请注意,您可以实现自己的 Spliterator,以完全控制输入的浏览和拆分(网格上的单元格?)。
2.使用map处理输入。
3. 使用reduce 方法合并所有先前计算的结果。
简单示例(基于您的示例):
// Create a pool with wanted number of threads
final ForkJoinPool pool = new ForkJoinPool(4);
// We give the entire procedure to the thread pool
final int result = pool.submit(() -> {
// Generate a hundred counters, initialized on 0 value
return IntStream.generate(() -> 0)
.limit(100)
// Specify we want it processed in a parallel way
.parallel()
// The map will register processing method
.map(in -> incrementMultipleTimes(in, 20))
// We ask the merge of processing results
.reduce((first, second) -> first + second)
.orElseThrow(() -> new IllegalArgumentException("Empty dataset"));
})
// Wait for the overall result
.get();
System.out.println("RESULT: " + result);
pool.shutdown();
pool.awaitTermination(10, TimeUnit.SECONDS);
一些注意事项:
1. 默认情况下,并行流在 JVM Common fork-join pool 上执行任务,可以限制执行器的数量。但是有一些方法可以使用你自己的池:see this answer.
2.如果配置好,我认为这是最好的方法,因为并行逻辑已经被JDK开发者自己处理了。
移相器
如果您不能使用 java8 功能(或者我误解了您的问题,或者您真的想自己处理低级管理),我可以给您的最后一条线索是:Phaser 对象。
正如文档所述,它是循环屏障和倒计时锁存器的可重复使用组合。我已经多次使用它。它使用起来很复杂,但也非常强大。它可以用作循环屏障,所以我认为它适合你的情况。
一些修复:您对线程的迭代应该是 for(int t=0;...) 假设您的线程数组 [0] 应该参与全局计数器总和。我们可以猜测是一个Test数组,而不是线程。 local_counter 应该是可变的,否则你可能看不到跨测试线程和主线程的真实值。
好的,现在,您有一个正确的 2 阶段循环,afaict。在每个循环中带有新倒计时锁存器的移相器或 1 循环屏障等任何其他东西都只是同一主题的变体:让多个线程同意让主线程恢复,并让主线程一次性恢复多个线程。
更精简的实现可能涉及重入锁、到达测试线程的计数器、恢复所有测试线程测试的条件以及恢复主线程的条件。 --count==0 时到达的测试线程应该发出主要恢复条件的信号。所有测试线程都等待测试恢复条件。 main 应将计数器重置为 N 并在测试恢复条件下发出 signalAll,然后在 main 条件下等待。线程(测试和主线程)每个循环只等待一次。
最后,如果最终目标是由任何线程更新的总和,您应该查看 LongAdder(如果不是 AtomicLong)以同时执行对 long 的加法,而不必停止所有线程(它们会战斗并添加,而不是涉及主体)。
否则,您可以让线程将它们的 material 传送到主线程读取的阻塞队列。这样做的方式太多了;我很难理解为什么要挂起所有线程来收集数据。那是 all.The 问题过于简单化了,我们没有足够的约束来证明你在做什么。
不用担心 CyclicBarrier,它是通过可重入锁、计数器和触发所有等待线程的 signalAll() 的条件来实现的。这是严格编码的,afaict。如果你想要无锁版本,你将面临太多繁忙的自旋循环,浪费 cpu 时间,尤其是当你担心线程数多于内核数时的扩展。
与此同时,您是否有可能实际上有 8 个看起来像 16 cpu 的超线程内核?
清理后,您的代码如下所示:
package tests;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.stream.Stream;
public class Test implements Runnable {
static final int n_threads = 8;
static final long LOOPS = 10000;
public static int global_counter;
public static CyclicBarrier thread_barrier = new CyclicBarrier(n_threads + 1);
public volatile int local_counter;
@Override
public void run() {
try {
runImpl();
} catch (InterruptedException | BrokenBarrierException e) {
//
}
}
void runImpl() throws InterruptedException, BrokenBarrierException {
for (int i = 0; i < LOOPS; i++) {
thread_barrier.await();
local_counter = 0;
for (int j=0; j<20; j++)
local_counter++;
thread_barrier.await();
}
}
public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
Test[] ra = new Test[n_threads];
Thread[] ta = new Thread[n_threads];
for(int i=0; i<n_threads; i++)
(ta[i] = new Thread(ra[i]=new Test()).start();
long nanos = System.nanoTime();
for (int i = 0; i < LOOPS; i++) {
thread_barrier.await();
thread_barrier.await();
for (int t=0; t<ra.length; t++) {
global_counter += ra[t].local_counter;
}
}
System.out.println(global_counter+", "+1e-6*(System.nanoTime()-nanos)+" ms");
Stream.of(ta).forEach(t -> t.interrupt());
}
}
我的带 1 个锁的版本如下所示:
package tests;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Stream;
public class TwoPhaseCycle implements Runnable {
static final boolean DEBUG = false;
static final int N = 8;
static final int LOOPS = 10000;
static ReentrantLock lock = new ReentrantLock();
static Condition testResume = lock.newCondition();
static volatile long cycle = -1;
static Condition mainResume = lock.newCondition();
static volatile int testLeft = 0;
static void p(Object msg) {
System.out.println(Thread.currentThread().getName()+"] "+msg);
}
//-----
volatile int local_counter;
@Override
public void run() {
try {
runImpl();
} catch (InterruptedException e) {
p("interrupted; ending.");
}
}
public void runImpl() throws InterruptedException {
lock.lock();
try {
if(DEBUG) p("waiting for 1st testResumed");
while(cycle<0) {
testResume.await();
}
} finally {
lock.unlock();
}
long localCycle = 0;//for (int i = 0; i < LOOPS; i++) {
while(true) {
if(DEBUG) p("working");
local_counter = 0;
for (int j = 0; j<20; j++)
local_counter++;
localCycle++;
lock.lock();
try {
if(DEBUG) p("done");
if(--testLeft <=0)
mainResume.signalAll(); //could have been just .signal() since only main is waiting, but safety first.
if(DEBUG) p("waiting for cycle "+localCycle+" testResumed");
while(cycle < localCycle) {
testResume.await();
}
} finally {
lock.unlock();
}
}
}
public static void main(String[] args) throws InterruptedException {
TwoPhaseCycle[] ra = new TwoPhaseCycle[N];
Thread[] ta = new Thread[N];
for(int i=0; i<N; i++)
(ta[i] = new Thread(ra[i]=new TwoPhaseCycle(), "\t\t\t\t\t\t\t\t".substring(0, i%8)+"\tT"+i)).start();
long nanos = System.nanoTime();
int global_counter = 0;
for (int i=0; i<LOOPS; i++) {
lock.lock();
try {
if(DEBUG) p("gathering");
for (int t=0; t<ra.length; t++) {
global_counter += ra[t].local_counter;
}
testLeft = N;
cycle = i;
if(DEBUG) p("resuming cycle "+cycle+" tests");
testResume.signalAll();
if(DEBUG) p("waiting for main resume");
while(testLeft>0) {
mainResume.await();
}
} finally {
lock.unlock();
}
}
System.out.println(global_counter+", "+1e-6*(System.nanoTime()-nanos)+" ms");
p(global_counter);
Stream.of(ta).forEach(t -> t.interrupt());
}
}
当然,这绝不是一个稳定的微基准,但趋势表明它更快。希望你喜欢。 (我放弃了一些最喜欢的调试技巧,值得将调试变为现实......)
您真的可以考虑遵循 (CyclicBarrier
) documentation:
中的 'official' 示例
class Solver {
final int N;
final float[][] data;
final CyclicBarrier barrier;
class Worker implements Runnable {
int myRow;
Worker(int row) { myRow = row; }
public void run() {
while (!done()) {
processRow(myRow);
try {
barrier.await();
} catch (InterruptedException ex) {
return;
} catch (BrokenBarrierException ex) {
return;
}
}
}
}
public Solver(float[][] matrix) {
data = matrix;
N = matrix.length;
barrier = new CyclicBarrier(N,
new Runnable() {
public void run() {
mergeRows(...);
}
});
for (int i = 0; i < N; ++i)
new Thread(new Worker(i)).start();
waitUntilDone();
}
}
你的情况
processRow()
会生成部分生成(任务被分成 N 份,工人可以在初始化时获得他们的编号,或者只使用 return 编辑的编号 barrier.await()
(在这种情况下,工作人员应该从等待开始)
mergeRows()
,在构造时传递给屏障的匿名Runnable
,是整代准备好的地方,你可以把它打印在屏幕上什么的(也许交换一些'currentGen' 和 'nextGen' 缓冲区)。当此方法 returns(或更准确地说是 run()
)时,barrier.await()
也会在 worker 中调用 return 并开始计算下一代(或不计算,请参阅下一个要点)
done()
决定线程何时退出(而不是产生新一代)。它可以是 'real' 方法,但 static volatile boolean
变量也可以工作
waitUntilDone()
可能是所有线程的循环,join()
-ing 它们。或者只是等待程序应该退出时可以触发的东西(来自 'mergeRows')
我有一个程序是这样的
public class Test implements Runnable
{
public int local_counter
public static int global_counter
// Barrier waits for as many threads as we launch + main thread
public static CyclicBarrier thread_barrier = new CyclicBarrier (n_threads + 1);
/* Constructors etc. */
public void run()
{
for (int i=0; i<100; i++)
{
thread_barrier.await();
local_counter = 0;
for(int j=0 ; j = 20 ; j++)
local_counter++;
thread_barrier.await();
}
}
public void main()
{
/* Create and launch some threads, stored on thread_array */
for(int i=0 ; i<100 ; i++)
{
thread_barrier.await();
thread_barrier.await();
for (int t=1; t<thread_array.length; t++)
{
global_counter += thread_array[t].local_counter;
}
}
}
}
基本上,我有几个线程有自己的本地计数器,我正在这样做(在一个循环中)
|----| | |----|
|main| | |pool|
|----| | |----|
|
-------------------------------------------------------
barrier (get local counters before they're overwritten)
-------------------------------------------------------
|
| 1. reset local counter
| 2. do some computations
| involving local counter
|
-------------------------------------------------------
barrier (synchronize all threads)
-------------------------------------------------------
|
1. update global counter |
using each thread's |
local counter |
这应该一切都很好,但事实证明这并不能很好地扩展。在 16 个物理节点集群上,6-8 个线程后的加速可以忽略不计,因此我必须摆脱其中一个等待。我试过使用 CyclicBarrier,它的扩展性非常好,Semaphores,它做的一样多,还有一个自定义库 (jbarrier),它工作得很好,直到线程多于物理内核,此时它的性能比顺序版本差。但是我无法想出一种不停止所有线程两次的方法。
编辑:虽然我感谢您对我的程序中任何其他可能的瓶颈的所有和任何见解,但我正在寻找有关此特定问题的答案。如果需要我可以提供更具体的例子
嗯。我不确定是否完全理解,但我认为您的主要问题是您尝试过多地重复使用一组预定义的线程。你应该让 Java 处理这个(这就是 executors/fork-join 池的用途)。为了解决您的问题,split/process/merge(或map/reduce)似乎适合我。自 java 8 以来,这是一种非常简单的实现方法(感谢 stream/fork-join pool/completable 未来的 API)。我在这里提出 2 个备选方案:
Java 8 流
对我来说,你的问题看起来可以恢复为 map/reduce 问题。并且如果你可以使用 Java 8 个流,你可以将性能问题委托给它。我会做什么:
1. 创建一个并行流,包含您的处理输入(您甚至可以使用方法动态生成输入)。请注意,您可以实现自己的 Spliterator,以完全控制输入的浏览和拆分(网格上的单元格?)。
2.使用map处理输入。
3. 使用reduce 方法合并所有先前计算的结果。
简单示例(基于您的示例):
// Create a pool with wanted number of threads
final ForkJoinPool pool = new ForkJoinPool(4);
// We give the entire procedure to the thread pool
final int result = pool.submit(() -> {
// Generate a hundred counters, initialized on 0 value
return IntStream.generate(() -> 0)
.limit(100)
// Specify we want it processed in a parallel way
.parallel()
// The map will register processing method
.map(in -> incrementMultipleTimes(in, 20))
// We ask the merge of processing results
.reduce((first, second) -> first + second)
.orElseThrow(() -> new IllegalArgumentException("Empty dataset"));
})
// Wait for the overall result
.get();
System.out.println("RESULT: " + result);
pool.shutdown();
pool.awaitTermination(10, TimeUnit.SECONDS);
一些注意事项:
1. 默认情况下,并行流在 JVM Common fork-join pool 上执行任务,可以限制执行器的数量。但是有一些方法可以使用你自己的池:see this answer.
2.如果配置好,我认为这是最好的方法,因为并行逻辑已经被JDK开发者自己处理了。
移相器
如果您不能使用 java8 功能(或者我误解了您的问题,或者您真的想自己处理低级管理),我可以给您的最后一条线索是:Phaser 对象。 正如文档所述,它是循环屏障和倒计时锁存器的可重复使用组合。我已经多次使用它。它使用起来很复杂,但也非常强大。它可以用作循环屏障,所以我认为它适合你的情况。
一些修复:您对线程的迭代应该是 for(int t=0;...) 假设您的线程数组 [0] 应该参与全局计数器总和。我们可以猜测是一个Test数组,而不是线程。 local_counter 应该是可变的,否则你可能看不到跨测试线程和主线程的真实值。
好的,现在,您有一个正确的 2 阶段循环,afaict。在每个循环中带有新倒计时锁存器的移相器或 1 循环屏障等任何其他东西都只是同一主题的变体:让多个线程同意让主线程恢复,并让主线程一次性恢复多个线程。
更精简的实现可能涉及重入锁、到达测试线程的计数器、恢复所有测试线程测试的条件以及恢复主线程的条件。 --count==0 时到达的测试线程应该发出主要恢复条件的信号。所有测试线程都等待测试恢复条件。 main 应将计数器重置为 N 并在测试恢复条件下发出 signalAll,然后在 main 条件下等待。线程(测试和主线程)每个循环只等待一次。
最后,如果最终目标是由任何线程更新的总和,您应该查看 LongAdder(如果不是 AtomicLong)以同时执行对 long 的加法,而不必停止所有线程(它们会战斗并添加,而不是涉及主体)。
否则,您可以让线程将它们的 material 传送到主线程读取的阻塞队列。这样做的方式太多了;我很难理解为什么要挂起所有线程来收集数据。那是 all.The 问题过于简单化了,我们没有足够的约束来证明你在做什么。
不用担心 CyclicBarrier,它是通过可重入锁、计数器和触发所有等待线程的 signalAll() 的条件来实现的。这是严格编码的,afaict。如果你想要无锁版本,你将面临太多繁忙的自旋循环,浪费 cpu 时间,尤其是当你担心线程数多于内核数时的扩展。
与此同时,您是否有可能实际上有 8 个看起来像 16 cpu 的超线程内核?
清理后,您的代码如下所示:
package tests;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.stream.Stream;
public class Test implements Runnable {
static final int n_threads = 8;
static final long LOOPS = 10000;
public static int global_counter;
public static CyclicBarrier thread_barrier = new CyclicBarrier(n_threads + 1);
public volatile int local_counter;
@Override
public void run() {
try {
runImpl();
} catch (InterruptedException | BrokenBarrierException e) {
//
}
}
void runImpl() throws InterruptedException, BrokenBarrierException {
for (int i = 0; i < LOOPS; i++) {
thread_barrier.await();
local_counter = 0;
for (int j=0; j<20; j++)
local_counter++;
thread_barrier.await();
}
}
public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
Test[] ra = new Test[n_threads];
Thread[] ta = new Thread[n_threads];
for(int i=0; i<n_threads; i++)
(ta[i] = new Thread(ra[i]=new Test()).start();
long nanos = System.nanoTime();
for (int i = 0; i < LOOPS; i++) {
thread_barrier.await();
thread_barrier.await();
for (int t=0; t<ra.length; t++) {
global_counter += ra[t].local_counter;
}
}
System.out.println(global_counter+", "+1e-6*(System.nanoTime()-nanos)+" ms");
Stream.of(ta).forEach(t -> t.interrupt());
}
}
我的带 1 个锁的版本如下所示:
package tests;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Stream;
public class TwoPhaseCycle implements Runnable {
static final boolean DEBUG = false;
static final int N = 8;
static final int LOOPS = 10000;
static ReentrantLock lock = new ReentrantLock();
static Condition testResume = lock.newCondition();
static volatile long cycle = -1;
static Condition mainResume = lock.newCondition();
static volatile int testLeft = 0;
static void p(Object msg) {
System.out.println(Thread.currentThread().getName()+"] "+msg);
}
//-----
volatile int local_counter;
@Override
public void run() {
try {
runImpl();
} catch (InterruptedException e) {
p("interrupted; ending.");
}
}
public void runImpl() throws InterruptedException {
lock.lock();
try {
if(DEBUG) p("waiting for 1st testResumed");
while(cycle<0) {
testResume.await();
}
} finally {
lock.unlock();
}
long localCycle = 0;//for (int i = 0; i < LOOPS; i++) {
while(true) {
if(DEBUG) p("working");
local_counter = 0;
for (int j = 0; j<20; j++)
local_counter++;
localCycle++;
lock.lock();
try {
if(DEBUG) p("done");
if(--testLeft <=0)
mainResume.signalAll(); //could have been just .signal() since only main is waiting, but safety first.
if(DEBUG) p("waiting for cycle "+localCycle+" testResumed");
while(cycle < localCycle) {
testResume.await();
}
} finally {
lock.unlock();
}
}
}
public static void main(String[] args) throws InterruptedException {
TwoPhaseCycle[] ra = new TwoPhaseCycle[N];
Thread[] ta = new Thread[N];
for(int i=0; i<N; i++)
(ta[i] = new Thread(ra[i]=new TwoPhaseCycle(), "\t\t\t\t\t\t\t\t".substring(0, i%8)+"\tT"+i)).start();
long nanos = System.nanoTime();
int global_counter = 0;
for (int i=0; i<LOOPS; i++) {
lock.lock();
try {
if(DEBUG) p("gathering");
for (int t=0; t<ra.length; t++) {
global_counter += ra[t].local_counter;
}
testLeft = N;
cycle = i;
if(DEBUG) p("resuming cycle "+cycle+" tests");
testResume.signalAll();
if(DEBUG) p("waiting for main resume");
while(testLeft>0) {
mainResume.await();
}
} finally {
lock.unlock();
}
}
System.out.println(global_counter+", "+1e-6*(System.nanoTime()-nanos)+" ms");
p(global_counter);
Stream.of(ta).forEach(t -> t.interrupt());
}
}
当然,这绝不是一个稳定的微基准,但趋势表明它更快。希望你喜欢。 (我放弃了一些最喜欢的调试技巧,值得将调试变为现实......)
您真的可以考虑遵循 (CyclicBarrier
) documentation:
class Solver {
final int N;
final float[][] data;
final CyclicBarrier barrier;
class Worker implements Runnable {
int myRow;
Worker(int row) { myRow = row; }
public void run() {
while (!done()) {
processRow(myRow);
try {
barrier.await();
} catch (InterruptedException ex) {
return;
} catch (BrokenBarrierException ex) {
return;
}
}
}
}
public Solver(float[][] matrix) {
data = matrix;
N = matrix.length;
barrier = new CyclicBarrier(N,
new Runnable() {
public void run() {
mergeRows(...);
}
});
for (int i = 0; i < N; ++i)
new Thread(new Worker(i)).start();
waitUntilDone();
}
}
你的情况
processRow()
会生成部分生成(任务被分成 N 份,工人可以在初始化时获得他们的编号,或者只使用 return 编辑的编号barrier.await()
(在这种情况下,工作人员应该从等待开始)mergeRows()
,在构造时传递给屏障的匿名Runnable
,是整代准备好的地方,你可以把它打印在屏幕上什么的(也许交换一些'currentGen' 和 'nextGen' 缓冲区)。当此方法 returns(或更准确地说是run()
)时,barrier.await()
也会在 worker 中调用 return 并开始计算下一代(或不计算,请参阅下一个要点)done()
决定线程何时退出(而不是产生新一代)。它可以是 'real' 方法,但static volatile boolean
变量也可以工作waitUntilDone()
可能是所有线程的循环,join()
-ing 它们。或者只是等待程序应该退出时可以触发的东西(来自 'mergeRows')