我如何合并活跃的反应流?

How could I merge active reactive streams?

我刚刚开始探索项目反应堆,我目前对其主要目的的理解是以异步方式从多个来源获取信息。例如,我有一个源可以快速生成数据,另一个源可以非常慢地生成数据。我想尽快合并这两个来源和来自它们的 return 信息。 我试过这样的代码:

@RestController
public class ReactiveController {

@RequestMapping("/info")
public Flux<String> getInfoFromServices() {
    return Flux.merge(getDataFromSlowSource(), getDataFromFastSource()).take(10);
}

private Flux<String> getDataFromFastSource() {
    return Flux.generate(sink -> {
        sink.next("data from fast source\n");
        try {
            Thread.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
}

private Flux<String> getDataFromSlowSource() {
    return Flux.generate(sink -> {
        sink.next("data from slow source\n");
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    });
}

我期望服务器的回答是这样的:

data from fast source
data from fast source
data from slow source
data from fast source
data from fast source
data from slow source
data from fast source
data from fast source
data from slow source

但我有:

data from slow source
data from slow source
data from slow source
data from slow source
data from slow source
data from slow source
data from slow source
data from slow source
data from slow source
data from slow source

那么,我能否以某种方式在源生成数据后同时从两个源获取数据?

您的问题是 Thread.sleep,它阻塞了当前线程。这就是为什么你会看到这种行为。在快源和慢源中不使用 Thread.sleep,而是使用 delayElements,您会看到预期的行为。

private Flux<Object> getDataFromFastSource() {
    return Flux.generate(sink -> {
        sink.next("data from fast source\n");
    }).delayElements(Duration.ofSeconds(1));
}

private Flux<Object> getDataFromSlowSource() {
    return Flux.generate(sink -> {
        sink.next("data from slow source\n");
    }).delayElements(Duration.ofSeconds(2));
}

注:

始终使整个反应器链成为非阻塞的,并使用适当的调度程序来做到这一点。更多信息在这里。

http://www.vinsguru.com/reactive-programming-schedulers/

您的问题是合并操作、慢源和快源都在同一个线程上 运行。因此,这两个来源之间没有竞争。

如果您像这样修改代码,使慢速和快速源 运行 在不同的线程(调度程序)上运行,您将看到预期的结果:

@RequestMapping("/info")
public Flux<String> getInfoFromServices() {
    return Flux.merge(
      getDataFromSlowSource().subscribeOn(Schedulers.boundedElastic(),
      getDataFromFastSource().subscribeOn(Schedulers.boundedElastic()
    ).take(10);
}

关于vins的回答:delayElements(duration)方法也使用了一个调度器。