终止服务器上的特定 Flux 流
Terminate particular Flux stream on the server
我正在尝试使用 Spring WebFlux 构建一个简单的聊天服务器。这很简单,而且工作正常。我现在要实现的是在服务器端终止 Flux 流。想象一下,有一个像这样暴露的无限通量:
@GetMapping(produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<Object> join(@PathVariable String user)
我有 10 个 clients/subscribers 连接到该事件流。现在我想终止一个特定客户端的连接,因为例如用户在聊天中咒骂。任何。是否可以 manage/identify 此类端点的订阅者?
您可以使用 .takeUntilOther(Publisher)
运算符构建一些东西,并在用户应该断开连接时发出给定的 Publisher
...
@GetMapping(produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<Object> join(@PathVariable String user) {
return Flux.from(/* your logic to build your flux */)
.takeUntilOther(disconnector.onDisconnectUser(user));
}
onDisconnectUser(String user)
的一个可能实现是过滤一个 "global" 热点 Flux<String>
,它发出用户名以通过给定的用户名断开连接。也许是这样的:
public class UserDisconnecter {
private final FluxProcessor<String, String> processor;
private final FluxSink<String> sink;
public UserDisconnecter() {
this.processor = DirectProcessor.create();
this.sink = this.processor.sink();
}
/**
* Signals that all existing streams for this user should be disconnected.
*/
public void disconnectUser(String user) {
this.sink.next(user);
}
/**
* Returns a Mono that emits when the given user should be disconnected.
*/
public Mono<String> onDisconnectUser(String user) {
return processor
.filter(user::equals)
.next();
}
}
这是一个简单的实现,但应该可以帮助您入门。
我正在尝试使用 Spring WebFlux 构建一个简单的聊天服务器。这很简单,而且工作正常。我现在要实现的是在服务器端终止 Flux 流。想象一下,有一个像这样暴露的无限通量:
@GetMapping(produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<Object> join(@PathVariable String user)
我有 10 个 clients/subscribers 连接到该事件流。现在我想终止一个特定客户端的连接,因为例如用户在聊天中咒骂。任何。是否可以 manage/identify 此类端点的订阅者?
您可以使用 .takeUntilOther(Publisher)
运算符构建一些东西,并在用户应该断开连接时发出给定的 Publisher
...
@GetMapping(produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<Object> join(@PathVariable String user) {
return Flux.from(/* your logic to build your flux */)
.takeUntilOther(disconnector.onDisconnectUser(user));
}
onDisconnectUser(String user)
的一个可能实现是过滤一个 "global" 热点 Flux<String>
,它发出用户名以通过给定的用户名断开连接。也许是这样的:
public class UserDisconnecter {
private final FluxProcessor<String, String> processor;
private final FluxSink<String> sink;
public UserDisconnecter() {
this.processor = DirectProcessor.create();
this.sink = this.processor.sink();
}
/**
* Signals that all existing streams for this user should be disconnected.
*/
public void disconnectUser(String user) {
this.sink.next(user);
}
/**
* Returns a Mono that emits when the given user should be disconnected.
*/
public Mono<String> onDisconnectUser(String user) {
return processor
.filter(user::equals)
.next();
}
}
这是一个简单的实现,但应该可以帮助您入门。