使用 Spring 集成中的 Java 配置从 SFTP 复制和移动文件
Copy and move a file from SFTP using Java config in Spring Integration
我是 Spring 集成的新手。我有这个要求,首先将文件从 /files 文件夹移动到 SFTP 位置的 /process 文件夹,然后将该文件复制到本地。
建议我使用网关,配置必须在 java 中使用注释。
我曾尝试在 Whosebug 上寻找答案,但找不到相关内容。
但是,我能够使用@InboundChannelAdapter 并通过配置其他 bean 来复制文件。
下面是我目前写的代码
配置
public class SftpConfiguration {
@Value("${ftp.file.host}")
private String host;
@Value("${ftp.file.port")
private int port;
@Value("${ftp.file.user")
private String user;
@Value("${ftp.file.password")
private String password;
@Value("${directorry.file.remote")
private String remoteDir;
@Value("${directorry.file.in.pollerDelay")
final private String pollerDelay = "1000";
@Value("${directory.file.remote.move}")
private String toMoveDirectory;
@Bean
public SessionFactory<LsEntry> sftpSessionFactory() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost(host);
factory.setPort(port);
factory.setUser(user);
factory.setPassword(password);
factory.setAllowUnknownKeys(true);
return new CachingSessionFactory<LsEntry>(factory);
}
@Bean
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
fileSynchronizer.setDeleteRemoteFiles(false);
fileSynchronizer.setRemoteDirectory(remoteDir);
fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter("*.xlsx"));
return fileSynchronizer;
}
@Bean
@InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(fixedDelay = pollerDelay))
public MessageSource<File> sftpMessageSource() {
SftpInboundFileSynchronizingMessageSource source = new SftpInboundFileSynchronizingMessageSource(
sftpInboundFileSynchronizer());
source.setLocalDirectory(new File("ftp-inbound"));
source.setAutoCreateLocalDirectory(true);
source.setLocalFilter(new AcceptOnceFileListFilter<File>());
return source;
}
@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
try {
new FtpOrderRequestHandler().handle((File) message.getPayload());
} catch (IOException e) {
e.printStackTrace();
}
}
};
}
@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler handlerOut() {
return new SftpOutboundGateway(sftpSessionFactory(), "mv", toMoveDirectory);
}
}
我将不胜感激任何提示或建议。
谢谢
对了,你需要使用@Bean
和@InboundChannelAdapter
为SftpInboundFileSynchronizingMessageSource
从远程/process
文件夹下载文件到本地目录。这是在轮询器的基础上完成的,是一个单独的过程,与移动操作完全无关。您可以通过 FtpOutboundGateway
使用 MV
命令执行该移动逻辑:
The mv command has no options.
The expression attribute defines the "from" path and the rename-expression attribute defines the "to" path.
By default, the rename-expression is headers['file_renameTo']
.
This expression must not evaluate to null, or an empty String
.
If necessary, any remote directories needed will be created.
The payload of the result message is Boolean.TRUE
.
The original remote directory is provided in the file_remoteDirectory
header, and the filename is provided in the file_remoteFile
header.
The new path is in the file_renameTo
header.
这个你必须通过以下方式使用:
@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler handler() {
return new SftpOutboundGateway(ftpSessionFactory(), "mv", "'my_remote_dir/my_file'");
}
我试过了,下面的对我有用。
@Configuration
@EnableIntegration
public class SftpConfiguration {
@Value("${config.adapter.ftp.host}")
private String host;
@Value("${config.adapter.ftp.port}")
private int port;
@Value("${config.adapter.ftp.user}")
private String user;
@Value("${config.adapter.ftp.password}")
private String password;
@Autowired
private FtpOrderRequestHandler handler;
@Autowired
ConfigurableApplicationContext context;
@Value("${config.adapter.ftp.excel.sourceFolder}")
private String sourceDir;
@Value("${config.adapter.ftp.excel.localFolder}")
private String localDir;
@Value("${config.adapter.ftp.excel.backupFolder}")
private String backupDir;
@Value("${config.adapter.ftp.statusExcel.destinationFolder}")
private String statusDest;
private static final Logger LOGGER = LoggerFactory.getLogger(SftpConfiguration.class);
@Bean
public SessionFactory<LsEntry> sftpSessionFactory() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost(host);
factory.setPort(port);
factory.setUser(user);
factory.setPassword(password);
factory.setAllowUnknownKeys(true);
return new CachingSessionFactory<LsEntry>(factory);
}
@Bean
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
fileSynchronizer.setDeleteRemoteFiles(true);
fileSynchronizer.setRemoteDirectory(sourceDir);
fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter("*.xlsx"));
return fileSynchronizer;
}
@Bean
@InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(cron = "${config.cron.cataligent.order}"), autoStartup = "true")
public MessageSource<File> sftpMessageSource() {
SftpInboundFileSynchronizingMessageSource source = new SftpInboundFileSynchronizingMessageSource(
sftpInboundFileSynchronizer());
source.setLocalDirectory(new File(localDir));
source.setAutoCreateLocalDirectory(true);
source.setLocalFilter(new AcceptOnceFileListFilter<File>());
return source;
}
@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler handlerOrder() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
try {
SFTPGateway gateway = context.getBean(SFTPGateway.class);
final File orderFile = (File) message.getPayload();
gateway.sendToSftp(orderFile);
LOGGER.debug("{} is picked by scheduler and moved to {} folder",orderFile.getName(),backupDir);
handler.handle((File) message.getPayload());
handler.deleteFileFromLocal((File) message.getPayload());
LOGGER.info("Processing of file {} is completed",orderFile.getName());
} catch (IOException e) {
LOGGER.error("Error occurred while processing order file exception is {}",e.getMessage());
}
}
};
}
@Bean
@ServiceActivator(inputChannel = "sftpChannelDest")
public MessageHandler handlerOrderBackUp() {
SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
handler.setRemoteDirectoryExpression(new LiteralExpression(backupDir));
return handler;
}
@Bean
@ServiceActivator(inputChannel = "sftpChannelStatus")
public MessageHandler handlerOrderStatusUS() {
SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
handler.setRemoteDirectoryExpression(new LiteralExpression(statusDest));
return handler;
}
@MessagingGateway
public interface SFTPGateway {
@Gateway(requestChannel = "sftpChannelDest")
void sendToSftp(File file);
@Gateway(requestChannel = "sftpChannelStatus")
void sendToSftpOrderStatus(File file);
}
}
我用它从 SFTP 位置获取文件。保存在本地。然后将文件移动到备份文件夹,然后处理整个文件。处理后从本地删除文件。
我使用网关(@MessagingGateway) 进行文件移动,我在该界面中有两种不同的方法。一种用于将同一文件移动到备份文件夹,另一种方法是将我的代码中的另一个文件(状态文件)移动到所需的 SFTP 位置。为了更好地理解,我尽量保持代码整洁。
我是 Spring 集成的新手。我有这个要求,首先将文件从 /files 文件夹移动到 SFTP 位置的 /process 文件夹,然后将该文件复制到本地。 建议我使用网关,配置必须在 java 中使用注释。 我曾尝试在 Whosebug 上寻找答案,但找不到相关内容。 但是,我能够使用@InboundChannelAdapter 并通过配置其他 bean 来复制文件。
下面是我目前写的代码
配置 public class SftpConfiguration {
@Value("${ftp.file.host}")
private String host;
@Value("${ftp.file.port")
private int port;
@Value("${ftp.file.user")
private String user;
@Value("${ftp.file.password")
private String password;
@Value("${directorry.file.remote")
private String remoteDir;
@Value("${directorry.file.in.pollerDelay")
final private String pollerDelay = "1000";
@Value("${directory.file.remote.move}")
private String toMoveDirectory;
@Bean
public SessionFactory<LsEntry> sftpSessionFactory() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost(host);
factory.setPort(port);
factory.setUser(user);
factory.setPassword(password);
factory.setAllowUnknownKeys(true);
return new CachingSessionFactory<LsEntry>(factory);
}
@Bean
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
fileSynchronizer.setDeleteRemoteFiles(false);
fileSynchronizer.setRemoteDirectory(remoteDir);
fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter("*.xlsx"));
return fileSynchronizer;
}
@Bean
@InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(fixedDelay = pollerDelay))
public MessageSource<File> sftpMessageSource() {
SftpInboundFileSynchronizingMessageSource source = new SftpInboundFileSynchronizingMessageSource(
sftpInboundFileSynchronizer());
source.setLocalDirectory(new File("ftp-inbound"));
source.setAutoCreateLocalDirectory(true);
source.setLocalFilter(new AcceptOnceFileListFilter<File>());
return source;
}
@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
try {
new FtpOrderRequestHandler().handle((File) message.getPayload());
} catch (IOException e) {
e.printStackTrace();
}
}
};
}
@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler handlerOut() {
return new SftpOutboundGateway(sftpSessionFactory(), "mv", toMoveDirectory);
}
}
我将不胜感激任何提示或建议。 谢谢
对了,你需要使用@Bean
和@InboundChannelAdapter
为SftpInboundFileSynchronizingMessageSource
从远程/process
文件夹下载文件到本地目录。这是在轮询器的基础上完成的,是一个单独的过程,与移动操作完全无关。您可以通过 FtpOutboundGateway
使用 MV
命令执行该移动逻辑:
The mv command has no options.
The expression attribute defines the "from" path and the rename-expression attribute defines the "to" path. By default, the rename-expression is
headers['file_renameTo']
. This expression must not evaluate to null, or an emptyString
. If necessary, any remote directories needed will be created. The payload of the result message isBoolean.TRUE
. The original remote directory is provided in thefile_remoteDirectory
header, and the filename is provided in thefile_remoteFile
header. The new path is in thefile_renameTo
header.
这个你必须通过以下方式使用:
@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler handler() {
return new SftpOutboundGateway(ftpSessionFactory(), "mv", "'my_remote_dir/my_file'");
}
我试过了,下面的对我有用。
@Configuration
@EnableIntegration
public class SftpConfiguration {
@Value("${config.adapter.ftp.host}")
private String host;
@Value("${config.adapter.ftp.port}")
private int port;
@Value("${config.adapter.ftp.user}")
private String user;
@Value("${config.adapter.ftp.password}")
private String password;
@Autowired
private FtpOrderRequestHandler handler;
@Autowired
ConfigurableApplicationContext context;
@Value("${config.adapter.ftp.excel.sourceFolder}")
private String sourceDir;
@Value("${config.adapter.ftp.excel.localFolder}")
private String localDir;
@Value("${config.adapter.ftp.excel.backupFolder}")
private String backupDir;
@Value("${config.adapter.ftp.statusExcel.destinationFolder}")
private String statusDest;
private static final Logger LOGGER = LoggerFactory.getLogger(SftpConfiguration.class);
@Bean
public SessionFactory<LsEntry> sftpSessionFactory() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost(host);
factory.setPort(port);
factory.setUser(user);
factory.setPassword(password);
factory.setAllowUnknownKeys(true);
return new CachingSessionFactory<LsEntry>(factory);
}
@Bean
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
fileSynchronizer.setDeleteRemoteFiles(true);
fileSynchronizer.setRemoteDirectory(sourceDir);
fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter("*.xlsx"));
return fileSynchronizer;
}
@Bean
@InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(cron = "${config.cron.cataligent.order}"), autoStartup = "true")
public MessageSource<File> sftpMessageSource() {
SftpInboundFileSynchronizingMessageSource source = new SftpInboundFileSynchronizingMessageSource(
sftpInboundFileSynchronizer());
source.setLocalDirectory(new File(localDir));
source.setAutoCreateLocalDirectory(true);
source.setLocalFilter(new AcceptOnceFileListFilter<File>());
return source;
}
@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler handlerOrder() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
try {
SFTPGateway gateway = context.getBean(SFTPGateway.class);
final File orderFile = (File) message.getPayload();
gateway.sendToSftp(orderFile);
LOGGER.debug("{} is picked by scheduler and moved to {} folder",orderFile.getName(),backupDir);
handler.handle((File) message.getPayload());
handler.deleteFileFromLocal((File) message.getPayload());
LOGGER.info("Processing of file {} is completed",orderFile.getName());
} catch (IOException e) {
LOGGER.error("Error occurred while processing order file exception is {}",e.getMessage());
}
}
};
}
@Bean
@ServiceActivator(inputChannel = "sftpChannelDest")
public MessageHandler handlerOrderBackUp() {
SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
handler.setRemoteDirectoryExpression(new LiteralExpression(backupDir));
return handler;
}
@Bean
@ServiceActivator(inputChannel = "sftpChannelStatus")
public MessageHandler handlerOrderStatusUS() {
SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
handler.setRemoteDirectoryExpression(new LiteralExpression(statusDest));
return handler;
}
@MessagingGateway
public interface SFTPGateway {
@Gateway(requestChannel = "sftpChannelDest")
void sendToSftp(File file);
@Gateway(requestChannel = "sftpChannelStatus")
void sendToSftpOrderStatus(File file);
}
}
我用它从 SFTP 位置获取文件。保存在本地。然后将文件移动到备份文件夹,然后处理整个文件。处理后从本地删除文件。
我使用网关(@MessagingGateway) 进行文件移动,我在该界面中有两种不同的方法。一种用于将同一文件移动到备份文件夹,另一种方法是将我的代码中的另一个文件(状态文件)移动到所需的 SFTP 位置。为了更好地理解,我尽量保持代码整洁。