ForkJoinPool 具有不同风格的相同代码的不同延迟
ForkJoinPool different latency with different style of same code
我试图将 paralleStream 与自定义 ForkJoin 池一起使用,该任务执行网络调用。当我使用以下样式时
pool.submit(() -> {
ioDelays.parallelStream().forEach(n -> {
induceRandomSleep(n);
});
}).get();
如果我循环并逐个提交任务,所花费的时间将近 11 倍,如下所示:
for (final Integer num : ioDelays) {
ForkJoinTask<Integer> task = pool.submit(() -> {
return induceRandomSleep(num);
});
tasks.add(task);
}
int count = 0;
final List<Integer> returnVals = new ArrayList<>();
tasks.forEach(task -> {
try {
returnVals.add(task.get());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
});
如果使用parallelStream,是否涉及ForkJoinPool.common?这是模拟上述两种风格的整个程序
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
public class FJTPExperiment {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ForkJoinPool pool = new ForkJoinPool(200);
List<Integer> ioDelays = new ArrayList<>();
for (int i = 0; i <2000; i++) {
ioDelays.add( (int)(300 *Math.random() + 200));
}
int originalCount = 0;
for (Integer val : ioDelays) {
originalCount += val;
}
System.out.println("Expected " + originalCount);
System.out.println(Thread.currentThread().getName() + " ::::Number of threads in common pool :" + ForkJoinPool.getCommonPoolParallelism());
long beginTimestamp = System.currentTimeMillis();
pool.submit(() -> {
ioDelays.parallelStream().forEach(n -> {
induceRandomSleep(n);
});
}).get();
long endTimestamp = System.currentTimeMillis();
System.out.println("Took " + (endTimestamp - beginTimestamp) + " ms");
List<ForkJoinTask<Integer>> tasks = new ArrayList<>();
beginTimestamp = System.currentTimeMillis();
for (final Integer num : ioDelays) {
ForkJoinTask<Integer> task = pool.submit(() -> {
return induceRandomSleep(num);
});
tasks.add(task);
}
int count = 0;
final List<Integer> returnVals = new ArrayList<>();
tasks.forEach(task -> {
try {
returnVals.add(task.get());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
});
endTimestamp = System.currentTimeMillis();
for (Integer val : returnVals) {
count += val;
}
System.out.println("Count " + count);
System.out.println("Took " + (endTimestamp - beginTimestamp) + " ms");
}
public static int induceRandomSleep(int sleepInterval) {
System.out.println(Thread.currentThread().getName() + " ::::sleeping for " + sleepInterval + " ms");
try {
Thread.sleep(sleepInterval);
return sleepInterval;
} catch (InterruptedException e) {
e.printStackTrace();
return sleepInterval;
}
}
}
我最终找到了问题的答案有两个部分:
1) 只有一个任务被提交到 ForkJoinPool 如何产生多个线程?
查看 JDK implementation 似乎在调用 parallelStream 时它会检查当前线程是否为 ForkJoinWorkerThread 如果是则任务被推送到客户 ForkJoinPool 的队列如果不是它被推送到 ForkJoinPool.common.这也通过日志进行了验证。
2) 如果能正常工作,为什么会很慢?
它很慢,因为并行度不是来自自定义 ForkJoinPool 的并行度,而是来自 ForkJoinPool.common 的并行度,默认情况下限制为 Number of CPU cores -1
。 JDK 实现是 here the LEAF_TARGET
is derived here。如果这必须正常工作,那么应该有一个分支从自定义线程池
的并行性派生LEAF_TARGET
我试图将 paralleStream 与自定义 ForkJoin 池一起使用,该任务执行网络调用。当我使用以下样式时
pool.submit(() -> {
ioDelays.parallelStream().forEach(n -> {
induceRandomSleep(n);
});
}).get();
如果我循环并逐个提交任务,所花费的时间将近 11 倍,如下所示:
for (final Integer num : ioDelays) {
ForkJoinTask<Integer> task = pool.submit(() -> {
return induceRandomSleep(num);
});
tasks.add(task);
}
int count = 0;
final List<Integer> returnVals = new ArrayList<>();
tasks.forEach(task -> {
try {
returnVals.add(task.get());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
});
如果使用parallelStream,是否涉及ForkJoinPool.common?这是模拟上述两种风格的整个程序
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
public class FJTPExperiment {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ForkJoinPool pool = new ForkJoinPool(200);
List<Integer> ioDelays = new ArrayList<>();
for (int i = 0; i <2000; i++) {
ioDelays.add( (int)(300 *Math.random() + 200));
}
int originalCount = 0;
for (Integer val : ioDelays) {
originalCount += val;
}
System.out.println("Expected " + originalCount);
System.out.println(Thread.currentThread().getName() + " ::::Number of threads in common pool :" + ForkJoinPool.getCommonPoolParallelism());
long beginTimestamp = System.currentTimeMillis();
pool.submit(() -> {
ioDelays.parallelStream().forEach(n -> {
induceRandomSleep(n);
});
}).get();
long endTimestamp = System.currentTimeMillis();
System.out.println("Took " + (endTimestamp - beginTimestamp) + " ms");
List<ForkJoinTask<Integer>> tasks = new ArrayList<>();
beginTimestamp = System.currentTimeMillis();
for (final Integer num : ioDelays) {
ForkJoinTask<Integer> task = pool.submit(() -> {
return induceRandomSleep(num);
});
tasks.add(task);
}
int count = 0;
final List<Integer> returnVals = new ArrayList<>();
tasks.forEach(task -> {
try {
returnVals.add(task.get());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
});
endTimestamp = System.currentTimeMillis();
for (Integer val : returnVals) {
count += val;
}
System.out.println("Count " + count);
System.out.println("Took " + (endTimestamp - beginTimestamp) + " ms");
}
public static int induceRandomSleep(int sleepInterval) {
System.out.println(Thread.currentThread().getName() + " ::::sleeping for " + sleepInterval + " ms");
try {
Thread.sleep(sleepInterval);
return sleepInterval;
} catch (InterruptedException e) {
e.printStackTrace();
return sleepInterval;
}
}
}
我最终找到了问题的答案有两个部分:
1) 只有一个任务被提交到 ForkJoinPool 如何产生多个线程?
查看 JDK implementation 似乎在调用 parallelStream 时它会检查当前线程是否为 ForkJoinWorkerThread 如果是则任务被推送到客户 ForkJoinPool 的队列如果不是它被推送到 ForkJoinPool.common.这也通过日志进行了验证。
2) 如果能正常工作,为什么会很慢?
它很慢,因为并行度不是来自自定义 ForkJoinPool 的并行度,而是来自 ForkJoinPool.common 的并行度,默认情况下限制为 Number of CPU cores -1
。 JDK 实现是 here the LEAF_TARGET
is derived here。如果这必须正常工作,那么应该有一个分支从自定义线程池
LEAF_TARGET