为 Flux.generate(...) 预加载元素
Preload elements for Flux.generate(...)
我正在使用 Flux.generate() 创建一个 Flux。生成器(消费者)实际上是从消息队列中读取。问题是这个调用需要相当长的时间(有时甚至是 1-2 秒)。这将使助焊剂停止处理。
package com.github.loa.vault.service.listener;
import com.github.loa.document.service.domain.DocumentType;
import com.github.loa.queue.service.QueueManipulator;
import com.github.loa.queue.service.domain.Queue;
import com.github.loa.queue.service.domain.message.DocumentArchivingMessage;
import com.github.loa.vault.service.domain.DocumentArchivingContext;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import reactor.core.publisher.SynchronousSink;
import java.util.function.Consumer;
@Slf4j
@Service
@RequiredArgsConstructor
public class VaultQueueConsumer implements Consumer<SynchronousSink<DocumentArchivingContext>> {
private final QueueManipulator queueManipulator;
@Override
public void accept(final SynchronousSink<DocumentArchivingContext> documentSourceItemSynchronousSink) {
final DocumentArchivingMessage documentArchivingMessage = (DocumentArchivingMessage)
queueManipulator.readMessage(Queue.DOCUMENT_ARCHIVING_QUEUE);
documentSourceItemSynchronousSink.next(
DocumentArchivingContext.builder()
.type(DocumentType.valueOf(documentArchivingMessage.getType()))
.source(documentArchivingMessage.getSource())
.content(documentArchivingMessage.getContent())
.build()
);
}
}
显然添加 parallel
没有帮助,因为生成器仍被一次调用一个。
Flux.generate(vaultQueueConsumer)
.parallel()
.runOn(Schedulers.parallel())
.flatMap(vaultDocumentManager::archiveDocument)
.subscribe();
有谁知道如何让发电机并联吗?我不想使用 Flux.create()
因为那样我会失去背压。
你试过了吗:
Flux.generate(vaultQueueConsumer)
.parallel()
.runOn(Schedulers.parallel())
.flatMap(vaultDocumentManager::archiveDocument)
.subscribe();
问题是 vaultQueueConsumer
包括运行缓慢。
因此,解决方案是将这个缓慢的操作从 generate
提取到可以并行化的 map
。
作为一个想法,您可以生成一个队列名称,消息必须从中使用,并在使流量并行后在 map
方法中执行实际消息使用:
String queue = "test";
Flux.<String>generate(synchronousSink -> synchronousSink.next(queue))
.parallel()
.runOn(Schedulers.parallel())
.map(queueManipulator::readMessage)
.doOnNext(log::info)
.subscribe();
假冒的QueueManipulator
在回复消息之前睡了1-2秒:
public class QueueManipulator {
private final AtomicLong counter = new AtomicLong();
public String readMessage(String queue) {
sleep(); //sleep 1-2 seconds
return queue + " " + counter.incrementAndGet();
}
//...
}
这种消息消费是并行完成的:
12:49:22.362 [parallel-4] - test 2
12:49:22.362 [parallel-3] - test 4
12:49:22.362 [parallel-2] - test 1
12:49:22.362 [parallel-1] - test 3
12:49:23.369 [parallel-3] - test 6
12:49:23.369 [parallel-1] - test 5
12:49:23.369 [parallel-2] - test 7
12:49:23.369 [parallel-4] - test 8
上面的这个解决方案很简单,可能看起来像 "hack"。
另一个想法是在 flatMap
:
中调用 Flux.generate
String queue = "test";
int parallelism = 5;
Flux.range(0, parallelism)
.flatMap(i ->
Flux.<String>generate(synchronousSink -> {
synchronousSink.next(queueManipulator.readMessage(queue));
}).subscribeOn(Schedulers.parallel()))
.doOnNext(log::info)
.subscribe();
Mono.just(1).repeat() // create infinite flux, maybe there is a nicer way for that?
.flatMap(this::readFromQueue, 100) // define queue polling concurrency
.flatMap(this::archiveDocument)
.subscribe();
private Mono<String> readFromQueue(Integer ignore)
{
return Mono.fromCallable(() -> {
Thread.sleep(1500); // your actual blocking queue polling here
return "queue_element";
}).subscribeOn(Schedulers.elastic()); // dedicate blocking call to threadpool
}
我正在使用 Flux.generate() 创建一个 Flux。生成器(消费者)实际上是从消息队列中读取。问题是这个调用需要相当长的时间(有时甚至是 1-2 秒)。这将使助焊剂停止处理。
package com.github.loa.vault.service.listener;
import com.github.loa.document.service.domain.DocumentType;
import com.github.loa.queue.service.QueueManipulator;
import com.github.loa.queue.service.domain.Queue;
import com.github.loa.queue.service.domain.message.DocumentArchivingMessage;
import com.github.loa.vault.service.domain.DocumentArchivingContext;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import reactor.core.publisher.SynchronousSink;
import java.util.function.Consumer;
@Slf4j
@Service
@RequiredArgsConstructor
public class VaultQueueConsumer implements Consumer<SynchronousSink<DocumentArchivingContext>> {
private final QueueManipulator queueManipulator;
@Override
public void accept(final SynchronousSink<DocumentArchivingContext> documentSourceItemSynchronousSink) {
final DocumentArchivingMessage documentArchivingMessage = (DocumentArchivingMessage)
queueManipulator.readMessage(Queue.DOCUMENT_ARCHIVING_QUEUE);
documentSourceItemSynchronousSink.next(
DocumentArchivingContext.builder()
.type(DocumentType.valueOf(documentArchivingMessage.getType()))
.source(documentArchivingMessage.getSource())
.content(documentArchivingMessage.getContent())
.build()
);
}
}
显然添加 parallel
没有帮助,因为生成器仍被一次调用一个。
Flux.generate(vaultQueueConsumer)
.parallel()
.runOn(Schedulers.parallel())
.flatMap(vaultDocumentManager::archiveDocument)
.subscribe();
有谁知道如何让发电机并联吗?我不想使用 Flux.create()
因为那样我会失去背压。
你试过了吗:
Flux.generate(vaultQueueConsumer)
.parallel()
.runOn(Schedulers.parallel())
.flatMap(vaultDocumentManager::archiveDocument)
.subscribe();
问题是 vaultQueueConsumer
包括运行缓慢。
因此,解决方案是将这个缓慢的操作从 generate
提取到可以并行化的 map
。
作为一个想法,您可以生成一个队列名称,消息必须从中使用,并在使流量并行后在 map
方法中执行实际消息使用:
String queue = "test";
Flux.<String>generate(synchronousSink -> synchronousSink.next(queue))
.parallel()
.runOn(Schedulers.parallel())
.map(queueManipulator::readMessage)
.doOnNext(log::info)
.subscribe();
假冒的QueueManipulator
在回复消息之前睡了1-2秒:
public class QueueManipulator {
private final AtomicLong counter = new AtomicLong();
public String readMessage(String queue) {
sleep(); //sleep 1-2 seconds
return queue + " " + counter.incrementAndGet();
}
//...
}
这种消息消费是并行完成的:
12:49:22.362 [parallel-4] - test 2
12:49:22.362 [parallel-3] - test 4
12:49:22.362 [parallel-2] - test 1
12:49:22.362 [parallel-1] - test 3
12:49:23.369 [parallel-3] - test 6
12:49:23.369 [parallel-1] - test 5
12:49:23.369 [parallel-2] - test 7
12:49:23.369 [parallel-4] - test 8
上面的这个解决方案很简单,可能看起来像 "hack"。
另一个想法是在 flatMap
:
Flux.generate
String queue = "test";
int parallelism = 5;
Flux.range(0, parallelism)
.flatMap(i ->
Flux.<String>generate(synchronousSink -> {
synchronousSink.next(queueManipulator.readMessage(queue));
}).subscribeOn(Schedulers.parallel()))
.doOnNext(log::info)
.subscribe();
Mono.just(1).repeat() // create infinite flux, maybe there is a nicer way for that?
.flatMap(this::readFromQueue, 100) // define queue polling concurrency
.flatMap(this::archiveDocument)
.subscribe();
private Mono<String> readFromQueue(Integer ignore)
{
return Mono.fromCallable(() -> {
Thread.sleep(1500); // your actual blocking queue polling here
return "queue_element";
}).subscribeOn(Schedulers.elastic()); // dedicate blocking call to threadpool
}