如何使用 ExecutorService 划分线程?
How can I divide into threads using ExecuterService?
我有一个包含约 5 000 个对象的数组。我计划为每个对象执行一些逻辑。每个对象的逻辑都是相同的。我想尽快完成,所以我来到了 java.util.cuncurrent 中的 ExecutorService。我决定在一个线程中处理 100 个对象,所以我将总数 (~5k) 划分为间隔
private List<Integer> divideIntoIntervals(Integer total)
{
List<Integer> intervals = new ArrayList<Integer>();
intervals.add(0);
int n = total / PART_SIZE;
int leftover = total % PART_SIZE;
if(n!=0)
{
intervals.add(PART_SIZE);
for (int i=2; i<=n; i++)
{
intervals.add(PART_SIZE*i);
}
intervals.add(PART_SIZE*n+leftover);
}
return intervals;
}
因此,数组将是:0、100、200、300、...、5000、5041。
你会建议什么当前列表?
我打算创建逻辑,它会寻找我的 cuncurrent 数组用于另一个时间间隔。
您可能不需要并发列表。只要您在线程工作时不修改列表,您就可以创建一个单独的 Runnable
在其自己的范围内工作,并将这些可运行对象提交给具有适当数量的 ThreadPoolExecutorService
线程(在您的情况下约为 50 个)。任务将自动在线程之间平均分配:
ExecutorService executor = Executors.newFixedThreadPool(list.size() / 100 + 1);
// (+1 in case there are less than 100 items)
for (int i = 0; i < list.size(); i += 100) {
final int start = i;
executor.execute(() -> {
int end = start + 100;
if (end > list.size()) {
end = list.size();
}
for (int j = start; j < end; ++j) {
list.get(j).doYourLogicHere();
}
});
}
如果您不能完全确定您不会在这些任务之外修改列表,则应根据您要处理这些修改的方式更改代码。例如,如果在处理过程中可以将新项目附加到列表中,并且您不关心这些新项目是否在这个阶段被处理,那么您可能希望使用 CopyOnWriteArrayList
并更改上面的内部循环使用迭代器而不是基于 int 的索引。这将导致代码使用创建迭代器时获取的列表快照(如果在迭代时没有修改,则不会有任何实际复制)。根据附加新项目的时间,此快照可能包含也可能不包含它们,但至少它是一致的并且没有任何中断。
您也可以使用newCachedThreadPool
方法。这会创建一个线程池,该线程池会根据需要创建新线程,但会在可用时重用以前构建的线程。 Source
我如何使用它的例子:
// Create an executor service with a thread pool that creates new threads as needed,
// but will reuse previously constructed threads when they are available
ExecutorService executorService = Executors.newCachedThreadPool();
Integer finalMinValue = minValue;
Integer finalMaxValue = maxValue;
executorService.execute(() -> {
// Initialise buckets
int bucketCount = (finalMaxValue - finalMinValue) / bucketSize + 1;
List<List<Integer>> buckets = new ArrayList<>(bucketCount);
for (int i = 0; i < bucketCount; i++) {
buckets.add(new ArrayList<>());
}
// Distribute input array values into buckets
for (Integer anArrayElement : arrayToSort) {
buckets.get((anArrayElement - finalMinValue) / bucketSize).add(anArrayElement);
}
// Sort buckets and place back into input array
// Loop through the contents of each bucket
for (List<Integer> bucket : buckets) {
Integer[] bucketArray = new Integer[bucket.size()];
bucketArray = bucket.toArray(bucketArray);
InsertionSort.sort(bucketArray);
for (Integer aBucketArray : bucketArray) {
arrayToSort[currentIndex] = aBucketArray;
incrementSync();
}
}
});
有关此实现的更多信息,请访问 Github
我有一个包含约 5 000 个对象的数组。我计划为每个对象执行一些逻辑。每个对象的逻辑都是相同的。我想尽快完成,所以我来到了 java.util.cuncurrent 中的 ExecutorService。我决定在一个线程中处理 100 个对象,所以我将总数 (~5k) 划分为间隔
private List<Integer> divideIntoIntervals(Integer total)
{
List<Integer> intervals = new ArrayList<Integer>();
intervals.add(0);
int n = total / PART_SIZE;
int leftover = total % PART_SIZE;
if(n!=0)
{
intervals.add(PART_SIZE);
for (int i=2; i<=n; i++)
{
intervals.add(PART_SIZE*i);
}
intervals.add(PART_SIZE*n+leftover);
}
return intervals;
}
因此,数组将是:0、100、200、300、...、5000、5041。 你会建议什么当前列表? 我打算创建逻辑,它会寻找我的 cuncurrent 数组用于另一个时间间隔。
您可能不需要并发列表。只要您在线程工作时不修改列表,您就可以创建一个单独的 Runnable
在其自己的范围内工作,并将这些可运行对象提交给具有适当数量的 ThreadPoolExecutorService
线程(在您的情况下约为 50 个)。任务将自动在线程之间平均分配:
ExecutorService executor = Executors.newFixedThreadPool(list.size() / 100 + 1);
// (+1 in case there are less than 100 items)
for (int i = 0; i < list.size(); i += 100) {
final int start = i;
executor.execute(() -> {
int end = start + 100;
if (end > list.size()) {
end = list.size();
}
for (int j = start; j < end; ++j) {
list.get(j).doYourLogicHere();
}
});
}
如果您不能完全确定您不会在这些任务之外修改列表,则应根据您要处理这些修改的方式更改代码。例如,如果在处理过程中可以将新项目附加到列表中,并且您不关心这些新项目是否在这个阶段被处理,那么您可能希望使用 CopyOnWriteArrayList
并更改上面的内部循环使用迭代器而不是基于 int 的索引。这将导致代码使用创建迭代器时获取的列表快照(如果在迭代时没有修改,则不会有任何实际复制)。根据附加新项目的时间,此快照可能包含也可能不包含它们,但至少它是一致的并且没有任何中断。
您也可以使用newCachedThreadPool
方法。这会创建一个线程池,该线程池会根据需要创建新线程,但会在可用时重用以前构建的线程。 Source
我如何使用它的例子:
// Create an executor service with a thread pool that creates new threads as needed,
// but will reuse previously constructed threads when they are available
ExecutorService executorService = Executors.newCachedThreadPool();
Integer finalMinValue = minValue;
Integer finalMaxValue = maxValue;
executorService.execute(() -> {
// Initialise buckets
int bucketCount = (finalMaxValue - finalMinValue) / bucketSize + 1;
List<List<Integer>> buckets = new ArrayList<>(bucketCount);
for (int i = 0; i < bucketCount; i++) {
buckets.add(new ArrayList<>());
}
// Distribute input array values into buckets
for (Integer anArrayElement : arrayToSort) {
buckets.get((anArrayElement - finalMinValue) / bucketSize).add(anArrayElement);
}
// Sort buckets and place back into input array
// Loop through the contents of each bucket
for (List<Integer> bucket : buckets) {
Integer[] bucketArray = new Integer[bucket.size()];
bucketArray = bucket.toArray(bucketArray);
InsertionSort.sort(bucketArray);
for (Integer aBucketArray : bucketArray) {
arrayToSort[currentIndex] = aBucketArray;
incrementSync();
}
}
});
有关此实现的更多信息,请访问 Github