获取反应流的排列

Getting permutation for reactive streams

我有两个应该异步工作的反应流。因此,我应该调用 foo() 函数,该函数将接受两个参数,并将使用来自两个初始反应流的元素的所有可能排列进行调用。 如果在整个过程中发生任何异常,它应该使一切失败。 使用 reactor-core 实现此目的的最佳方法是什么?

示例:

    String[] aInitial = {"a","b","c"};
    String[] bInitial = {"0","1"};

    Flux<String> fluxA = Flux.fromArray(aInitial);
    Flux<String> fluxB = Flux.fromArray(bInitial);

    ...

    private void foo(String a, String b){
        System.out.println(a + ", " + b); 
    }

预期结果(顺序无关紧要):

a, 0 a, 1 b, 0 b, 1 c, 0 c, 1

不是在执行转换的方法中调用操作(在您的例子中是 System.out.println),而是将其拆分为一个函数,该函数结合了两者的元素,另一个函数对数据进行操作。

public static void main(String[] args) {

    String[] aInitial = {"a", "b", "c"};
    String[] bInitial = {"0", "1"};

    Flux<String> fluxA = Flux.fromArray(aInitial);
    Flux<String> fluxB = Flux.fromArray(bInitial);

    fluxA
            .flatMap(input1 -> fluxB.map(input2 -> foo(input1, input2)))
            .doOnNext(System.out::println)
            .blockLast();
}

private static String foo(String a, String b) {
    return a + ", " + b;
}

如您所见,我们迭代第一个通量,并使用第二个通量创建各种嵌套循环。

请注意,这不适用于热通量,只有冷通量在这里起作用,因为我们需要为从第一个元素发出的每个元素重新订阅第二个通量。