Spring 集成 ftp 适配器和批处理
Spring integration ftp adapter and batch processing
我需要轮询 FTP 服务器并处理新的或更改的文件。我将 Spring Integration 5.3.2 与入站 FTP 适配器和固定速率为 5 秒的轮询器一起使用。所有文件立即下载到本地目录,但集成流底层处理程序每 5 秒为每个文件调用一次。我想在并发线程中立即处理下载的文件列表,但在流程结束后每 5 秒轮询一次 ftp。我该怎么做?
@Bean
fun ftpInboundFlow(): IntegrationFlow {
return IntegrationFlows.from(Ftp.inboundAdapter(ftpSessionFactory())
.preserveTimestamp(true)
.maxFetchSize(ftpProperties.maxFetchSize)
.remoteDirectory(ftpProperties.remoteDirectory)
.localDirectory(File(ftpProperties.downloadDirectory))
.filter(FtpPersistentAcceptOnceFileListFilter(SimpleMetadataStore(), "ftp-"))
.regexFilter(".*\.zip$")
) { e -> e.poller(Pollers.fixedRate(Duration.ofSeconds(5))) }
.channel(MessageChannels.executor(Executors.newWorkStealingPool()))
.transform(unZipTransformer())
.handle { m -> LOGGER.info("Unzipped {}", m.headers[FileHeaders.FILENAME]) }
.get()
}
设置maxMessagesPerPoll
- 默认为1; -1 表示无穷大。
Pollers.fixedRate(Duration.ofSeconds(5)).maxMessagesPerPoll(-1)
听起来更像是您想使用 FtpOutboundGateway
中的 MGET
命令。因此,您仍然可以使用带有空字符串负载的普通轮询器来发送,例如 IntegrationFlows.from(() -> "")
和所需的轮询器选项。
使用 MGET
命令对远程目录调用 Ftp.outboundGateway()
会给你一个 List<File>
作为回复消息。因此,您可以自由地批量处理文件。
有关详细信息,请参阅文档:https://docs.spring.io/spring-integration/docs/5.3.2.RELEASE/reference/html/ftp.html#ftp-outbound-gateway
我需要轮询 FTP 服务器并处理新的或更改的文件。我将 Spring Integration 5.3.2 与入站 FTP 适配器和固定速率为 5 秒的轮询器一起使用。所有文件立即下载到本地目录,但集成流底层处理程序每 5 秒为每个文件调用一次。我想在并发线程中立即处理下载的文件列表,但在流程结束后每 5 秒轮询一次 ftp。我该怎么做?
@Bean
fun ftpInboundFlow(): IntegrationFlow {
return IntegrationFlows.from(Ftp.inboundAdapter(ftpSessionFactory())
.preserveTimestamp(true)
.maxFetchSize(ftpProperties.maxFetchSize)
.remoteDirectory(ftpProperties.remoteDirectory)
.localDirectory(File(ftpProperties.downloadDirectory))
.filter(FtpPersistentAcceptOnceFileListFilter(SimpleMetadataStore(), "ftp-"))
.regexFilter(".*\.zip$")
) { e -> e.poller(Pollers.fixedRate(Duration.ofSeconds(5))) }
.channel(MessageChannels.executor(Executors.newWorkStealingPool()))
.transform(unZipTransformer())
.handle { m -> LOGGER.info("Unzipped {}", m.headers[FileHeaders.FILENAME]) }
.get()
}
设置maxMessagesPerPoll
- 默认为1; -1 表示无穷大。
Pollers.fixedRate(Duration.ofSeconds(5)).maxMessagesPerPoll(-1)
听起来更像是您想使用 FtpOutboundGateway
中的 MGET
命令。因此,您仍然可以使用带有空字符串负载的普通轮询器来发送,例如 IntegrationFlows.from(() -> "")
和所需的轮询器选项。
使用 MGET
命令对远程目录调用 Ftp.outboundGateway()
会给你一个 List<File>
作为回复消息。因此,您可以自由地批量处理文件。
有关详细信息,请参阅文档:https://docs.spring.io/spring-integration/docs/5.3.2.RELEASE/reference/html/ftp.html#ftp-outbound-gateway