终止服务器上的特定 Flux 流

Terminate particular Flux stream on the server

我正在尝试使用 Spring WebFlux 构建一个简单的聊天服务器。这很简单,而且工作正常。我现在要实现的是在服务器端终止 Flux 流。想象一下,有一个像这样暴露的无限通量:

@GetMapping(produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<Object> join(@PathVariable String user)

我有 10 个 clients/subscribers 连接到该事件流。现在我想终止一个特定客户端的连接,因为例如用户在聊天中咒骂。任何。是否可以 manage/identify 此类端点的订阅者?

您可以使用 .takeUntilOther(Publisher) 运算符构建一些东西,并在用户应该断开连接时发出给定的 Publisher...

@GetMapping(produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
public Flux<Object> join(@PathVariable String user) {
    return Flux.from(/* your logic to build your flux */)
               .takeUntilOther(disconnector.onDisconnectUser(user));
}

onDisconnectUser(String user) 的一个可能实现是过滤一个 "global" 热点 Flux<String>,它发出用户名以通过给定的用户名断开连接。也许是这样的:

public class UserDisconnecter {

    private final FluxProcessor<String, String> processor;
    private final FluxSink<String> sink;

    public UserDisconnecter() {
        this.processor = DirectProcessor.create();
        this.sink = this.processor.sink();
    }

    /**
     * Signals that all existing streams for this user should be disconnected.
     */
    public void disconnectUser(String user) {
        this.sink.next(user);
    }

    /**
     * Returns a Mono that emits when the given user should be disconnected.
     */
    public Mono<String> onDisconnectUser(String user) {
        return processor
            .filter(user::equals)
            .next();
    }
}

这是一个简单的实现,但应该可以帮助您入门。