Spring 启动 Webflux/Netty - 检测关闭的连接
Spring Boot Webflux/Netty - Detect closed connection
我一直在使用 spring-boot 2.0.0.RC1
使用 webflux starter (spring-boot-starter-webflux
)。我创建了一个 returns 无限通量的简单控制器。我希望发布者只有在有客户(订阅者)的情况下才会工作。假设我有一个这样的控制器:
@RestController
public class Demo {
@GetMapping(value = "/")
public Flux<String> getEvents(){
return Flux.create((FluxSink<String> sink) -> {
while(!sink.isCancelled()){
// TODO e.g. fetch data from somewhere
sink.next("DATA");
}
sink.complete();
}).doFinally(signal -> System.out.println("END"));
}
}
现在,当我尝试 运行 该代码并使用 Chrome 访问端点 http://localhost:8080/ 时,我可以看到数据。但是,一旦我关闭浏览器,while 循环就会继续,因为没有触发取消事件。 如何在关闭浏览器后terminate/cancel直播?
从这个我引用:
Currently with HTTP, the exact backpressure information is not
transmitted over the network, since the HTTP protocol doesn't support
this. This can change if we use a different wire protocol.
我假设,由于 HTTP 协议不支持背压,这意味着也不会发出取消请求。
进一步调查,通过分析网络流量,显示浏览器在我关闭浏览器后立即发送 TCP FIN
。有没有办法配置 Netty(或其他东西),以便半关闭连接将触发发布者上的取消事件,从而停止 while 循环?
或者我是否必须编写自己的适配器,类似于 org.springframework.http.server.reactive.ServletHttpHandlerAdapter
我在其中实现自己的订阅者?
感谢您的帮助。
编辑:
如果没有客户端,将在尝试将数据写入套接字时引发 IOException
。正如您在 stack trace.
中看到的
但这还不够好,因为可能需要一段时间才能准备好发送下一个数据块,因此检测消失的客户端也需要相同的时间。正如 中指出的那样,这是 Reactor Netty 中的一个已知问题。我尝试通过将依赖项添加到 POM.xml
来使用 Tomcat。像这样:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
</dependency>
虽然它取代了 Netty 并使用 Tomcat 代替,但由于浏览器不显示任何数据,它似乎没有反应性。但是,控制台中没有warning/info/exception。 此版本 (2.0.0.RC1
) 的 spring-boot-starter-webflux
是否应该与 Tomcat 一起使用?
我不确定为什么会这样,但我怀疑是因为生成运算符的选择。我认为使用以下方法可行:
return Flux.interval(Duration.ofMillis(500))
.map(input -> {
return "DATA";
});
根据 to Reactor's reference documentation, you're probably hitting the key difference between generate
and push
(我相信使用生成的非常相似的方法也可能有效)。
我的评论是指背压信息(Subscriber
愿意接受多少元素),但 success/error 信息是通过网络传递的。
根据您选择的 Web 服务器(Reactor Netty、Tomcat、Jetty 等),关闭客户端连接可能会导致:
- 服务器端收到取消信号(我认为这是 Netty 支持的)
- 服务器在尝试写入已关闭的连接时收到错误信号(我相信 Servlet 规范不提供该回调,我们缺少取消信息)。
简而言之:您不需要做任何特别的事情,应该已经支持它了,但是您的 Flux
实施可能是这里的实际问题。
由于这是一个已知问题(参见 ),我最终使用一个 Flux
来获取我的真实数据并使用另一个来实现某种 ping/heartbeat 机制。结果,我将两者合并为 Flux.merge()
。
在这里您可以看到我的解决方案的简化版本:
@RestController
public class Demo {
public interface Notification{}
public static class MyData implements Notification{
…
public boolean isEmpty(){…}
}
@GetMapping(value = "/", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<? extends Notification>> getNotificationStream() {
return Flux.merge(getEventMessageStream(), getHeartbeatStream());
}
private Flux<ServerSentEvent<Notification>> getHeartbeatStream() {
return Flux.interval(Duration.ofSeconds(2))
.map(i -> ServerSentEvent.<Notification>builder().event("ping").build())
.doFinally(signalType ->System.out.println("END"));
}
private Flux<ServerSentEvent<MyData>> getEventMessageStream() {
return Flux.interval(Duration.ofSeconds(30))
.map(i -> {
// TODO e.g. fetch data from somewhere,
// if there is no data return an empty object
return data;
})
.filter(data -> !data.isEmpty())
.map(data -> ServerSentEvent
.builder(data)
.event("message").build());
}
}
我把所有东西都打包成 ServerSentEvent<? extends Notification>
。 Notification
只是一个标记界面。我使用 ServerSentEvent
class 中的 event
字段来区分数据和 ping 事件。由于心跳 Flux
不断地以很短的间隔发送事件,因此检测到客户端消失所花费的时间最多是该间隔的长度。请记住,我需要它,因为我可能需要一段时间才能获得一些可以发送的真实数据,因此,它也可能需要一段时间才能检测到客户端已经消失。这样一来ping不通(也可能是message事件)就会检测到client挂了。
关于标记界面的最后一点,我称之为通知。这不是真正必要的,但它提供了一些类型安全。否则,我们可以为 getNotificationStream() 方法编写 Flux<ServerSentEvent<?>>
而不是 Flux<ServerSentEvent<? extends Notification>>
作为 return 类型。或者也可以使 getHeartbeatStream() return Flux<ServerSentEvent<MyData>>
。然而,像这样它会允许发送任何对象,这是我不想要的。因此,我添加了接口。
我一直在使用 spring-boot 2.0.0.RC1
使用 webflux starter (spring-boot-starter-webflux
)。我创建了一个 returns 无限通量的简单控制器。我希望发布者只有在有客户(订阅者)的情况下才会工作。假设我有一个这样的控制器:
@RestController
public class Demo {
@GetMapping(value = "/")
public Flux<String> getEvents(){
return Flux.create((FluxSink<String> sink) -> {
while(!sink.isCancelled()){
// TODO e.g. fetch data from somewhere
sink.next("DATA");
}
sink.complete();
}).doFinally(signal -> System.out.println("END"));
}
}
现在,当我尝试 运行 该代码并使用 Chrome 访问端点 http://localhost:8080/ 时,我可以看到数据。但是,一旦我关闭浏览器,while 循环就会继续,因为没有触发取消事件。 如何在关闭浏览器后terminate/cancel直播?
从这个
Currently with HTTP, the exact backpressure information is not transmitted over the network, since the HTTP protocol doesn't support this. This can change if we use a different wire protocol.
我假设,由于 HTTP 协议不支持背压,这意味着也不会发出取消请求。
进一步调查,通过分析网络流量,显示浏览器在我关闭浏览器后立即发送 TCP FIN
。有没有办法配置 Netty(或其他东西),以便半关闭连接将触发发布者上的取消事件,从而停止 while 循环?
或者我是否必须编写自己的适配器,类似于 org.springframework.http.server.reactive.ServletHttpHandlerAdapter
我在其中实现自己的订阅者?
感谢您的帮助。
编辑:
如果没有客户端,将在尝试将数据写入套接字时引发 IOException
。正如您在 stack trace.
但这还不够好,因为可能需要一段时间才能准备好发送下一个数据块,因此检测消失的客户端也需要相同的时间。正如 POM.xml
来使用 Tomcat。像这样:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
</dependency>
虽然它取代了 Netty 并使用 Tomcat 代替,但由于浏览器不显示任何数据,它似乎没有反应性。但是,控制台中没有warning/info/exception。 此版本 (2.0.0.RC1
) 的 spring-boot-starter-webflux
是否应该与 Tomcat 一起使用?
我不确定为什么会这样,但我怀疑是因为生成运算符的选择。我认为使用以下方法可行:
return Flux.interval(Duration.ofMillis(500))
.map(input -> {
return "DATA";
});
根据 to Reactor's reference documentation, you're probably hitting the key difference between generate
and push
(我相信使用生成的非常相似的方法也可能有效)。
我的评论是指背压信息(Subscriber
愿意接受多少元素),但 success/error 信息是通过网络传递的。
根据您选择的 Web 服务器(Reactor Netty、Tomcat、Jetty 等),关闭客户端连接可能会导致:
- 服务器端收到取消信号(我认为这是 Netty 支持的)
- 服务器在尝试写入已关闭的连接时收到错误信号(我相信 Servlet 规范不提供该回调,我们缺少取消信息)。
简而言之:您不需要做任何特别的事情,应该已经支持它了,但是您的 Flux
实施可能是这里的实际问题。
由于这是一个已知问题(参见 Flux
来获取我的真实数据并使用另一个来实现某种 ping/heartbeat 机制。结果,我将两者合并为 Flux.merge()
。
在这里您可以看到我的解决方案的简化版本:
@RestController
public class Demo {
public interface Notification{}
public static class MyData implements Notification{
…
public boolean isEmpty(){…}
}
@GetMapping(value = "/", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<? extends Notification>> getNotificationStream() {
return Flux.merge(getEventMessageStream(), getHeartbeatStream());
}
private Flux<ServerSentEvent<Notification>> getHeartbeatStream() {
return Flux.interval(Duration.ofSeconds(2))
.map(i -> ServerSentEvent.<Notification>builder().event("ping").build())
.doFinally(signalType ->System.out.println("END"));
}
private Flux<ServerSentEvent<MyData>> getEventMessageStream() {
return Flux.interval(Duration.ofSeconds(30))
.map(i -> {
// TODO e.g. fetch data from somewhere,
// if there is no data return an empty object
return data;
})
.filter(data -> !data.isEmpty())
.map(data -> ServerSentEvent
.builder(data)
.event("message").build());
}
}
我把所有东西都打包成 ServerSentEvent<? extends Notification>
。 Notification
只是一个标记界面。我使用 ServerSentEvent
class 中的 event
字段来区分数据和 ping 事件。由于心跳 Flux
不断地以很短的间隔发送事件,因此检测到客户端消失所花费的时间最多是该间隔的长度。请记住,我需要它,因为我可能需要一段时间才能获得一些可以发送的真实数据,因此,它也可能需要一段时间才能检测到客户端已经消失。这样一来ping不通(也可能是message事件)就会检测到client挂了。
关于标记界面的最后一点,我称之为通知。这不是真正必要的,但它提供了一些类型安全。否则,我们可以为 getNotificationStream() 方法编写 Flux<ServerSentEvent<?>>
而不是 Flux<ServerSentEvent<? extends Notification>>
作为 return 类型。或者也可以使 getHeartbeatStream() return Flux<ServerSentEvent<MyData>>
。然而,像这样它会允许发送任何对象,这是我不想要的。因此,我添加了接口。