Spring 5 执行fluxSink.next时FluxSink不发送数据
Spring 5 FluxSink does not send data when fluxSink.next is executed
我有一个生成 Randon INT 并发送回客户端的 GET 方法示例
@GetMapping
public Flux<String> search() {
return Flux.create(fluxSink -> {
Random r = new Random();
int n;
for (int i = 0; i < 10; i++) {
n= r.nextInt(1000);
System.out.println("Creating:"+n);
fluxSink.next(String.valueOf(n));
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
fluxSink.complete();
});
}
卷曲:
curl -v -H "Accept: text/event-stream" -X GET
'http://localhost:8080/stream
使用此代码使用 CURL 命令时,我只能在 fluxSink.complete 发生时在我的客户端中看到数字。
现在,如果我将代码更改为:
@GetMapping
public Flux<String> search() {
return Flux.create(fluxSink -> {
Thread t = new Thread(() -> {
Random r = new Random();
int n;
for (int i = 0; i < 10; i++) {
n= r.nextInt(1000);
System.out.println("Creating:"+n);
fluxSink.next(String.valueOf(n));
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
fluxSink.complete();
});
t.start();
});
}
将进程包装到一个线程中,它工作正常。当 fluxSink.next 发生时,我可以看到数据传输正常。
谁能解释一下这个效果?如何在不显式使用线程的情况下查看数据流动?
谢谢!
Flux.create()
大约是 "emitting multiple elements in a synchronous or asynchronous manner through the FluxSink API"。在您的情况下,您正在同步发射所有元素,并且您没有机会让 Reactor 工作,因为在完成所有操作之前您永远不会退出该方法。简而言之,您正在阻塞当前线程,这在 Reactor 中是禁止的。
使用线程让 Reactor 有机会分派这些信号。但是 Flux.create()
的精神(如其 javadoc 所示)是关于适应回调。
在这种情况下,也许 Flux.generate
更合适。尝试用 Thread
模拟延迟不是一个好主意。您可以改为执行以下操作:
Flux<String> response = Flux.just(aListOfElements).delayElements(Duration.ofSeconds(2));
我有一个生成 Randon INT 并发送回客户端的 GET 方法示例
@GetMapping
public Flux<String> search() {
return Flux.create(fluxSink -> {
Random r = new Random();
int n;
for (int i = 0; i < 10; i++) {
n= r.nextInt(1000);
System.out.println("Creating:"+n);
fluxSink.next(String.valueOf(n));
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
fluxSink.complete();
});
}
卷曲:
curl -v -H "Accept: text/event-stream" -X GET 'http://localhost:8080/stream
使用此代码使用 CURL 命令时,我只能在 fluxSink.complete 发生时在我的客户端中看到数字。
现在,如果我将代码更改为:
@GetMapping
public Flux<String> search() {
return Flux.create(fluxSink -> {
Thread t = new Thread(() -> {
Random r = new Random();
int n;
for (int i = 0; i < 10; i++) {
n= r.nextInt(1000);
System.out.println("Creating:"+n);
fluxSink.next(String.valueOf(n));
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
fluxSink.complete();
});
t.start();
});
}
将进程包装到一个线程中,它工作正常。当 fluxSink.next 发生时,我可以看到数据传输正常。
谁能解释一下这个效果?如何在不显式使用线程的情况下查看数据流动?
谢谢!
Flux.create()
大约是 "emitting multiple elements in a synchronous or asynchronous manner through the FluxSink API"。在您的情况下,您正在同步发射所有元素,并且您没有机会让 Reactor 工作,因为在完成所有操作之前您永远不会退出该方法。简而言之,您正在阻塞当前线程,这在 Reactor 中是禁止的。
使用线程让 Reactor 有机会分派这些信号。但是 Flux.create()
的精神(如其 javadoc 所示)是关于适应回调。
在这种情况下,也许 Flux.generate
更合适。尝试用 Thread
模拟延迟不是一个好主意。您可以改为执行以下操作:
Flux<String> response = Flux.just(aListOfElements).delayElements(Duration.ofSeconds(2));