Java 并行流填充数组
Java parallel stream to fill array
我有一个非常大的 hashmap,里面全是素数。
var mapA = new HashMap<Integer, Long>();
我需要对其进行大量计算,所以我使用并行流:
var res = new ArrayList<Integer();
mapA.entrySet()
.parallelStream()
.forEach( x -> {
var values = mapA.entrySet()
.parallelStream()
.filter( /*conditions*/ )
.map(y -> y.getKey())
.toArray();
Arrays.stream(values)
.parallel()
.sorted()
.forEach(val -> {
synchronized (this) {
res.add(x.getKey());
res.add((Integer) val);
}
});
});
如您所见,res
是一个超出流范围的数组。我需要并行循环,否则计算可能需要几分钟。这个有必要吗?
.forEach(val -> {
synchronized (this) {
res.add(x.getKey());
res.add((Integer) val);
}
});
我添加了 synchronized
因为流是并行运行的,所以我不想在 2 个或更多线程同时在 res
中添加数据时出现竞争条件时间.
我已尝试删除同步 (this),程序仍然运行良好。但是我怎么能确定它会一直正常工作呢?
谢谢
如果需要,我将在此处添加整个代码:
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class DiffieHellman {
private static final int LIMIT = 65536;
private final long p;
private final long g;
public DiffieHellman(long p, long g) {
this.p = p;
this.g = g;
}
public List<Integer> tryBruteForce(long publicA, long publicB) {
List<Integer> res = new ArrayList<Integer>();
var mapA = new HashMap<Integer, Long>(
IntStream
.rangeClosed(0, LIMIT)
.parallel()
.boxed()
.collect(
Collectors.toMap(x -> x, x -> DiffieHellmanUtils.modPow(publicB, x, p))
)
);
var mapB = new HashMap<Integer, Long>(
IntStream
.rangeClosed(0, LIMIT)
.parallel()
.boxed()
.collect(
Collectors.toMap(x -> x, x -> DiffieHellmanUtils.modPow(publicB, x, p))
)
);
mapA.entrySet()
.parallelStream()
.forEach( x -> {
var values = mapB.entrySet()
.parallelStream()
.filter( y -> y.getValue().equals(x.getValue()))
.map(Map.Entry::getKey)
.toArray(Integer[]::new);
Arrays.stream(values)
.parallel()
.sorted()
.forEach(val -> {
res.add(x.getKey());
res.add((Integer) val);
});
});
return res;
}
}
当然,您可以像其他答案指出的那样简单地使用同步集合,但是由于争用,这可能性能不够,而且编写起来仍然很麻烦。
相反,您可以通过惯用地使用 Stream API 以稍微不同的方式解决问题。
首先,嵌套操作可以在单个流管道中完成:
mapB.entrySet()
.parallelStream()
.filter(y -> y.getValue().equals(x.getValue()))
.map(y -> y.getKey())
.sorted()
.forEach(val -> {
synchronized (this) {
res.add(x.getKey());
res.add((Integer) val);
}
});
其次,为了避免并发问题,最简单的方法是放弃命令式方法并利用 Stream API 的声明性。
为了做到这一点,人们不会手动 for-each
然后 add
结果元素,而是让 Stream 管理它。
您在这里要做的是通过将 mapA
entrySet() 的每个元素替换为自定义序列来创建一个新序列:
List<Integer> res = mapA.entrySet()
.parallelStream()
.flatMap(x -> mapB.entrySet().stream()
.filter(y -> y.getValue().equals(x.getValue()))
.map(Map.Entry::getKey)
.sorted()
.flatMap(v -> Stream.of(x.getKey(), v)))
.collect(Collectors.toList());
可以省略嵌套 parallelStream
,因为 flatMap
无论如何都会在 Stream 上调用 sequential()
。
我有一个非常大的 hashmap,里面全是素数。
var mapA = new HashMap<Integer, Long>();
我需要对其进行大量计算,所以我使用并行流:
var res = new ArrayList<Integer();
mapA.entrySet()
.parallelStream()
.forEach( x -> {
var values = mapA.entrySet()
.parallelStream()
.filter( /*conditions*/ )
.map(y -> y.getKey())
.toArray();
Arrays.stream(values)
.parallel()
.sorted()
.forEach(val -> {
synchronized (this) {
res.add(x.getKey());
res.add((Integer) val);
}
});
});
如您所见,res
是一个超出流范围的数组。我需要并行循环,否则计算可能需要几分钟。这个有必要吗?
.forEach(val -> {
synchronized (this) {
res.add(x.getKey());
res.add((Integer) val);
}
});
我添加了 synchronized
因为流是并行运行的,所以我不想在 2 个或更多线程同时在 res
中添加数据时出现竞争条件时间.
我已尝试删除同步 (this),程序仍然运行良好。但是我怎么能确定它会一直正常工作呢?
谢谢
如果需要,我将在此处添加整个代码:
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class DiffieHellman {
private static final int LIMIT = 65536;
private final long p;
private final long g;
public DiffieHellman(long p, long g) {
this.p = p;
this.g = g;
}
public List<Integer> tryBruteForce(long publicA, long publicB) {
List<Integer> res = new ArrayList<Integer>();
var mapA = new HashMap<Integer, Long>(
IntStream
.rangeClosed(0, LIMIT)
.parallel()
.boxed()
.collect(
Collectors.toMap(x -> x, x -> DiffieHellmanUtils.modPow(publicB, x, p))
)
);
var mapB = new HashMap<Integer, Long>(
IntStream
.rangeClosed(0, LIMIT)
.parallel()
.boxed()
.collect(
Collectors.toMap(x -> x, x -> DiffieHellmanUtils.modPow(publicB, x, p))
)
);
mapA.entrySet()
.parallelStream()
.forEach( x -> {
var values = mapB.entrySet()
.parallelStream()
.filter( y -> y.getValue().equals(x.getValue()))
.map(Map.Entry::getKey)
.toArray(Integer[]::new);
Arrays.stream(values)
.parallel()
.sorted()
.forEach(val -> {
res.add(x.getKey());
res.add((Integer) val);
});
});
return res;
}
}
当然,您可以像其他答案指出的那样简单地使用同步集合,但是由于争用,这可能性能不够,而且编写起来仍然很麻烦。
相反,您可以通过惯用地使用 Stream API 以稍微不同的方式解决问题。
首先,嵌套操作可以在单个流管道中完成:
mapB.entrySet()
.parallelStream()
.filter(y -> y.getValue().equals(x.getValue()))
.map(y -> y.getKey())
.sorted()
.forEach(val -> {
synchronized (this) {
res.add(x.getKey());
res.add((Integer) val);
}
});
其次,为了避免并发问题,最简单的方法是放弃命令式方法并利用 Stream API 的声明性。
为了做到这一点,人们不会手动 for-each
然后 add
结果元素,而是让 Stream 管理它。
您在这里要做的是通过将 mapA
entrySet() 的每个元素替换为自定义序列来创建一个新序列:
List<Integer> res = mapA.entrySet()
.parallelStream()
.flatMap(x -> mapB.entrySet().stream()
.filter(y -> y.getValue().equals(x.getValue()))
.map(Map.Entry::getKey)
.sorted()
.flatMap(v -> Stream.of(x.getKey(), v)))
.collect(Collectors.toList());
可以省略嵌套 parallelStream
,因为 flatMap
无论如何都会在 Stream 上调用 sequential()
。