使用 Streams API 对 Collection 中的 n 个随机不同元素执行操作

Perform operation on n random distinct elements from Collection using Streams API

我正在尝试使用 Java 8 中的流 API 从 Collection 中检索 n 个唯一的随机元素以进行进一步处理,但是,运气不佳。

更准确地说,我想要这样的东西:

Set<Integer> subList = new HashSet<>();
Queue<Integer> collection = new PriorityQueue<>();
collection.addAll(Arrays.asList(1,2,3,4,5,6,7,8,9));
Random random = new Random();
int n = 4;
while (subList.size() < n) {
  subList.add(collection.get(random.nextInt()));
}
sublist.forEach(v -> v.doSomethingFancy());

我想尽可能高效。

这可以做到吗?

编辑:我的第二次尝试——虽然不完全是我的目标:

List<Integer> sublist = new ArrayList<>(collection);
Collections.shuffle(sublist);
sublist.stream().limit(n).forEach(v -> v.doSomethingFancy());

编辑:第三次尝试(受Holger启发),如果coll.size()很大且n很小,这将消除很多随机播放的开销:

int n = // unique element count
List<Integer> sublist = new ArrayList<>(collection);   
Random r = new Random();
for(int i = 0; i < n; i++)
    Collections.swap(sublist, i, i + r.nextInt(source.size() - i));
sublist.stream().limit(n).forEach(v -> v.doSomethingFancy());

你可以使用limit来解决你的问题。

http://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html#limit-long-

Collections.shuffle(collection); 

int howManyDoYouWant = 10;
List<Integer> smallerCollection = collection
    .stream()
    .limit(howManyDoYouWant)
    .collect(Collectors.toList());

您始终可以创建一个 "dumb" 比较器,它将随机比较列表中的元素。调用 distinct() 将确保元素是唯一的(来自队列)。

像这样:

static List<Integer> nDistinct(Collection<Integer> queue, int n) {
    final Random rand = new Random();
    return queue.stream()
                .distinct()
                .sorted(Comparator.comparingInt(a -> rand.nextInt()))
                .limit(n)
                .collect(Collectors.toList());
}

但是我不确定将元素放入列表中,将其打乱并 return 一个子列表会更有效率。

static List<Integer> nDistinct(Collection<Integer> queue, int n) {
    List<Integer> list = new ArrayList<>(queue);
    Collections.shuffle(list);
    return list.subList(0, n);
}

哦,return a Set 而不是 List 在语义上可能更好,因为元素是不同的。这些方法也被设计为采用 Integers,但将它们设计为通用的并不困难。 :)

请注意,Stream API 看起来像一个工具箱,我们可以使用它来处理所有事情,但情况并非总是如此。如您所见,第二种方法更具可读性(IMO),可能更高效并且没有更多代码(甚至更少!)。

应该清楚流式传输集合不是您想要的。

使用generate()limit方法:

