等待使用 Spring 数据 MongoDB 激活反应式更改流订阅?
Wait for reactive change stream subscription to be active with Spring Data MongoDB?
当订阅 change streams using the blocking Spring Data Mongo implementation one can call await 等待订阅生效时:
Subscription subscription = startBlockingMongoChangeStream();
subscription.await(Duration.of(2, SECONDS));
Document someDocument = ..
writeDocumentToMongoDb(someDocument);
startBlockingMongoChangeStream
的实现方式如下:
public Subscription startBlockingMongoChangeStream() {
MessageListenerContainer container = new DefaultMessageListenerContainer(template);
container.start();
MessageListener<ChangeStreamDocument<Document>, Document> listener = System.out::println;
ChangeStreamRequestOptions options = new ChangeStreamRequestOptions("user", ChangeStreamOptions.empty());
return container.register(new ChangeStreamRequest<>(listener, options), Document.class);
}
如果在上面的示例中没有使用 await
,则有可能(如果 JVM 很热,则几乎有 100% 的机会)someDocument
写在 之前 订阅处于活动状态,因此错过了 someDocument
。所以添加 await
可以缓解这个问题。
我正在寻找一种方法来在使用反应式实现时实现相同的目的。代码现在看起来像这样:
Disposable disposable = startReactiveMongoChangeStream().subscribe(); // (1)
Document someDocument = ..
writeDocumentToMongoDb(someDocument).subscribe(); // (2)
这里的问题又是,someDocument
是在 startReactiveMongoChangeStream
返回的订阅开始之前写入的,因此文档丢失了。
另请注意,这是一个有点人为的示例,因为在我的实际应用程序中 writeDocumentToMongoDb
(2) 不知道 startReactiveMongoChangeStream
订阅 (1),所以我不能简单地 flatMap
(1) 并调用 (2)。 startReactiveMongoChangeStream
方法的实现方式如下:
public Flux<ChangeStreamEvent<String>> startReactiveMongoChangeStream() {
return reactiveTemplate.changeStream(String.class)
.watchCollection("user")
.listen();
}
如何在反应式实现中“模拟”阻塞式实现中可用的 await
功能?
TL;DR
反应式中没有同步方法API
说明
首先,让我们看看这两种实现方式以了解这是为什么。
阻塞实现使用 MongoDB 的游标 API 来获取游标。获取游标包括与服务器的对话。在 MessageListenerContainer
获得游标后,它将订阅任务切换为活动状态,这意味着您已经等待第一个游标被获取的阶段。
响应式实现在 ChangeStreamPublisher
上运行。从反应流协议中,可以在发出元素时、流完成或失败时收到通知。服务器端 activity 启动或完成时没有可用的通知。因此,您不能等到反应式 API 收到第一个游标。由于游标可能为空,因此第一个游标可能根本不会发出任何值。
我认为 MongoDB 驱动程序 可以 提供回调样式 API 以获取有关流处于活动状态的通知。然而,这是要在 MongoDB issue tracker.
中报告的内容
当订阅 change streams using the blocking Spring Data Mongo implementation one can call await 等待订阅生效时:
Subscription subscription = startBlockingMongoChangeStream();
subscription.await(Duration.of(2, SECONDS));
Document someDocument = ..
writeDocumentToMongoDb(someDocument);
startBlockingMongoChangeStream
的实现方式如下:
public Subscription startBlockingMongoChangeStream() {
MessageListenerContainer container = new DefaultMessageListenerContainer(template);
container.start();
MessageListener<ChangeStreamDocument<Document>, Document> listener = System.out::println;
ChangeStreamRequestOptions options = new ChangeStreamRequestOptions("user", ChangeStreamOptions.empty());
return container.register(new ChangeStreamRequest<>(listener, options), Document.class);
}
如果在上面的示例中没有使用 await
,则有可能(如果 JVM 很热,则几乎有 100% 的机会)someDocument
写在 之前 订阅处于活动状态,因此错过了 someDocument
。所以添加 await
可以缓解这个问题。
我正在寻找一种方法来在使用反应式实现时实现相同的目的。代码现在看起来像这样:
Disposable disposable = startReactiveMongoChangeStream().subscribe(); // (1)
Document someDocument = ..
writeDocumentToMongoDb(someDocument).subscribe(); // (2)
这里的问题又是,someDocument
是在 startReactiveMongoChangeStream
返回的订阅开始之前写入的,因此文档丢失了。
另请注意,这是一个有点人为的示例,因为在我的实际应用程序中 writeDocumentToMongoDb
(2) 不知道 startReactiveMongoChangeStream
订阅 (1),所以我不能简单地 flatMap
(1) 并调用 (2)。 startReactiveMongoChangeStream
方法的实现方式如下:
public Flux<ChangeStreamEvent<String>> startReactiveMongoChangeStream() {
return reactiveTemplate.changeStream(String.class)
.watchCollection("user")
.listen();
}
如何在反应式实现中“模拟”阻塞式实现中可用的 await
功能?
TL;DR
反应式中没有同步方法API
说明
首先,让我们看看这两种实现方式以了解这是为什么。
阻塞实现使用 MongoDB 的游标 API 来获取游标。获取游标包括与服务器的对话。在 MessageListenerContainer
获得游标后,它将订阅任务切换为活动状态,这意味着您已经等待第一个游标被获取的阶段。
响应式实现在 ChangeStreamPublisher
上运行。从反应流协议中,可以在发出元素时、流完成或失败时收到通知。服务器端 activity 启动或完成时没有可用的通知。因此,您不能等到反应式 API 收到第一个游标。由于游标可能为空,因此第一个游标可能根本不会发出任何值。
我认为 MongoDB 驱动程序 可以 提供回调样式 API 以获取有关流处于活动状态的通知。然而,这是要在 MongoDB issue tracker.
中报告的内容