将并行的数组流减少为单个数组
reduce a parallel stream of arrays into a single array
我正在尝试将并行的数组流 Stream 缩减为单个数组 ArrayList 所以
我使用带有累加器和组合器的 reduce 方法如下:-
public static void main(String [] args) {
ArrayList<String> l1 = new ArrayList<>();
l1.add("a1");
l1.add("a2");
List<String> l2 = new ArrayList<>();
l2.add("a3");
l2.add("a4");
List<List<String>> l = new ArrayList<>();
l.add(l1);
l.add(l2);
Stream<List<String>> stream = l.stream();
join(stream).forEach(System.out::println);
}
private static <T> List<T> join(Stream<List<T>> stream) {
return stream.parallel().reduce(new ArrayList<>(), (total, element) -> {
System.out.println("total: " + total);
System.out.println("element: " + element);
total.addAll(element);
return total;
}, (total1, total2) -> {
System.out.println("total1: " + total1);
System.out.println("total2: " + total2);
total1.addAll(total2);
return total1;
});
}
我知道组合器用于组合并行流..但它没有像我预期的那样工作,
因为我得到重复的结果如下:-
total: []
element: [a3, a4]
total: []
element: [a1, a2]
total1: [a3, a4, a1, a2]
total2: [a3, a4, a1, a2]
a3
a4
a1
a2
a3
a4
a1
a2
那么为什么结果是重复的?在累加器中使用数组列表也是线程安全的吗?
你应该只使用 flatMap
:
Returns a stream consisting of the results of replacing each element of this stream with the contents of a mapped stream produced by applying the provided mapping function to each element. Each mapped stream is closed after its contents have been placed into this stream. (If a mapped stream is null an empty stream is used, instead.)
This is an intermediate operation.
l.stream().flatMap(x -> x.stream()).collect(Collectors.toList()); // is [a1, a2, a3, a4]
或
l.stream().flatMap(List::stream).collect(Collectors.toList());
您的代码的问题在于您将 functional-style 代码与 side-effects 混合在一起。这可不是什么好兆头。如果删除 side-effects,则输出如预期的那样:
private static <T> List<T> join(Stream<List<T>> stream) {
return stream.parallel().reduce(new ArrayList<>(), (total, element) -> {
System.out.println("total: " + total);
System.out.println("element: " + element);
//total.addAll(element);
//return total;
var list = new ArrayList<T>(total);
list.addAll(element);
return list;
}, (total1, total2) -> {
System.out.println("total1: " + total1);
System.out.println("total2: " + total2);
//total1.addAll(total2);
//return total1;
var list = new ArrayList<T>(total1);
list.addAll(total2);
return list;
});
}
您还应避免使用 parallel()
,除非您有明确的 objective 理由。并行是一种开销,只有在有繁重的工作要做时才会变得更高效。否则,同步开销将是比任何收益都更大的惩罚。
我正在尝试将并行的数组流 Stream
public static void main(String [] args) {
ArrayList<String> l1 = new ArrayList<>();
l1.add("a1");
l1.add("a2");
List<String> l2 = new ArrayList<>();
l2.add("a3");
l2.add("a4");
List<List<String>> l = new ArrayList<>();
l.add(l1);
l.add(l2);
Stream<List<String>> stream = l.stream();
join(stream).forEach(System.out::println);
}
private static <T> List<T> join(Stream<List<T>> stream) {
return stream.parallel().reduce(new ArrayList<>(), (total, element) -> {
System.out.println("total: " + total);
System.out.println("element: " + element);
total.addAll(element);
return total;
}, (total1, total2) -> {
System.out.println("total1: " + total1);
System.out.println("total2: " + total2);
total1.addAll(total2);
return total1;
});
}
我知道组合器用于组合并行流..但它没有像我预期的那样工作, 因为我得到重复的结果如下:-
total: []
element: [a3, a4]
total: []
element: [a1, a2]
total1: [a3, a4, a1, a2]
total2: [a3, a4, a1, a2]
a3
a4
a1
a2
a3
a4
a1
a2
那么为什么结果是重复的?在累加器中使用数组列表也是线程安全的吗?
你应该只使用 flatMap
:
Returns a stream consisting of the results of replacing each element of this stream with the contents of a mapped stream produced by applying the provided mapping function to each element. Each mapped stream is closed after its contents have been placed into this stream. (If a mapped stream is null an empty stream is used, instead.)
This is an intermediate operation.
l.stream().flatMap(x -> x.stream()).collect(Collectors.toList()); // is [a1, a2, a3, a4]
或
l.stream().flatMap(List::stream).collect(Collectors.toList());
您的代码的问题在于您将 functional-style 代码与 side-effects 混合在一起。这可不是什么好兆头。如果删除 side-effects,则输出如预期的那样:
private static <T> List<T> join(Stream<List<T>> stream) {
return stream.parallel().reduce(new ArrayList<>(), (total, element) -> {
System.out.println("total: " + total);
System.out.println("element: " + element);
//total.addAll(element);
//return total;
var list = new ArrayList<T>(total);
list.addAll(element);
return list;
}, (total1, total2) -> {
System.out.println("total1: " + total1);
System.out.println("total2: " + total2);
//total1.addAll(total2);
//return total1;
var list = new ArrayList<T>(total1);
list.addAll(total2);
return list;
});
}
您还应避免使用 parallel()
,除非您有明确的 objective 理由。并行是一种开销,只有在有繁重的工作要做时才会变得更高效。否则,同步开销将是比任何收益都更大的惩罚。