webflux:内部事件总线和异步、松散耦合的事件监听器
webflux: internal event bus and async, loosley coupled event listeners
如何实现内部事件总线以在 webflux spring 堆栈中执行异步操作?
我想要一个服务来发出一个事件:
@Service
class FeedServiceImpl(/*...dependencies...*/) : FeedService {
override suspend fun deleteEntry(entryId: Long) {
entryRepository.deleteById(entryId)
publishEvent(
FeedEntryDeletedEvent(
timestamp = time.utcMillis(),
entryId = entryId,
)
)
}
}
发布者服务不知道的另一个组件应该能够决定对该事件做出反应。
@Service
class CommentServiceImpl(/*...dependencies...*/): CommentService {
override suspend fun onDeleteEntry(event: FeedEntryDeletedEvent) {
// do stuff
}
}
在 MVC 应用程序中,我将使用 ApplicationEventPublisher
发布事件 (publishEvent
) 并在处理程序 (onDeleteEntry
) 上使用 @EventListener
+@Async
.
反应堆中的等价物是什么?
我考虑的另一个选择是 运行 嵌入式消息服务,因为这应该暗示异步语义。但是对于一个简单的场景来说,这感觉开销很大。
我找到了这些 SO 线程
- Execute Asynchronous call after return statement Reactive Core + Spring Boot
但他们不回答这种情况,因为他们假设听众是发布者已知的。但是我需要松耦合。
我也发现了这些 spring 问题
- https://github.com/spring-projects/spring-framework/issues/21025
- https://github.com/spring-projects/spring-framework/issues/21831
具体请参阅 this comment 有希望的建议:
Mono.fromRunnable(() -> context.publishEvent(...))
据我所知,我可以只使用 @EventListener
,因为我完全可以不传播反应性上下文。
但是有人可以解释线程绑定的含义吗?这在反应式堆栈中是否合法?
更新
从测试来看,这样做感觉很好:
@Service
class FeedServiceImpl(
val applicationEventPublisher: ApplicationEventPublisher,
) : FeedService {
@EventListener
@Async
override fun handle(e: FeedEntryDeletedEvent) {
log.info("Handler started")
runBlocking {
// do stuff that takes some time
delay(1000)
}
log.info("ThreadId: ${Thread.currentThread().id}")
log.info("Handler done")
}
override suspend fun deleteEntry(entryId: Long) {
entryRepository.deleteById(entryId)
applicationEventPublisher.publishEvent(
FeedEntryDeletedEvent(
timestamp = time.utcMillis(),
entryId = entryId,
)
)
log.info("ThreadId: ${Thread.currentThread().id}")
log.info("Publisher done")
}
}
请注意,handle
不是挂起函数,因为 @EventListener
必须有一个参数,协程在后台引入了连续参数。然后处理程序启动一个新的 blocking 协程范围,这很好,因为它在不同的线程上,因为 @Async
.
输出为:
2021-05-13 12:15:20.755 INFO 20252 --- [-1 @coroutine#6] ...FeedServiceImpl : ThreadId: 38
2021-05-13 12:15:20.755 INFO 20252 --- [ task-1] ...FeedServiceImpl : Handler started
2021-05-13 12:15:20.755 INFO 20252 --- [-1 @coroutine#6] ...FeedServiceImpl : Publisher done
2021-05-13 12:15:21.758 INFO 20252 --- [ task-1] ...FeedServiceImpl : ThreadId: 54
2021-05-13 12:15:21.759 INFO 20252 --- [ task-1] ...FeedServiceImpl : Handler done
更新 2
另一种不使用@Async 的方法是这样的:
@EventListener
// @Async
override fun handle(e: FeedEntryDeletedEvent) {
log.info("Handler start")
log.info("Handler ThreadId: ${Thread.currentThread().id}")
runBlocking {
log.info("Handler block start")
delay(1000)
log.info("Handler block ThreadId: ${Thread.currentThread().id}")
log.info("Handler block end")
}
log.info("Handler done")
}
override suspend fun deleteEntry(entryId: Long) {
feedRepository.deleteById(entryId)
Mono.fromRunnable<Unit> {
applicationEventPublisher.publishEvent(
FeedEntryDeletedEvent(
timestamp = time.utcMillis(),
entryId = entryId,
)
)
}
.subscribeOn(Schedulers.boundedElastic())
.subscribe()
log.info("Publisher ThreadId: ${Thread.currentThread().id}")
log.info("Publisher done")
}
2021-05-13 13:06:54.503 INFO 23326 --- [-1 @coroutine#6] ...FeedServiceImpl : Publisher ThreadId: 38
2021-05-13 13:06:54.503 INFO 23326 --- [-1 @coroutine#6] ...FeedServiceImpl : Publisher done
2021-05-13 13:06:54.504 INFO 23326 --- [oundedElastic-1] ...FeedServiceImpl : Handler start
2021-05-13 13:06:54.504 INFO 23326 --- [oundedElastic-1] ...FeedServiceImpl : Handler ThreadId: 53
2021-05-13 13:06:54.505 INFO 23326 --- [-1 @coroutine#7] ...FeedServiceImpl : Handler block start
2021-05-13 13:06:55.539 INFO 23326 --- [-1 @coroutine#7] ...FeedServiceImpl : Handler block ThreadId: 53
2021-05-13 13:06:55.539 INFO 23326 --- [-1 @coroutine#7] ...FeedServiceImpl : Handler block end
2021-05-13 13:06:55.540 INFO 23326 --- [oundedElastic-1] ...FeedServiceImpl : Handler done
但是,我仍然不理解负载下的应用程序的含义,将反应性操作与执行 runBlocking { }
的处理程序混合感觉是错误的。
Reactor 提供 Sink。您可以像使用事件总线一样使用它。看看下面的例子。
@Configuration
public class EventNotificationConfig {
@Bean
public Sinks.Many<EventNotification> eventNotifications() {
return Sinks.many().replay().latest();
}
}
您在 @Configuration
class 中创建了一个 Sink Bean。这可用于发出新事件,并可将其转变为供订阅者使用的 Flux。
@Component
@RequiredArgsConstructor
@Slf4j
public class NotificationUsecase {
private final @NonNull Sinks.Many<EventNotification> eventNotifications;
/**
* Provide a flux with our notifications.
*
* @return a Flux
*/
public Flux<EventNotification> getNotifications() {
return eventNotifications.asFlux();
}
/**
* Emit a new event to the sink.
*
* @param eventId
* @param status
* @param payload
*/
public void emitNotification(final String eventId, final EventNotification.Status status, final Map<String, Object> payload) {
eventNotifications.tryEmitNext(EventNotification.builder()
.eventId(eventId)
.status(status)
.payload(payload).build());
}
}
您的应用程序中最多可以保留一个 Sink 实例。订阅不同类型的事件可以通过各种订阅者可以应用于 Flux 的过滤器来实现。
@Component
@RequiredArgsConstructor
@Slf4j
public class EventListener {
private final @NonNull NotificationUsecase notificationUsecase;
/**
* Start listening to events as soon as class EventListener
* has been constructed.
*
* Listening will continue until the Flux emits a 'completed'
* signal.
*/
@PostConstruct
public void init() {
this.listenToPings()
.subscribe();
this.listenToDataFetched()
.subscribe();
}
public Flux<EventNotification> listenToPings() {
return this.notificationUsecase.getNotifications()
.subscribeOn(Schedulers.boundedElastic())
.filter(notification -> notification.getStatus().equals(EventNotification.Status.PING))
.doOnNext(notification -> log.info("received PING: {}", notification));
}
public Flux<EventNotification> listenToDataFetched() {
return this.notificationUsecase.getNotifications()
.subscribeOn(Schedulers.boundedElastic())
.filter(notification -> notification.getStatus().equals(EventNotification.Status.DATA_FETCHED))
.doOnNext(notification -> log.info("received data: {}", notification));
}
}
public Flux<EventNotification> listenToDataFetchedAndWriteToDatabase() {
return this.notificationUsecase.getNotifications()
.subscribeOn(Schedulers.boundedElastic())
.flatMap(notification -> reactiveMongoRepository
.saveAndReturnNewObject(notification)
.doOnNext(log.info("I just saved something and returned an instance of NewObject!"))
.zipWith(Mono.just(notification)))
.map(tuple->tuple.getT2())
.filter(notification -> notification.getStatus().equals(PlanningNotification.Status.DATA_FETCHED))
.doOnNext(notification -> log.info("received data: {} - saved ", notification));
}
发出新事件同样简单。只需调用发射方法:
notificationUsecase.emitNotification(eventId, EventNotification.Status.PING, payload);
如何实现内部事件总线以在 webflux spring 堆栈中执行异步操作?
我想要一个服务来发出一个事件:
@Service
class FeedServiceImpl(/*...dependencies...*/) : FeedService {
override suspend fun deleteEntry(entryId: Long) {
entryRepository.deleteById(entryId)
publishEvent(
FeedEntryDeletedEvent(
timestamp = time.utcMillis(),
entryId = entryId,
)
)
}
}
发布者服务不知道的另一个组件应该能够决定对该事件做出反应。
@Service
class CommentServiceImpl(/*...dependencies...*/): CommentService {
override suspend fun onDeleteEntry(event: FeedEntryDeletedEvent) {
// do stuff
}
}
在 MVC 应用程序中,我将使用 ApplicationEventPublisher
发布事件 (publishEvent
) 并在处理程序 (onDeleteEntry
) 上使用 @EventListener
+@Async
.
反应堆中的等价物是什么?
我考虑的另一个选择是 运行 嵌入式消息服务,因为这应该暗示异步语义。但是对于一个简单的场景来说,这感觉开销很大。
我找到了这些 SO 线程
- Execute Asynchronous call after return statement Reactive Core + Spring Boot
但他们不回答这种情况,因为他们假设听众是发布者已知的。但是我需要松耦合。
我也发现了这些 spring 问题
- https://github.com/spring-projects/spring-framework/issues/21025
- https://github.com/spring-projects/spring-framework/issues/21831
具体请参阅 this comment 有希望的建议:
Mono.fromRunnable(() -> context.publishEvent(...))
据我所知,我可以只使用 @EventListener
,因为我完全可以不传播反应性上下文。
但是有人可以解释线程绑定的含义吗?这在反应式堆栈中是否合法?
更新
从测试来看,这样做感觉很好:
@Service
class FeedServiceImpl(
val applicationEventPublisher: ApplicationEventPublisher,
) : FeedService {
@EventListener
@Async
override fun handle(e: FeedEntryDeletedEvent) {
log.info("Handler started")
runBlocking {
// do stuff that takes some time
delay(1000)
}
log.info("ThreadId: ${Thread.currentThread().id}")
log.info("Handler done")
}
override suspend fun deleteEntry(entryId: Long) {
entryRepository.deleteById(entryId)
applicationEventPublisher.publishEvent(
FeedEntryDeletedEvent(
timestamp = time.utcMillis(),
entryId = entryId,
)
)
log.info("ThreadId: ${Thread.currentThread().id}")
log.info("Publisher done")
}
}
请注意,handle
不是挂起函数,因为 @EventListener
必须有一个参数,协程在后台引入了连续参数。然后处理程序启动一个新的 blocking 协程范围,这很好,因为它在不同的线程上,因为 @Async
.
输出为:
2021-05-13 12:15:20.755 INFO 20252 --- [-1 @coroutine#6] ...FeedServiceImpl : ThreadId: 38
2021-05-13 12:15:20.755 INFO 20252 --- [ task-1] ...FeedServiceImpl : Handler started
2021-05-13 12:15:20.755 INFO 20252 --- [-1 @coroutine#6] ...FeedServiceImpl : Publisher done
2021-05-13 12:15:21.758 INFO 20252 --- [ task-1] ...FeedServiceImpl : ThreadId: 54
2021-05-13 12:15:21.759 INFO 20252 --- [ task-1] ...FeedServiceImpl : Handler done
更新 2
另一种不使用@Async 的方法是这样的:
@EventListener
// @Async
override fun handle(e: FeedEntryDeletedEvent) {
log.info("Handler start")
log.info("Handler ThreadId: ${Thread.currentThread().id}")
runBlocking {
log.info("Handler block start")
delay(1000)
log.info("Handler block ThreadId: ${Thread.currentThread().id}")
log.info("Handler block end")
}
log.info("Handler done")
}
override suspend fun deleteEntry(entryId: Long) {
feedRepository.deleteById(entryId)
Mono.fromRunnable<Unit> {
applicationEventPublisher.publishEvent(
FeedEntryDeletedEvent(
timestamp = time.utcMillis(),
entryId = entryId,
)
)
}
.subscribeOn(Schedulers.boundedElastic())
.subscribe()
log.info("Publisher ThreadId: ${Thread.currentThread().id}")
log.info("Publisher done")
}
2021-05-13 13:06:54.503 INFO 23326 --- [-1 @coroutine#6] ...FeedServiceImpl : Publisher ThreadId: 38
2021-05-13 13:06:54.503 INFO 23326 --- [-1 @coroutine#6] ...FeedServiceImpl : Publisher done
2021-05-13 13:06:54.504 INFO 23326 --- [oundedElastic-1] ...FeedServiceImpl : Handler start
2021-05-13 13:06:54.504 INFO 23326 --- [oundedElastic-1] ...FeedServiceImpl : Handler ThreadId: 53
2021-05-13 13:06:54.505 INFO 23326 --- [-1 @coroutine#7] ...FeedServiceImpl : Handler block start
2021-05-13 13:06:55.539 INFO 23326 --- [-1 @coroutine#7] ...FeedServiceImpl : Handler block ThreadId: 53
2021-05-13 13:06:55.539 INFO 23326 --- [-1 @coroutine#7] ...FeedServiceImpl : Handler block end
2021-05-13 13:06:55.540 INFO 23326 --- [oundedElastic-1] ...FeedServiceImpl : Handler done
但是,我仍然不理解负载下的应用程序的含义,将反应性操作与执行 runBlocking { }
的处理程序混合感觉是错误的。
Reactor 提供 Sink。您可以像使用事件总线一样使用它。看看下面的例子。
@Configuration
public class EventNotificationConfig {
@Bean
public Sinks.Many<EventNotification> eventNotifications() {
return Sinks.many().replay().latest();
}
}
您在 @Configuration
class 中创建了一个 Sink Bean。这可用于发出新事件,并可将其转变为供订阅者使用的 Flux。
@Component
@RequiredArgsConstructor
@Slf4j
public class NotificationUsecase {
private final @NonNull Sinks.Many<EventNotification> eventNotifications;
/**
* Provide a flux with our notifications.
*
* @return a Flux
*/
public Flux<EventNotification> getNotifications() {
return eventNotifications.asFlux();
}
/**
* Emit a new event to the sink.
*
* @param eventId
* @param status
* @param payload
*/
public void emitNotification(final String eventId, final EventNotification.Status status, final Map<String, Object> payload) {
eventNotifications.tryEmitNext(EventNotification.builder()
.eventId(eventId)
.status(status)
.payload(payload).build());
}
}
您的应用程序中最多可以保留一个 Sink 实例。订阅不同类型的事件可以通过各种订阅者可以应用于 Flux 的过滤器来实现。
@Component
@RequiredArgsConstructor
@Slf4j
public class EventListener {
private final @NonNull NotificationUsecase notificationUsecase;
/**
* Start listening to events as soon as class EventListener
* has been constructed.
*
* Listening will continue until the Flux emits a 'completed'
* signal.
*/
@PostConstruct
public void init() {
this.listenToPings()
.subscribe();
this.listenToDataFetched()
.subscribe();
}
public Flux<EventNotification> listenToPings() {
return this.notificationUsecase.getNotifications()
.subscribeOn(Schedulers.boundedElastic())
.filter(notification -> notification.getStatus().equals(EventNotification.Status.PING))
.doOnNext(notification -> log.info("received PING: {}", notification));
}
public Flux<EventNotification> listenToDataFetched() {
return this.notificationUsecase.getNotifications()
.subscribeOn(Schedulers.boundedElastic())
.filter(notification -> notification.getStatus().equals(EventNotification.Status.DATA_FETCHED))
.doOnNext(notification -> log.info("received data: {}", notification));
}
}
public Flux<EventNotification> listenToDataFetchedAndWriteToDatabase() {
return this.notificationUsecase.getNotifications()
.subscribeOn(Schedulers.boundedElastic())
.flatMap(notification -> reactiveMongoRepository
.saveAndReturnNewObject(notification)
.doOnNext(log.info("I just saved something and returned an instance of NewObject!"))
.zipWith(Mono.just(notification)))
.map(tuple->tuple.getT2())
.filter(notification -> notification.getStatus().equals(PlanningNotification.Status.DATA_FETCHED))
.doOnNext(notification -> log.info("received data: {} - saved ", notification));
}
发出新事件同样简单。只需调用发射方法:
notificationUsecase.emitNotification(eventId, EventNotification.Status.PING, payload);