仅限 TcpClient Returns Spring WebFlux 控制器中的第一个结果

TcpClient Only Returns First Result in Spring WebFlux Controller

我有一个 HTTP 服务公开了一个 GET 端点,该端点通过 TCP 连接到一个简单的回显服务器。 HTTP 服务在 Netty 上是 运行。

@RestController
public class OurTcpClient {

    private Connection connection;

    @GetMapping("echo1")
    public Mono<String> echo(@RequestParam("value") final String value) {
        this.connection.outbound()
            .sendString(Mono.just(String.format("%04d", value.length()) + value)) // prepend length
            .then()
            .subscribe();
        return this.connection.inbound()
            .receive()
            .asString()
            .next();
    }

    @PostConstruct
    public void init() {
        this.connection = TcpClient.create()
            .host("localhost")
            .port(10002)
            .wiretap(true)
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000)
            .option(ChannelOption.SO_KEEPALIVE, true)
            .connectNow();
    }
}

我的期望是,我可以在 http://localhost:8081/echo1?value=hi 处查询服务任意多次,并在每次响应中收到 "hi" 返回。这适用于第一个请求。第二个请求无限期挂起。如果我随后取消第二个请求并尝试另一个请求,我会收到以下错误:

{
    "timestamp": "2020-04-13T18:56:40.221+0000",
    "path": "/echo1",
    "status": 500,
    "error": "Internal Server Error",
    "message": "Only one connection receive subscriber allowed."
}

如有任何帮助,我们将不胜感激。

在您使用的示例中 next()

return this.connection.inbound()
            .receive()
            .asString()
            .next();

根据 Flux#next javadoc Emit only the first item emitted by this Flux, into a new Mono.,那么订阅将被取消。

在 Reactor Netty 的上下文中,当您使用 nexttimeouttake 等取消订阅的运算符时,这意味着连接将被关闭。