使用 spring 入站文件适配器的并发处理
Concurrent processing with spring inbound file adapter
我有一个文件系统目录,我想在其中轮询文件,然后并发处理每个文件,每个文件一个线程。我的印象是,在幕后,InboundFileAdapter
将每个文件放入一个队列中,这样我就可以使用下游的执行程序通道来同时处理以后的调用。我在 Java Config 中实现如下:
return IntegrationFlows
.from(s -> s.file(inboundMessageDirectory.toFile(), Comparator.comparing(File::lastModified)) // serve oldest first
.scanner(directoryScanner) // we know the directory structure, so we can take advantage of that with a custom scanner
.filter(new AcceptOnceFileListFilter<>(MAX_FILTER_CAPACITY)), // limit number of references in memory
e -> e.poller(Pollers
.fixedDelay(fileSystemPollDelay)
.get()))
.channel(MessageChannels.executor(executor).get())
.transform(File::toPath)
.enrichHeaders(cleanUpConfigurer)
.get()
执行器通道下游的每个通道本身都是直接通道。
但是,我发现下游服务的并发性很差。使用缓存线程池,我看到同一个线程基本上串行执行下游代码,而如果我使用固定池执行程序,我看到不同的线程权衡仍然串行执行。
我也试过在轮询器和执行器通道之间架起一座桥梁,但无济于事。
那只是因为 SourcePollingChannelAdapterFactoryBean
幕后:
if (this.pollerMetadata.getMaxMessagesPerPoll() == Integer.MIN_VALUE){
// the default is 1 since a source might return
// a non-null and non-interruptible value every time it is invoked
this.pollerMetadata.setMaxMessagesPerPoll(1);
}
因此,每个 .fixedDelay(fileSystemPollDelay)
只有一个 File
从队列中轮询以进行处理。
因此,只需将 .maxMessagesPerPoll()
增加到适合您的系统值的某个值,并享受并发性!
顺便说一句,没有理由在轮询适配器之后引入ExecutorChannel
。完全出于相同的并发原因,您可以将 .taskExecutor()
用于 .poller()
。
我有一个文件系统目录,我想在其中轮询文件,然后并发处理每个文件,每个文件一个线程。我的印象是,在幕后,InboundFileAdapter
将每个文件放入一个队列中,这样我就可以使用下游的执行程序通道来同时处理以后的调用。我在 Java Config 中实现如下:
return IntegrationFlows
.from(s -> s.file(inboundMessageDirectory.toFile(), Comparator.comparing(File::lastModified)) // serve oldest first
.scanner(directoryScanner) // we know the directory structure, so we can take advantage of that with a custom scanner
.filter(new AcceptOnceFileListFilter<>(MAX_FILTER_CAPACITY)), // limit number of references in memory
e -> e.poller(Pollers
.fixedDelay(fileSystemPollDelay)
.get()))
.channel(MessageChannels.executor(executor).get())
.transform(File::toPath)
.enrichHeaders(cleanUpConfigurer)
.get()
执行器通道下游的每个通道本身都是直接通道。
但是,我发现下游服务的并发性很差。使用缓存线程池,我看到同一个线程基本上串行执行下游代码,而如果我使用固定池执行程序,我看到不同的线程权衡仍然串行执行。
我也试过在轮询器和执行器通道之间架起一座桥梁,但无济于事。
那只是因为 SourcePollingChannelAdapterFactoryBean
幕后:
if (this.pollerMetadata.getMaxMessagesPerPoll() == Integer.MIN_VALUE){
// the default is 1 since a source might return
// a non-null and non-interruptible value every time it is invoked
this.pollerMetadata.setMaxMessagesPerPoll(1);
}
因此,每个 .fixedDelay(fileSystemPollDelay)
只有一个 File
从队列中轮询以进行处理。
因此,只需将 .maxMessagesPerPoll()
增加到适合您的系统值的某个值,并享受并发性!
顺便说一句,没有理由在轮询适配器之后引入ExecutorChannel
。完全出于相同的并发原因,您可以将 .taskExecutor()
用于 .poller()
。