多个主机未拾取相同文件
same file not picked up by multiple hosts
我有一个应用程序托管多个主机侦听单个远程 SFTP 位置。我应该如何确保相同的文件没有被其他已经被其他人拾取的主机拾取?我对 spring 集成还很陌生。感谢有人可以分享例子
}
编辑:
这是我的集成流程,从 sftp 获取文件并放置在本地目录中,在转换器中执行业务逻辑并返回文件并将其发送到远程 sftp。
@Bean
public SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory() {
LOGGER.debug(" Creating SFTP Session Factory -Start");
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost(sftpHost);
factory.setUser(sftpUser);
factory.setPort(port);
factory.setPassword(sftpPassword);
factory.setAllowUnknownKeys(true);
return new CachingSessionFactory<>(factory);
}
@Bean
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
fileSynchronizer.setDeleteRemoteFiles(true);
fileSynchronizer.setRemoteDirectory(sftpInboundDirectory);
fileSynchronizer.setFilter(new SftpPersistentAcceptOnceFileListFilter(store(), "*.json"));
return fileSynchronizer;
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setTrigger(new PeriodicTrigger(5000));
return pollerMetadata;
}
@Bean
@InboundChannelAdapter(channel = "fileInputChannel", poller = @Poller(fixedDelay = "5000"))
public MessageSource<File> sftpMessageSource() {
SftpInboundFileSynchronizingMessageSource source =
new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());
source.setLocalDirectory(localDirectory);
source.setAutoCreateLocalDirectory(true);
source.setLocalFilter(new AcceptOnceFileListFilter<File>());
source.setMaxFetchSize(1);
return source;
}
@Bean
IntegrationFlow integrationFlow() {
return IntegrationFlows.from(this.sftpMessageSource()).channel(fileInputChannel()).
transform(this::messageTransformer).channel(fileOutputChannel()).
handle(orderOutMessageHandler()).get();
}
@Bean
@ServiceActivator(inputChannel = "fileOutputChannel")
public SftpMessageHandler orderOutMessageHandler() {
SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
LOGGER.debug(" Creating SFTP MessageHandler - Start ");
handler.setRemoteDirectoryExpression(new LiteralExpression(sftpOutboundDirectory));
handler.setFileNameGenerator(new FileNameGenerator() {
@Override
public String generateFileName(Message<?> message) {
if (message.getPayload() instanceof File) {
return ((File) message.getPayload()).getName();
} else {
throw new IllegalArgumentException("Expected Input is File.");
}
}
});
LOGGER.debug(" Creating SFTP MessageHandler - End ");
return handler;
}
@Bean
@org.springframework.integration.annotation.Transformer(inputChannel = "fileInputChannel", outputChannel = "fileOutputChannel")
public Transformer messageTransformer() {
return message -> {
File file=orderTransformer.transformInboundMessage(message);
return (Message<?>) file;
};
}
@Bean
public ConcurrentMetadataStore store() {
return new SimpleMetadataStore(hazelcastInstance().getMap("idempotentReceiverMetadataStore"));
}
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance(new Config().setProperty("hazelcast.logging.type", "slf4j"));
参见 SftpPersistentAcceptOnceFileListFilter
注入 SFTP 入站通道适配器。这个必须提供一个基于共享数据库的MetadataStore
。
在文档中查看更多信息:
https://docs.spring.io/spring-integration/docs/current/reference/html/sftp.html#sftp-inbound
我有一个应用程序托管多个主机侦听单个远程 SFTP 位置。我应该如何确保相同的文件没有被其他已经被其他人拾取的主机拾取?我对 spring 集成还很陌生。感谢有人可以分享例子
}
编辑: 这是我的集成流程,从 sftp 获取文件并放置在本地目录中,在转换器中执行业务逻辑并返回文件并将其发送到远程 sftp。
@Bean
public SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory() {
LOGGER.debug(" Creating SFTP Session Factory -Start");
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost(sftpHost);
factory.setUser(sftpUser);
factory.setPort(port);
factory.setPassword(sftpPassword);
factory.setAllowUnknownKeys(true);
return new CachingSessionFactory<>(factory);
}
@Bean
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
fileSynchronizer.setDeleteRemoteFiles(true);
fileSynchronizer.setRemoteDirectory(sftpInboundDirectory);
fileSynchronizer.setFilter(new SftpPersistentAcceptOnceFileListFilter(store(), "*.json"));
return fileSynchronizer;
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setTrigger(new PeriodicTrigger(5000));
return pollerMetadata;
}
@Bean
@InboundChannelAdapter(channel = "fileInputChannel", poller = @Poller(fixedDelay = "5000"))
public MessageSource<File> sftpMessageSource() {
SftpInboundFileSynchronizingMessageSource source =
new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());
source.setLocalDirectory(localDirectory);
source.setAutoCreateLocalDirectory(true);
source.setLocalFilter(new AcceptOnceFileListFilter<File>());
source.setMaxFetchSize(1);
return source;
}
@Bean
IntegrationFlow integrationFlow() {
return IntegrationFlows.from(this.sftpMessageSource()).channel(fileInputChannel()).
transform(this::messageTransformer).channel(fileOutputChannel()).
handle(orderOutMessageHandler()).get();
}
@Bean
@ServiceActivator(inputChannel = "fileOutputChannel")
public SftpMessageHandler orderOutMessageHandler() {
SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
LOGGER.debug(" Creating SFTP MessageHandler - Start ");
handler.setRemoteDirectoryExpression(new LiteralExpression(sftpOutboundDirectory));
handler.setFileNameGenerator(new FileNameGenerator() {
@Override
public String generateFileName(Message<?> message) {
if (message.getPayload() instanceof File) {
return ((File) message.getPayload()).getName();
} else {
throw new IllegalArgumentException("Expected Input is File.");
}
}
});
LOGGER.debug(" Creating SFTP MessageHandler - End ");
return handler;
}
@Bean
@org.springframework.integration.annotation.Transformer(inputChannel = "fileInputChannel", outputChannel = "fileOutputChannel")
public Transformer messageTransformer() {
return message -> {
File file=orderTransformer.transformInboundMessage(message);
return (Message<?>) file;
};
}
@Bean
public ConcurrentMetadataStore store() {
return new SimpleMetadataStore(hazelcastInstance().getMap("idempotentReceiverMetadataStore"));
}
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance(new Config().setProperty("hazelcast.logging.type", "slf4j"));
参见 SftpPersistentAcceptOnceFileListFilter
注入 SFTP 入站通道适配器。这个必须提供一个基于共享数据库的MetadataStore
。
在文档中查看更多信息:
https://docs.spring.io/spring-integration/docs/current/reference/html/sftp.html#sftp-inbound