Java 并行流:有一种导航二叉树的方法吗?

Java parallel streams: there's a way to navigate a binary tree?

我正在努力寻找从这个流中获得加速的正确方法:

    StreamSupport.stream(new BinaryTreeSpliterator(root), true)
                .parallel()
                .map(node -> processor.onerousFunction(node.getValue()))
                .mapToInt(i -> i.intValue())
                .sum()

onerousFunction()只是一个让线程工作一点的函数,returns是节点的int值。

无论我使用多少cpu,执行时间始终保持不变。 我认为问题出在我写的 Spliterator 中:

    public class BinaryTreeSpliterator extends AbstractSpliterator<Node> {

        private LinkedBlockingQueue<Node> nodes = new LinkedBlockingQueue<>();

        public BinaryTreeSpliterator(Node root) {
            super(Long.MAX_VALUE, NONNULL | IMMUTABLE);
            this.nodes.add(root);
        }

        @Override
         public boolean tryAdvance(Consumer<? super Node> action) {
            Node current = this.nodes.poll();
            if(current != null) {
                action.accept(current);
                if(current.getLeft() != null) 
                    this.nodes.offer(current.getLeft());
                if(current.getRight() != null)
                    this.nodes.offer(current.getRight());
                return true;
            }
            return false;
        }

    }

可是实在找不到好的解决方法

要并行处理数据,您需要 trySplit 实现 return 部分数据作为新的 Spliterator 实例。每个拆分器实例都由一个线程遍历。因此,顺便说一下,您不需要在拆分器中使用线程安全的集合。但是你的问题是你从 AbstractSpliterator 继承了 trySplit 实现,尽管对你的数据一无所知,它确实试图提供一些并行支持。

它通过顺序请求一些项目,将它们缓冲到一个数组中并 returning 一个新的基于数组的拆分器来做到这一点。不幸的是,它不能很好地处理“未知大小”(这同样适用于一般的并行流实现)。默认情况下它将缓冲 1024 个元素,如果有那么多元素,下次缓冲更多。更糟糕的是,流实现不会使用基于数组的拆分器的良好拆分功能,因为它将“未知大小”视为文字 Long.MAX_VALUE,得出的结论是您的拆分器的元素比数组中的 1024 个元素多得多,因此, 甚至不会尝试拆分基于数组的拆分器。

您的拆分器可以实现更合适的 trySplit 方法:

public class BinaryTreeSpliterator extends AbstractSpliterator<Node> {
    /**
     * a node that has not been traversed, but its children are only
     * traversed if contained in this.pending
     * (otherwise a different spliterator might be responsible)
     */
    private Node pendingNode;
    /** pending nodes needing full traversal */
    private ArrayDeque<Node> pending = new ArrayDeque<>();

    public BinaryTreeSpliterator(Node root) {
        super(Long.MAX_VALUE, NONNULL | IMMUTABLE);
        push(root);
    }

    private BinaryTreeSpliterator(Node pending, Node next) {
        super(Long.MAX_VALUE, NONNULL | IMMUTABLE);
        pendingNode = pending;
        if(next!=null) this.pending.offer(next);
    }
    private void push(Node n) {
        if(pendingNode == null) {
            pendingNode = n;
            if(n != null) {
                if(n.getRight()!=null) pending.offerFirst(n.getRight());
                if(n.getLeft() !=null) pending.offerFirst(n.getLeft());
            }
        }
        else pending.offerFirst(n);
    }

    @Override
     public boolean tryAdvance(Consumer<? super Node> action) {
        Node current = pendingNode;
        if(current == null) {
            current = pending.poll();
            if(current == null) return false;
            push(current.getRight());
            push(current.getLeft());
        }
        else pendingNode = null;
        action.accept(current);
        return true;
    }

    @Override
    public void forEachRemaining(Consumer<? super Node> action) {
        Node current = pendingNode;
        if(current != null) {
            pendingNode = null;
            action.accept(current);
        }
        for(;;) {
            current = pending.poll();
            if(current == null) break;
            traverseLocal(action, current);
        }
    }
    private void traverseLocal(Consumer<? super Node> action, Node current) {
        do {
            action.accept(current);
            Node child = current.getLeft();
            if(child!=null) traverseLocal(action, child);
            current = current.getRight();
        } while(current != null);
    }

    @Override
    public Spliterator<Node> trySplit() {
        Node next = pending.poll();
        if(next == null) return null;
        if(pending.isEmpty()) {
            pending.offer(next);
            next = null;
        }
        if(pendingNode==null) return next==null? null: new BinaryTreeSpliterator(next);
        Spliterator<Node> s = new BinaryTreeSpliterator(pendingNode, next);
        pendingNode = null;
        return s;
    }
}

请注意,此拆分器也有资格作为 ORDERED 拆分器,保持从左上到右的顺序。完全无序的拆分器可以稍微简单地实现。

您可以实施比继承的默认方法更有效的 forEachRemaining 方法,例如

@Override
public void forEachRemaining(Consumer<? super Node> action) {
    Node current = pendingNode;
    if(current != null) {
        pendingNode = null;
        action.accept(current);
    }
    for(;;) {
        current = pending.poll();
        if(current == null) break;
        traverseLocal(action, current);
    }
}
private void traverseLocal(Consumer<? super Node> action, Node current) {
    do {
        action.accept(current);
        Node child = current.getLeft();
        if(child!=null) traverseLocal(action, child);
        current = current.getRight();
    } while(current != null);
}

但是如果您的应用程序必须处理不平衡的树(特别是本例中的左路径很长),此方法可能会导致 Whosebug 错误。