Java 8 流:使用多个收集器进行分组
Java 8 Stream: groupingBy with multiple Collectors
我想使用 Java 8 Stream 和 Group by one classifier 但有多个 Collector 函数。所以在分组时,例如计算一个字段(或者可能是另一个字段)的平均值和总和。
我试着用一个例子来简化一下:
public void test() {
List<Person> persons = new ArrayList<>();
persons.add(new Person("Person One", 1, 18));
persons.add(new Person("Person Two", 1, 20));
persons.add(new Person("Person Three", 1, 30));
persons.add(new Person("Person Four", 2, 30));
persons.add(new Person("Person Five", 2, 29));
persons.add(new Person("Person Six", 3, 18));
Map<Integer, Data> result = persons.stream().collect(
groupingBy(person -> person.group, multiCollector)
);
}
class Person {
String name;
int group;
int age;
// Contructor, getter and setter
}
class Data {
long average;
long sum;
public Data(long average, long sum) {
this.average = average;
this.sum = sum;
}
// Getter and setter
}
结果应该是一个关联分组结果的Map like
1 => Data(average(18, 20, 30), sum(18, 20, 30))
2 => Data(average(30, 29), sum(30, 29))
3 => ....
这对于像 "Collectors.counting()" 这样的一个函数来说工作得很好,但我喜欢链接多个(理想情况下是列表中的无限)。
List<Collector<Person, ?, ?>>
是否可以这样做?
你可以将它们链接起来,
一个收集器只能产生一个object,但是这个object可以保存多个值。例如,您可以 return 一个地图,其中地图为您正在 return 收集的每个收集器都有一个条目。
您可以使用Collectors.of(HashMap::new, accumulator, combiner);
您的 accumulator
将有一个收集者地图,其中生成的地图的键与收集者的名称相匹配。当并行执行时,组合器需要一种方法来组合多个结果 esp。
通常,内置收集器使用一种数据类型来获得复杂的结果。
来自收藏家
public static <T>
Collector<T, ?, DoubleSummaryStatistics> summarizingDouble(ToDoubleFunction<? super T> mapper) {
return new CollectorImpl<T, DoubleSummaryStatistics, DoubleSummaryStatistics>(
DoubleSummaryStatistics::new,
(r, t) -> r.accept(mapper.applyAsDouble(t)),
(l, r) -> { l.combine(r); return l; }, CH_ID);
}
和它自己的 class
public class DoubleSummaryStatistics implements DoubleConsumer {
private long count;
private double sum;
private double sumCompensation; // Low order bits of sum
private double simpleSum; // Used to compute right sum for non-finite inputs
private double min = Double.POSITIVE_INFINITY;
private double max = Double.NEGATIVE_INFINITY;
求和求平均的具体问题,用collectingAndThen
along with summarizingDouble
:
Map<Integer, Data> result = persons.stream().collect(
groupingBy(Person::getGroup,
collectingAndThen(summarizingDouble(Person::getAge),
dss -> new Data((long)dss.getAverage(), (long)dss.getSum()))));
对于更一般的问题(收集关于你的人物的各种信息),你可以像这样创建一个复杂的收集器:
// Individual collectors are defined here
List<Collector<Person, ?, ?>> collectors = Arrays.asList(
Collectors.averagingInt(Person::getAge),
Collectors.summingInt(Person::getAge));
@SuppressWarnings("unchecked")
Collector<Person, List<Object>, List<Object>> complexCollector = Collector.of(
() -> collectors.stream().map(Collector::supplier)
.map(Supplier::get).collect(toList()),
(list, e) -> IntStream.range(0, collectors.size()).forEach(
i -> ((BiConsumer<Object, Person>) collectors.get(i).accumulator()).accept(list.get(i), e)),
(l1, l2) -> {
IntStream.range(0, collectors.size()).forEach(
i -> l1.set(i, ((BinaryOperator<Object>) collectors.get(i).combiner()).apply(l1.get(i), l2.get(i))));
return l1;
},
list -> {
IntStream.range(0, collectors.size()).forEach(
i -> list.set(i, ((Function<Object, Object>)collectors.get(i).finisher()).apply(list.get(i))));
return list;
});
Map<Integer, List<Object>> result = persons.stream().collect(
groupingBy(Person::getGroup, complexCollector));
映射值是列表,其中第一个元素是应用第一个收集器的结果,依此类推。您可以使用 Collectors.collectingAndThen(complexCollector, list -> ...)
添加自定义完成步骤,将此列表转换为更合适的内容。
您应该构建一个作为收集器聚合器的抽象,而不是链接收集器:使用 class 实现 Collector
接口,它接受收集器列表并将每个方法调用委托给他们每个人。然后,最后,您 return new Data()
得到嵌套收集器产生的所有结果。
您可以通过使用 Collector.of(supplier, accumulator, combiner, finisher, Collector.Characteristics... characteristics)
避免使用所有方法声明创建自定义 class finisher
lambda 将调用每个嵌套收集器的完成器,然后 return Data
实例。
通过使用 map 作为输出类型,可以有一个潜在无限的 reducer 列表,每个 reducer 都产生自己的统计数据并将其添加到 map。
public static <K, V> Map<K, V> addMap(Map<K, V> map, K k, V v) {
Map<K, V> mapout = new HashMap<K, V>();
mapout.putAll(map);
mapout.put(k, v);
return mapout;
}
...
List<Person> persons = new ArrayList<>();
persons.add(new Person("Person One", 1, 18));
persons.add(new Person("Person Two", 1, 20));
persons.add(new Person("Person Three", 1, 30));
persons.add(new Person("Person Four", 2, 30));
persons.add(new Person("Person Five", 2, 29));
persons.add(new Person("Person Six", 3, 18));
List<BiFunction<Map<String, Integer>, Person, Map<String, Integer>>> listOfReducers = new ArrayList<>();
listOfReducers.add((m, p) -> addMap(m, "Count", Optional.ofNullable(m.get("Count")).orElse(0) + 1));
listOfReducers.add((m, p) -> addMap(m, "Sum", Optional.ofNullable(m.get("Sum")).orElse(0) + p.i1));
BiFunction<Map<String, Integer>, Person, Map<String, Integer>> applyList
= (mapin, p) -> {
Map<String, Integer> mapout = mapin;
for (BiFunction<Map<String, Integer>, Person, Map<String, Integer>> f : listOfReducers) {
mapout = f.apply(mapout, p);
}
return mapout;
};
BinaryOperator<Map<String, Integer>> combineMaps
= (map1, map2) -> {
Map<String, Integer> mapout = new HashMap<>();
mapout.putAll(map1);
mapout.putAll(map2);
return mapout;
};
Map<String, Integer> map
= persons
.stream()
.reduce(new HashMap<String, Integer>(),
applyList, combineMaps);
System.out.println("map = " + map);
产生:
map = {Sum=10, Count=6}
在 Java12 中,收集器 API 已使用静态 teeing(...) 函数进行扩展:
teeing(Collector<? super T,?,R1> downstream1,
Collector<? super T,?,R2> downstream2,
BiFunction<? super R1,? super R2,R> merger)
这提供了一种内置功能,可以在一个 Stream 上使用两个收集器并将结果合并到一个对象中。
下面是一个小示例,其中将员工列表分成年龄组,每组两个 Collectors.summarizingInt() performed on age and salary are returned as a list of IntSummaryStatistics:
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
public class CollectorTeeingTest {
public static void main(String... args){
NavigableSet<Integer> age_groups = new TreeSet<>();
age_groups.addAll(List.of(30,40,50,60,Integer.MAX_VALUE)); //we don't want to map to null
Function<Integer,Integer> to_age_groups = age -> age_groups.higher(age);
List<Employee> employees = List.of( new Employee("A",21,2000),
new Employee("B",24,2400),
new Employee("C",32,3000),
new Employee("D",40,4000),
new Employee("E",41,4100),
new Employee("F",61,6100)
);
Map<Integer,List<IntSummaryStatistics>> stats = employees.stream()
.collect(Collectors.groupingBy(
employee -> to_age_groups.apply(employee.getAge()),
Collectors.teeing(
Collectors.summarizingInt(Employee::getAge),
Collectors.summarizingInt(Employee::getSalary),
(stat1, stat2) -> List.of(stat1,stat2))));
stats.entrySet().stream().forEach(entry -> {
System.out.println("Age-group: <"+entry.getKey()+"\n"+entry.getValue());
});
}
public static class Employee{
private final String name;
private final int age;
private final int salary;
public Employee(String name, int age, int salary){
this.name = name;
this.age = age;
this.salary = salary;
}
public String getName(){return this.name;}
public int getAge(){return this.age;}
public int getSalary(){return this.salary;}
}
}
输出:
Age-group: <2147483647
[IntSummaryStatistics{count=1, sum=61, min=61, average=61,000000, max=61}, IntSummaryStatistics{count=1, sum=6100, min=6100, average=6100,000000, max=6100}]
Age-group: <50
[IntSummaryStatistics{count=2, sum=81, min=40, average=40,500000, max=41}, IntSummaryStatistics{count=2, sum=8100, min=4000, average=4050,000000, max=4100}]
Age-group: <40
[IntSummaryStatistics{count=1, sum=32, min=32, average=32,000000, max=32}, IntSummaryStatistics{count=1, sum=3000, min=3000, average=3000,000000, max=3000}]
Age-group: <30
[IntSummaryStatistics{count=2, sum=45, min=21, average=22,500000, max=24}, IntSummaryStatistics{count=2, sum=4400, min=2000, average=2200,000000, max=2400}]
我想使用 Java 8 Stream 和 Group by one classifier 但有多个 Collector 函数。所以在分组时,例如计算一个字段(或者可能是另一个字段)的平均值和总和。
我试着用一个例子来简化一下:
public void test() {
List<Person> persons = new ArrayList<>();
persons.add(new Person("Person One", 1, 18));
persons.add(new Person("Person Two", 1, 20));
persons.add(new Person("Person Three", 1, 30));
persons.add(new Person("Person Four", 2, 30));
persons.add(new Person("Person Five", 2, 29));
persons.add(new Person("Person Six", 3, 18));
Map<Integer, Data> result = persons.stream().collect(
groupingBy(person -> person.group, multiCollector)
);
}
class Person {
String name;
int group;
int age;
// Contructor, getter and setter
}
class Data {
long average;
long sum;
public Data(long average, long sum) {
this.average = average;
this.sum = sum;
}
// Getter and setter
}
结果应该是一个关联分组结果的Map like
1 => Data(average(18, 20, 30), sum(18, 20, 30))
2 => Data(average(30, 29), sum(30, 29))
3 => ....
这对于像 "Collectors.counting()" 这样的一个函数来说工作得很好,但我喜欢链接多个(理想情况下是列表中的无限)。
List<Collector<Person, ?, ?>>
是否可以这样做?
你可以将它们链接起来,
一个收集器只能产生一个object,但是这个object可以保存多个值。例如,您可以 return 一个地图,其中地图为您正在 return 收集的每个收集器都有一个条目。
您可以使用Collectors.of(HashMap::new, accumulator, combiner);
您的 accumulator
将有一个收集者地图,其中生成的地图的键与收集者的名称相匹配。当并行执行时,组合器需要一种方法来组合多个结果 esp。
通常,内置收集器使用一种数据类型来获得复杂的结果。
来自收藏家
public static <T>
Collector<T, ?, DoubleSummaryStatistics> summarizingDouble(ToDoubleFunction<? super T> mapper) {
return new CollectorImpl<T, DoubleSummaryStatistics, DoubleSummaryStatistics>(
DoubleSummaryStatistics::new,
(r, t) -> r.accept(mapper.applyAsDouble(t)),
(l, r) -> { l.combine(r); return l; }, CH_ID);
}
和它自己的 class
public class DoubleSummaryStatistics implements DoubleConsumer {
private long count;
private double sum;
private double sumCompensation; // Low order bits of sum
private double simpleSum; // Used to compute right sum for non-finite inputs
private double min = Double.POSITIVE_INFINITY;
private double max = Double.NEGATIVE_INFINITY;
求和求平均的具体问题,用collectingAndThen
along with summarizingDouble
:
Map<Integer, Data> result = persons.stream().collect(
groupingBy(Person::getGroup,
collectingAndThen(summarizingDouble(Person::getAge),
dss -> new Data((long)dss.getAverage(), (long)dss.getSum()))));
对于更一般的问题(收集关于你的人物的各种信息),你可以像这样创建一个复杂的收集器:
// Individual collectors are defined here
List<Collector<Person, ?, ?>> collectors = Arrays.asList(
Collectors.averagingInt(Person::getAge),
Collectors.summingInt(Person::getAge));
@SuppressWarnings("unchecked")
Collector<Person, List<Object>, List<Object>> complexCollector = Collector.of(
() -> collectors.stream().map(Collector::supplier)
.map(Supplier::get).collect(toList()),
(list, e) -> IntStream.range(0, collectors.size()).forEach(
i -> ((BiConsumer<Object, Person>) collectors.get(i).accumulator()).accept(list.get(i), e)),
(l1, l2) -> {
IntStream.range(0, collectors.size()).forEach(
i -> l1.set(i, ((BinaryOperator<Object>) collectors.get(i).combiner()).apply(l1.get(i), l2.get(i))));
return l1;
},
list -> {
IntStream.range(0, collectors.size()).forEach(
i -> list.set(i, ((Function<Object, Object>)collectors.get(i).finisher()).apply(list.get(i))));
return list;
});
Map<Integer, List<Object>> result = persons.stream().collect(
groupingBy(Person::getGroup, complexCollector));
映射值是列表,其中第一个元素是应用第一个收集器的结果,依此类推。您可以使用 Collectors.collectingAndThen(complexCollector, list -> ...)
添加自定义完成步骤,将此列表转换为更合适的内容。
您应该构建一个作为收集器聚合器的抽象,而不是链接收集器:使用 class 实现 Collector
接口,它接受收集器列表并将每个方法调用委托给他们每个人。然后,最后,您 return new Data()
得到嵌套收集器产生的所有结果。
您可以通过使用 Collector.of(supplier, accumulator, combiner, finisher, Collector.Characteristics... characteristics)
避免使用所有方法声明创建自定义 class finisher
lambda 将调用每个嵌套收集器的完成器,然后 return Data
实例。
通过使用 map 作为输出类型,可以有一个潜在无限的 reducer 列表,每个 reducer 都产生自己的统计数据并将其添加到 map。
public static <K, V> Map<K, V> addMap(Map<K, V> map, K k, V v) {
Map<K, V> mapout = new HashMap<K, V>();
mapout.putAll(map);
mapout.put(k, v);
return mapout;
}
...
List<Person> persons = new ArrayList<>();
persons.add(new Person("Person One", 1, 18));
persons.add(new Person("Person Two", 1, 20));
persons.add(new Person("Person Three", 1, 30));
persons.add(new Person("Person Four", 2, 30));
persons.add(new Person("Person Five", 2, 29));
persons.add(new Person("Person Six", 3, 18));
List<BiFunction<Map<String, Integer>, Person, Map<String, Integer>>> listOfReducers = new ArrayList<>();
listOfReducers.add((m, p) -> addMap(m, "Count", Optional.ofNullable(m.get("Count")).orElse(0) + 1));
listOfReducers.add((m, p) -> addMap(m, "Sum", Optional.ofNullable(m.get("Sum")).orElse(0) + p.i1));
BiFunction<Map<String, Integer>, Person, Map<String, Integer>> applyList
= (mapin, p) -> {
Map<String, Integer> mapout = mapin;
for (BiFunction<Map<String, Integer>, Person, Map<String, Integer>> f : listOfReducers) {
mapout = f.apply(mapout, p);
}
return mapout;
};
BinaryOperator<Map<String, Integer>> combineMaps
= (map1, map2) -> {
Map<String, Integer> mapout = new HashMap<>();
mapout.putAll(map1);
mapout.putAll(map2);
return mapout;
};
Map<String, Integer> map
= persons
.stream()
.reduce(new HashMap<String, Integer>(),
applyList, combineMaps);
System.out.println("map = " + map);
产生:
map = {Sum=10, Count=6}
在 Java12 中,收集器 API 已使用静态 teeing(...) 函数进行扩展:
teeing(Collector<? super T,?,R1> downstream1, Collector<? super T,?,R2> downstream2, BiFunction<? super R1,? super R2,R> merger)
这提供了一种内置功能,可以在一个 Stream 上使用两个收集器并将结果合并到一个对象中。
下面是一个小示例,其中将员工列表分成年龄组,每组两个 Collectors.summarizingInt() performed on age and salary are returned as a list of IntSummaryStatistics:
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
public class CollectorTeeingTest {
public static void main(String... args){
NavigableSet<Integer> age_groups = new TreeSet<>();
age_groups.addAll(List.of(30,40,50,60,Integer.MAX_VALUE)); //we don't want to map to null
Function<Integer,Integer> to_age_groups = age -> age_groups.higher(age);
List<Employee> employees = List.of( new Employee("A",21,2000),
new Employee("B",24,2400),
new Employee("C",32,3000),
new Employee("D",40,4000),
new Employee("E",41,4100),
new Employee("F",61,6100)
);
Map<Integer,List<IntSummaryStatistics>> stats = employees.stream()
.collect(Collectors.groupingBy(
employee -> to_age_groups.apply(employee.getAge()),
Collectors.teeing(
Collectors.summarizingInt(Employee::getAge),
Collectors.summarizingInt(Employee::getSalary),
(stat1, stat2) -> List.of(stat1,stat2))));
stats.entrySet().stream().forEach(entry -> {
System.out.println("Age-group: <"+entry.getKey()+"\n"+entry.getValue());
});
}
public static class Employee{
private final String name;
private final int age;
private final int salary;
public Employee(String name, int age, int salary){
this.name = name;
this.age = age;
this.salary = salary;
}
public String getName(){return this.name;}
public int getAge(){return this.age;}
public int getSalary(){return this.salary;}
}
}
输出:
Age-group: <2147483647
[IntSummaryStatistics{count=1, sum=61, min=61, average=61,000000, max=61}, IntSummaryStatistics{count=1, sum=6100, min=6100, average=6100,000000, max=6100}]
Age-group: <50
[IntSummaryStatistics{count=2, sum=81, min=40, average=40,500000, max=41}, IntSummaryStatistics{count=2, sum=8100, min=4000, average=4050,000000, max=4100}]
Age-group: <40
[IntSummaryStatistics{count=1, sum=32, min=32, average=32,000000, max=32}, IntSummaryStatistics{count=1, sum=3000, min=3000, average=3000,000000, max=3000}]
Age-group: <30
[IntSummaryStatistics{count=2, sum=45, min=21, average=22,500000, max=24}, IntSummaryStatistics{count=2, sum=4400, min=2000, average=2200,000000, max=2400}]