Spring 集成 - 处理陈旧的 sftp 会话
Spring Integration - Handling stale sftp sessions
我实现了以下场景:
- 一个 queueChannel 以 byte[] 的形式保存消息
- 一个 MessageHandler,轮询队列通道并通过 sftp 上传文件
- 一个 Transformer,侦听 errorChannel 并将从失败消息中提取的有效负载发送回 queueChannel(被认为是处理失败消息的错误处理程序,因此不会丢失任何内容)
如果 sftp 服务器在线,则一切正常。
如果 sftp 服务器关闭,则作为转换器到达的错误消息是:
org.springframework.messaging.MessagingException: Failed to obtain pooled item; nested exception is java.lang.IllegalStateException: failed to create SFTP Session
转换器对此无能为力,因为有效负载的 failedMessage 为 null 并且本身会抛出异常。变压器丢失消息。
如何配置我的流程,使转换程序获得正确的消息以及未成功上传文件的相应负载?
我的配置:
@Bean
public MessageChannel toSftpChannel() {
final QueueChannel channel = new QueueChannel();
channel.setLoggingEnabled(true);
return new QueueChannel();
}
@Bean
public MessageChannel toSplitter() {
return new PublishSubscribeChannel();
}
@Bean
@ServiceActivator(inputChannel = "toSftpChannel", poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1"))
public MessageHandler handler() {
final SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
handler.setRemoteDirectoryExpression(new LiteralExpression(sftpRemoteDirectory));
handler.setFileNameGenerator(message -> {
if (message.getPayload() instanceof byte[]) {
return (String) message.getHeaders().get("name");
} else {
throw new IllegalArgumentException("byte[] expected in Payload!");
}
});
return handler;
}
@Bean
public SessionFactory<LsEntry> sftpSessionFactory() {
final DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
final Properties jschProps = new Properties();
jschProps.put("StrictHostKeyChecking", "no");
jschProps.put("PreferredAuthentications", "publickey,password");
factory.setSessionConfig(jschProps);
factory.setHost(sftpHost);
factory.setPort(sftpPort);
factory.setUser(sftpUser);
if (sftpPrivateKey != null) {
factory.setPrivateKey(sftpPrivateKey);
factory.setPrivateKeyPassphrase(sftpPrivateKeyPassphrase);
} else {
factory.setPassword(sftpPasword);
}
factory.setAllowUnknownKeys(true);
return new CachingSessionFactory<>(factory);
}
@Bean
@Splitter(inputChannel = "toSplitter")
public DmsDocumentMessageSplitter splitter() {
final DmsDocumentMessageSplitter splitter = new DmsDocumentMessageSplitter();
splitter.setOutputChannelName("toSftpChannel");
return splitter;
}
@Transformer(inputChannel = "errorChannel", outputChannel = "toSftpChannel")
public Message<?> errorChannelHandler(ErrorMessage errorMessage) throws RuntimeException {
Message<?> failedMessage = ((MessagingException) errorMessage.getPayload())
.getFailedMessage();
return MessageBuilder.withPayload(failedMessage)
.copyHeadersIfAbsent(failedMessage.getHeaders())
.build();
}
@MessagingGateway
public interface UploadGateway {
@Gateway(requestChannel = "toSplitter")
void upload(@Payload List<byte[]> payload, @Header("header") DmsDocumentUploadRequestHeader header);
}
谢谢..
更新
@Bean(PollerMetadata.DEFAULT_POLLER)
@Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.READ_COMMITTED)
PollerMetadata poller() {
return Pollers
.fixedRate(5000)
.maxMessagesPerPoll(1)
.receiveTimeout(500)
.taskExecutor(taskExecutor())
.transactionSynchronizationFactory(transactionSynchronizationFactory())
.get();
}
@Bean
@ServiceActivator(inputChannel = "toMessageStore", poller = @Poller(PollerMetadata.DEFAULT_POLLER))
public BridgeHandler bridge() {
BridgeHandler bridgeHandler = new BridgeHandler();
bridgeHandler.setOutputChannelName("toSftpChannel");
return bridgeHandler;
}
null
failedMessage
是一个错误;转载INT-4421.
我不建议在这种情况下使用 QueueChannel
。如果您使用直接渠道,您可以配置 retry advice 来尝试重新投递。当重试次数耗尽时(如果这样配置),异常将被抛回给调用线程。
将建议添加到 SftpMessageHandler
的 adviceChain
属性。
编辑
您可以通过在可轮询通道和 sftp 适配器之间插入桥接来解决 "missing" 失败消息:
@Bean
@ServiceActivator(inputChannel = "toSftpChannel", poller = @Poller(fixedDelay = "5000", maxMessagesPerPoll = "1"))
public BridgeHandler bridge() {
BridgeHandler bridgeHandler = new BridgeHandler();
bridgeHandler.setOutputChannelName("toRealSftpChannel");
return bridgeHandler;
}
@Bean
@ServiceActivator(inputChannel = "toRealSftpChannel")
public MessageHandler handler() {
final SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
handler.setRemoteDirectoryExpression(new LiteralExpression("foo"));
handler.setFileNameGenerator(message -> {
if (message.getPayload() instanceof byte[]) {
return (String) message.getHeaders().get("name");
}
else {
throw new IllegalArgumentException("byte[] expected in Payload!");
}
});
return handler;
}
我实现了以下场景:
- 一个 queueChannel 以 byte[] 的形式保存消息
- 一个 MessageHandler,轮询队列通道并通过 sftp 上传文件
- 一个 Transformer,侦听 errorChannel 并将从失败消息中提取的有效负载发送回 queueChannel(被认为是处理失败消息的错误处理程序,因此不会丢失任何内容)
如果 sftp 服务器在线,则一切正常。
如果 sftp 服务器关闭,则作为转换器到达的错误消息是:
org.springframework.messaging.MessagingException: Failed to obtain pooled item; nested exception is java.lang.IllegalStateException: failed to create SFTP Session
转换器对此无能为力,因为有效负载的 failedMessage 为 null 并且本身会抛出异常。变压器丢失消息。
如何配置我的流程,使转换程序获得正确的消息以及未成功上传文件的相应负载?
我的配置:
@Bean
public MessageChannel toSftpChannel() {
final QueueChannel channel = new QueueChannel();
channel.setLoggingEnabled(true);
return new QueueChannel();
}
@Bean
public MessageChannel toSplitter() {
return new PublishSubscribeChannel();
}
@Bean
@ServiceActivator(inputChannel = "toSftpChannel", poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1"))
public MessageHandler handler() {
final SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
handler.setRemoteDirectoryExpression(new LiteralExpression(sftpRemoteDirectory));
handler.setFileNameGenerator(message -> {
if (message.getPayload() instanceof byte[]) {
return (String) message.getHeaders().get("name");
} else {
throw new IllegalArgumentException("byte[] expected in Payload!");
}
});
return handler;
}
@Bean
public SessionFactory<LsEntry> sftpSessionFactory() {
final DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
final Properties jschProps = new Properties();
jschProps.put("StrictHostKeyChecking", "no");
jschProps.put("PreferredAuthentications", "publickey,password");
factory.setSessionConfig(jschProps);
factory.setHost(sftpHost);
factory.setPort(sftpPort);
factory.setUser(sftpUser);
if (sftpPrivateKey != null) {
factory.setPrivateKey(sftpPrivateKey);
factory.setPrivateKeyPassphrase(sftpPrivateKeyPassphrase);
} else {
factory.setPassword(sftpPasword);
}
factory.setAllowUnknownKeys(true);
return new CachingSessionFactory<>(factory);
}
@Bean
@Splitter(inputChannel = "toSplitter")
public DmsDocumentMessageSplitter splitter() {
final DmsDocumentMessageSplitter splitter = new DmsDocumentMessageSplitter();
splitter.setOutputChannelName("toSftpChannel");
return splitter;
}
@Transformer(inputChannel = "errorChannel", outputChannel = "toSftpChannel")
public Message<?> errorChannelHandler(ErrorMessage errorMessage) throws RuntimeException {
Message<?> failedMessage = ((MessagingException) errorMessage.getPayload())
.getFailedMessage();
return MessageBuilder.withPayload(failedMessage)
.copyHeadersIfAbsent(failedMessage.getHeaders())
.build();
}
@MessagingGateway
public interface UploadGateway {
@Gateway(requestChannel = "toSplitter")
void upload(@Payload List<byte[]> payload, @Header("header") DmsDocumentUploadRequestHeader header);
}
谢谢..
更新
@Bean(PollerMetadata.DEFAULT_POLLER)
@Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.READ_COMMITTED)
PollerMetadata poller() {
return Pollers
.fixedRate(5000)
.maxMessagesPerPoll(1)
.receiveTimeout(500)
.taskExecutor(taskExecutor())
.transactionSynchronizationFactory(transactionSynchronizationFactory())
.get();
}
@Bean
@ServiceActivator(inputChannel = "toMessageStore", poller = @Poller(PollerMetadata.DEFAULT_POLLER))
public BridgeHandler bridge() {
BridgeHandler bridgeHandler = new BridgeHandler();
bridgeHandler.setOutputChannelName("toSftpChannel");
return bridgeHandler;
}
null
failedMessage
是一个错误;转载INT-4421.
我不建议在这种情况下使用 QueueChannel
。如果您使用直接渠道,您可以配置 retry advice 来尝试重新投递。当重试次数耗尽时(如果这样配置),异常将被抛回给调用线程。
将建议添加到 SftpMessageHandler
的 adviceChain
属性。
编辑
您可以通过在可轮询通道和 sftp 适配器之间插入桥接来解决 "missing" 失败消息:
@Bean
@ServiceActivator(inputChannel = "toSftpChannel", poller = @Poller(fixedDelay = "5000", maxMessagesPerPoll = "1"))
public BridgeHandler bridge() {
BridgeHandler bridgeHandler = new BridgeHandler();
bridgeHandler.setOutputChannelName("toRealSftpChannel");
return bridgeHandler;
}
@Bean
@ServiceActivator(inputChannel = "toRealSftpChannel")
public MessageHandler handler() {
final SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
handler.setRemoteDirectoryExpression(new LiteralExpression("foo"));
handler.setFileNameGenerator(message -> {
if (message.getPayload() instanceof byte[]) {
return (String) message.getHeaders().get("name");
}
else {
throw new IllegalArgumentException("byte[] expected in Payload!");
}
});
return handler;
}