Java 8 流混合两个元素

Java 8 Stream mixing two elements

我在数组列表中有很多 Slot 类型的对象。

插槽class如下图-

Slot{
   int start;
   int end;
}

List<Slot> 类型的列表被调用 slots。插槽根据开始时间排序。一个时隙的结束时间可能等于下一个时隙的开始时间,但它们永远不会重叠。

有什么方法可以使用 Java 8 个流遍历此列表,如果一个的结束时间与下一个的开始时间匹配,则合并两个插槽并将它们输出到 ArrayList?

您可以使用 reduce() 方法来实现,而 U 是另一个 List<Slot>,但它比仅在 for 循环中实现要复杂得多,除非需要并行处理。

测试设置见答案结尾。

这里是 for 循环实现:

List<Slot> mixed = new ArrayList<>();
Slot last = null;
for (Slot slot : slots)
    if (last == null || last.end != slot.start)
        mixed.add(last = slot);
    else
        mixed.set(mixed.size() - 1, last = new Slot(last.start, slot.end));

输出

[3-5, 5-7, 8-10, 10-11, 11-13]
[3-7, 8-13]

这里是流化简实现:

List<Slot> mixed = slots.stream().reduce((List<Slot>)null, (list, slot) -> {
    System.out.println("accumulator.apply(" + list + ", " + slot + ")");
    if (list == null) {
        List<Slot> newList = new ArrayList<>();
        newList.add(slot);
        return newList;
    }
    Slot last = list.get(list.size() - 1);
    if (last.end != slot.start)
        list.add(slot);
    else
        list.set(list.size() - 1, new Slot(last.start, slot.end));
    return list;
}, (list1, list2) -> {
    System.out.println("combiner.apply(" + list1 + ", " + list2 + ")");
    if (list1 == null)
        return list2;
    if (list2 == null)
        return list1;
    Slot lastOf1 = list1.get(list1.size() - 1);
    Slot firstOf2 = list2.get(0);
    if (lastOf1.end != firstOf2.start)
        list1.addAll(list2);
    else {
        list1.set(list1.size() - 1, new Slot(lastOf1.start, firstOf2.end));
        list1.addAll(list2.subList(1, list2.size()));
    }
    return list1;
});

输出

accumulator.apply(null, 3-5)
accumulator.apply([3-5], 5-7)
accumulator.apply([3-7], 8-10)
accumulator.apply([3-7, 8-10], 10-11)
accumulator.apply([3-7, 8-11], 11-13)
[3-5, 5-7, 8-10, 10-11, 11-13]
[3-7, 8-13]

将其更改为并行(多线程)处理:

