订阅 Hot Observable 源,然后在同一源上发信号
Subscribing to Hot Observable source, then signalling on the same source
Tl;博士:
如何在我订阅了同一个热 Observable 后在 HotObservable 上发出 onNext 信号?
更长的版本:
我的目标是创建 REST 端点,它将发出事件,然后在返回响应之前等待(这本身就是一个事件)- 基本上是事件驱动的 REST 端点 Command/Response.
因为我还没有找到任何解决方案,所以我决定通过反应 java 来实现,通过将 spring 事件映射到 DirectProcessor 发布者(热可观察)。我有:
https://github.com/Venthe/Exploratory-Projects/tree/reactive/reactive
@Slf4j
@Component
class EventDriver {
private static DirectProcessor<Object> EVENTS = DirectProcessor.create();
@EventListener(Object.class)
public void onEvent(Object event) {
log.info(MessageFormat.format("Event {0} is ready to be pushed into processor.", event.toString()));
EVENTS.onNext(event);
}
// Not important, but it does show that my events are captured & processed via 'normal' subscription
@EventListener(ApplicationStartedEvent.class)
public void logger() {
EVENTS.map(Object::toString).subscribe(e -> log.info(MessageFormat.format("Event {0} received by processor.", e)));
}
public Flux<Object> getEvents() {
log.info("Requesting processor.");
return EVENTS
.doOnEach(l -> log.info(MessageFormat.format("Processing subscribed event {0}", l.toString())));
}
}
@RestController
@RequiredArgsConstructor
class ReservationRestController {
private final ApplicationEventPublisher eventDispatcher;
private final EventDriver eventDriver;
@GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE, value = "/wait-for-event/{name}")
public Flux<String> performAction(@PathVariable String name) {
return eventDriver.getEvents()
.doOnSubscribe(s -> eventDispatcher.publishEvent(new MyEvent(name)))
.map(Object::toString)
.log();
}
@AllArgsConstructor
@NoArgsConstructor
@Data
class MyEvent {
private String name;
}
}
我的理解是:当我打开 REST 端点 \wait-for-event\any
时,Reactive Web 方法 performAction
应该首先订阅事件,然后调度事件(从而触发管道),因为doOnSubscribe
(当 {@link Flux} 完成订阅时触发添加行为(副作用))
不幸的是,事件卡住了 - 订阅看不到我的事件,这是日志
: Requesting processor.
: Event ReservationRestController.MyEvent(name=as3) is ready to be pushed into processor.
: Event ReservationRestController.MyEvent(name=as3) received by processor.
: | onSubscribe([Fuseable] FluxOnAssembly.OnAssemblySubscriber)
: | request(1)
据我所知,我在实际调用 onSubscribe 之前进行调度。
当然,之后发生的任何其他事件都将被正确捕获。
一段时间后我又回到了这个问题;解决方案是先订阅事件,然后使用调度程序压缩:
public String wtfperformActionPathVariable String name) {
return Mono.from(
eventDriver.getEvents()
.map(Object::toString)
.filter(b -> b.contains(name))
.log("Event source")
)
.zipWith(Mono.just(name)
.map(MyEvent::new)
.doOnNext(eventDispatcher::publishEvent)
.log("Publisher"))
.map(Tuple2::getT1)
.log("Result")
.block();
}
Tl;博士:
如何在我订阅了同一个热 Observable 后在 HotObservable 上发出 onNext 信号?
更长的版本:
我的目标是创建 REST 端点,它将发出事件,然后在返回响应之前等待(这本身就是一个事件)- 基本上是事件驱动的 REST 端点 Command/Response.
因为我还没有找到任何解决方案,所以我决定通过反应 java 来实现,通过将 spring 事件映射到 DirectProcessor 发布者(热可观察)。我有:
https://github.com/Venthe/Exploratory-Projects/tree/reactive/reactive
@Slf4j
@Component
class EventDriver {
private static DirectProcessor<Object> EVENTS = DirectProcessor.create();
@EventListener(Object.class)
public void onEvent(Object event) {
log.info(MessageFormat.format("Event {0} is ready to be pushed into processor.", event.toString()));
EVENTS.onNext(event);
}
// Not important, but it does show that my events are captured & processed via 'normal' subscription
@EventListener(ApplicationStartedEvent.class)
public void logger() {
EVENTS.map(Object::toString).subscribe(e -> log.info(MessageFormat.format("Event {0} received by processor.", e)));
}
public Flux<Object> getEvents() {
log.info("Requesting processor.");
return EVENTS
.doOnEach(l -> log.info(MessageFormat.format("Processing subscribed event {0}", l.toString())));
}
}
@RestController
@RequiredArgsConstructor
class ReservationRestController {
private final ApplicationEventPublisher eventDispatcher;
private final EventDriver eventDriver;
@GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE, value = "/wait-for-event/{name}")
public Flux<String> performAction(@PathVariable String name) {
return eventDriver.getEvents()
.doOnSubscribe(s -> eventDispatcher.publishEvent(new MyEvent(name)))
.map(Object::toString)
.log();
}
@AllArgsConstructor
@NoArgsConstructor
@Data
class MyEvent {
private String name;
}
}
我的理解是:当我打开 REST 端点 \wait-for-event\any
时,Reactive Web 方法 performAction
应该首先订阅事件,然后调度事件(从而触发管道),因为doOnSubscribe
(当 {@link Flux} 完成订阅时触发添加行为(副作用))
不幸的是,事件卡住了 - 订阅看不到我的事件,这是日志
: Requesting processor.
: Event ReservationRestController.MyEvent(name=as3) is ready to be pushed into processor.
: Event ReservationRestController.MyEvent(name=as3) received by processor.
: | onSubscribe([Fuseable] FluxOnAssembly.OnAssemblySubscriber)
: | request(1)
据我所知,我在实际调用 onSubscribe 之前进行调度。
当然,之后发生的任何其他事件都将被正确捕获。
一段时间后我又回到了这个问题;解决方案是先订阅事件,然后使用调度程序压缩:
public String wtfperformActionPathVariable String name) {
return Mono.from(
eventDriver.getEvents()
.map(Object::toString)
.filter(b -> b.contains(name))
.log("Event source")
)
.zipWith(Mono.just(name)
.map(MyEvent::new)
.doOnNext(eventDispatcher::publishEvent)
.log("Publisher"))
.map(Tuple2::getT1)
.log("Result")
.block();
}