从流中收集某些元素

Collect certain elements from stream

如何根据位置从 Stream 中提取两个元素?例如,我试图从 Stream<String> 中提取元素 0 和 1(这些数字是任意的!)。一个天真的方法是这样的:

List<String> strings = Arrays.asList("s0", "s1", "s2", "s3", "s4");
Consumer<String> c0 = s -> System.out.println("c0.accept(" + s + ")");
Consumer<String> c1 = s -> System.out.println("c1.accept(" + s + ")");
strings.stream().skip(0).peek(c0).skip(1).peek(c1).findAny();

这会产生以下输出:

c0.accept(s0)
c0.accept(s1)
c1.accept(s1)

我理解是因为s0会进入流,遇到skip(0),然后peek(c0)(给出第一行)然后skip(1),这将跳过此元素,然后显然继续流开头的下一个元素。

我以为我可以使用这些消费者来提取字符串,但是 c0 会被第二个元素覆盖:

String[] extracted = new String[2];
c0 = s -> extracted[0];
c1 = s -> extracted[1];

编辑:

这些是流的特征:

你可以这样写:

public static void main(String[] args) throws Exception {
    List<String> strings = Arrays.asList("s0", "s1", "s2", "s3", "s4");
    System.out.println(getNthElement(strings.stream(), 0)); // prints "s0"
    System.out.println(getNthElement(strings.stream(), 1)); // prints "s1"
}

private static <T> T getNthElement(Stream<T> stream, int n) {
    return stream.skip(n).findFirst().get();
}

请注意,如果流中的元素少于 n,这将引发异常。此外,只有当 Stream 不是并行时才有意义。

考虑到您的限制,您可以像这样将 limit() 与自定义收集器结合使用:

public static <T, A, R> Collector<T, ?, R> collectByIndex(Set<Integer> wantedIndices, 
                                                          Collector<T, A, R> downstream) {
    class Acc {
        int pos;
        A acc = downstream.supplier().get();
    }
    return Collector.of(Acc::new, (acc, t) -> {
        if(wantedIndices.contains(acc.pos++))
            downstream.accumulator().accept(acc.acc, t);
    }, (a, b) -> {throw new UnsupportedOperationException();}, // combining not supported
       acc -> downstream.finisher().apply(acc.acc));
}

这里Set<Integer> wantedIndices是包含想要元素索引的集合(不限于2)。用法:

Set<Integer> wantedIndices = new HashSet<>(Arrays.asList(1, 3));
Stream<String> input = Stream.of("s0", "s1", "s2", "s3", "s4");
List<String> result = input.limit(Collections.max(wantedIndices)+1)
            .collect(collectByIndex(wantedIndices, Collectors.toList()));
// [s1, s3]

这是一个(不是很漂亮,但简单有效的)解决方案:

List<String> strings = Arrays.asList("s0", "s1", "s2", "s3", "s4");
String[] extracted = new String[2];
Consumer<String> c0 = s -> extracted[0] = extracted[0] == null ? s : extracted[0];
Consumer<String> c1 = s -> extracted[1] = extracted[1] == null ? s : extracted[1];
strings.stream().skip(0).peek(c0).skip(1 - 0).peek(c1).findAny();

此解决方案来自 Federico Peralta Schaffner 的评论:

public String[] collect(Stream<String> stream, int... positions) {
    String[] collect = new String[positions.length];
    Iterator<String> iterator = stream.iterator();
    int skipped = 0;
    for (int pos = 0; pos < positions.length; pos++) {
        while (skipped++ < positions[pos]) {
            iterator.next();
        }
        collect[pos] = iterator.next();
    }
    return collect;
}

这是最直接最直接的想法,效果很好。

这是我在其他答案中没有看到的一种方法。它使用无处不在的 Pair class:

的变体
class Pair<T> {
    final T first;
    final T last;
    Pair(T t1, T t2) { first = t1; last = t2; }
    Pair(T t) { first = last = t; }
    Pair<T> merge(Pair<T> other) { return new Pair<>(this.first, other.last); }
}

有了这个之后,您就可以轻松获取流的第一个和最后一个元素。给定无限流和所需索引,您可以使用 skip()limit() 到 trim 流以仅包含所需元素:

static <T> Pair<T> firstAndLast(Stream<T> stream, int firstIndex, int lastIndex) {
    // ensure indexes >= 0 and firstIndex <= lastIndex
    return stream.skip(firstIndex)
                 .limit(lastIndex - firstIndex + 1)
                 .map(Pair::new)
                 .reduce(Pair::merge)
                 .orElseThrow(() -> new IllegalArgumentException("nonexistent"));
}

其他变体包括将构造或合并逻辑内联到流操作中,而不是将其放在 Pair class 上。重构品味。

你会像这样使用它:

    Stream<String> str = Stream.of("a", "b", "c", "d", "e", "f", "g", "h", "i", "j");
    Pair<String> pair = firstAndLast(str, 4, 5);
    System.out.println(pair.first + " " + pair.last);

    e f

主要障碍是 Streams 的一次性使用性质,可以规避:

static <T> List<T> get(Stream<? extends T> s, int... positions) {
    Spliterator<? extends T> sp=s.spliterator();
    ArrayList<T> list=new ArrayList<>(positions.length);
    int current=0;
    for(int i: positions) {
        if(i<current) throw new IllegalArgumentException("positions not ascending");
        Optional<? extends T> o
            =StreamSupport.stream(sp, false).skip(i-current).findFirst();
        if(!o.isPresent()) break;
        current=i+1;
        list.add(o.get());
    }
    return list;
}

虽然不知道喜不喜欢……