使用 10 个线程处理一个数组
Using 10 Threads to Process an Array
我正在努力提高我的 java 技能,但不确定如何处理这个多线程应用程序。基本上,该程序读取一个文本文件并找到最大的数字。我在我的搜索算法中添加了一个 for 循环来创建 10 个线程,但我不确定它是否真的创建了 10 个线程。这个想法是为了缩短执行时间,或者至少这是我认为应该发生的事情。有没有办法检查我是否做对了,执行时间是否确实缩短了?
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
public class ProcessDataFile {
public static void main(String[] args) throws IOException {
int max = Integer.MIN_VALUE;
int i = 0;
int[] numbers = new int[100000];
String datafile = "dataset529.txt"; //string which contains datafile
String line; //current line of text file
try (BufferedReader br = new BufferedReader(new FileReader(datafile))) { //reads in the datafile
while ((line = br.readLine()) != null) { //reads through each line
numbers[i++] = Integer.parseInt(line); //pulls out the number of each line and puts it in numbers[]
}
}
for (i = 0; i < 10000; i++){ //loop to go through each number in the file and compare it to find the largest int.
for(int j = 0; j < 10; j++) { //creates 10 threads
new Thread();
}
if (max < numbers[i]) //As max gets bigger it checks the array and keeps increasing it as it finds a larger int.
max = numbers[i]; //Sets max equal to the final highest value found.
}
System.out.println("The largest number in DataSet529 is: " + max);
}
}
这是一个非常基本的示例,它演示了创建和 运行 线程的基本概念,这些线程处理来自特定数组的给定值范围。该示例做了一些假设(例如只有偶数个元素)。这个例子也有点冗长,是故意这样做的,目的是为了演示需要的基本步骤
首先查看 the Concurrency Trail 了解更多详情
import java.util.Random;
public class ThreadExample {
public static void main(String[] args) {
int[] numbers = new int[100000];
Random rnd = new Random();
for (int index = 0; index < numbers.length; index++) {
numbers[index] = rnd.nextInt();
}
Thread[] threads = new Thread[10];
Worker[] workers = new Worker[10];
int range = numbers.length / 10;
for (int index = 0; index < 10; index++) {
int startAt = index * range;
int endAt = startAt + range;
workers[index] = new Worker(startAt, endAt, numbers);
}
for (int index = 0; index < 10; index++) {
threads[index] = new Thread(workers[index]);
threads[index].start();
}
boolean isProcessing = false;
do {
isProcessing = false;
for (Thread t : threads) {
if (t.isAlive()) {
isProcessing = true;
break;
}
}
} while (isProcessing);
for (Worker worker : workers) {
System.out.println("Max = " + worker.getMax());
}
}
public static class Worker implements Runnable {
private int startAt;
private int endAt;
private int numbers[];
private int max = Integer.MIN_VALUE;
public Worker(int startAt, int endAt, int[] numbers) {
this.startAt = startAt;
this.endAt = endAt;
this.numbers = numbers;
}
@Override
public void run() {
for (int index = startAt; index < endAt; index++) {
max = Math.max(numbers[index], max);
}
}
public int getMax() {
return max;
}
}
}
一个稍微简单的解决方案将涉及 ExecutorService
API,它允许您向服务提供一系列 Callable
,然后 return List
个,共 Future
个。这里的好处是,在所有 Callable
完成(或失败)之前,服务不会 return,因此您不需要经常检查线程的状态
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class ThreadExample {
public static void main(String[] args) {
int[] numbers = new int[100000];
Random rnd = new Random();
for (int index = 0; index < numbers.length; index++) {
numbers[index] = rnd.nextInt();
}
ExecutorService executor = Executors.newFixedThreadPool(10);
Worker[] workers = new Worker[10];
int range = numbers.length / 10;
for (int index = 0; index < 10; index++) {
int startAt = index * range;
int endAt = startAt + range;
workers[index] = new Worker(startAt, endAt, numbers);
}
try {
List<Future<Integer>> results = executor.invokeAll(Arrays.asList(workers));
for (Future<Integer> future : results) {
System.out.println(future.get());
}
} catch (InterruptedException | ExecutionException ex) {
ex.printStackTrace();
}
}
public static class Worker implements Callable<Integer> {
private int startAt;
private int endAt;
private int numbers[];
public Worker(int startAt, int endAt, int[] numbers) {
this.startAt = startAt;
this.endAt = endAt;
this.numbers = numbers;
}
@Override
public Integer call() throws Exception {
int max = Integer.MIN_VALUE;
for (int index = startAt; index < endAt; index++) {
max = Math.max(numbers[index], max);
}
return max;
}
}
}
我知道这个回答有点晚了,但您也可以在使用 ExecutorService 时使用 lambda 表达式 而不是创建新的 class 实现 Runnable。
下面是一个完整的示例,您可以使用 THREAD_SIZE 和 RANDOM_ARRAY_SIZE 变量。
import org.apache.log4j.Logger;
import java.security.SecureRandom;
import java.util.*;
import java.util.concurrent.*;
public class ConcurrentMaximumTest {
static final int THREAD_SIZE = 10;
static final int RANDOM_ARRAY_SIZE = 8999;
static final SecureRandom RAND = new SecureRandom();
private static Logger logger = Logger.getLogger(ConcurrentMaximumTest.class);
public static void main(String[] args) throws InterruptedException, ExecutionException {
int[] array = generateRandomIntArray(RANDOM_ARRAY_SIZE);
Map<Integer, Integer> positionMap = calculatePositions(array.length, THREAD_SIZE);
ExecutorService threads = Executors.newFixedThreadPool(THREAD_SIZE);
List<Callable<Integer>> toRun = new ArrayList<>(THREAD_SIZE);
for (Map.Entry<Integer, Integer> entry : positionMap.entrySet())
toRun.add(() -> findMax(array, entry.getKey(), entry.getValue()));
int result = Integer.MIN_VALUE;
List<Future<Integer>> futures = threads.invokeAll(toRun);
for (Future<Integer> future : futures) {
Integer localMax = future.get();
if(localMax > result)
result = localMax;
}
threads.shutdownNow();
logger.info("Max value calculated with " + THREAD_SIZE + " threads:" + result);
Arrays.sort(array);
int resultCrosscheck = array[array.length - 1];
logger.info("Max value calculated with sorting: " + resultCrosscheck);
assert result != resultCrosscheck : "Crosscheck failed";
}
/* Calculates start and end positions of each chunk(for simplicity). It can also be calculated on the fly.*/
private static Map<Integer, Integer> calculatePositions(int size, int numThreads){
int lengthOfChunk = size / numThreads;
int remainder = size % numThreads;
int start = 0;
Map<Integer,Integer> result = new LinkedHashMap<>();
for(int i = 0; i < numThreads -1; i++){
result.put(start, lengthOfChunk);
start += lengthOfChunk;
}
result.put(start, lengthOfChunk+remainder);
return result;
}
/*Find maximum value of given part of an array, from start position and chunk size.*/
private static int findMax(int[] wholeArray, int position, int size){
int end = (position + size);
int max = Integer.MIN_VALUE;
logger.info("Starting read for interval [" + position + "," + end + ")");
for(int i = position; i < (position + size); i++)
if(wholeArray[i] > max)
max = wholeArray[i];
logger.info("Finishing finding maximum for interval [" + position + "," + end + ")" + ". Calculated local maximum is " + max);
return max;
}
/* Helper function for generating random int array */
private static int[] generateRandomIntArray(int size){
int[] result = new int[size];
for (int i = 0; i < size; i++)
result[i] = RAND.nextInt(Integer.MAX_VALUE);
return result;
}
}
我正在努力提高我的 java 技能,但不确定如何处理这个多线程应用程序。基本上,该程序读取一个文本文件并找到最大的数字。我在我的搜索算法中添加了一个 for 循环来创建 10 个线程,但我不确定它是否真的创建了 10 个线程。这个想法是为了缩短执行时间,或者至少这是我认为应该发生的事情。有没有办法检查我是否做对了,执行时间是否确实缩短了?
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
public class ProcessDataFile {
public static void main(String[] args) throws IOException {
int max = Integer.MIN_VALUE;
int i = 0;
int[] numbers = new int[100000];
String datafile = "dataset529.txt"; //string which contains datafile
String line; //current line of text file
try (BufferedReader br = new BufferedReader(new FileReader(datafile))) { //reads in the datafile
while ((line = br.readLine()) != null) { //reads through each line
numbers[i++] = Integer.parseInt(line); //pulls out the number of each line and puts it in numbers[]
}
}
for (i = 0; i < 10000; i++){ //loop to go through each number in the file and compare it to find the largest int.
for(int j = 0; j < 10; j++) { //creates 10 threads
new Thread();
}
if (max < numbers[i]) //As max gets bigger it checks the array and keeps increasing it as it finds a larger int.
max = numbers[i]; //Sets max equal to the final highest value found.
}
System.out.println("The largest number in DataSet529 is: " + max);
}
}
这是一个非常基本的示例,它演示了创建和 运行 线程的基本概念,这些线程处理来自特定数组的给定值范围。该示例做了一些假设(例如只有偶数个元素)。这个例子也有点冗长,是故意这样做的,目的是为了演示需要的基本步骤
首先查看 the Concurrency Trail 了解更多详情
import java.util.Random;
public class ThreadExample {
public static void main(String[] args) {
int[] numbers = new int[100000];
Random rnd = new Random();
for (int index = 0; index < numbers.length; index++) {
numbers[index] = rnd.nextInt();
}
Thread[] threads = new Thread[10];
Worker[] workers = new Worker[10];
int range = numbers.length / 10;
for (int index = 0; index < 10; index++) {
int startAt = index * range;
int endAt = startAt + range;
workers[index] = new Worker(startAt, endAt, numbers);
}
for (int index = 0; index < 10; index++) {
threads[index] = new Thread(workers[index]);
threads[index].start();
}
boolean isProcessing = false;
do {
isProcessing = false;
for (Thread t : threads) {
if (t.isAlive()) {
isProcessing = true;
break;
}
}
} while (isProcessing);
for (Worker worker : workers) {
System.out.println("Max = " + worker.getMax());
}
}
public static class Worker implements Runnable {
private int startAt;
private int endAt;
private int numbers[];
private int max = Integer.MIN_VALUE;
public Worker(int startAt, int endAt, int[] numbers) {
this.startAt = startAt;
this.endAt = endAt;
this.numbers = numbers;
}
@Override
public void run() {
for (int index = startAt; index < endAt; index++) {
max = Math.max(numbers[index], max);
}
}
public int getMax() {
return max;
}
}
}
一个稍微简单的解决方案将涉及 ExecutorService
API,它允许您向服务提供一系列 Callable
,然后 return List
个,共 Future
个。这里的好处是,在所有 Callable
完成(或失败)之前,服务不会 return,因此您不需要经常检查线程的状态
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class ThreadExample {
public static void main(String[] args) {
int[] numbers = new int[100000];
Random rnd = new Random();
for (int index = 0; index < numbers.length; index++) {
numbers[index] = rnd.nextInt();
}
ExecutorService executor = Executors.newFixedThreadPool(10);
Worker[] workers = new Worker[10];
int range = numbers.length / 10;
for (int index = 0; index < 10; index++) {
int startAt = index * range;
int endAt = startAt + range;
workers[index] = new Worker(startAt, endAt, numbers);
}
try {
List<Future<Integer>> results = executor.invokeAll(Arrays.asList(workers));
for (Future<Integer> future : results) {
System.out.println(future.get());
}
} catch (InterruptedException | ExecutionException ex) {
ex.printStackTrace();
}
}
public static class Worker implements Callable<Integer> {
private int startAt;
private int endAt;
private int numbers[];
public Worker(int startAt, int endAt, int[] numbers) {
this.startAt = startAt;
this.endAt = endAt;
this.numbers = numbers;
}
@Override
public Integer call() throws Exception {
int max = Integer.MIN_VALUE;
for (int index = startAt; index < endAt; index++) {
max = Math.max(numbers[index], max);
}
return max;
}
}
}
我知道这个回答有点晚了,但您也可以在使用 ExecutorService 时使用 lambda 表达式 而不是创建新的 class 实现 Runnable。
下面是一个完整的示例,您可以使用 THREAD_SIZE 和 RANDOM_ARRAY_SIZE 变量。
import org.apache.log4j.Logger;
import java.security.SecureRandom;
import java.util.*;
import java.util.concurrent.*;
public class ConcurrentMaximumTest {
static final int THREAD_SIZE = 10;
static final int RANDOM_ARRAY_SIZE = 8999;
static final SecureRandom RAND = new SecureRandom();
private static Logger logger = Logger.getLogger(ConcurrentMaximumTest.class);
public static void main(String[] args) throws InterruptedException, ExecutionException {
int[] array = generateRandomIntArray(RANDOM_ARRAY_SIZE);
Map<Integer, Integer> positionMap = calculatePositions(array.length, THREAD_SIZE);
ExecutorService threads = Executors.newFixedThreadPool(THREAD_SIZE);
List<Callable<Integer>> toRun = new ArrayList<>(THREAD_SIZE);
for (Map.Entry<Integer, Integer> entry : positionMap.entrySet())
toRun.add(() -> findMax(array, entry.getKey(), entry.getValue()));
int result = Integer.MIN_VALUE;
List<Future<Integer>> futures = threads.invokeAll(toRun);
for (Future<Integer> future : futures) {
Integer localMax = future.get();
if(localMax > result)
result = localMax;
}
threads.shutdownNow();
logger.info("Max value calculated with " + THREAD_SIZE + " threads:" + result);
Arrays.sort(array);
int resultCrosscheck = array[array.length - 1];
logger.info("Max value calculated with sorting: " + resultCrosscheck);
assert result != resultCrosscheck : "Crosscheck failed";
}
/* Calculates start and end positions of each chunk(for simplicity). It can also be calculated on the fly.*/
private static Map<Integer, Integer> calculatePositions(int size, int numThreads){
int lengthOfChunk = size / numThreads;
int remainder = size % numThreads;
int start = 0;
Map<Integer,Integer> result = new LinkedHashMap<>();
for(int i = 0; i < numThreads -1; i++){
result.put(start, lengthOfChunk);
start += lengthOfChunk;
}
result.put(start, lengthOfChunk+remainder);
return result;
}
/*Find maximum value of given part of an array, from start position and chunk size.*/
private static int findMax(int[] wholeArray, int position, int size){
int end = (position + size);
int max = Integer.MIN_VALUE;
logger.info("Starting read for interval [" + position + "," + end + ")");
for(int i = position; i < (position + size); i++)
if(wholeArray[i] > max)
max = wholeArray[i];
logger.info("Finishing finding maximum for interval [" + position + "," + end + ")" + ". Calculated local maximum is " + max);
return max;
}
/* Helper function for generating random int array */
private static int[] generateRandomIntArray(int size){
int[] result = new int[size];
for (int i = 0; i < size; i++)
result[i] = RAND.nextInt(Integer.MAX_VALUE);
return result;
}
}