Stream.generate(() -> list.get(new Random().nextInt(list.size())).limit(3).forEach(...);
List<Integer> collection = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
int n = 4;
Random random = ThreadLocalRandom.current();

random.ints(0, collection.size())
        .distinct()
        .limit(n)
        .mapToObj(collection::get)
        .forEach(System.out::println);

这当然会有中间索引集的开销,如果 n > collection.size(),它将永远挂起。

如果你想避免任何非 constatn 开销,你必须创建一个有状态的 Predicate

正如 fge in a and by ZouZou in another 所建议的那样,改组方法工作得相当好。这是洗牌方法的通用版本:

static <E> List<E> shuffleSelectN(Collection<? extends E> coll, int n) {
    assert n <= coll.size();
    List<E> list = new ArrayList<>(coll);
    Collections.shuffle(list);
    return list.subList(0, n);
}

我会注意到,使用 subList 比获取流然后调用 limit(n) 更可取,如其他一些答案所示,因为生成的流具有已知大小并且可以拆分更高效。

改组方法有几个缺点。它需要复制出所有元素,然后需要对所有元素进行洗牌。如果元素总数很大而要选择的元素数量很少,这可能会非常昂贵。

OP 和其他几个答案建议的一种方法是随机选择元素,同时拒绝重复元素,直到选择了所需数量的唯一元素。如果要选择的元素数量相对于总数而言较小,则此方法效果很好,但随着选择数量的增加,速度会变慢很多,因为选择重复项的可能性也会增加。

如果有一种方法可以单次遍历 space 个输入元素并准确选择所需的数字,并且随机统一选择,那不是很好吗?事实证明,和往常一样,答案可以在 Knuth 身上找到。参见 TAOCP 第 2 卷,第 3.4.2 节,随机采样和混洗,算法 S.

简而言之,该算法就是访问每个元素,根据访问的元素个数和选择的元素个数来决定是否选择它。在 Knuth 的表示法中,假设您有 N 个元素,并且您想随机选择其中的 n 个。下一个元素的选择概率为

(n - m) / (N - t)

其中t是到目前为止访问的元素数,m是到目前为止选择的元素数。

这一点并不明显,这将使所选元素均匀分布,但显然确实如此。证明作为练习留给 reader;请参阅本节的练习 3。

鉴于此算法,在 "conventional" Java 中实现它非常简单,方法是循环遍历集合并添加到基于随机测试的结果列表。 OP 询问有关使用流的问题,所以这是一个镜头。

算法 S 显然不适合 Java 流操作。它完全按顺序描述,关于是否 select 当前元素的决定取决于随机决定加上从所有先前决定派生的状态。这可能使它看起来本质上是顺序的,但我以前错了。我只想说,如何使这个算法 运行 并行并不是很明显。

不过,有一种方法可以使该算法适应流。我们需要的是一个状态谓词。这个谓词将 return 一个基于当前状态确定的概率的随机结果,并且状态将根据这个随机结果更新——是的,变异的。这似乎很难 运行 并行,但至少它很容易使线程安全,以防它是来自并行流的 运行 :只需使其同步。不过,如果流是并行的,它将按顺序降级为 运行ning。

实现非常简单。 Knuth 的描述使用了 0 到 1 之间的随机数,但是 Java Random class 让我们在半开区间内选择一个随机整数。因此,我们需要做的就是记录有多少元素需要访问以及有多少元素可供选择,et voila:

/**
 * A stateful predicate that, given a total number
 * of items and the number to choose, will return 'true'
 * the chosen number of times distributed randomly
 * across the total number of calls to its test() method.
 */
static class Selector implements Predicate<Object> {
    int total;  // total number items remaining
    int remain; // number of items remaining to select
    Random random = new Random();

    Selector(int total, int remain) {
        this.total = total;
        this.remain = remain;
    }

    @Override
    public synchronized boolean test(Object o) {
        assert total > 0;
        if (random.nextInt(total--) < remain) {
            remain--;
            return true;
        } else {
            return false;
        }
    }
}

现在我们有了谓词,它很容易在流中使用:

static <E> List<E> randomSelectN(Collection<? extends E> coll, int n) {
    assert n <= coll.size();
    return coll.stream()
        .filter(new Selector(coll.size(), n))
        .collect(toList());
}

在 Knuth 的同一部分中也提到了另一种选择,建议以 n / N 的恒定概率随机选择一个元素。如果您不需要恰好选择 n 个元素,这将很有用。它平均会选择n个元素,当然会有一些变化。如果这是可以接受的,有状态谓词就会变得简单得多。我们可以简单地创建随机状态并从局部变量中捕获它,而不是编写整个 class:

/**
 * Returns a predicate that evaluates to true with a probability
 * of toChoose/total.
 */
static Predicate<Object> randomPredicate(int total, int toChoose) {
    Random random = new Random();
    return obj -> random.nextInt(total) < toChoose;
}

要使用它,请将上面流管道中的 filter 行替换为

        .filter(randomPredicate(coll.size(), n))

最后,为了比较,下面是使用常规 Java 编写的 selection 算法的实现,即使用 for 循环并添加到集合中:

static <E> List<E> conventionalSelectN(Collection<? extends E> coll, int remain) {
    assert remain <= coll.size();
    int total = coll.size();
    List<E> result = new ArrayList<>(remain);
    Random random = new Random();

    for (E e : coll) {
        if (random.nextInt(total--) < remain) {
            remain--;
            result.add(e);
        }
    }            

    return result;
}

这很简单,并没有什么错。它比流方法更简单、更独立。不过,流方法说明了一些可能在其他情况下有用的有趣技术。


参考:

Knuth, Donald E. 计算机编程艺术:第 2 卷,半数值算法,第 2 版。 版权所有 1981,1969 Addison-Wesley。

作为已接受答案的 shuffle 方法的附录:

如果您只想 select 大型列表中的少数项目,并且希望避免重新排列整个列表的开销,您可以按如下方式解决任务:

public static <T> List<T> getRandom(List<T> source, int num) {
    Random r=new Random();
    for(int i=0; i<num; i++)
        Collections.swap(source, i, i+r.nextInt(source.size()-i));
    return source.subList(0, num);
}

它的作用与 shuffle 的作用非常相似,但它将其作用减少为只有 num 个随机元素而不是 source.size() 个随机元素……

如果您想轻松处理整个流,只需使用 Collectors.collectingAndThen():

创建您自己的收集器
public static <T> Collector<T, ?, Stream<T>> toEagerShuffledStream() {
    return Collectors.collectingAndThen(
      toList(),
      list -> {
          Collections.shuffle(list);
          return list.stream();
      });
}

但是如果您想 limit() 结果流,这将不会很好地执行。为了克服这个问题,可以创建一个自定义的 Spliterator:

package com.pivovarit.stream;

import java.util.List;
import java.util.Random;
import java.util.Spliterator;
import java.util.function.Consumer;
import java.util.function.Supplier;

public class ImprovedRandomSpliterator<T> implements Spliterator<T> {

    private final Random random;
    private final T[] source;
    private int size;

    ImprovedRandomSpliterator(List<T> source, Supplier<? extends Random> random) {
        if (source.isEmpty()) {
            throw new IllegalArgumentException("RandomSpliterator can't be initialized with an empty collection");
        }
        this.source = (T[]) source.toArray();
        this.random = random.get();
        this.size = this.source.length;
    }

    @Override
    public boolean tryAdvance(Consumer<? super T> action) {
        int nextIdx = random.nextInt(size);
        int lastIdx = size - 1;

        action.accept(source[nextIdx]);
        source[nextIdx] = source[lastIdx];
        source[lastIdx] = null; // let object be GCed
        return --size > 0;
    }

    @Override
    public Spliterator<T> trySplit() {
        return null;
    }

    @Override
    public long estimateSize() {
        return source.length;
    }

    @Override
    public int characteristics() {
        return SIZED;
    }
}

然后:

public final class RandomCollectors {

    private RandomCollectors() {
    }

    public static <T> Collector<T, ?, Stream<T>> toImprovedLazyShuffledStream() {
        return Collectors.collectingAndThen(
          toCollection(ArrayList::new),
          list -> !list.isEmpty()
            ? StreamSupport.stream(new ImprovedRandomSpliterator<>(list, Random::new), false)
            : Stream.empty());
    }

    public static <T> Collector<T, ?, Stream<T>> toEagerShuffledStream() {
        return Collectors.collectingAndThen(
          toCollection(ArrayList::new),
          list -> {
              Collections.shuffle(list);
              return list.stream();
          });
    }
}

然后你可以像这样使用它:

stream
  .collect(toLazyShuffledStream()) // or toEagerShuffledStream() depending on the use case
  .distinct()
  .limit(42)
  .forEach( ... );

可以找到详细的解释here

如果您想要从流中随机抽样元素,洗牌的惰性替代方法可能是基于均匀分布的过滤器:

...
import org.apache.commons.lang3.RandomUtils

// If you don't know ntotal, just use a 0-1 ratio 
var relativeSize = nsample / ntotal;

Stream.of (...) // or any other stream
.parallel() // can work in parallel
.filter ( e -> Math.random() < relativeSize )
// or any other stream operation
.forEach ( e -> System.out.println ( "I've got: " + e ) );