如何在不关闭流的情况下从助焊剂中收集

How do I collect from a flux without closing the stream

我的用例是创建一个反应式端点,如下所示:

public Flux<ServerEvent> getEventFlux(Long forId){
    ServicePoller poller = new ServicePollerImpl();
    Map<String,Object> params =  new HashMap<>();
    params.put("id", forId);

    Flux<Long> interval = Flux.interval(Duration.ofMillis(pollDuration));
    Flux<ServerEvent> serverEventFlux =
            Flux.fromStream(
                    poller.getEventStream(url, params) //poll a given endpoint after a fixed duration.
            );
    Flux<ServerEvent> sourceFlux= Flux.zip(interval, serverEventFlux)
             .map(Tuple2::getT2); // Zip the two streams. 
/* Here I want to store data from sourceFlux into a collection whenever some data arrives without disturbing the downstream processing in Spring. So that I can access collection later on without polling again */

这会在数据可用时立即将数据发送回前端,但是我的第二个用例是在数据到达时将其集中到一个单独的集合中,这样如果以后有类似的请求到达,我可以从池中卸载整个数据而无需再次访问服务。

我尝试在控制器从原始流量返回之前订阅流量、缓冲区、缓存和收集流量,但所有这些似乎都关闭了流,因此 Spring 无法处理它。

我有什么选择可以在不关闭通量流的情况下利用通量并将值存储到集合中?

遇到异常:

java.lang.IllegalStateException: stream has already been operated upon or closed at java.util.stream.AbstractPipeline.spliterator(AbstractPipeline.java:343) ~[na:1.8.0_171] at java.util.stream.ReferencePipeline.iterator(ReferencePipeline.java:139) ~[na:1.8.0_171] at reactor.core.publisher.FluxStream.subscribe(FluxStream.java:57) ~[reactor-core-3.1.7.RELEASE.jar:3.1.7.RELEASE] at reactor.core.publisher.Flux.subscribe(Flux.java:6873) ~[reactor-core-3.1.7.RELEASE.jar:3.1.7.RELEASE] at reactor.core.publisher.FluxZip$ZipCoordinator.subscribe(FluxZip.java:573) ~[reactor-core-3.1.7.RELEASE.jar:3.1.7.RELEASE] at reactor.core.publisher.FluxZip.handleBoth(FluxZip.java:326) ~[reactor-core-3.1.7.RELEASE.jar:3.1.7.RELEASE]

poller.getEventStream returns 一个只能消耗一次的 Java 8 流。您可以先将流转换为集合,也可以使用供应商延迟 poller.getEventStream 的执行:

Flux.fromStream(
  () -> poller.getEventStream(url, params)
);

@a better oliver 建议的对我有用的解决方案

public Flux<ServerEvent> getEventFlux(Long forId){
        ServicePoller poller = new ServicePollerImpl();
        Map<String,Object> params =  new HashMap<>();
        params.put("id", forId);

        Flux<Long> interval = Flux.interval(Duration.ofMillis(pollDuration));
        Flux<ServerEvent> serverEventFlux =
                Flux.fromStream(
                        ()->{
                        return poller.getEventStream(url, params).peek((se)->{reactSink.addtoSink(forId, se);});
                        }
                );
        Flux<ServerEvent> sourceFlux= Flux.zip(interval, serverEventFlux)
                 .map(Tuple2::getT2);

        return sourceFlux;

    }