List<Slot> mixed = slots.stream().parallel().reduce(...

输出

accumulator.apply(null, 8-10)
accumulator.apply(null, 3-5)
accumulator.apply(null, 10-11)
accumulator.apply(null, 11-13)
combiner.apply([10-11], [11-13])
accumulator.apply(null, 5-7)
combiner.apply([3-5], [5-7])
combiner.apply([8-10], [10-13])
combiner.apply([3-7], [8-13])
[3-5, 5-7, 8-10, 10-11, 11-13]
[3-7, 8-13]

警告

如果 slots 是一个空列表,则 for 循环版本的结果是一个空列表,而流版本的结果是一个 null 值。


测试设置

以上所有代码都使用了以下Slot class:

class Slot {
    int start;
    int end;
    Slot(int start, int end) {
        this.start = start;
        this.end = end;
    }
    @Override
    public String toString() {
        return this.start + "-" + this.end;
    }
}

slots 变量定义为:

List<Slot> slots = Arrays.asList(new Slot(3, 5), new Slot(5, 7), new Slot(8, 10), new Slot(10, 11), new Slot(11, 13));

slots 和结果 mixed 都使用以下方式打印:

System.out.println(slots);
System.out.println(mixed);

由于这类问题经常出现,我认为编写一个可以通过谓词对相邻元素进行分组的收集器可能是一个有趣的练习。

假设我们可以将组合逻辑添加到 Slot class

boolean canCombine(Slot other) {
    return this.end == other.start;
}

Slot combine(Slot other) {
    if (!canCombine(other)) {
        throw new IllegalArgumentException();
    }
    return new Slot(this.start, other.end);
}

groupingAdjacent 收集器可以按如下方式使用:

List<Slot> combined = slots.stream()
    .collect(groupingAdjacent(
        Slot::canCombine,         // test to determine if two adjacent elements go together
        reducing(Slot::combine),  // collector to use for combining the adjacent elements
        mapping(Optional::get, toList())  // collector to group up combined elements
    ));

或者,第二个参数可以是 collectingAndThen(reducing(Slot::combine), Optional::get),第三个参数可以是 toList()

这是 groupingAdjacent 的来源。它可以处理 null 个元素并且是并行友好的。有点麻烦,可以用 Spliterator.

来完成类似的事情
public static <T, AI, I, AO, R> Collector<T, ?, R> groupingAdjacent(
        BiPredicate<? super T, ? super T> keepTogether,
        Collector<? super T, AI, ? extends I> inner,
        Collector<I, AO, R> outer
) {
    AI EMPTY = (AI) new Object();

    // Container to accumulate adjacent possibly null elements.  Adj can be in one of 3 states:
    // - Before first element: curGrp == EMPTY
    // - After first element but before first group boundary: firstGrp == EMPTY, curGrp != EMPTY
    // - After at least one group boundary: firstGrp != EMPTY, curGrp != EMPTY
    class Adj {

        T first, last;     // first and last elements added to this container
        AI firstGrp = EMPTY, curGrp = EMPTY;
        AO acc = outer.supplier().get();  // accumlator for completed groups

        void add(T t) {
            if (curGrp == EMPTY) /* first element */ {
                first = t;
                curGrp = inner.supplier().get();
            } else if (!keepTogether.test(last, t)) /* group boundary */ {
                addGroup(curGrp);
                curGrp = inner.supplier().get();
            }
            inner.accumulator().accept(curGrp, last = t);
        }

        void addGroup(AI group) /* group can be EMPTY, in which case this should do nothing */ {
            if (firstGrp == EMPTY) {
                firstGrp = group;
            } else if (group != EMPTY) {
                outer.accumulator().accept(acc, inner.finisher().apply(group));
            }
        }

        Adj merge(Adj other) {
            if (other.curGrp == EMPTY) /* other is empty */ {
                return this;
            } else if (this.curGrp == EMPTY) /* this is empty */ {
                return other;
            } else if (!keepTogether.test(last, other.first)) /* boundary between this and other*/ {
                addGroup(this.curGrp);
                addGroup(other.firstGrp);
            } else if (other.firstGrp == EMPTY) /* other container is single-group. prepend this.curGrp to other.curGrp*/ {
                other.curGrp = inner.combiner().apply(this.curGrp, other.curGrp);
            } else /* other Adj contains a boundary.  this.curGrp+other.firstGrp form a complete group. */ {
                addGroup(inner.combiner().apply(this.curGrp, other.firstGrp));
            }
            this.acc = outer.combiner().apply(this.acc, other.acc);
            this.curGrp = other.curGrp;
            this.last = other.last;
            return this;
        }

        R finish() {
            AO combined = outer.supplier().get();
            if (curGrp != EMPTY) {
                addGroup(curGrp);
                assert firstGrp != EMPTY;
                outer.accumulator().accept(combined, inner.finisher().apply(firstGrp));
            }
            return outer.finisher().apply(outer.combiner().apply(combined, acc));
        }
    }
    return Collector.of(Adj::new, Adj::add, Adj::merge, Adj::finish);
}

我的免费 StreamEx library which enhances standard Stream API. There's an intervalMap 中间操作完美支持这种情况,它能够将多个相邻的流元素折叠为单个元素。这是完整的示例:

// Slot class and sample data are taken from @Andreas answer
List<Slot> slots = Arrays.asList(new Slot(3, 5), new Slot(5, 7), 
                new Slot(8, 10), new Slot(10, 11), new Slot(11, 13));

List<Slot> result = StreamEx.of(slots)
        .intervalMap((s1, s2) -> s1.end == s2.start,
                     (s1, s2) -> new Slot(s1.start, s2.end))
        .toList();
System.out.println(result);
// Output: [3-7, 8-13]

intervalMap 方法有两个参数。第一个是 BiPredicate 从输入流中接受两个相邻的元素,如果它们必须合并则 returns 为真(这里的条件是 s1.end == s2.start)。第二个参数是 BiFunction ,它从合并的系列中获取第一个和最后一个元素并生成结果元素。

请注意,例如,如果您有 100 个应该合并为一个的相邻插槽,则此解决方案不会创建 100 个中间对象(就像@Misha 的回答一样,这仍然非常有趣),它会跟踪第一个和最后一个系列中的插槽立即忘记了中间的一次。当然这个解决方案是并行友好的。如果您有数千个输入槽,使用 .parallel() 可能会提高性能。

请注意,当前的实现将重新创建 Slot,即使它没有与任何东西合并。在这种情况下,BinaryOperator 两次接收相同的 Slot 参数。如果你想优化这种情况,你可以做额外的检查,比如 s1 == s2 ? s1 : ...:

List<Slot> result = StreamEx.of(slots)
        .intervalMap((s1, s2) -> s1.end == s2.start,
                     (s1, s2) -> s1 == s2 ? s1 : new Slot(s1.start, s2.end))
        .toList();

这是两班飞机:

List<Slot> condensed = new LinkedList<>();
slots.stream().reduce((a,b) -> {if (a.end == b.start) return new Slot(a.start, b.end); 
  condensed.add(a); return b;}).ifPresent(condensed::add);

如果插槽字段不可见,则必须将 a.end 更改为 a.getEnd(),等等


一些带有一些边缘情况的测试代码:

List<List<Slot>> tests = Arrays.asList(
        Arrays.asList(new Slot(3, 5), new Slot(5, 7), new Slot(8, 10), new Slot(10, 11), new Slot(11, 13)),
        Arrays.asList(new Slot(3, 5), new Slot(5, 7), new Slot(8, 10), new Slot(10, 11), new Slot(12, 13)),
        Arrays.asList(new Slot(3, 5), new Slot(5, 7)),
        Collections.emptyList());
for (List<Slot> slots : tests) {
    List<Slot> condensed = new LinkedList<>();
    slots.stream().reduce((a, b) -> {if (a.end == b.start) return new Slot(a.start, b.end);
        condensed.add(a); return b;}).ifPresent(condensed::add);
    System.out.println(condensed);
}

输出:

[3-7, 8-13]
[3-7, 8-11, 12-13]
[3-7]
[]

如果您将以下方法添加到您的 Slot class

public boolean join(Slot s) {
    if(s.start != end)
        return false;
    end = s.end;
    return true;
}

您可以按照以下方式使用标准 API 执行整个操作

List<Slot> result = slots.stream().collect(ArrayList::new,
    (l, s)-> { if(l.isEmpty() || !l.get(l.size()-1).join(s)) l.add(s); },
    (l1, l2)-> l1.addAll(
        l1.isEmpty()||l2.isEmpty()||!l1.get(l1.size()-1).join(l2.get(0))?
        l2: l2.subList(1, l2.size()))
);

这遵守 API 的约定(不同于滥用 reduce),因此可以与并行流无缝协作(尽管您确实需要 大型源列出受益于并行执行的列表)。


但是,上面的解决方案使用 Slot 的就地连接,只有在您不再需要源 list/items 时才可以接受。否则,或者如果您仅使用不可变 Slot 实例,则必须创建新的 Slot 实例来表示关节槽。

一个可能的解决方案如下

BiConsumer<List<Slot>,Slot> joinWithList=(l,s) -> {
    if(!l.isEmpty()) {
        Slot old=l.get(l.size()-1);
        if(old.end==s.start) {
            l.set(l.size()-1, new Slot(old.start, s.end));
            return;
        }
    }
    l.add(s);
};
List<Slot> result = slots.stream().collect(ArrayList::new, joinWithList,
    (l1, l2)-> {
        if(!l2.isEmpty()) {
            joinWithList.accept(l1, l2.get(0));
            l1.addAll(l2.subList(1, l2.size()));
        }
    }
);

不需要任何新方法的干净(并行安全)解决方案:

List<Slot> condensed = slots.stream().collect(LinkedList::new,
  (l, s) -> l.add(l.isEmpty() || l.getLast().end != s.start ?
    s : new Slot(l.removeLast().start, s.end)),
  (l, l2) -> {if (!l.isEmpty() && !l2.isEmpty() && l.getLast().end == l2.getFirst().start) {
    l.add(new Slot(l.removeLast().start, l2.removeFirst().end));} l.addAll(l2);});

这使用更合适的列表实现 LinkedList 来简化合并插槽时删除和访问列表最后一个元素的过程。


List<List<Slot>> tests = Arrays.asList(
            Arrays.asList(new Slot(3, 5), new Slot(5, 7), new Slot(8, 10), new Slot(10, 11), new Slot(11, 13)),
            Arrays.asList(new Slot(3, 5), new Slot(5, 7), new Slot(8, 10), new Slot(10, 11), new Slot(12, 13)),
            Arrays.asList(new Slot(3, 5), new Slot(5, 7)),
            Collections.emptyList());
for (List<Slot> slots : tests) {
    List<Slot> condensed = slots.stream().collect(LinkedList::new,
      (l, s) -> l.add(l.isEmpty() || l.getLast().end != s.start ?
        s : new Slot(l.removeLast().start, s.end)),
      (l, l2) -> {if (!l.isEmpty() && !l2.isEmpty() && l.getLast().end == l2.getFirst().start) {
        l.add(new Slot(l.removeLast().start, l2.removeFirst().end));} l.addAll(l2);});
    System.out.println(condensed);
}

输出:

[[3, 7], [8, 13]]
[[3, 7], [8, 11], [12, 13]]
[[3, 7]]
[]