Java 中的线程和计算
threads in Java and computation
我是 java 的新手,我正在尝试编写一个带有两个参数的程序:
- 我们必须对素数求和的数字
- 我们必须执行此操作的线程数
所以我使用了一个名为 Eratosthene 的方法,它存储一个 boolean 数组,如果一个数字是素数,我们将其标记为真,并且之后我们将这个数字的所有倍数标记为假。
我尝试将我的数组分成每个线程的子数组,并在每个子数组中进行操作,最后将所有子数组的结果相加。
但我不知道我哪里做错了:有时程序没有给出好的结果。
所以这是我的代码:
SumPrime.java
import java.util.*;
import java.util.concurrent.*;
public class SumPrimes {
private boolean array[];
private int numberOfWorkers;
private Semaphore allFinished;
public SumPrimes(int num, int threads){
array = new boolean[num];
numberOfWorkers = threads;
for (int i = 2; i < num; i++)
array[i] = true;
}
private class SumParallel extends Thread {
int min;
int max;
long sum;
SumParallel(int min, int max){
this.min = min;
this.max = max;
sum = 0;
}
public void run() {
for (int i = min; i < max; i++) {
if (array[i]) {
for (int j = min; j*i < array.length; j++) {
array[i*j] = false;
}
sum += i;
}
}
allFinished.release();
}
public long getSum() {
return sum;
}
}
public void SumInParallel() {
allFinished = new Semaphore(0);
List<SumParallel> workers = new ArrayList<SumParallel>();
int lengthOfOneWorker = array.length / numberOfWorkers;
for (int i = 0; i < numberOfWorkers; i++) {
int start = i * lengthOfOneWorker;
int end = (i+1) * lengthOfOneWorker;
if (i == numberOfWorkers - 1)
end = array.length;
SumParallel worker = new SumParallel(start, end);
workers.add(worker);
worker.start();
}
try {
allFinished.acquire(numberOfWorkers);
} catch (InterruptedException ignored) {}
int sum = 0;
for (SumParallel w : workers){
sum += w.getSum();
}
System.out.println("The sum of prime numbers is: " + sum);
}
public static void main(String[] args) {
int limitNum = Integer.parseInt(args[0]);
int threadNum = Integer.parseInt(args[1]);
SumPrimes sum_primes = new SumPrimes(limitNum, threadNum);
sum_primes.SumInParallel();
}
}
你可以运行这样的程序:
java SumPrimes 1000 3
我愿意接受任何改进我的代码的建议。
您需要完全重新考虑线程的逻辑。
各个线程无法访问array
的相同范围,例如如果线程有 min = 100
和 max = 150
,则只能使用 100 到 149(含)范围内的元素 and/or 更改。
您的代码:
for (int i = min; i < max; i++) {
if (array[i]) {
for (int j = min; j*i < array.length; j++) {
array[i*j] = false;
开始 i = 100, j = 100
,这使得 i*j = 10000
。如果数组真的那么大,这意味着您访问 array[10000]
,但那是 不允许的 。当然,数组并没有那么大,所以 代码什么都不做。
啊,你说,第一个线程有 min = 0
和 max = 50
,所以它会从索引 0 (0*0) 到 2401 (49*49) 更改值,并且由于数组比那个小,它会更新整个数组,但那是不允许的.
现在,再考虑一下。
如果范围是min = 100, max = 150
,那么你需要首先清除该范围内的所有偶数,然后是所有能被3整除的数字,然后是所有...等等,但仅限于那个范围.
我会让你重新思考逻辑。
更新
要将 Sieve of Eratosthenes 应用于某个范围,我们需要不超过该范围最大值的平方根的素数。
如果范围是min = 150, max = 200
,那么maxPrime = sqrt(200) = 14
,所以我们需要从2到14(含)的素数,那么我们可以更新范围150-199。
假设我们首先更新 array
以找到 2-14 范围内的所有素数,我们可以使用它来迭代目标范围内这些素数的 倍数 (150-199)。为此,我们需要从 >= min 的素数的最低倍数开始,因此我们需要将 min
向上舍入到 prime
.
的下一个倍数
使用整数数学,round up to next multiple,我们计算:
lower = (min + prime - 1) / prime * prime
这给了我们主要逻辑:
maxPrime = (int) Math.sqrt(max);
for (int prime = 2; prime <= maxPrime; prime++) {
if (array[prime]) {
int lower = (min + prime - 1) / prime * prime;
for (int i = lower; i < max; i += prime)
array[i] = false
我们还应该让每个线程负责首先设置范围内的所有布尔值,这样这部分也变成多线程的。
主逻辑现在必须首先在主线程中找到 2-sqrt(N) 范围内的素数,然后在线程之间拆分剩余范围。
这是我的尝试:
public static long sumPrimes(int n, int threadCount) {
// Find and sum the "seed" primes needed by the threads
int maxSeedPrime = (int) Math.sqrt(n + 2); // extra to be sure no "float errors" occur
boolean[] seedPrime = new boolean[maxSeedPrime + 1];
AtomicLong totalSum = new AtomicLong(sumPrimes(seedPrime, seedPrime, 0, maxSeedPrime));
// Split remaining into ranges and start threads to calculate sums
Thread[] threads = new Thread[threadCount];
for (int t = 0, rangeMin = maxSeedPrime + 1; t < threadCount; t++) {
int min = rangeMin;
int max = min + (n - min + 1) / (threadCount - t) - 1;
threads[t] = new Thread(() ->
totalSum.addAndGet(sumPrimes(seedPrime, new boolean[max - min + 1], min, max))
);
threads[t].start();
rangeMin = max + 1;
}
// Wait for threads to end
for (int t = 0; t < threadCount; t++) {
try {
threads[t].join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
// Return the calculated sum
return totalSum.get();
}
private static long sumPrimes(boolean[] seedPrime, boolean[] rangePrime, int min, int max/*inclusive*/) {
// Initialize range
for (int i = Math.max(min, 2); i <= max; i++) {
rangePrime[i - min] = true;
}
// Mark non-primes in range
int maxPrime = (int) Math.sqrt(max + 1); // extra to be sure no "float errors" occur
for (int prime = 2; prime <= maxPrime; prime++) {
if (seedPrime[prime]) {
int minMultiple = (min + prime - 1) / prime * prime;
if (minMultiple <= prime)
minMultiple = prime * 2;
for (int multiple = minMultiple; multiple <= max ; multiple += prime) {
rangePrime[multiple - min] = false;
}
}
}
// Sum the primes
long sum = 0;
for (int prime = min; prime <= max; prime++) {
if (rangePrime[prime - min]) {
sum += prime;
}
}
return sum;
}
测试
public static void main(String[] args) {
test(1000, 3);
test(100000000, 4);
}
public static void test(int n, int threadCount) {
long start = System.nanoTime();
long sum = sumPrimes(n, threadCount);
long end = System.nanoTime();
System.out.printf("sumPrimes(%,d, %d) = %,d (%.9f seconds)%n",
n, threadCount, sum, (end - start) / 1e9);
}
输出
sumPrimes(1,000, 3) = 76,127 (0.005595600 seconds)
sumPrimes(100,000,000, 4) = 279,209,790,387,276 (0.686881000 seconds)
更新 2
上面的代码使用了 lambda 表达式:
threads[t] = new Thread(() ->
totalSum.addAndGet(sumPrimes(seedPrime, new boolean[max - min + 1], min, max))
);
如果您不想使用 lambda 表达式,例如所以它将在 Java 上 运行 7,您可以使用匿名 class 代替:
threads[t] = new Thread() {
@Override
public void run() {
totalSum.addAndGet(sumPrimes(seedPrime, new boolean[max - min + 1], min, max));
}
};
我认为你的问题是这段代码:
public void run() {
for (int i = min; i < max; i++) {
if (array[i]) {
for (int j = min; j*i < array.length; j++) {
array[i*j] = false;
}
sum += i;
}
}
allFinished.release();
}
想象一下您后面的线程之一,在列表末尾附近工作。第一项不是素数,但确定它不是素数的工作尚未完成——它来自不同的线程,而该线程才刚刚开始。所以你认为这个值是质数(它还没有标记为非质数)并相应地工作。
如果你提供了一个产生不好结果的例子,我们可以很容易地测试这个理论。
多线程通常也意味着你想做的更快。因此,首先可能值得回顾一下您的初始设计并使其在单线程上更快。那就是要击败的目标。此外,为了比较 运行 时间而不编写精炼的基准测试,您需要 运行 长度为 "visible" 的时间。
在我的机器上,"setting"
int max = 1_000_000_000;
boolean sieve[] = new boolean[max];
long sum = 0; // will be 24739512092254535 at the end
你的原始代码,
for(int i=2;i<max;i++)
if(!sieve[i]) {
for(int j=i*2;j<max;j+=i)
sieve[j]=true;
sum+=i;
}
运行s 持续 24-28 秒。正如下面@Andreas 的 post 和后来的评论中所讨论的(是的,现在我看到它被接受并且大部分讨论都消失了),内部循环做了很多额外的检查(因为它做了一个比较一直,即使它不会真正开始)。所以外部循环可以分为两部分:首先筛选和求和(直到 max
的最后一个 "unknown" 除数,不超过其平方根),然后对其余部分求和:
int maxunique=(int)Math.sqrt(max);
for(int i=2;i<=maxunique;i++)
if(!sieve[i]) {
for(int j=i*2;j<max;j+=i)
sieve[j]=true;
sum+=i;
}
for(int i=maxunique+1;i<max;i++)
if(!sieve[i])
sum+=i;
这个 运行s 在我的机器上持续 14-16 秒。显着的收获,还没有涉及线程。
然后是线程,if(!sieve[i])
的问题:在计算总和时,在低于 i
的素数的内循环超过 [= 之前,不得进行此类检查22=],所以 sieve[i]
真的可以判断它是否是质数。因为例如,如果一个线程 运行ning 像 for(int i=4;i<10001;i+=2)sieve[i]=true;
,而另一个线程同时正在检查 sieve[10000]
,它仍然会是 false
,并且 10000
会被误认为质数。
第一次尝试可能是在一个线程上进行筛选(它的外循环 "only" 无论如何都会得到 max
的平方根),然后并行求和:
for(int i=2;i<=maxunique;i++)
if(!sieve[i])
for(int j=i*2;j<max;j+=i)
sieve[j]=true;
int numt=4;
Thread sumt[]=new Thread[numt];
long sums[]=new long[numt];
for(int i=0;i<numt;i++) {
long ii=i;
Thread t=sumt[i]=new Thread(new Runnable() {
public void run() {
int from=(int)Math.max(ii*max/numt,2);
int to=(int)Math.min((ii+1)*max/numt,max);
long sum=0;
for(int i=from;i<to;i++)
if(!sieve[i])
sum+=i;
sums[(int)ii]=sum;
}
});
t.start();
}
for(int i=0;i<sumt.length;i++) {
sumt[i].join();
sum+=sums[i];
}
这有点整洁,所有线程(我有 4 个核心)检查相同数量的候选人,结果更快。有时将近一秒,但大部分时间约为一半(~0.4 ... ~0.8 秒)。所以这个真的不值得付出努力,筛分循环是这里真正耗时的部分。
可以决定允许冗余工作,并为筛选中遇到的每个质数启动一个线程,即使它不是真正的质数,只是还没有被剔除:
List<Thread> threads=new ArrayList<>();
for(int i=2;i<=maxunique;i++)
if(!sieve[i]) {
int ii=i;
Thread t=new Thread(new Runnable() {
public void run() {
for(int j=ii*2;j<max;j+=ii)
sieve[j]=true;
}
});
t.start();
threads.add(t);
}
//System.out.println(threads.size());
for(int i=0;i<threads.size();i++)
threads.get(i).join();
for(int i=maxunique+1;i<max;i++)
if(!sieve[i])
sum+=i;
注释 println()
会告诉(在我的机器上)创建了 3500-3700 个线程(而如果有人在原始循环中放置一个计数器,结果表明 3401 是最小值,即在单线程筛环中遇到许多素数)。虽然超调不是灾难性的,但线程数量相当多,增益也不是太明显,尽管它比之前的尝试更明显:运行 时间是 10-11 秒(当然可以通过使用并行求和循环再降低半秒)。
当循环被证明是在非质数上过滤时,可以通过关闭循环来解决一些冗余工作:
for(int j=ii*2;j<max && !sieve[ii];j+=ii)
这个确实有点效果,我用了 8.6-10.1 秒 运行 时间。
由于创建 3401 个线程并不比创建 3700 个线程疯狂多少,因此限制它们的数量可能是个好主意,这也是更容易挥手告别 Thread
的地方秒。虽然从技术上讲可以对它们进行计数,但有各种内置基础设施可以帮我们做到这一点。
Executors
可以帮助将线程数限制为固定数量 (newFixedThreadPool()
),或者更好的是,限制为可用的 CPU 数量 (newWorkStealingPool()
):
ExecutorService es=Executors.newWorkStealingPool();
ExecutorCompletionService<Object> ecs=new ExecutorCompletionService<Object>(es);
int count=0;
for(int i=2;i<=maxunique;i++)
if(!sieve[i]) {
int ii=i;
count++;
ecs.submit(new Callable<Object>() {
public Object call() throws Exception {
// if(!sieve[ii])
for(int j=ii*2;j<max /**/ && !sieve[ii] /**/;j+=ii)
sieve[j]=true;
return null;
}
});
}
System.out.println(count);
while(count-->0)
ecs.take();
es.shutdown();
long sum=0;
for(int i=2;i<max;i++)
if(!sieve[i])
sum+=i;
这样它产生的结果与前一个 (8.6-10.5s) 相似。但是,对于低 CPU 计数(4 核),交换条件会导致一些加速(取消注释 if
并在循环中注释相同的条件,在 /**/
之间),因为任务是运行ning 的提交顺序,因此大部分冗余循环可以在一开始就退出,使得重复检查浪费时间。然后对我来说是 8.5-9.3 秒,击败了直接穿线尝试的最佳和最差时间。但是,如果您的 CPU 计数很高(我 运行 它也在超级计算节点上,根据 Runtime.availableProcessors()
有 32 个可用内核),任务将重叠更多,并且非欺骗版本(所以总是做检查的人)会更快。
如果你想要一个小的加速,具有相当好的可读性,你可以并行化内部循环(这也可以用 Thread
s,只是非常乏味),使用流:
long sum=0;
for(int i=2;i<=maxunique;i++)
if(!sieve[i]) {
sum+=i;
int ii=i;
IntStream.range(1, (max-1)/i).parallel().forEach(
j -> sieve[ii+j*ii]=true);
}
for(int i=maxunique+1;i<max;i++)
if(!sieve[i])
sum+=i;
这个很像原来的优化循环对,速度还是有的,对我来说是9.4-10.0秒。所以它比其他的慢(约 10% 左右),但它要简单得多。
更新:
我修复了一系列差错:xy<maxunique
现在是 xy<=maxunique
。虽然它 un/fortunately 没有影响巨大的结果,但它确实在 max=9
这样简单的情况下失败了(当 maxunique=3
和 xy<3
循环时,9 将保持素数,总和是 26 而不是 17)。嗯。也修复了几个连续循环(因此它们现在从 maxunique+1
继续)。
创建无限数量的子任务困扰着我,幸运的是找到了一个倒置的设计,我们不检查是否达到 sqrt(max)
(即 maxunique
),但是我们知道,如果我们完成了对低于某个 limit
的数字的筛选,我们可以继续检查直到 limit*limit
的数字,因为在 运行ge 中剩下的任何东西都是素数(limit
... limit*limit
) 确实是一个素数(我们仍然可以记住这个上限以 maxunique
为界)。因此这些可以并行筛选。
基本算法,仅用于检查(单线程):
int limit=2;
do {
int upper=Math.min(maxunique+1,limit*limit);
for(int i=limit;i<upper;i++)
if(!sieve[i]) {
sum+=i;
for(int j=i*2;j<max;j+=i)
sieve[j]=true;
}
limit=upper;
} while(limit<=maxunique);
for(int i=limit;i<max;i++)
if(!sieve[i])
sum+=i;
出于某种原因,它比原来的双循环变体稍慢(13.8-14.5 秒对 13.7-14.0 秒,min/max 共 20 运行 秒),但我对无论如何并行化。
可能是因为素数分布不均,使用并行流效果不佳(我认为它只是将工作预先划分为看似相等的部分),但基于Executor
的方法效果很好:
ExecutorService es=Executors.newWorkStealingPool();
ExecutorCompletionService<Object> ecs=new ExecutorCompletionService<>(es);
int limit=2;
int count=0;
do {
int upper=Math.min(maxunique+1,limit*limit);
for(int i=limit;i<upper;i++)
if(!sieve[i]) {
sum+=i;
int ii=i;
count++;
ecs.submit(new Callable<Object>() {
public Object call() throws Exception {
for(int j=ii*2;j<max;j+=ii)
sieve[j]=true;
return null;
}
});
}
while(count>0) {
count--;
ecs.take();
}
limit=upper;
} while(limit<=maxunique);
es.shutdown();
for(int i=limit;i<max;i++)
if(!sieve[i])
sum+=i;
对于低 CPU 计数环境,这是迄今为止最快的(7.4-9.0 秒对比 "infinite number of threads" 的 8.7-9.9 秒和 8.5-9.2 秒另一个基于 Executor
的)。然而,一开始它 运行 的并行任务数量很少(当 limit=2
时,它只启动两个并行循环,对于 2 和 3),最重要的是,那些是最长的 运行ning 循环(步数最小),并且由于在高 CPU 计数环境中它仅次于原始的基于 Executor
的循环,2.9-3.6 秒对2.7-3.2 秒)。
一开始当然可以实现一个单独的 ramp-up,明确收集必要数量的素数以使可用内核饱和,然后切换到这种基于 limit
的方法,然后结果可能会击败其他方法与核心数量无关。不过我觉得我暂时可以抵挡住诱惑。
我是 java 的新手,我正在尝试编写一个带有两个参数的程序:
- 我们必须对素数求和的数字
- 我们必须执行此操作的线程数
所以我使用了一个名为 Eratosthene 的方法,它存储一个 boolean 数组,如果一个数字是素数,我们将其标记为真,并且之后我们将这个数字的所有倍数标记为假。
我尝试将我的数组分成每个线程的子数组,并在每个子数组中进行操作,最后将所有子数组的结果相加。
但我不知道我哪里做错了:有时程序没有给出好的结果。
所以这是我的代码:
SumPrime.java
import java.util.*;
import java.util.concurrent.*;
public class SumPrimes {
private boolean array[];
private int numberOfWorkers;
private Semaphore allFinished;
public SumPrimes(int num, int threads){
array = new boolean[num];
numberOfWorkers = threads;
for (int i = 2; i < num; i++)
array[i] = true;
}
private class SumParallel extends Thread {
int min;
int max;
long sum;
SumParallel(int min, int max){
this.min = min;
this.max = max;
sum = 0;
}
public void run() {
for (int i = min; i < max; i++) {
if (array[i]) {
for (int j = min; j*i < array.length; j++) {
array[i*j] = false;
}
sum += i;
}
}
allFinished.release();
}
public long getSum() {
return sum;
}
}
public void SumInParallel() {
allFinished = new Semaphore(0);
List<SumParallel> workers = new ArrayList<SumParallel>();
int lengthOfOneWorker = array.length / numberOfWorkers;
for (int i = 0; i < numberOfWorkers; i++) {
int start = i * lengthOfOneWorker;
int end = (i+1) * lengthOfOneWorker;
if (i == numberOfWorkers - 1)
end = array.length;
SumParallel worker = new SumParallel(start, end);
workers.add(worker);
worker.start();
}
try {
allFinished.acquire(numberOfWorkers);
} catch (InterruptedException ignored) {}
int sum = 0;
for (SumParallel w : workers){
sum += w.getSum();
}
System.out.println("The sum of prime numbers is: " + sum);
}
public static void main(String[] args) {
int limitNum = Integer.parseInt(args[0]);
int threadNum = Integer.parseInt(args[1]);
SumPrimes sum_primes = new SumPrimes(limitNum, threadNum);
sum_primes.SumInParallel();
}
}
你可以运行这样的程序:
java SumPrimes 1000 3
我愿意接受任何改进我的代码的建议。
您需要完全重新考虑线程的逻辑。
各个线程无法访问array
的相同范围,例如如果线程有 min = 100
和 max = 150
,则只能使用 100 到 149(含)范围内的元素 and/or 更改。
您的代码:
for (int i = min; i < max; i++) {
if (array[i]) {
for (int j = min; j*i < array.length; j++) {
array[i*j] = false;
开始 i = 100, j = 100
,这使得 i*j = 10000
。如果数组真的那么大,这意味着您访问 array[10000]
,但那是 不允许的 。当然,数组并没有那么大,所以 代码什么都不做。
啊,你说,第一个线程有 min = 0
和 max = 50
,所以它会从索引 0 (0*0) 到 2401 (49*49) 更改值,并且由于数组比那个小,它会更新整个数组,但那是不允许的.
现在,再考虑一下。
如果范围是min = 100, max = 150
,那么你需要首先清除该范围内的所有偶数,然后是所有能被3整除的数字,然后是所有...等等,但仅限于那个范围.
我会让你重新思考逻辑。
更新
要将 Sieve of Eratosthenes 应用于某个范围,我们需要不超过该范围最大值的平方根的素数。
如果范围是min = 150, max = 200
,那么maxPrime = sqrt(200) = 14
,所以我们需要从2到14(含)的素数,那么我们可以更新范围150-199。
假设我们首先更新 array
以找到 2-14 范围内的所有素数,我们可以使用它来迭代目标范围内这些素数的 倍数 (150-199)。为此,我们需要从 >= min 的素数的最低倍数开始,因此我们需要将 min
向上舍入到 prime
.
使用整数数学,round up to next multiple,我们计算:
lower = (min + prime - 1) / prime * prime
这给了我们主要逻辑:
maxPrime = (int) Math.sqrt(max);
for (int prime = 2; prime <= maxPrime; prime++) {
if (array[prime]) {
int lower = (min + prime - 1) / prime * prime;
for (int i = lower; i < max; i += prime)
array[i] = false
我们还应该让每个线程负责首先设置范围内的所有布尔值,这样这部分也变成多线程的。
主逻辑现在必须首先在主线程中找到 2-sqrt(N) 范围内的素数,然后在线程之间拆分剩余范围。
这是我的尝试:
public static long sumPrimes(int n, int threadCount) {
// Find and sum the "seed" primes needed by the threads
int maxSeedPrime = (int) Math.sqrt(n + 2); // extra to be sure no "float errors" occur
boolean[] seedPrime = new boolean[maxSeedPrime + 1];
AtomicLong totalSum = new AtomicLong(sumPrimes(seedPrime, seedPrime, 0, maxSeedPrime));
// Split remaining into ranges and start threads to calculate sums
Thread[] threads = new Thread[threadCount];
for (int t = 0, rangeMin = maxSeedPrime + 1; t < threadCount; t++) {
int min = rangeMin;
int max = min + (n - min + 1) / (threadCount - t) - 1;
threads[t] = new Thread(() ->
totalSum.addAndGet(sumPrimes(seedPrime, new boolean[max - min + 1], min, max))
);
threads[t].start();
rangeMin = max + 1;
}
// Wait for threads to end
for (int t = 0; t < threadCount; t++) {
try {
threads[t].join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
// Return the calculated sum
return totalSum.get();
}
private static long sumPrimes(boolean[] seedPrime, boolean[] rangePrime, int min, int max/*inclusive*/) {
// Initialize range
for (int i = Math.max(min, 2); i <= max; i++) {
rangePrime[i - min] = true;
}
// Mark non-primes in range
int maxPrime = (int) Math.sqrt(max + 1); // extra to be sure no "float errors" occur
for (int prime = 2; prime <= maxPrime; prime++) {
if (seedPrime[prime]) {
int minMultiple = (min + prime - 1) / prime * prime;
if (minMultiple <= prime)
minMultiple = prime * 2;
for (int multiple = minMultiple; multiple <= max ; multiple += prime) {
rangePrime[multiple - min] = false;
}
}
}
// Sum the primes
long sum = 0;
for (int prime = min; prime <= max; prime++) {
if (rangePrime[prime - min]) {
sum += prime;
}
}
return sum;
}
测试
public static void main(String[] args) {
test(1000, 3);
test(100000000, 4);
}
public static void test(int n, int threadCount) {
long start = System.nanoTime();
long sum = sumPrimes(n, threadCount);
long end = System.nanoTime();
System.out.printf("sumPrimes(%,d, %d) = %,d (%.9f seconds)%n",
n, threadCount, sum, (end - start) / 1e9);
}
输出
sumPrimes(1,000, 3) = 76,127 (0.005595600 seconds)
sumPrimes(100,000,000, 4) = 279,209,790,387,276 (0.686881000 seconds)
更新 2
上面的代码使用了 lambda 表达式:
threads[t] = new Thread(() ->
totalSum.addAndGet(sumPrimes(seedPrime, new boolean[max - min + 1], min, max))
);
如果您不想使用 lambda 表达式,例如所以它将在 Java 上 运行 7,您可以使用匿名 class 代替:
threads[t] = new Thread() {
@Override
public void run() {
totalSum.addAndGet(sumPrimes(seedPrime, new boolean[max - min + 1], min, max));
}
};
我认为你的问题是这段代码:
public void run() {
for (int i = min; i < max; i++) {
if (array[i]) {
for (int j = min; j*i < array.length; j++) {
array[i*j] = false;
}
sum += i;
}
}
allFinished.release();
}
想象一下您后面的线程之一,在列表末尾附近工作。第一项不是素数,但确定它不是素数的工作尚未完成——它来自不同的线程,而该线程才刚刚开始。所以你认为这个值是质数(它还没有标记为非质数)并相应地工作。
如果你提供了一个产生不好结果的例子,我们可以很容易地测试这个理论。
多线程通常也意味着你想做的更快。因此,首先可能值得回顾一下您的初始设计并使其在单线程上更快。那就是要击败的目标。此外,为了比较 运行 时间而不编写精炼的基准测试,您需要 运行 长度为 "visible" 的时间。
在我的机器上,"setting"
int max = 1_000_000_000;
boolean sieve[] = new boolean[max];
long sum = 0; // will be 24739512092254535 at the end
你的原始代码,
for(int i=2;i<max;i++)
if(!sieve[i]) {
for(int j=i*2;j<max;j+=i)
sieve[j]=true;
sum+=i;
}
运行s 持续 24-28 秒。正如下面@Andreas 的 post 和后来的评论中所讨论的(是的,现在我看到它被接受并且大部分讨论都消失了),内部循环做了很多额外的检查(因为它做了一个比较一直,即使它不会真正开始)。所以外部循环可以分为两部分:首先筛选和求和(直到 max
的最后一个 "unknown" 除数,不超过其平方根),然后对其余部分求和:
int maxunique=(int)Math.sqrt(max);
for(int i=2;i<=maxunique;i++)
if(!sieve[i]) {
for(int j=i*2;j<max;j+=i)
sieve[j]=true;
sum+=i;
}
for(int i=maxunique+1;i<max;i++)
if(!sieve[i])
sum+=i;
这个 运行s 在我的机器上持续 14-16 秒。显着的收获,还没有涉及线程。
然后是线程,if(!sieve[i])
的问题:在计算总和时,在低于 i
的素数的内循环超过 [= 之前,不得进行此类检查22=],所以 sieve[i]
真的可以判断它是否是质数。因为例如,如果一个线程 运行ning 像 for(int i=4;i<10001;i+=2)sieve[i]=true;
,而另一个线程同时正在检查 sieve[10000]
,它仍然会是 false
,并且 10000
会被误认为质数。
第一次尝试可能是在一个线程上进行筛选(它的外循环 "only" 无论如何都会得到 max
的平方根),然后并行求和:
for(int i=2;i<=maxunique;i++)
if(!sieve[i])
for(int j=i*2;j<max;j+=i)
sieve[j]=true;
int numt=4;
Thread sumt[]=new Thread[numt];
long sums[]=new long[numt];
for(int i=0;i<numt;i++) {
long ii=i;
Thread t=sumt[i]=new Thread(new Runnable() {
public void run() {
int from=(int)Math.max(ii*max/numt,2);
int to=(int)Math.min((ii+1)*max/numt,max);
long sum=0;
for(int i=from;i<to;i++)
if(!sieve[i])
sum+=i;
sums[(int)ii]=sum;
}
});
t.start();
}
for(int i=0;i<sumt.length;i++) {
sumt[i].join();
sum+=sums[i];
}
这有点整洁,所有线程(我有 4 个核心)检查相同数量的候选人,结果更快。有时将近一秒,但大部分时间约为一半(~0.4 ... ~0.8 秒)。所以这个真的不值得付出努力,筛分循环是这里真正耗时的部分。
可以决定允许冗余工作,并为筛选中遇到的每个质数启动一个线程,即使它不是真正的质数,只是还没有被剔除:
List<Thread> threads=new ArrayList<>();
for(int i=2;i<=maxunique;i++)
if(!sieve[i]) {
int ii=i;
Thread t=new Thread(new Runnable() {
public void run() {
for(int j=ii*2;j<max;j+=ii)
sieve[j]=true;
}
});
t.start();
threads.add(t);
}
//System.out.println(threads.size());
for(int i=0;i<threads.size();i++)
threads.get(i).join();
for(int i=maxunique+1;i<max;i++)
if(!sieve[i])
sum+=i;
注释 println()
会告诉(在我的机器上)创建了 3500-3700 个线程(而如果有人在原始循环中放置一个计数器,结果表明 3401 是最小值,即在单线程筛环中遇到许多素数)。虽然超调不是灾难性的,但线程数量相当多,增益也不是太明显,尽管它比之前的尝试更明显:运行 时间是 10-11 秒(当然可以通过使用并行求和循环再降低半秒)。
当循环被证明是在非质数上过滤时,可以通过关闭循环来解决一些冗余工作:
for(int j=ii*2;j<max && !sieve[ii];j+=ii)
这个确实有点效果,我用了 8.6-10.1 秒 运行 时间。
由于创建 3401 个线程并不比创建 3700 个线程疯狂多少,因此限制它们的数量可能是个好主意,这也是更容易挥手告别 Thread
的地方秒。虽然从技术上讲可以对它们进行计数,但有各种内置基础设施可以帮我们做到这一点。
Executors
可以帮助将线程数限制为固定数量 (newFixedThreadPool()
),或者更好的是,限制为可用的 CPU 数量 (newWorkStealingPool()
):
ExecutorService es=Executors.newWorkStealingPool();
ExecutorCompletionService<Object> ecs=new ExecutorCompletionService<Object>(es);
int count=0;
for(int i=2;i<=maxunique;i++)
if(!sieve[i]) {
int ii=i;
count++;
ecs.submit(new Callable<Object>() {
public Object call() throws Exception {
// if(!sieve[ii])
for(int j=ii*2;j<max /**/ && !sieve[ii] /**/;j+=ii)
sieve[j]=true;
return null;
}
});
}
System.out.println(count);
while(count-->0)
ecs.take();
es.shutdown();
long sum=0;
for(int i=2;i<max;i++)
if(!sieve[i])
sum+=i;
这样它产生的结果与前一个 (8.6-10.5s) 相似。但是,对于低 CPU 计数(4 核),交换条件会导致一些加速(取消注释 if
并在循环中注释相同的条件,在 /**/
之间),因为任务是运行ning 的提交顺序,因此大部分冗余循环可以在一开始就退出,使得重复检查浪费时间。然后对我来说是 8.5-9.3 秒,击败了直接穿线尝试的最佳和最差时间。但是,如果您的 CPU 计数很高(我 运行 它也在超级计算节点上,根据 Runtime.availableProcessors()
有 32 个可用内核),任务将重叠更多,并且非欺骗版本(所以总是做检查的人)会更快。
如果你想要一个小的加速,具有相当好的可读性,你可以并行化内部循环(这也可以用 Thread
s,只是非常乏味),使用流:
long sum=0;
for(int i=2;i<=maxunique;i++)
if(!sieve[i]) {
sum+=i;
int ii=i;
IntStream.range(1, (max-1)/i).parallel().forEach(
j -> sieve[ii+j*ii]=true);
}
for(int i=maxunique+1;i<max;i++)
if(!sieve[i])
sum+=i;
这个很像原来的优化循环对,速度还是有的,对我来说是9.4-10.0秒。所以它比其他的慢(约 10% 左右),但它要简单得多。
更新:
我修复了一系列差错:
xy<maxunique
现在是xy<=maxunique
。虽然它 un/fortunately 没有影响巨大的结果,但它确实在max=9
这样简单的情况下失败了(当maxunique=3
和xy<3
循环时,9 将保持素数,总和是 26 而不是 17)。嗯。也修复了几个连续循环(因此它们现在从maxunique+1
继续)。创建无限数量的子任务困扰着我,幸运的是找到了一个倒置的设计,我们不检查是否达到
sqrt(max)
(即maxunique
),但是我们知道,如果我们完成了对低于某个limit
的数字的筛选,我们可以继续检查直到limit*limit
的数字,因为在 运行ge 中剩下的任何东西都是素数(limit
...limit*limit
) 确实是一个素数(我们仍然可以记住这个上限以maxunique
为界)。因此这些可以并行筛选。
基本算法,仅用于检查(单线程):
int limit=2;
do {
int upper=Math.min(maxunique+1,limit*limit);
for(int i=limit;i<upper;i++)
if(!sieve[i]) {
sum+=i;
for(int j=i*2;j<max;j+=i)
sieve[j]=true;
}
limit=upper;
} while(limit<=maxunique);
for(int i=limit;i<max;i++)
if(!sieve[i])
sum+=i;
出于某种原因,它比原来的双循环变体稍慢(13.8-14.5 秒对 13.7-14.0 秒,min/max 共 20 运行 秒),但我对无论如何并行化。
可能是因为素数分布不均,使用并行流效果不佳(我认为它只是将工作预先划分为看似相等的部分),但基于Executor
的方法效果很好:
ExecutorService es=Executors.newWorkStealingPool();
ExecutorCompletionService<Object> ecs=new ExecutorCompletionService<>(es);
int limit=2;
int count=0;
do {
int upper=Math.min(maxunique+1,limit*limit);
for(int i=limit;i<upper;i++)
if(!sieve[i]) {
sum+=i;
int ii=i;
count++;
ecs.submit(new Callable<Object>() {
public Object call() throws Exception {
for(int j=ii*2;j<max;j+=ii)
sieve[j]=true;
return null;
}
});
}
while(count>0) {
count--;
ecs.take();
}
limit=upper;
} while(limit<=maxunique);
es.shutdown();
for(int i=limit;i<max;i++)
if(!sieve[i])
sum+=i;
对于低 CPU 计数环境,这是迄今为止最快的(7.4-9.0 秒对比 "infinite number of threads" 的 8.7-9.9 秒和 8.5-9.2 秒另一个基于 Executor
的)。然而,一开始它 运行 的并行任务数量很少(当 limit=2
时,它只启动两个并行循环,对于 2 和 3),最重要的是,那些是最长的 运行ning 循环(步数最小),并且由于在高 CPU 计数环境中它仅次于原始的基于 Executor
的循环,2.9-3.6 秒对2.7-3.2 秒)。
一开始当然可以实现一个单独的 ramp-up,明确收集必要数量的素数以使可用内核饱和,然后切换到这种基于 limit
的方法,然后结果可能会击败其他方法与核心数量无关。不过我觉得我暂时可以抵挡住诱惑。