Spring 集成 ftp 适配器和批处理

Spring integration ftp adapter and batch processing

我需要轮询 FTP 服务器并处理新的或更改的文件。我将 Spring Integration 5.3.2 与入站 FTP 适配器和固定速率为 5 秒的轮询器一起使用。所有文件立即下载到本地目录,但集成流底层处理程序每​​ 5 秒为每个文件调用一次。我想在并发线程中立即处理下载的文件列表,但在流程结束后每 5 秒轮询一次 ftp。我该怎么做?

@Bean
fun ftpInboundFlow(): IntegrationFlow {
    return IntegrationFlows.from(Ftp.inboundAdapter(ftpSessionFactory())
            .preserveTimestamp(true)
            .maxFetchSize(ftpProperties.maxFetchSize)
            .remoteDirectory(ftpProperties.remoteDirectory)
            .localDirectory(File(ftpProperties.downloadDirectory))
            .filter(FtpPersistentAcceptOnceFileListFilter(SimpleMetadataStore(), "ftp-"))
            .regexFilter(".*\.zip$")
    ) { e -> e.poller(Pollers.fixedRate(Duration.ofSeconds(5))) }
            .channel(MessageChannels.executor(Executors.newWorkStealingPool()))
            .transform(unZipTransformer())
            .handle { m -> LOGGER.info("Unzipped {}", m.headers[FileHeaders.FILENAME]) }
            .get()
}

设置maxMessagesPerPoll - 默认为1; -1 表示无穷大。

Pollers.fixedRate(Duration.ofSeconds(5)).maxMessagesPerPoll(-1)

听起来更像是您想使用 FtpOutboundGateway 中的 MGET 命令。因此,您仍然可以使用带有空字符串负载的普通轮询器来发送,例如 IntegrationFlows.from(() -> "") 和所需的轮询器选项。

使用 MGET 命令对远程目录调用 Ftp.outboundGateway() 会给你一个 List<File> 作为回复消息。因此,您可以自由地批量处理文件。

有关详细信息,请参阅文档:https://docs.spring.io/spring-integration/docs/5.3.2.RELEASE/reference/html/ftp.html#ftp-outbound-gateway