Reactor:创建 Monos 到 Flux 的函数
Reactor: function creating Monos to Flux
基本上,我正在 Spring 引导中制作一个队列处理器,并希望将 Reactor 用于异步。我做了一个函数需要永远循环,因为它是从队列中拉出然后将项目标记为已处理的函数。
这是有效的阻塞版本 订阅 returns 一个 Mono
while(true) {
manager.Subscribe().block()
}
我不确定如何将它变成 Flux 我查看了间隔、生成、创建等,如果不调用 block()
这是我尝试过的示例
Flux.generate(() -> manager,
(state, sink) -> {
state.Subscribe().block();
sink.next("done");
return state;
}));
作为 Reactor 的新手,我还没有找到任何关于循环和无阻塞地同步处理 Monos 的信息。
下面是订阅方法使用 AWS Java SDK v2 的作用:
public Mono Subscribe() {
return Mono.fromFuture(_client.receiveMessage(ReceiveMessageRequest.builder()
.waitTimeSeconds(10)
.queueUrl(_queueUrl)
.build()))
.filter(x -> x.messages() != null)
.flatMap(x -> Mono.when(x.messages()
.stream()
.map(y -> {
_log.warn(y.body());
return Mono.fromFuture(_client.deleteMessage(DeleteMessageRequest.builder()
.queueUrl(_queueUrl)
.receiptHandle(y.receiptHandle())
.build()));
}).collect(Collectors.toList())));
}
基本上,我只是轮询一个 SQS 队列,删除消息然后我想再做一次。这对我来说只是探索。
谢谢!
您需要两件事:一种在循环中订阅的方法和一种确保在每次迭代中有效调用 Subscribe()
方法的方法(因为 Future
需要重新创建)。
repeat()
是内置运算符,它将在源完成后重新订阅其源。如果源错误,则重复循环停止。最简单的变体继续这样做 Long.MAX_VALUE
次。
唯一的问题是,在您的情况下,Subscribe()
中的 Mono
必须在每次迭代时重新创建 。
为此,您可以将 Subscribe()
调用包装在 defer
中:它会在每次发生新订阅时重新调用该方法,其中包括每次重复尝试:
Flux<Stuff> repeated = Mono
.defer(manager::Subscribe)
.repeat();
基本上,我正在 Spring 引导中制作一个队列处理器,并希望将 Reactor 用于异步。我做了一个函数需要永远循环,因为它是从队列中拉出然后将项目标记为已处理的函数。
这是有效的阻塞版本 订阅 returns 一个 Mono
while(true) {
manager.Subscribe().block()
}
我不确定如何将它变成 Flux 我查看了间隔、生成、创建等,如果不调用 block()
这是我尝试过的示例
Flux.generate(() -> manager,
(state, sink) -> {
state.Subscribe().block();
sink.next("done");
return state;
}));
作为 Reactor 的新手,我还没有找到任何关于循环和无阻塞地同步处理 Monos 的信息。
下面是订阅方法使用 AWS Java SDK v2 的作用:
public Mono Subscribe() {
return Mono.fromFuture(_client.receiveMessage(ReceiveMessageRequest.builder()
.waitTimeSeconds(10)
.queueUrl(_queueUrl)
.build()))
.filter(x -> x.messages() != null)
.flatMap(x -> Mono.when(x.messages()
.stream()
.map(y -> {
_log.warn(y.body());
return Mono.fromFuture(_client.deleteMessage(DeleteMessageRequest.builder()
.queueUrl(_queueUrl)
.receiptHandle(y.receiptHandle())
.build()));
}).collect(Collectors.toList())));
}
基本上,我只是轮询一个 SQS 队列,删除消息然后我想再做一次。这对我来说只是探索。
谢谢!
您需要两件事:一种在循环中订阅的方法和一种确保在每次迭代中有效调用 Subscribe()
方法的方法(因为 Future
需要重新创建)。
repeat()
是内置运算符,它将在源完成后重新订阅其源。如果源错误,则重复循环停止。最简单的变体继续这样做 Long.MAX_VALUE
次。
唯一的问题是,在您的情况下,Subscribe()
中的 Mono
必须在每次迭代时重新创建 。
为此,您可以将 Subscribe()
调用包装在 defer
中:它会在每次发生新订阅时重新调用该方法,其中包括每次重复尝试:
Flux<Stuff> repeated = Mono
.defer(manager::Subscribe)
.repeat();