Spring 启动 Webflux/Netty - 检测关闭的连接

Spring Boot Webflux/Netty - Detect closed connection

我一直在使用 spring-boot 2.0.0.RC1 使用 webflux starter (spring-boot-starter-webflux)。我创建了一个 returns 无限通量的简单控制器。我希望发布者只有在有客户(订阅者)的情况下才会工作。假设我有一个这样的控制器:

@RestController
public class Demo {

    @GetMapping(value = "/")
    public Flux<String> getEvents(){
        return Flux.create((FluxSink<String> sink) -> {

            while(!sink.isCancelled()){

                // TODO e.g. fetch data from somewhere

                sink.next("DATA");
            }
            sink.complete();
        }).doFinally(signal -> System.out.println("END"));
    }

}

现在,当我尝试 运行 该代码并使用 Chrome 访问端点 http://localhost:8080/ 时,我可以看到数据。但是,一旦我关闭浏览器,while 循环就会继续,因为没有触发取消事件。 如何在关闭浏览器后terminate/cancel直播?

从这个我引用:

Currently with HTTP, the exact backpressure information is not transmitted over the network, since the HTTP protocol doesn't support this. This can change if we use a different wire protocol.

我假设,由于 HTTP 协议不支持背压,这意味着也不会发出取消请求。

进一步调查,通过分析网络流量,显示浏览器在我关闭浏览器后立即发送 TCP FIN。有没有办法配置 Netty(或其他东西),以便半关闭连接将触发发布者上的取消事件,从而停止 while 循环?

或者我是否必须编写自己的适配器,类似于 org.springframework.http.server.reactive.ServletHttpHandlerAdapter 我在其中实现自己的订阅者?

感谢您的帮助。

编辑: 如果没有客户端,将在尝试将数据写入套接字时引发 IOException。正如您在 stack trace.

中看到的

但这还不够好,因为可能需要一段时间才能准备好发送下一个数据块,因此检测消失的客户端也需要相同的时间。正如 中指出的那样,这是 Reactor Netty 中的一个已知问题。我尝试通过将依赖项添加到 POM.xml 来使用 Tomcat。像这样:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-tomcat</artifactId>
</dependency>

虽然它取代了 Netty 并使用 Tomcat 代替,但由于浏览器不显示任何数据,它似乎没有反应性。但是,控制台中没有warning/info/exception。 此版本 (2.0.0.RC1) 的 spring-boot-starter-webflux 是否应该与 Tomcat 一起使用?

我不确定为什么会这样,但我怀疑是因为生成运算符的选择。我认为使用以下方法可行:

    return Flux.interval(Duration.ofMillis(500))
    .map(input -> {
        return "DATA";
    });

根据 to Reactor's reference documentation, you're probably hitting the key difference between generate and push(我相信使用生成的非常相似的方法也可能有效)。

我的评论是指背压信息(Subscriber 愿意接受多少元素),但 success/error 信息是通过网络传递的。

根据您选择的 Web 服务器(Reactor Netty、Tomcat、Jetty 等),关闭客户端连接可能会导致:

  • 服务器端收到取消信号(我认为这是 Netty 支持的)
  • 服务器在尝试写入已关闭的连接时收到错误信号(我相信 Servlet 规范不提供该回调,我们缺少取消信息)。

简而言之:您不需要做任何特别的事情,应该已经支持它了,但是您的 Flux 实施可能是这里的实际问题。

更新: this is a known issue in Reactor Netty

由于这是一个已知问题(参见 ),我最终使用一个 Flux 来获取我的真实数据并使用另一个来实现某种 ping/heartbeat 机制。结果,我将两者合并为 Flux.merge()

在这里您可以看到我的解决方案的简化版本:

@RestController
public class Demo {

    public interface Notification{}

    public static class MyData implements Notification{
        …
        public boolean isEmpty(){…}
    }

    @GetMapping(value = "/", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ServerSentEvent<? extends Notification>> getNotificationStream() {
        return Flux.merge(getEventMessageStream(), getHeartbeatStream());
    }

    private Flux<ServerSentEvent<Notification>> getHeartbeatStream() {
        return Flux.interval(Duration.ofSeconds(2))
                .map(i -> ServerSentEvent.<Notification>builder().event("ping").build())
                .doFinally(signalType ->System.out.println("END"));
    }

    private Flux<ServerSentEvent<MyData>> getEventMessageStream() {
        return Flux.interval(Duration.ofSeconds(30))
                .map(i -> {

                    // TODO e.g. fetch data from somewhere,
                    // if there is no data return an empty object

                    return data;
                })
                .filter(data -> !data.isEmpty())
                .map(data -> ServerSentEvent
                        .builder(data)
                        .event("message").build());
    }
}

我把所有东西都打包成 ServerSentEvent<? extends Notification>Notification只是一个标记界面。我使用 ServerSentEvent class 中的 event 字段来区分数据和 ping 事件。由于心跳 Flux 不断地以很短的间隔发送事件,因此检测到客户端消失所花费的时间最多是该间隔的长度。请记住,我需要它,因为我可能需要一段时间才能获得一些可以发送的真实数据,因此,它也可能需要一段时间才能检测到客户端已经消失。这样一来ping不通(也可能是message事件)就会检测到client挂了。

关于标记界面的最后一点,我称之为通知。这不是真正必要的,但它提供了一些类型安全。否则,我们可以为 getNotificationStream() 方法编写 Flux<ServerSentEvent<?>> 而不是 Flux<ServerSentEvent<? extends Notification>> 作为 return 类型。或者也可以使 getHeartbeatStream() return Flux<ServerSentEvent<MyData>>。然而,像这样它会允许发送任何对象,这是我不想要的。因此,我添加了接口。