仅限 TcpClient Returns Spring WebFlux 控制器中的第一个结果
TcpClient Only Returns First Result in Spring WebFlux Controller
我有一个 HTTP 服务公开了一个 GET 端点,该端点通过 TCP 连接到一个简单的回显服务器。 HTTP 服务在 Netty 上是 运行。
@RestController
public class OurTcpClient {
private Connection connection;
@GetMapping("echo1")
public Mono<String> echo(@RequestParam("value") final String value) {
this.connection.outbound()
.sendString(Mono.just(String.format("%04d", value.length()) + value)) // prepend length
.then()
.subscribe();
return this.connection.inbound()
.receive()
.asString()
.next();
}
@PostConstruct
public void init() {
this.connection = TcpClient.create()
.host("localhost")
.port(10002)
.wiretap(true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
.option(ChannelOption.SO_KEEPALIVE, true)
.connectNow();
}
}
我的期望是,我可以在 http://localhost:8081/echo1?value=hi 处查询服务任意多次,并在每次响应中收到 "hi" 返回。这适用于第一个请求。第二个请求无限期挂起。如果我随后取消第二个请求并尝试另一个请求,我会收到以下错误:
{
"timestamp": "2020-04-13T18:56:40.221+0000",
"path": "/echo1",
"status": 500,
"error": "Internal Server Error",
"message": "Only one connection receive subscriber allowed."
}
如有任何帮助,我们将不胜感激。
在您使用的示例中 next()
return this.connection.inbound()
.receive()
.asString()
.next();
根据 Flux#next
javadoc Emit only the first item emitted by this Flux, into a new Mono.,那么订阅将被取消。
在 Reactor Netty 的上下文中,当您使用 next
、timeout
、take
等取消订阅的运算符时,这意味着连接将被关闭。
我有一个 HTTP 服务公开了一个 GET 端点,该端点通过 TCP 连接到一个简单的回显服务器。 HTTP 服务在 Netty 上是 运行。
@RestController
public class OurTcpClient {
private Connection connection;
@GetMapping("echo1")
public Mono<String> echo(@RequestParam("value") final String value) {
this.connection.outbound()
.sendString(Mono.just(String.format("%04d", value.length()) + value)) // prepend length
.then()
.subscribe();
return this.connection.inbound()
.receive()
.asString()
.next();
}
@PostConstruct
public void init() {
this.connection = TcpClient.create()
.host("localhost")
.port(10002)
.wiretap(true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
.option(ChannelOption.SO_KEEPALIVE, true)
.connectNow();
}
}
我的期望是,我可以在 http://localhost:8081/echo1?value=hi 处查询服务任意多次,并在每次响应中收到 "hi" 返回。这适用于第一个请求。第二个请求无限期挂起。如果我随后取消第二个请求并尝试另一个请求,我会收到以下错误:
{
"timestamp": "2020-04-13T18:56:40.221+0000",
"path": "/echo1",
"status": 500,
"error": "Internal Server Error",
"message": "Only one connection receive subscriber allowed."
}
如有任何帮助,我们将不胜感激。
在您使用的示例中 next()
return this.connection.inbound()
.receive()
.asString()
.next();
根据 Flux#next
javadoc Emit only the first item emitted by this Flux, into a new Mono.,那么订阅将被取消。
在 Reactor Netty 的上下文中,当您使用 next
、timeout
、take
等取消订阅的运算符时,这意味着连接将被关闭。