合并热通量源
Merging Hot Flux Sources
在 Spring Boot 2 with Reactor 中,我试图合并两个 Flux
热源。但是,merge
似乎只报告 merge
中两个 Flux
参数中的第一个。如何让 merge
识别第二个 Flux
。
在下面的示例中,当 outgoing1a
是第一个参数时,B-2
中的 System.err
甚至不会打印。如果我先设置 outgoing2
,则 A-2
不会打印。
下面是完整的例子;
package com.example.demo;
import java.time.Duration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class Weather {
String city;
Integer temperature;
public Weather(String city, Integer temperature) {
this.city = city;
this.temperature = temperature;
}
@Override
public String toString() {
return "Weather [city=" + city + ", temperature=" + temperature + "]";
}
public static void main(String[] args) {
BlockingQueue<Weather> queue = new LinkedBlockingQueue<>();
BlockingQueue<Weather> queue2 = new LinkedBlockingQueue<>();
// Assume Spring @Repository "A-1"
new Thread(() -> {
for (int d = 1; d < 1000; d += 1) {
for (String s: new String[] {"LDN", "NYC", "PAR", "ZUR"}) {
queue.add(new Weather(s, d));
try { Thread.sleep(250); } catch (InterruptedException e) {}
}
}
}).start();
// Assume Spring @Repository "B-1"
new Thread(() -> {
for (int d = 1; d < 1000; d += 1) {
for (String s: new String[] {"MOS", "TLV"}) {
queue2.add(new Weather(s, d));
try { Thread.sleep(1000); } catch (InterruptedException e) {}
}
}
}).start();
// Assume Spring @Service "A-2" = real-time LDN, NYC, PAR, ZUR
Flux<Weather> outgoing1 = Flux.<Weather>create(
sink -> {
for (int i = 0; i < 1000; i++) {
try {
sink.next(queue.take());
System.err.println("1 " + queue.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
sink.complete();
}
).publishOn(Schedulers.newSingle("outgoing-1"));
// Assume Spring @Service "B-2" = real-time MOS, TLV
Flux<Weather> outgoing2 = Flux.<Weather>create(
sink -> {
for (int i = 0; i < 1000; i++) {
try {
sink.next(queue2.take());
System.err.println("2 " + queue2.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
sink.complete();
}
).publishOn(Schedulers.newSingle("outgoing-2"));
// Assume Spring @Service "A-3" = 5 second summary of LDN, NYC, PAR, ZUR
Flux<Weather> outgoing1a = Flux.from(outgoing1)
.groupBy(c -> c.city)
.flatMap(g -> g
.sample(Duration.ofSeconds(5))
)
.log("C");
// Assume Spring @Service "C" - merges "A-3" and "B-2"
// only prints outgoing1a
Flux.merge(outgoing1a, outgoing2).subscribe(System.out::println);
// only prints outgoing2
//Flux.merge(outgoing2, outgoing1a).subscribe(System.out::println);
}
}
这里有一些事情在起作用。
- 请注意
.merge
操作员的以下建议...
Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
您的出站通量使用 .publishOn
,但这只会影响在 .publishOn
运算符 之后链接 的运算符。即它不会影响之前 .publishOn
的任何内容。具体来说,它不会影响传递给 Flux.create
的 lambda 代码执行的线程。如果在每个出站 Fluxes 中的 .publishOn
之前添加 .log()
before,您可以看到这一点。
您传递给 Flux.create
的 lambda 调用了一个阻塞方法 (queue.take
)。
由于您在 main
线程中对合并的 Flux 调用 subscribe(...)
,您传递给 Flux.create
的 lambda 在 main
线程中执行,并阻止它。
最简单的解决方法是使用 .subscribeOn
而不是 .publishOn
,这样传递给 Flux.create
的 lambda 中的代码可以在不同的线程上运行(main
除外).这将防止 main
线程阻塞,并允许交错两个出站流的合并输出。
在 Spring Boot 2 with Reactor 中,我试图合并两个 Flux
热源。但是,merge
似乎只报告 merge
中两个 Flux
参数中的第一个。如何让 merge
识别第二个 Flux
。
在下面的示例中,当 outgoing1a
是第一个参数时,B-2
中的 System.err
甚至不会打印。如果我先设置 outgoing2
,则 A-2
不会打印。
下面是完整的例子;
package com.example.demo;
import java.time.Duration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class Weather {
String city;
Integer temperature;
public Weather(String city, Integer temperature) {
this.city = city;
this.temperature = temperature;
}
@Override
public String toString() {
return "Weather [city=" + city + ", temperature=" + temperature + "]";
}
public static void main(String[] args) {
BlockingQueue<Weather> queue = new LinkedBlockingQueue<>();
BlockingQueue<Weather> queue2 = new LinkedBlockingQueue<>();
// Assume Spring @Repository "A-1"
new Thread(() -> {
for (int d = 1; d < 1000; d += 1) {
for (String s: new String[] {"LDN", "NYC", "PAR", "ZUR"}) {
queue.add(new Weather(s, d));
try { Thread.sleep(250); } catch (InterruptedException e) {}
}
}
}).start();
// Assume Spring @Repository "B-1"
new Thread(() -> {
for (int d = 1; d < 1000; d += 1) {
for (String s: new String[] {"MOS", "TLV"}) {
queue2.add(new Weather(s, d));
try { Thread.sleep(1000); } catch (InterruptedException e) {}
}
}
}).start();
// Assume Spring @Service "A-2" = real-time LDN, NYC, PAR, ZUR
Flux<Weather> outgoing1 = Flux.<Weather>create(
sink -> {
for (int i = 0; i < 1000; i++) {
try {
sink.next(queue.take());
System.err.println("1 " + queue.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
sink.complete();
}
).publishOn(Schedulers.newSingle("outgoing-1"));
// Assume Spring @Service "B-2" = real-time MOS, TLV
Flux<Weather> outgoing2 = Flux.<Weather>create(
sink -> {
for (int i = 0; i < 1000; i++) {
try {
sink.next(queue2.take());
System.err.println("2 " + queue2.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
sink.complete();
}
).publishOn(Schedulers.newSingle("outgoing-2"));
// Assume Spring @Service "A-3" = 5 second summary of LDN, NYC, PAR, ZUR
Flux<Weather> outgoing1a = Flux.from(outgoing1)
.groupBy(c -> c.city)
.flatMap(g -> g
.sample(Duration.ofSeconds(5))
)
.log("C");
// Assume Spring @Service "C" - merges "A-3" and "B-2"
// only prints outgoing1a
Flux.merge(outgoing1a, outgoing2).subscribe(System.out::println);
// only prints outgoing2
//Flux.merge(outgoing2, outgoing1a).subscribe(System.out::println);
}
}
这里有一些事情在起作用。
- 请注意
.merge
操作员的以下建议...
Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to another source.
您的出站通量使用
.publishOn
,但这只会影响在.publishOn
运算符 之后链接 的运算符。即它不会影响之前.publishOn
的任何内容。具体来说,它不会影响传递给Flux.create
的 lambda 代码执行的线程。如果在每个出站 Fluxes 中的.publishOn
之前添加.log()
before,您可以看到这一点。您传递给
Flux.create
的 lambda 调用了一个阻塞方法 (queue.take
)。
由于您在 main
线程中对合并的 Flux 调用 subscribe(...)
,您传递给 Flux.create
的 lambda 在 main
线程中执行,并阻止它。
最简单的解决方法是使用 .subscribeOn
而不是 .publishOn
,这样传递给 Flux.create
的 lambda 中的代码可以在不同的线程上运行(main
除外).这将防止 main
线程阻塞,并允许交错两个出站流的合并输出。