订阅 Hot Observable 源,然后在同一源上发信号

Subscribing to Hot Observable source, then signalling on the same source

Tl;博士:

如何在我订阅了同一个热 Observable 后在 HotObservable 上发出 onNext 信号?

更长的版本:

我的目标是创建 REST 端点,它将发出事件,然后在返回响应之前等待(这本身就是一个事件)- 基本上是事件驱动的 REST 端点 Command/Response.

因为我还没有找到任何解决方案,所以我决定通过反应 java 来实现,通过将 spring 事件映射到 DirectProcessor 发布者(热可观察)。我有:

https://github.com/Venthe/Exploratory-Projects/tree/reactive/reactive

@Slf4j
@Component
class EventDriver {
    private static DirectProcessor<Object> EVENTS = DirectProcessor.create();

    @EventListener(Object.class)
    public void onEvent(Object event) {
        log.info(MessageFormat.format("Event {0} is ready to be pushed into processor.", event.toString()));
        EVENTS.onNext(event);
    }

    // Not important, but it does show that my events are captured & processed via 'normal' subscription
    @EventListener(ApplicationStartedEvent.class)
    public void logger() {
        EVENTS.map(Object::toString).subscribe(e -> log.info(MessageFormat.format("Event {0} received by processor.", e)));
    }

    public Flux<Object> getEvents() {
        log.info("Requesting processor.");
        return EVENTS
                .doOnEach(l -> log.info(MessageFormat.format("Processing subscribed event {0}", l.toString())));
    }
}
@RestController
@RequiredArgsConstructor
class ReservationRestController {
    private final ApplicationEventPublisher eventDispatcher;
    private final EventDriver eventDriver;

    @GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE, value = "/wait-for-event/{name}")
    public Flux<String> performAction(@PathVariable String name) {
        return eventDriver.getEvents()
                .doOnSubscribe(s -> eventDispatcher.publishEvent(new MyEvent(name)))
                .map(Object::toString)
                .log();
    }

    @AllArgsConstructor
    @NoArgsConstructor
    @Data
    class MyEvent {
        private String name;
    }
}

我的理解是:当我打开 REST 端点 \wait-for-event\any 时,Reactive Web 方法 performAction 应该首先订阅事件,然后调度事件(从而触发管道),因为doOnSubscribe(当 {@link Flux} 完成订阅时触发添加行为(副作用))

不幸的是,事件卡住了 - 订阅看不到我的事件,这是日志

 : Requesting processor.
 : Event ReservationRestController.MyEvent(name=as3) is ready to be pushed into processor.
 : Event ReservationRestController.MyEvent(name=as3) received by processor.
 : | onSubscribe([Fuseable] FluxOnAssembly.OnAssemblySubscriber)
 : | request(1)

据我所知,我在实际调用 onSubscribe 之前进行调度。

当然,之后发生的任何其他事件都将被正确捕获。

一段时间后我又回到了这个问题;解决方案是先订阅事件,然后使用调度程序压缩:

public String wtfperformActionPathVariable String name) {
        return Mono.from(
                eventDriver.getEvents()
                        .map(Object::toString)
                        .filter(b -> b.contains(name))
                        .log("Event source")
        )
                .zipWith(Mono.just(name)
                        .map(MyEvent::new)
                        .doOnNext(eventDispatcher::publishEvent)
                        .log("Publisher"))
                .map(Tuple2::getT1)
                .log("Result")
                .block();
    }