使用 Spring 集成中的 Java 配置从 SFTP 复制和移动文件

Copy and move a file from SFTP using Java config in Spring Integration

我是 Spring 集成的新手。我有这个要求,首先将文件从 /files 文件夹移动到 SFTP 位置的 /process 文件夹,然后将该文件复制到本地。 建议我使用网关,配置必须在 java 中使用注释。 我曾尝试在 Whosebug 上寻找答案,但找不到相关内容。 但是,我能够使用@InboundChannelAdapter 并通过配置其他 bean 来复制文件。

下面是我目前写的代码

配置 public class SftpConfiguration {

@Value("${ftp.file.host}")
private String host;

@Value("${ftp.file.port")
private int port;

@Value("${ftp.file.user")
private String user;

@Value("${ftp.file.password")
private String password;

@Value("${directorry.file.remote")
private String remoteDir;

@Value("${directorry.file.in.pollerDelay")
final private String pollerDelay = "1000";

@Value("${directory.file.remote.move}")
private String toMoveDirectory;

@Bean
public SessionFactory<LsEntry> sftpSessionFactory() {
    DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
    factory.setHost(host);
    factory.setPort(port);
    factory.setUser(user);
    factory.setPassword(password);
    factory.setAllowUnknownKeys(true);
    return new CachingSessionFactory<LsEntry>(factory);
}

@Bean
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
    SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
    fileSynchronizer.setDeleteRemoteFiles(false);
    fileSynchronizer.setRemoteDirectory(remoteDir);
    fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter("*.xlsx"));
    return fileSynchronizer;
}

@Bean
@InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(fixedDelay = pollerDelay))
public MessageSource<File> sftpMessageSource() {
    SftpInboundFileSynchronizingMessageSource source = new SftpInboundFileSynchronizingMessageSource(
            sftpInboundFileSynchronizer());
    source.setLocalDirectory(new File("ftp-inbound"));
    source.setAutoCreateLocalDirectory(true);
    source.setLocalFilter(new AcceptOnceFileListFilter<File>());

    return source;
}

@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler handler() {
    return new MessageHandler() {

        @Override
        public void handleMessage(Message<?> message) throws MessagingException {
            try {
                new FtpOrderRequestHandler().handle((File) message.getPayload());
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

    };
}

@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler handlerOut() {
    return new SftpOutboundGateway(sftpSessionFactory(), "mv", toMoveDirectory);
}

}

我将不胜感激任何提示或建议。 谢谢

对了,你需要使用@Bean@InboundChannelAdapterSftpInboundFileSynchronizingMessageSource从远程/process文件夹下载文件到本地目录。这是在轮询器的基础上完成的,是一个单独的过程,与移动操作完全无关。您可以通过 FtpOutboundGateway 使用 MV 命令执行该移动逻辑:

The mv command has no options.

The expression attribute defines the "from" path and the rename-expression attribute defines the "to" path. By default, the rename-expression is headers['file_renameTo']. This expression must not evaluate to null, or an empty String. If necessary, any remote directories needed will be created. The payload of the result message is Boolean.TRUE. The original remote directory is provided in the file_remoteDirectory header, and the filename is provided in the file_remoteFile header. The new path is in the file_renameTo header.

这个你必须通过以下方式使用:

@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler handler() {
    return new SftpOutboundGateway(ftpSessionFactory(), "mv", "'my_remote_dir/my_file'");
}

我试过了,下面的对我有用。

@Configuration
@EnableIntegration
public class SftpConfiguration {

    @Value("${config.adapter.ftp.host}")
    private String host;

    @Value("${config.adapter.ftp.port}")
    private int port;

    @Value("${config.adapter.ftp.user}")
    private String user;

    @Value("${config.adapter.ftp.password}")
    private String password;

    @Autowired
    private FtpOrderRequestHandler handler;

    @Autowired
    ConfigurableApplicationContext context;

    @Value("${config.adapter.ftp.excel.sourceFolder}")
    private String sourceDir;

    @Value("${config.adapter.ftp.excel.localFolder}")
    private String localDir;

    @Value("${config.adapter.ftp.excel.backupFolder}")
    private String backupDir;

    @Value("${config.adapter.ftp.statusExcel.destinationFolder}")
    private String statusDest;

    private static final Logger LOGGER = LoggerFactory.getLogger(SftpConfiguration.class);


    @Bean
    public SessionFactory<LsEntry> sftpSessionFactory() {
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
        factory.setHost(host);
        factory.setPort(port);
        factory.setUser(user);
        factory.setPassword(password);
        factory.setAllowUnknownKeys(true);
        return new CachingSessionFactory<LsEntry>(factory);
    }

    @Bean
    public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
        SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
        fileSynchronizer.setDeleteRemoteFiles(true);
        fileSynchronizer.setRemoteDirectory(sourceDir);
        fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter("*.xlsx"));
        return fileSynchronizer;
    }

    @Bean
    @InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(cron = "${config.cron.cataligent.order}"), autoStartup = "true")
    public MessageSource<File> sftpMessageSource() {
        SftpInboundFileSynchronizingMessageSource source = new SftpInboundFileSynchronizingMessageSource(
                sftpInboundFileSynchronizer());
        source.setLocalDirectory(new File(localDir));
        source.setAutoCreateLocalDirectory(true);
        source.setLocalFilter(new AcceptOnceFileListFilter<File>());
        return source;
    }


    @Bean
    @ServiceActivator(inputChannel = "sftpChannel")
    public MessageHandler handlerOrder() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                try {
                    SFTPGateway gateway = context.getBean(SFTPGateway.class);
                    final File orderFile = (File) message.getPayload();
                    gateway.sendToSftp(orderFile);
                    LOGGER.debug("{} is picked by scheduler and moved to {} folder",orderFile.getName(),backupDir);
                    handler.handle((File) message.getPayload());
                    handler.deleteFileFromLocal((File) message.getPayload());
                    LOGGER.info("Processing of file {} is completed",orderFile.getName());
                } catch (IOException e) {
                    LOGGER.error("Error occurred while processing order file exception is {}",e.getMessage());
                }
            }

        };
    }

    @Bean
    @ServiceActivator(inputChannel = "sftpChannelDest")
    public MessageHandler handlerOrderBackUp() {
        SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
        handler.setRemoteDirectoryExpression(new LiteralExpression(backupDir));
        return handler;
    }

    @Bean
    @ServiceActivator(inputChannel = "sftpChannelStatus")
    public MessageHandler handlerOrderStatusUS() {
        SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
        handler.setRemoteDirectoryExpression(new LiteralExpression(statusDest));
        return handler;
    }



    @MessagingGateway
    public interface SFTPGateway {
        @Gateway(requestChannel = "sftpChannelDest")
        void sendToSftp(File file);

        @Gateway(requestChannel = "sftpChannelStatus")
        void sendToSftpOrderStatus(File file);

    }

}

我用它从 SFTP 位置获取文件。保存在本地。然后将文件移动到备份文件夹,然后处理整个文件。处理后从本地删除文件。

我使用网关(@MessagingGateway) 进行文件移动,我在该界面中有两种不同的方法。一种用于将同一文件移动到备份文件夹,另一种方法是将我的代码中的另一个文件(状态文件)移动到所需的 SFTP 位置。为了更好地理解,我尽量保持代码整洁。