如何跳过从 Files.lines 获得的 Stream<String> 的偶数行

How to skip even lines of a Stream<String> obtained from the Files.lines

在这种情况下,只有奇数行具有有意义的数据,并且没有唯一标识这些行的字符。我的意图是获得与以下示例等效的内容:

Stream<DomainObject> res = Files.lines(src)
     .filter(line -> isOddLine())
     .map(line -> toDomainObject(line))

有没有不共享全局状态的“干净”方法?

不,API 无法方便地执行此操作。 (基本上与没有简单方法获得 zipWithIndex 的原因相同,请参阅 Is there a concise way to iterate over a stream with indices in Java 8?)。

您仍然可以使用 Stream,但要使用迭代器:

Iterator<String> iter = Files.lines(src).iterator();
while (iter.hasNext()) {
    iter.next();                  // discard
    toDomainObject(iter.next());  // use
}

(尽管您可能想在该流上使用 try-with-resource。)

一种干净的方法是更深入地实施 Spliterator。在此级别上,您可以控制对流元素的迭代,并在下游请求一个项目时简单地迭代两个项目:

public class OddLines<T> extends Spliterators.AbstractSpliterator<T>
    implements Consumer<T> {

    public static <T> Stream<T> oddLines(Stream<T> source) {
        return StreamSupport.stream(new OddLines(source.spliterator()), false);
    }
    private static long odd(long l) { return l==Long.MAX_VALUE? l: (l+1)/2; }
    Spliterator<T> originalLines;

    OddLines(Spliterator<T> source) {
        super(odd(source.estimateSize()), source.characteristics());
        originalLines=source;
    }

    @Override
    public boolean tryAdvance(Consumer<? super T> action) {
        if(originalLines==null || !originalLines.tryAdvance(action))
            return false;
        if(!originalLines.tryAdvance(this)) originalLines=null;
        return true;
    }

    @Override
    public void accept(T t) {}
}

那你就可以像这样使用了

Stream<DomainObject> res = OddLines.oddLines(Files.lines(src))
    .map(line -> toDomainObject(line));

此解决方案没有副作用,并保留了 Stream API 的大部分优点,例如惰性求值。然而,应该清楚的是,它对于无序流处理没有有用的语义(注意一些微妙的方面,比如在对所有元素执行终端操作时使用 forEachOrdered 而不是 forEach )并且同时支持原则上并行处理,不太可能非常高效......

作为,没有一个方便的方法来做到这一点,但有几个不方便的方法。 :-)

这是另一种基于拆分器的方法。与包装另一个拆分器的 不同,这个拆分器本身做 I/O。这可以更好地控制诸如排序之类的事情,但这也意味着它必须处理 IOException 和关闭处理。我还添加了一个 Predicate 参数,让您可以了解哪些行已通过。

static class LineSpliterator extends Spliterators.AbstractSpliterator<String>
        implements AutoCloseable {
    final BufferedReader br;
    final LongPredicate pred;
    long count = 0L;

    public LineSpliterator(Path path, LongPredicate pred) throws IOException {
        super(Long.MAX_VALUE, Spliterator.ORDERED);
        br = Files.newBufferedReader(path);
        this.pred = pred;
    }

    @Override
    public boolean tryAdvance(Consumer<? super String> action) {
        try {
            String s;
            while ((s = br.readLine()) != null) {
                if (pred.test(++count)) {
                    action.accept(s);
                    return true;
                }
            }
            return false;
        } catch (IOException ioe) {
            throw new UncheckedIOException(ioe);
        }
    }

    @Override
    public void close() {
        try {
            br.close();
        } catch (IOException ioe) {
            throw new UncheckedIOException(ioe);
        }
    }

    public static Stream<String> lines(Path path, LongPredicate pred) throws IOException {
        LineSpliterator ls = new LineSpliterator(path, pred);
        return StreamSupport.stream(ls, false)
                            .onClose(() -> ls.close());
    }
}

您将在 try-with-resources 中使用它来确保文件已关闭,即使发生异常也是如此:

static void printOddLines() throws IOException {
    try (Stream<String> lines = LineSpliterator.lines(PATH, x -> (x & 1L) == 1L)) {
        lines.forEach(System.out::println);
    }
}

您可以使用自定义拆分器执行此操作:

public class EvenOdd {
    public static final class EvenSpliterator<T> implements Spliterator<T> {
        private final Spliterator<T> underlying;
        boolean even;

        public EvenSpliterator(Spliterator<T> underlying, boolean even) {
            this.underlying = underlying;
            this.even = even;
        }

        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            if (even) {
                even = false;

                return underlying.tryAdvance(action);
            }
            if (!underlying.tryAdvance(t -> {})) {
                return false;
            }
            return underlying.tryAdvance(action);
        }

        @Override
        public Spliterator<T> trySplit() {
            if (!hasCharacteristics(SUBSIZED)) {
                return null;
            }
            final Spliterator<T> newUnderlying = underlying.trySplit();
            if (newUnderlying == null) {
                return null;
            }
            final boolean oldEven = even;

            if ((newUnderlying.estimateSize() & 1) == 1) {
                even = !even;
            }

            return new EvenSpliterator<>(newUnderlying, oldEven);
        }

        @Override
        public long estimateSize() {
            return underlying.estimateSize()>>1;
        }

        @Override
        public int characteristics() {
            return underlying.characteristics();
        }
    }

    public static void main(String[] args) {

        final EvenSpliterator<Integer> spliterator = new EvenSpliterator<>(IntStream.range(1, 100000).parallel().mapToObj(Integer::valueOf).spliterator(), false);
        final List<Integer> result = StreamSupport.stream(spliterator, true).parallel().collect(Collectors.toList());
        final List<Integer> expected = IntStream.range(1, 100000 / 2).mapToObj(i -> i * 2).collect(Collectors.toList());
        if (result.equals(expected)) {
            System.out.println("Yay! Expected result.");
        }
    }
}

遵循 algorithm, here's another spliterator-based approach, as proposed by 但更简洁,即使效率较低。

public static <T> Stream<T> filterOdd(Stream<T> src) {
    Spliterator<T> iter = src.spliterator();
    AbstractSpliterator<T> res = new AbstractSpliterator<T>(Long.MAX_VALUE, Spliterator.ORDERED)
    {
        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            iter.tryAdvance(item -> {});    // discard
            return iter.tryAdvance(action); // use
        }
    };
    return StreamSupport.stream(res, false);
}

那你就可以像这样使用了

Stream<DomainObject> res = Files.lines(src)
filterOdd(res)
 .map(line -> toDomainObject(line))