累加器在并行流中无法正常工作

Accumulator not working properly in parallel stream

我制作了可以将流简化为地图的收集器,该地图将键作为某些客户可以购买的商品,并将客户的姓名作为值,我的实现在顺序流中工作得很好 但是当我尝试使用 parallel 时它根本不起作用,结果集总是包含一个客户名称。

List<Customer> customerList = this.mall.getCustomerList();

Supplier<Object> supplier = ConcurrentHashMap<String,Set<String>>::new;

BiConsumer<Object, Customer> accumulator = ((o, customer) -> customer.getWantToBuy().stream().map(Item::getName).forEach(
            item -> ((ConcurrentHashMap<String,Set<String>>)o)
                    .merge(item,new HashSet<String>(Collections.singleton(customer.getName())),
                            (s,s2) -> {
                                HashSet<String> res = new HashSet<>(s);
                                res.addAll(s2);
                                return res;
                            })
    ));

BinaryOperator<Object> combiner = (o,o2) -> {
        ConcurrentHashMap<String,Set<String>> res = new ConcurrentHashMap<>((ConcurrentHashMap<String,Set<String>>)o);
        res.putAll((ConcurrentHashMap<String,Set<String>>)o2);
        return res;
    };

Function<Object, Map<String, Set<String>>> finisher = (o) -> new HashMap<>((ConcurrentHashMap<String,Set<String>>)o);

Collector<Customer, ?, Map<String, Set<String>>> toItemAsKey =
        new CollectorImpl<>(supplier, accumulator, combiner, finisher, EnumSet.of(
            Collector.Characteristics.CONCURRENT,
            Collector.Characteristics.IDENTITY_FINISH));

Map<String, Set<String>> itemMap = customerList.stream().parallel().collect(toItemAsKey);

我的 accumulator 实现或另一个 Function 肯定有问题,但我无法解决!谁能建议我该怎么做?

您的组合器未正确实现。
您覆盖具有相同密钥的所有条目。您想要的是向现有键添加值。

BinaryOperator<ConcurrentHashMap<String,Set<String>>> combiner = (o,o2) -> {
        ConcurrentHashMap<String,Set<String>> res = new ConcurrentHashMap<>(o);
        o2.forEach((key, set) -> set.forEach(string -> res.computeIfAbsent(key, k -> new HashSet<>())
                                                          .add(string)));
        return res;
    };