使用 Reactor 和 RabbitMQ 进行响应式编程
Reactive Programming with Reactor and RabbitMQ
最近我写了一个演示程序来结合 Reactor 和 RabbitMQ 启动反应式编程。这是我的演示代码:
public class FluxWithRabbitMQDemo {
private static final String QUEUE = "demo_thong";
private final reactor.rabbitmq.Sender sender;
private final Receiver receiver;
public FluxWithRabbitMQDemo() {
this.sender = ReactorRabbitMq.createSender();
this.receiver = ReactorRabbitMq.createReceiver();
}
public void run(int count) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.useNio();
SenderOptions senderOptions = new SenderOptions()
.connectionFactory(connectionFactory)
.resourceCreationScheduler(Schedulers.elastic());
reactor.rabbitmq.Sender sender = ReactorRabbitMq.createSender(senderOptions);
Mono<AMQP.Queue.DeclareOk> queueDeclaration = sender.declareQueue(QueueSpecification.queue(QUEUE));
Flux<Delivery> messages = receiver.consumeAutoAck(QUEUE);
queueDeclaration.thenMany(messages).subscribe(m->System.out.println("Get message "+ new String(m.getBody())));
Flux<OutboundMessageResult> dataStream = sender.sendWithPublishConfirms(Flux.range(1, count)
.filter(m -> !m.equals(10))
.parallel()
.runOn(Schedulers.parallel())
.doOnNext(i->System.out.println("Message " + i + " run on thread "+Thread.currentThread().getId()))
.map(i -> new OutboundMessage("", QUEUE, ("Message " + i).getBytes())));
sender.declareQueue(QueueSpecification.queue(QUEUE))
.thenMany(dataStream)
.doOnError(e -> System.out.println("Send failed"+ e))
.subscribe(m->{
if (m!= null){
System.out.println("Sent successfully message "+new String(m.getOutboundMessage().getBody()));
}
});
try {
Thread.sleep(20000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
int count = 20;
FluxWithRabbitMQDemo sender = new FluxWithRabbitMQDemo();
sender.run(count);
}
}
我预计在 Flux 发出一个项目之后,Sender 必须将它发送到 RabbitMQ,并且在接收到 RabbitMQ 之后 Receiver 必须接收它。
但一切都是按顺序发生的,这就是我得到的结果
Message 3 run on thread 25
Message 4 run on thread 26
Message 8 run on thread 26
Message 13 run on thread 26
Message 17 run on thread 26
Message 2 run on thread 24
Message 1 run on thread 23
Message 6 run on thread 24
Message 5 run on thread 23
Message 9 run on thread 23
Message 14 run on thread 23
Message 18 run on thread 23
Message 11 run on thread 24
Message 15 run on thread 24
Message 19 run on thread 24
Message 7 run on thread 25
Message 12 run on thread 25
Message 16 run on thread 25
Message 20 run on thread 25
Sent successfully message Message 3
Sent successfully message Message 1
Sent successfully message Message 2
Sent successfully message Message 4
Sent successfully message Message 5
Sent successfully message Message 6
Sent successfully message Message 8
Sent successfully message Message 9
Sent successfully message Message 11
Sent successfully message Message 13
Sent successfully message Message 14
Sent successfully message Message 15
Sent successfully message Message 17
Sent successfully message Message 18
Sent successfully message Message 19
Sent successfully message Message 7
Sent successfully message Message 12
Sent successfully message Message 16
Sent successfully message Message 20
Get message Message 3
Get message Message 1
Get message Message 2
Get message Message 4
Get message Message 5
Get message Message 6
Get message Message 8
Get message Message 9
Get message Message 11
Get message Message 13
Get message Message 14
Get message Message 15
Get message Message 17
Get message Message 18
Get message Message 19
Get message Message 7
Get message Message 12
Get message Message 16
Get message Message 20
我不知道如何处理我的代码才能达到预期的结果。有人能帮我吗?感谢提前!!!
消息生成速度太快。要查看交错,请在 dataStream
中添加
.doOnNext(i->Thread.sleep(10))
最近我写了一个演示程序来结合 Reactor 和 RabbitMQ 启动反应式编程。这是我的演示代码:
public class FluxWithRabbitMQDemo {
private static final String QUEUE = "demo_thong";
private final reactor.rabbitmq.Sender sender;
private final Receiver receiver;
public FluxWithRabbitMQDemo() {
this.sender = ReactorRabbitMq.createSender();
this.receiver = ReactorRabbitMq.createReceiver();
}
public void run(int count) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.useNio();
SenderOptions senderOptions = new SenderOptions()
.connectionFactory(connectionFactory)
.resourceCreationScheduler(Schedulers.elastic());
reactor.rabbitmq.Sender sender = ReactorRabbitMq.createSender(senderOptions);
Mono<AMQP.Queue.DeclareOk> queueDeclaration = sender.declareQueue(QueueSpecification.queue(QUEUE));
Flux<Delivery> messages = receiver.consumeAutoAck(QUEUE);
queueDeclaration.thenMany(messages).subscribe(m->System.out.println("Get message "+ new String(m.getBody())));
Flux<OutboundMessageResult> dataStream = sender.sendWithPublishConfirms(Flux.range(1, count)
.filter(m -> !m.equals(10))
.parallel()
.runOn(Schedulers.parallel())
.doOnNext(i->System.out.println("Message " + i + " run on thread "+Thread.currentThread().getId()))
.map(i -> new OutboundMessage("", QUEUE, ("Message " + i).getBytes())));
sender.declareQueue(QueueSpecification.queue(QUEUE))
.thenMany(dataStream)
.doOnError(e -> System.out.println("Send failed"+ e))
.subscribe(m->{
if (m!= null){
System.out.println("Sent successfully message "+new String(m.getOutboundMessage().getBody()));
}
});
try {
Thread.sleep(20000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
int count = 20;
FluxWithRabbitMQDemo sender = new FluxWithRabbitMQDemo();
sender.run(count);
}
} 我预计在 Flux 发出一个项目之后,Sender 必须将它发送到 RabbitMQ,并且在接收到 RabbitMQ 之后 Receiver 必须接收它。 但一切都是按顺序发生的,这就是我得到的结果
Message 3 run on thread 25
Message 4 run on thread 26
Message 8 run on thread 26
Message 13 run on thread 26
Message 17 run on thread 26
Message 2 run on thread 24
Message 1 run on thread 23
Message 6 run on thread 24
Message 5 run on thread 23
Message 9 run on thread 23
Message 14 run on thread 23
Message 18 run on thread 23
Message 11 run on thread 24
Message 15 run on thread 24
Message 19 run on thread 24
Message 7 run on thread 25
Message 12 run on thread 25
Message 16 run on thread 25
Message 20 run on thread 25
Sent successfully message Message 3
Sent successfully message Message 1
Sent successfully message Message 2
Sent successfully message Message 4
Sent successfully message Message 5
Sent successfully message Message 6
Sent successfully message Message 8
Sent successfully message Message 9
Sent successfully message Message 11
Sent successfully message Message 13
Sent successfully message Message 14
Sent successfully message Message 15
Sent successfully message Message 17
Sent successfully message Message 18
Sent successfully message Message 19
Sent successfully message Message 7
Sent successfully message Message 12
Sent successfully message Message 16
Sent successfully message Message 20
Get message Message 3
Get message Message 1
Get message Message 2
Get message Message 4
Get message Message 5
Get message Message 6
Get message Message 8
Get message Message 9
Get message Message 11
Get message Message 13
Get message Message 14
Get message Message 15
Get message Message 17
Get message Message 18
Get message Message 19
Get message Message 7
Get message Message 12
Get message Message 16
Get message Message 20
我不知道如何处理我的代码才能达到预期的结果。有人能帮我吗?感谢提前!!!
消息生成速度太快。要查看交错,请在 dataStream
中添加
.doOnNext(i->Thread.sleep(10))